Source code for compass.setup

import argparse
import os
import pickle
import sys
import warnings

from compass import provenance
from compass.config import CompassConfigParser
from compass.io import symlink
from compass.job import write_job_script
from compass.machines import discover_machine
from compass.mpas_cores import get_mpas_cores


[docs] def setup_cases(tests=None, numbers=None, config_file=None, machine=None, # noqa: C901, E501 work_dir=None, baseline_dir=None, mpas_model_path=None, suite_name='custom', cached=None, copy_executable=False): """ Set up one or more test cases Parameters ---------- tests : list of str, optional Relative paths for a test cases to set up numbers : list of str, optional Case numbers to setup, as listed from ``compass list``, optionally with a suffix ``c`` to indicate that all steps in that test case should be cached config_file : {str, None}, optional Configuration file with custom options for setting up and running test cases machine : str, optional The name of one of the machines with defined config options, which can be listed with ``compass list --machines`` work_dir : str, optional A directory that will serve as the base for creating case directories baseline_dir : str, optional Location of baselines that can be compared to mpas_model_path : str, optional The relative or absolute path to the root of a branch where the MPAS model has been built suite_name : str, optional The name of the test suite if tests are being set up through a test suite or ``'custom'`` if not cached : list of list of str, optional For each test in ``tests``, which steps (if any) should be cached, or a list with "_all" as the first entry if all steps in the test case should be cached copy_executable : bool, optional Whether to copy the MPAS executable to the work directory Returns ------- test_cases : dict of compass.TestCase A dictionary of test cases, with the relative path in the work directory as keys """ if machine is None and 'COMPASS_MACHINE' in os.environ: machine = os.environ['COMPASS_MACHINE'] if machine is None: machine = discover_machine() if config_file is None and machine is None: raise ValueError('At least one of config_file and machine is needed.') if config_file is not None and not os.path.exists(config_file): raise FileNotFoundError( f'The user config file wasn\'t found: {config_file}') if tests is None and numbers is None: raise ValueError('At least one of tests or numbers is needed.') if cached is not None: if tests is None: warnings.warn('Ignoring "cached" argument because "tests" was ' 'not provided') elif len(cached) != len(tests): raise ValueError('A list of cached steps must be provided for ' 'each test in "tests"') if work_dir is None: work_dir = os.getcwd() work_dir = os.path.abspath(work_dir) mpas_cores = get_mpas_cores() all_test_cases = dict() for mpas_core in mpas_cores: for test_group in mpas_core.test_groups.values(): for test_case in test_group.test_cases.values(): all_test_cases[test_case.path] = test_case test_cases = dict() cached_steps = dict() if numbers is not None: keys = list(all_test_cases) for number in numbers: cache_all = False if number.endswith('c'): cache_all = True number = int(number[:-1]) else: number = int(number) if number >= len(keys): raise ValueError('test number {} is out of range. There are ' 'only {} tests.'.format(number, len(keys))) path = keys[number] if cache_all: cached_steps[path] = ['_all'] else: cached_steps[path] = list() test_cases[path] = all_test_cases[path] if tests is not None: for index, path in enumerate(tests): if path not in all_test_cases: raise ValueError('Test case with path {} is not in ' 'test_cases'.format(path)) if cached is not None: cached_steps[path] = cached[index] else: cached_steps[path] = list() test_cases[path] = all_test_cases[path] # get the MPAS core of the first test case. We'll assume all tests are # for this core first_path = next(iter(test_cases)) mpas_core = test_cases[first_path].mpas_core.name basic_config = _get_basic_config(config_file, machine, mpas_model_path, mpas_core) provenance.write(work_dir, test_cases, config=basic_config) print('Setting up test cases:') for path, test_case in test_cases.items(): setup_case(path, test_case, config_file, machine, work_dir, baseline_dir, mpas_model_path, cached_steps=cached_steps[path], copy_executable=copy_executable) test_suite = {'name': suite_name, 'test_cases': test_cases, 'work_dir': work_dir} # pickle the test or step dictionary for use at runtime pickle_file = os.path.join(test_suite['work_dir'], '{}.pickle'.format(suite_name)) with open(pickle_file, 'wb') as handle: pickle.dump(test_suite, handle, protocol=pickle.HIGHEST_PROTOCOL) _symlink_load_script(work_dir) max_cores, max_of_min_cores = _get_required_cores(test_cases) print(f'target cores: {max_cores}') print(f'minimum cores: {max_of_min_cores}') if machine is not None: write_job_script(basic_config, machine, max_cores, max_of_min_cores, work_dir, suite=suite_name) return test_cases
[docs] def setup_case(path, test_case, config_file, machine, work_dir, baseline_dir, mpas_model_path, cached_steps, copy_executable): """ Set up one or more test cases Parameters ---------- path : str Relative path for a test cases to set up test_case : compass.TestCase A test case to set up config_file : str Configuration file with custom options for setting up and running test cases machine : str The name of one of the machines with defined config options, which can be listed with ``compass list --machines`` work_dir : str A directory that will serve as the base for creating case directories baseline_dir : str Location of baselines that can be compared to mpas_model_path : str The relative or absolute path to the root of a branch where the MPAS model has been built cached_steps : list of str Which steps (if any) should be cached. If all steps should be cached, the first entry is "_all" copy_executable : bool, optional Whether to copy the MPAS executable to the work directory """ print(' {}'.format(path)) mpas_core = test_case.mpas_core.name config = _get_basic_config(config_file, machine, mpas_model_path, mpas_core) # add the config options for the test group (if defined) test_group = test_case.test_group.name config.add_from_package(f'compass.{mpas_core}.tests.{test_group}', f'{test_group}.cfg', exception=False) if copy_executable: config.set('setup', 'copy_executable', 'True') # add the config options for the test case (if defined) config.add_from_package(test_case.__module__, f'{test_case.name}.cfg', exception=False) if 'COMPASS_BRANCH' in os.environ: compass_branch = os.environ['COMPASS_BRANCH'] config.set('paths', 'compass_branch', compass_branch) else: config.set('paths', 'compass_branch', os.getcwd()) test_case_dir = os.path.join(work_dir, path) try: os.makedirs(test_case_dir) except OSError: pass test_case.work_dir = test_case_dir test_case.base_work_dir = work_dir # add config options specific to the test case test_case.config = config test_case.configure() # add the baseline directory for this test case if baseline_dir is not None: test_case.baseline_dir = os.path.join(baseline_dir, path) # set the mpas_model path from the command line if provided if mpas_model_path is not None: mpas_model_path = os.path.abspath(mpas_model_path) config.set('paths', 'mpas_model', mpas_model_path, user=True) config.set('test_case', 'steps_to_run', ' '.join(test_case.steps_to_run)) # write out the config file test_case_config = '{}.cfg'.format(test_case.name) test_case.config_filename = test_case_config with open(os.path.join(test_case_dir, test_case_config), 'w') as f: config.write(f) if len(cached_steps) > 0 and cached_steps[0] == '_all': cached_steps = list(test_case.steps.keys()) if len(cached_steps) > 0: print_steps = ' '.join(cached_steps) print(f' steps with cached outputs: {print_steps}') for step_name in cached_steps: test_case.steps[step_name].cached = True # iterate over steps for step in test_case.steps.values(): # make the step directory if it doesn't exist step_dir = os.path.join(work_dir, step.path) try: os.makedirs(step_dir) except OSError: pass symlink(os.path.join(test_case_dir, test_case_config), os.path.join(step_dir, test_case_config)) step.work_dir = step_dir step.base_work_dir = work_dir step.config_filename = test_case_config step.config = config # set up the step step.setup() # process input, output, namelist and streams files step.process_inputs_and_outputs() # wait until we've set up all the steps before pickling because steps may # need other steps to be set up for step in test_case.steps.values(): # pickle the test case and step for use at runtime pickle_filename = os.path.join(step.work_dir, 'step.pickle') with open(pickle_filename, 'wb') as handle: pickle.dump((test_case, step), handle, protocol=pickle.HIGHEST_PROTOCOL) _symlink_load_script(step.work_dir) if machine is not None: cores = step.cpus_per_task * step.ntasks min_cores = step.min_cpus_per_task * step.min_tasks write_job_script(config, machine, cores, min_cores, step.work_dir) # pickle the test case and step for use at runtime pickle_filename = os.path.join(test_case.work_dir, 'test_case.pickle') with open(pickle_filename, 'wb') as handle: test_suite = {'name': 'test_case', 'test_cases': {test_case.path: test_case}, 'work_dir': test_case.work_dir} pickle.dump(test_suite, handle, protocol=pickle.HIGHEST_PROTOCOL) _symlink_load_script(test_case_dir) if machine is not None: max_cores, max_of_min_cores = _get_required_cores({path: test_case}) write_job_script(config, machine, max_cores, max_of_min_cores, test_case_dir)
def main(): parser = argparse.ArgumentParser( description='Set up one or more test cases', prog='compass setup') parser.add_argument("-t", "--test", dest="test", help="Relative path for a test case to set up", metavar="PATH") parser.add_argument("-n", "--case_number", nargs='+', dest="case_num", type=str, help="Case number(s) to setup, as listed from " "'compass list'. Can be a space-separated" "list of case numbers. A suffix 'c' indicates" "that all steps in the test should use cached" "outputs.", metavar="NUM") parser.add_argument("-f", "--config_file", dest="config_file", help="Configuration file for test case setup", metavar="FILE") parser.add_argument("-m", "--machine", dest="machine", help="The name of the machine for loading machine-" "related config options", metavar="MACH") parser.add_argument("-w", "--work_dir", dest="work_dir", help="If set, case directories are created in " "work_dir rather than the current directory.", metavar="PATH") parser.add_argument("-b", "--baseline_dir", dest="baseline_dir", help="Location of baselines that can be compared to", metavar="PATH") parser.add_argument("-p", "--mpas_model", dest="mpas_model", help="The path to the build of the MPAS model for the " "core.", metavar="PATH") parser.add_argument("--suite_name", dest="suite_name", default="custom", help="The name to use for the 'custom' test suite" "containing all setup test cases.", metavar="SUITE") parser.add_argument("--cached", dest="cached", nargs='+', help="A list of steps in the test case supplied with" "--test that should use cached outputs, or " "'_all' if all steps should be cached", metavar="STEP") parser.add_argument("--copy_executable", dest="copy_executable", action="store_true", help="If the MPAS executable should be copied to the " "work directory") args = parser.parse_args(sys.argv[2:]) cached = None if args.test is None: tests = None else: tests = [args.test] if args.cached is not None: cached = [args.cached] setup_cases(tests=tests, numbers=args.case_num, config_file=args.config_file, machine=args.machine, work_dir=args.work_dir, baseline_dir=args.baseline_dir, mpas_model_path=args.mpas_model, suite_name=args.suite_name, cached=cached, copy_executable=args.copy_executable) def _get_required_cores(test_cases): """ Get the maximum number of target cores and the max of min cores """ max_cores = 1 max_of_min_cores = 1 for test_case in test_cases.values(): for step_name in test_case.steps_to_run: step = test_case.steps[step_name] if step.cached: continue if step.ntasks is None: raise ValueError( f'The number of tasks (ntasks) was never set for ' f'{test_case.path} step {step_name}') if step.cpus_per_task is None: raise ValueError( f'The number of CPUs per task (cpus_per_task) was never ' f'set for {test_case.path} step {step_name}') cores = step.cpus_per_task * step.ntasks min_cores = step.min_cpus_per_task * step.min_tasks max_cores = max(max_cores, cores) max_of_min_cores = max(max_of_min_cores, min_cores) return max_cores, max_of_min_cores def _get_basic_config(config_file, machine, mpas_model_path, mpas_core): """ Get a base config parser for the machine and MPAS core but not a specific test """ config = CompassConfigParser() if config_file is not None: config.add_user_config(config_file) # start with default compass config options config.add_from_package('compass', 'default.cfg') # add the E3SM config options from mache if machine is not None: config.add_from_package('mache.machines', f'{machine}.cfg', exception=False) # add the compass machine config file if machine is None: machine = 'default' config.add_from_package('compass.machines', f'{machine}.cfg') # store name of machine so tests have access to it if needed if machine is not None: config.set('deploy', 'machine', machine) if 'COMPASS_BRANCH' in os.environ: compass_branch = os.environ['COMPASS_BRANCH'] config.set('paths', 'compass_branch', compass_branch) else: config.set('paths', 'compass_branch', os.getcwd()) # add the config options for the MPAS core config.add_from_package(f'compass.{mpas_core}', f'{mpas_core}.cfg') # set the mpas_model path from the command line if provided if mpas_model_path is not None: mpas_model_path = os.path.abspath(mpas_model_path) config.set('paths', 'mpas_model', mpas_model_path, user=True) return config def _symlink_load_script(work_dir): """ make a symlink to the script for loading the compass conda env. """ if 'LOAD_COMPASS_ENV' in os.environ: script_filename = os.environ['LOAD_COMPASS_ENV'] symlink(script_filename, os.path.join(work_dir, 'load_compass_env.sh'))