import multiprocessing
import os
import subprocess
import warnings
import numpy as np
from mpas_tools.logging import check_call
[docs]
def get_available_parallel_resources(config):
    """
    Get the number of total cores and nodes available for running steps
    Parameters
    ----------
    config : compass.config.CompassConfigParser
        Configuration options for the test case
    Returns
    -------
    available_resources : dict
        A dictionary containing available resources (cores, tasks, nodes
        and cores_per_node)
    """
    parallel_system = config.get('parallel', 'system')
    if parallel_system == 'slurm' and 'SLURM_JOB_ID' not in os.environ:
        parallel_system = 'login'
    if parallel_system == 'slurm':
        job_id = os.environ['SLURM_JOB_ID']
        node = os.environ['SLURMD_NODENAME']
        args = ['sinfo', '--noheader', '--node', node, '-o', '%X']
        sockets_per_node = _get_subprocess_int(args)
        args = ['sinfo', '--noheader', '--node', node, '-o', '%Y']
        cores_per_socket = _get_subprocess_int(args)
        if config.has_option('parallel', 'threads_per_core'):
            threads_per_core = config.getint('parallel', 'threads_per_core')
        else:
            args = ['sinfo', '--noheader', '--node', node, '-o', '%Z']
            threads_per_core = _get_subprocess_int(args)
        cores_per_node = sockets_per_node * cores_per_socket * threads_per_core
        args = ['squeue', '--noheader', '-j', job_id, '-o', '%D']
        nodes = _get_subprocess_int(args)
        cores = cores_per_node * nodes
        mpi_allowed = True
    elif parallel_system == 'login':
        cores = min(multiprocessing.cpu_count(),
                    config.getint('parallel', 'login_cores'))
        cores_per_node = cores
        nodes = 1
        mpi_allowed = False
    elif parallel_system == 'single_node':
        cores = multiprocessing.cpu_count()
        if config.has_option('parallel', 'cores_per_node'):
            cores = min(cores, config.getint('parallel', 'cores_per_node'))
        cores_per_node = cores
        nodes = 1
        mpi_allowed = True
    else:
        raise ValueError(f'Unexpected parallel system: {parallel_system}')
    available_resources = dict(
        cores=cores,
        nodes=nodes,
        cores_per_node=cores_per_node,
        mpi_allowed=mpi_allowed
    )
    if config.has_option('parallel', 'gpus_per_node'):
        available_resources['gpus_per_node'] = \
            
config.getint('parallel', 'gpus_per_node')
    return available_resources 
[docs]
def set_cores_per_node(config, cores_per_node):
    """
    If the system has Slurm, find out the ``cpus_per_node`` and set the config
    option accordingly.
    """
    parallel_system = config.get('parallel', 'system')
    if parallel_system == 'slurm':
        old_cores_per_node = config.getint('parallel', 'cores_per_node')
        config.set('parallel', 'cores_per_node', f'{cores_per_node}')
        if old_cores_per_node != cores_per_node:
            warnings.warn(f'Slurm found {cores_per_node} cpus per node but '
                          f'config from mache was {old_cores_per_node}')
    elif parallel_system == 'single_node':
        if not config.has_option('parallel', 'cores_per_node'):
            config.set('parallel', 'cores_per_node', f'{cores_per_node}') 
[docs]
def run_command(args, cpus_per_task, ntasks, openmp_threads, config, logger):
    """
    Run a subprocess with the given command-line arguments and resources
    Parameters
    ----------
    args : list of str
        The command-line arguments to run in parallel
    cpus_per_task : int
        the number of cores per task the process would ideally use.  If
        fewer cores per node are available on the system, the substep will
        run on all available cores as long as this is not below
        ``min_cpus_per_task``
    ntasks : int
        the number of tasks the process would ideally use.  If too few
        cores are available on the system to accommodate the number of
        tasks and the number of cores per task, the substep will run on
        fewer tasks as long as as this is not below ``min_tasks``
    openmp_threads : int
        the number of OpenMP threads to use
    config : configparser.ConfigParser
        Configuration options for the test case
    logger : logging.Logger
        A logger for output from the step
    """
    env = dict(os.environ)
    env['OMP_NUM_THREADS'] = f'{openmp_threads}'
    if openmp_threads > 1:
        logger.info(f'Running with {openmp_threads} OpenMP threads')
    parallel_executable = config.get('parallel', 'parallel_executable')
    # split the parallel executable into constituents in case it includes flags
    command_line_args = parallel_executable.split(' ')
    parallel_system = config.get('parallel', 'system')
    if parallel_system == 'slurm':
        cores = ntasks * cpus_per_task
        cores_per_node = config.getint('parallel', 'cores_per_node')
        nodes = int(np.ceil(cores / cores_per_node))
        command_line_args.extend(['-c', f'{cpus_per_task}',
                                  '-N', f'{nodes}',
                                  '-n', f'{ntasks}'])
    elif parallel_system == 'single_node':
        command_line_args.extend(['-n', f'{ntasks}'])
    else:
        raise ValueError(f'Unexpected parallel system: {parallel_system}')
    command_line_args.extend(args)
    check_call(command_line_args, logger, env=env) 
def _get_subprocess_int(args):
    value = subprocess.check_output(args)
    value = int(value.decode('utf-8').strip('\n'))
    return value