import os
import xarray as xr
from mpas_tools.logging import check_call
from compass.parallel import run_command
[docs]
def run_model(step, update_pio=True, partition_graph=True,
graph_file='graph.info', namelist=None, streams=None):
"""
Run the model after determining the number of tasks and threads
Parameters
----------
step : compass.Step
a step
update_pio : bool, optional
Whether to modify the namelist so the number of PIO tasks and the
stride between them is consistent with the number of nodes and tasks
(one PIO task per node).
partition_graph : bool, optional
Whether to partition the domain for the requested number of tasks. If
so, the partitioning executable is taken from the ``partition`` option
of the ``[executables]`` config section.
graph_file : str, optional
The name of the graph file to partition
namelist : str, optional
The name of the namelist file, default is ``namelist.<mpas_core>``
streams : str, optional
The name of the streams file, default is ``streams.<mpas_core>``
"""
mpas_core = step.mpas_core.name
ntasks = step.ntasks
cpus_per_task = step.cpus_per_task
openmp_threads = step.openmp_threads
config = step.config
logger = step.logger
if namelist is None:
namelist = f'namelist.{mpas_core}'
if streams is None:
streams = f'streams.{mpas_core}'
if update_pio:
step.update_namelist_pio(namelist)
if partition_graph:
partition(ntasks, config, logger, graph_file=graph_file)
model = config.get('executables', 'model')
model_basename = os.path.basename(model)
args = [f'./{model_basename}', '-n', namelist, '-s', streams]
run_command(args=args, cpus_per_task=cpus_per_task, ntasks=ntasks,
openmp_threads=openmp_threads, config=config, logger=logger)
[docs]
def partition(ntasks, config, logger, graph_file='graph.info'):
"""
Partition the domain for the requested number of tasks
Parameters
----------
ntasks : int
The number of tasks that the model should be run on
config : compass.config.CompassConfigParser
Configuration options for the test case, used to get the partitioning
executable
logger : logging.Logger
A logger for output from the step that is calling this function
graph_file : str, optional
The name of the graph file to partition
"""
if ntasks > 1:
executable = config.get('parallel', 'partition_executable')
args = [executable, graph_file, f'{ntasks}']
check_call(args, logger)
[docs]
def make_graph_file(mesh_filename, graph_filename='graph.info',
weight_field=None):
"""
Make a graph file from the MPAS mesh for use in the Metis graph
partitioning software
Parameters
----------
mesh_filename : str
The name of the input MPAS mesh file
graph_filename : str, optional
The name of the output graph file
weight_field : str
The name of a variable in the MPAS mesh file to use as a field of
weights
"""
with xr.open_dataset(mesh_filename) as ds:
nCells = ds.sizes['nCells']
nEdgesOnCell = ds.nEdgesOnCell.values
cellsOnCell = ds.cellsOnCell.values - 1
if weight_field is not None:
if weight_field in ds:
raise ValueError(f'weight_field {weight_field} not found in '
f'{mesh_filename}')
weights = ds[weight_field].values
else:
weights = None
nEdges = 0
for i in range(nCells):
for j in range(nEdgesOnCell[i]):
if cellsOnCell[i][j] != -1:
nEdges = nEdges + 1
nEdges = nEdges // 2
with open(graph_filename, 'w+') as graph:
if weights is None:
graph.write(f'{nCells} {nEdges}\n')
for i in range(nCells):
for j in range(0, nEdgesOnCell[i]):
if cellsOnCell[i][j] >= 0:
graph.write(f'{cellsOnCell[i][j] + 1} ')
graph.write('\n')
else:
graph.write(f'{nCells} {nEdges} 010\n')
for i in range(nCells):
graph.write(f'{int(weights[i])} ')
for j in range(0, nEdgesOnCell[i]):
if cellsOnCell[i][j] >= 0:
graph.write(f'{cellsOnCell[i][j] + 1} ')
graph.write('\n')