Source code for compass.parallel

import os
import multiprocessing
import subprocess
import warnings


[docs]def get_available_cores_and_nodes(config): """ Get the number of total cores and nodes available for running steps Parameters ---------- config : configparser.ConfigParser Configuration options for the test case Returns ------- cores : int The number of cores available for running steps nodes : int The number of cores available for running steps """ parallel_system = config.get('parallel', 'system') if parallel_system == 'slurm': job_id = os.environ['SLURM_JOB_ID'] args = ['squeue', '--noheader', '-j', job_id, '-o', '%C'] cores = _get_subprocess_int(args) args = ['squeue', '--noheader', '-j', job_id, '-o', '%D'] nodes = _get_subprocess_int(args) elif parallel_system == 'single_node': cores_per_node = config.getint('parallel', 'cores_per_node') cores = min(multiprocessing.cpu_count(), cores_per_node) nodes = 1 else: raise ValueError('Unexpected parallel system: {}'.format( parallel_system)) return cores, nodes
def check_parallel_system(config): """ Check whether we are in an appropriate state for the given queuing system. For systems with Slurm, this means that we need to have an interactive or batch job on a compute node, as determined by the ``$SLURM_JOB_ID`` environment variable. Parameters ---------- config : configparser.ConfigParser Configuration options Raises ------- ValueError If using Slurm and not on a compute node """ parallel_system = config.get('parallel', 'system') if parallel_system == 'slurm': if 'SLURM_JOB_ID' not in os.environ: raise ValueError('SLURM_JOB_ID not defined. You are likely not ' 'on a compute node.') elif parallel_system == 'single_node': pass else: raise ValueError('Unexpected parallel system: {}'.format( parallel_system)) def set_cores_per_node(config): """ If the system has Slurm, find out the ``cpus_per_node`` and set the config option accordingly. Parameters ---------- config : configparser.ConfigParser Configuration options """ parallel_system = config.get('parallel', 'system') if parallel_system == 'slurm': node = os.environ['SLURMD_NODENAME'] args = ['sinfo', '--noheader', '--node', node, '-o', '%c'] old_cpus_per_node = config.getint('parallel', 'cores_per_node') cpus_per_node = _get_subprocess_int(args) config.set('parallel', 'cores_per_node', f'{cpus_per_node}') if old_cpus_per_node != cpus_per_node: warnings.warn(f'Slurm found {cpus_per_node} cpus per node but ' f'config from mache was {old_cpus_per_node}') def _get_subprocess_int(args): value = subprocess.check_output(args) value = int(value.decode('utf-8').strip('\n')) return value