import argparse
import glob
import inspect
import os
import pickle
import sys
import time
import mpas_tools.io
from mpas_tools.logging import LoggingContext, check_call
from compass.config import CompassConfigParser
from compass.logging import log_function_call, log_method_call
from compass.parallel import (
get_available_parallel_resources,
set_cores_per_node,
)
[docs]
def run_tests(suite_name, quiet=False, is_test_case=False, steps_to_run=None,
steps_not_to_run=None):
"""
Run the given test suite or test case
Parameters
----------
suite_name : str
The name of the test suite
quiet : bool, optional
Whether step names are not included in the output as the test suite
progresses
is_test_case : bool
Whether this is a test case instead of a full test suite
steps_to_run : list of str, optional
A list of the steps to run if this is a test case, not a full suite.
The default behavior is to run the default steps unless they are in
``steps_not_to_run``
steps_not_to_run : list of str, optional
A list of steps not to run if this is a test case, not a full suite.
Typically, these are steps to remove from the defaults
"""
# Allow a suite name to either include or not the .pickle suffix
if suite_name.endswith('.pickle'):
# code below assumes no suffix, so remove it
suite_name = suite_name[:-len('.pickle')]
# Now open the the suite's pickle file
if not os.path.exists(f'{suite_name}.pickle'):
raise ValueError(f'The suite "{suite_name}" does not appear to have '
f'been set up here.')
with open(f'{suite_name}.pickle', 'rb') as handle:
test_suite = pickle.load(handle)
# get the config file for the first test case in the suite
test_case = next(iter(test_suite['test_cases'].values()))
config_filename = os.path.join(test_case.work_dir,
test_case.config_filename)
config = CompassConfigParser()
config.add_from_file(config_filename)
available_resources = get_available_parallel_resources(config)
# start logging to stdout/stderr
with LoggingContext(suite_name) as logger:
os.environ['PYTHONUNBUFFERED'] = '1'
if not is_test_case:
try:
os.makedirs('case_outputs')
except OSError:
pass
failures = 0
cwd = os.getcwd()
suite_start = time.time()
test_times = dict()
success_strs = dict()
for test_name in test_suite['test_cases']:
test_case = test_suite['test_cases'][test_name]
logger.info(f'{test_name}')
test_name = test_case.path.replace('/', '_')
if is_test_case:
log_filename = None
test_logger = logger
else:
log_filename = f'{cwd}/case_outputs/{test_name}.log'
test_logger = None
success_str, success, test_time = _log_and_run_test(
test_case, logger, test_logger, quiet, log_filename,
is_test_case, steps_to_run, steps_not_to_run,
available_resources)
success_strs[test_name] = success_str
if not success:
failures += 1
test_times[test_name] = test_time
suite_time = time.time() - suite_start
os.chdir(cwd)
logger.info('Test Runtimes:')
for test_name, test_time in test_times.items():
secs = round(test_time)
mins = secs // 60
secs -= 60 * mins
logger.info(f'{mins:02d}:{secs:02d} {success_strs[test_name]} '
f'{test_name}')
secs = round(suite_time)
mins = secs // 60
secs -= 60 * mins
logger.info(f'Total runtime {mins:02d}:{secs:02d}')
if failures == 0:
logger.info('PASS: All passed successfully!')
else:
if failures == 1:
message = '1 test'
else:
message = f'{failures} tests'
logger.error(f'FAIL: {message} failed, see above.')
sys.exit(1)
[docs]
def run_single_step(step_is_subprocess=False):
"""
Used by the framework to run a step when ``compass run`` gets called in the
step's work directory
Parameters
----------
step_is_subprocess : bool, optional
Whether the step is being run as a subprocess of a test case or suite
"""
with open('step.pickle', 'rb') as handle:
test_case, step = pickle.load(handle)
test_case.steps_to_run = [step.name]
test_case.new_step_log_file = False
if step_is_subprocess:
step.run_as_subprocess = False
config = CompassConfigParser()
config.add_from_file(step.config_filename)
available_resources = get_available_parallel_resources(config)
test_case.config = config
set_cores_per_node(test_case.config, available_resources['cores_per_node'])
mpas_tools.io.default_format = config.get('io', 'format')
mpas_tools.io.default_engine = config.get('io', 'engine')
# start logging to stdout/stderr
test_name = step.path.replace('/', '_')
with LoggingContext(name=test_name) as logger:
test_case.logger = logger
test_case.stdout_logger = None
log_function_call(function=_run_test, logger=logger)
logger.info('')
_run_test(test_case, available_resources)
if not step_is_subprocess:
# only perform validation if the step is being run by a user on its
# own
logger.info('')
log_method_call(method=test_case.validate, logger=logger)
logger.info('')
test_case.validate()
def main():
parser = argparse.ArgumentParser(
description='Run a test suite, test case or step',
prog='compass run')
parser.add_argument("suite", nargs='?',
help="The name of a test suite to run. Can exclude "
"or include the .pickle filename suffix.")
parser.add_argument("--steps", dest="steps", nargs='+',
help="The steps of a test case to run")
parser.add_argument("--no-steps", dest="no_steps", nargs='+',
help="The steps of a test case not to run, see "
"steps_to_run in the config file for defaults.")
parser.add_argument("-q", "--quiet", dest="quiet", action="store_true",
help="If set, step names are not included in the "
"output as the test suite progresses. Has no "
"effect when running test cases or steps on "
"their own.")
parser.add_argument("--step_is_subprocess", dest="step_is_subprocess",
action="store_true",
help="Used internally by compass to indicate that"
"a step is being run as a subprocess.")
args = parser.parse_args(sys.argv[2:])
if args.suite is not None:
run_tests(args.suite, quiet=args.quiet)
elif os.path.exists('test_case.pickle'):
run_tests(suite_name='test_case', quiet=args.quiet, is_test_case=True,
steps_to_run=args.steps, steps_not_to_run=args.no_steps)
elif os.path.exists('step.pickle'):
run_single_step(args.step_is_subprocess)
else:
pickles = glob.glob('*.pickle')
if len(pickles) == 1:
suite = os.path.splitext(os.path.basename(pickles[0]))[0]
run_tests(suite, quiet=args.quiet)
elif len(pickles) == 0:
raise OSError('No pickle files were found. Are you sure this is '
'a compass suite, test-case or step work directory?')
else:
raise ValueError('More than one suite was found. Please specify '
'which to run: compass run <suite>')
def _update_steps_to_run(steps_to_run, steps_not_to_run, config, steps):
"""
Update the steps to run
"""
if steps_to_run is None:
steps_to_run = config.get('test_case',
'steps_to_run').replace(',', ' ').split()
for step in steps_to_run:
if step not in steps:
raise ValueError(
f'A step "{step}" was requested but is not one of the steps '
f'in this test case:'
f'\n{list(steps)}')
if steps_not_to_run is not None:
for step in steps_not_to_run:
if step not in steps:
raise ValueError(
f'A step "{step}" was flagged not to run but is not one '
f'of the steps in this test case:'
f'\n{list(steps)}')
steps_to_run = [step for step in steps_to_run if step not in
steps_not_to_run]
return steps_to_run
def _print_to_stdout(test_case, message):
"""
Write out a message to stdout if we're not running a single step
"""
if test_case.stdout_logger is not None:
test_case.stdout_logger.info(message)
if test_case.logger != test_case.stdout_logger:
# also write it to the log file
test_case.logger.info(message)
def _log_and_run_test(test_case, logger, test_logger, quiet, # noqa: C901
log_filename, is_test_case, steps_to_run,
steps_not_to_run, available_resources):
# ANSI fail text: https://stackoverflow.com/a/287944/7728169
start_fail = '\033[91m'
start_pass = '\033[92m'
start_time_color = '\033[94m'
end = '\033[0m'
pass_str = f'{start_pass}PASS{end}'
success_str = f'{start_pass}SUCCESS{end}'
fail_str = f'{start_fail}FAIL{end}'
error_str = f'{start_fail}ERROR{end}'
test_name = test_case.path.replace('/', '_')
with LoggingContext(test_name, logger=test_logger,
log_filename=log_filename) as test_logger:
if quiet:
# just log the step names and any failure messages to the
# log file
test_case.stdout_logger = test_logger
else:
# log steps to stdout
test_case.stdout_logger = logger
test_case.logger = test_logger
test_case.log_filename = log_filename
test_case.new_step_log_file = is_test_case
os.chdir(test_case.work_dir)
config = CompassConfigParser()
config.add_from_file(test_case.config_filename)
test_case.config = config
set_cores_per_node(test_case.config,
available_resources['cores_per_node'])
mpas_tools.io.default_format = config.get('io', 'format')
mpas_tools.io.default_engine = config.get('io', 'engine')
test_case.steps_to_run = _update_steps_to_run(
steps_to_run, steps_not_to_run, config, test_case.steps)
test_start = time.time()
log_method_call(method=test_case.run, logger=test_logger)
test_logger.info('')
try:
_test_case_run_deprecated(test_case)
run_status = success_str
test_pass = True
except BaseException:
run_status = error_str
test_pass = False
test_logger.exception('Exception raised in the test '
'case\'s run() method')
if test_pass:
log_function_call(function=_run_test, logger=test_logger)
test_logger.info('')
test_logger.info('Running steps:')
for step in test_case.steps_to_run:
test_logger.info(f' {step}')
test_logger.info('')
try:
_run_test(test_case, available_resources)
run_status = success_str
test_pass = True
except BaseException:
run_status = error_str
test_pass = False
test_logger.exception('Exception raised while running '
'the steps of the test case')
if test_pass:
test_logger.info('')
log_method_call(method=test_case.validate,
logger=test_logger)
test_logger.info('')
try:
test_case.validate()
except BaseException:
run_status = error_str
test_pass = False
test_logger.exception('Exception raised in the test '
'case\'s validate() method')
baseline_status = None
internal_status = None
if test_case.validation is not None:
internal_pass = test_case.validation['internal_pass']
baseline_pass = test_case.validation['baseline_pass']
if internal_pass is not None:
if internal_pass:
internal_status = pass_str
else:
internal_status = fail_str
test_logger.error(
'Internal test case validation failed')
test_pass = False
if baseline_pass is not None:
if baseline_pass:
baseline_status = pass_str
else:
baseline_status = fail_str
test_logger.error('Baseline validation failed')
test_pass = False
status = f' test execution: {run_status}'
if internal_status is not None:
status = f'{status}\n' \
f' test validation: {internal_status}'
if baseline_status is not None:
status = f'{status}\n' \
f' baseline comparison: {baseline_status}'
if test_pass:
logger.info(status)
success_str = pass_str
success = True
else:
logger.error(status)
if not is_test_case:
logger.error(f' see: case_outputs/{test_name}.log')
success_str = fail_str
success = False
test_time = time.time() - test_start
secs = round(test_time)
mins = secs // 60
secs -= 60 * mins
logger.info(f' test runtime: '
f'{start_time_color}{mins:02d}:{secs:02d}{end}')
return success_str, success, test_time
def _run_test(test_case, available_resources):
"""
Run each step of the test case
"""
logger = test_case.logger
cwd = os.getcwd()
for step_name in test_case.steps_to_run:
step = test_case.steps[step_name]
if step.cached:
logger.info(f' * Cached step: {step_name}')
continue
step.config = test_case.config
if test_case.log_filename is not None:
step.log_filename = test_case.log_filename
_print_to_stdout(test_case, f' * step: {step_name}')
try:
if step.run_as_subprocess:
_run_step_as_subprocess(
test_case, step, test_case.new_step_log_file)
else:
_run_step(test_case, step, test_case.new_step_log_file,
available_resources)
except BaseException:
_print_to_stdout(test_case, ' Failed')
raise
os.chdir(cwd)
def _run_step(test_case, step, new_log_file, available_resources):
"""
Run the requested step
"""
logger = test_case.logger
cwd = os.getcwd()
step.constrain_resources(available_resources)
missing_files = list()
for input_file in step.inputs:
if not os.path.exists(input_file):
missing_files.append(input_file)
if len(missing_files) > 0:
raise OSError(
f'input file(s) missing in step {step.name} of '
f'{step.mpas_core.name}/{step.test_group.name}/'
f'{step.test_case.subdir}: {missing_files}')
test_name = step.path.replace('/', '_')
if new_log_file:
log_filename = f'{cwd}/{step.name}.log'
step.log_filename = log_filename
step_logger = None
else:
step_logger = logger
log_filename = None
with LoggingContext(name=test_name, logger=step_logger,
log_filename=log_filename) as step_logger:
step.logger = step_logger
os.chdir(step.work_dir)
# runtime_setup() will perform small tasks that require knowing the
# resources of the task before the step runs (such as creating
# graph partitions)
step_logger.info('')
log_method_call(method=step.runtime_setup, logger=step_logger)
step_logger.info('')
step.runtime_setup()
step_logger.info('')
log_method_call(method=step.run, logger=step_logger)
step_logger.info('')
step.run()
missing_files = list()
for output_file in step.outputs:
if not os.path.exists(output_file):
missing_files.append(output_file)
if len(missing_files) > 0:
raise OSError(
f'output file(s) missing in step {step.name} of '
f'{step.mpas_core.name}/{step.test_group.name}/'
f'{step.test_case.subdir}: {missing_files}')
def _run_step_as_subprocess(test_case, step, new_log_file):
"""
Run the requested step as a subprocess
"""
logger = test_case.logger
cwd = os.getcwd()
test_name = step.path.replace('/', '_')
if new_log_file:
log_filename = f'{cwd}/{step.name}.log'
step.log_filename = log_filename
step_logger = None
else:
step_logger = logger
log_filename = None
with LoggingContext(name=test_name, logger=step_logger,
log_filename=log_filename) as step_logger:
os.chdir(step.work_dir)
step_args = ['compass', 'run', '--step_is_subprocess']
check_call(step_args, step_logger)
def _test_case_run_deprecated(test_case):
method = test_case.run
# get the "child" class and its location (import sequence) from the method
child_class = method.__self__.__class__
# iterate over the classes that the child class descends from to find the
# first one that actually implements the given method.
actual_class = None
# inspect.getmro() returns a list of classes the child class descends from,
# starting with the child class itself and going "back" to the "object"
# class that all python classes descend from.
for cls in inspect.getmro(child_class):
if method.__name__ in cls.__dict__:
actual_class = cls
break
if actual_class is None:
raise ValueError('We could not find test_case.run(). Something is '
'buggy!')
if actual_class.__name__ != 'TestCase':
# the run() method has been overridden. We need to give the user a
# deprecation warning.
actual_location = f'{actual_class.__module__}.{actual_class.__name__}'
test_case.logger.warn(
f'\nWARNING: Overriding the TestCase.run() method is deprecated.\n'
f' Please move the contents of\n'
f' {actual_location}.run() \n'
f' to the runtime_setup() or constrain_resources() methods of '
f'its steps.\n')
test_case.run()