Source code for compass.ocean.tests.global_ocean.files_for_e3sm.diagnostic_masks

import os

import mpas_tools.io
import xarray as xr
from geometric_features import GeometricFeatures
from geometric_features.aggregation import get_aggregator_by_name
from mpas_tools.io import write_netcdf
from mpas_tools.logging import check_call
from mpas_tools.ocean.moc import add_moc_southern_boundary_transects

from compass.io import symlink
from compass.ocean.tests.global_ocean.files_for_e3sm.files_for_e3sm_step import (  # noqa: E501
    FilesForE3SMStep,
)


[docs] class DiagnosticMasks(FilesForE3SMStep): """ A step for creating region masks needed for the Meridional Overturning Circulation analysis member and diagnostics from MPAS-Analysis """
[docs] def __init__(self, test_case): """ Create a step Parameters ---------- test_case : compass.ocean.tests.global_ocean.files_for_e3sm.FilesForE3SM The test case this step belongs to """ # noqa: E501 super().__init__(test_case, name='diagnostics_masks', cpus_per_task=18, min_cpus_per_task=1)
# for now, we won't define any outputs because they include the mesh # short name, which is not known at setup time. Currently, this is # safe because no other steps depend on the outputs of this one.
[docs] def run(self): """ Run this step of the testcase """ super().run() make_diagnostics_files(self.logger, self.mesh_short_name, self.with_ice_shelf_cavities, self.cpus_per_task)
def make_diagnostics_files(logger, mesh_short_name, with_ice_shelf_cavities, cpus_per_task): """ Run this step of the testcase Parameters ---------- logger : logging.Logger A logger for output from the step mesh_short_name : str The E3SM short name of the mesh with_ice_shelf_cavities : bool Whether the mesh has ice-shelf cavities cpus_per_task : int The number of cores to use to build masks """ mask_dir = '../assembled_files/diagnostics/mpas_analysis/region_masks' try: os.makedirs(mask_dir) except FileExistsError: pass ocean_inputdata_dir = \ f'../assembled_files/inputdata/ocn/mpas-o/{mesh_short_name}' moc_mask_dirs = [mask_dir, ocean_inputdata_dir] _make_moc_masks(mesh_short_name, logger, cpus_per_task, moc_mask_dirs) gf = GeometricFeatures() region_groups = ['Antarctic Regions', 'Arctic Ocean Regions', 'Arctic Sea Ice Regions', 'Greenland Regions', 'Ocean Basins', 'Ocean Subbasins', 'ISMIP6 Regions'] if with_ice_shelf_cavities: region_groups.append('Ice Shelves') for region_group in region_groups: function, prefix, date = get_aggregator_by_name(region_group) suffix = f'{prefix}{date}' fc_mask = function(gf) _make_region_masks(mesh_short_name, suffix=suffix, fc_mask=fc_mask, logger=logger, cpus_per_task=cpus_per_task, output_dir=mask_dir) transect_groups = ['Transport Transects'] for transect_group in transect_groups: function, prefix, date = get_aggregator_by_name(transect_group) suffix = f'{prefix}{date}' fc_mask = function(gf) _make_transect_masks(mesh_short_name, suffix=suffix, fc_mask=fc_mask, logger=logger, cpus_per_task=cpus_per_task, output_dir=mask_dir) def _make_region_masks(mesh_name, suffix, fc_mask, logger, cpus_per_task, output_dir): mesh_filename = 'restart.nc' geojson_filename = f'{suffix}.geojson' mask_filename = f'{mesh_name}_{suffix}.nc' fc_mask.to_geojson(geojson_filename) # these defaults may have been updated from config options -- pass them # along to the subprocess netcdf_format = mpas_tools.io.default_format netcdf_engine = mpas_tools.io.default_engine args = ['compute_mpas_region_masks', '-m', mesh_filename, '-g', geojson_filename, '-o', mask_filename, '-t', 'cell', '--process_count', f'{cpus_per_task}', '--format', netcdf_format, '--engine', netcdf_engine] check_call(args, logger=logger) # make links in output directory symlink(os.path.abspath(mask_filename), f'{output_dir}/{mask_filename}') def _make_transect_masks(mesh_name, suffix, fc_mask, logger, cpus_per_task, output_dir, subdivision_threshold=10e3): mesh_filename = 'restart.nc' geojson_filename = f'{suffix}.geojson' mask_filename = f'{mesh_name}_{suffix}.nc' fc_mask.to_geojson(geojson_filename) # these defaults may have been updated from config options -- pass them # along to the subprocess netcdf_format = mpas_tools.io.default_format netcdf_engine = mpas_tools.io.default_engine args = ['compute_mpas_transect_masks', '-m', mesh_filename, '-g', geojson_filename, '-o', mask_filename, '-t', 'edge', '-s', f'{subdivision_threshold}', '--process_count', f'{cpus_per_task}', '--add_edge_sign', '--format', netcdf_format, '--engine', netcdf_engine] check_call(args, logger=logger) symlink(os.path.abspath(mask_filename), f'{output_dir}/{mask_filename}') def _make_moc_masks(mesh_short_name, logger, cpus_per_task, moc_mask_dirs): gf = GeometricFeatures() mesh_filename = 'restart.nc' function, prefix, date = get_aggregator_by_name('MOC Basins') fc_mask = function(gf) suffix = f'{prefix}{date}' geojson_filename = f'{suffix}.geojson' mask_filename = f'{mesh_short_name}_{suffix}.nc' fc_mask.to_geojson(geojson_filename) # these defaults may have been updated from config options -- pass them # along to the subprocess netcdf_format = mpas_tools.io.default_format netcdf_engine = mpas_tools.io.default_engine args = ['compute_mpas_region_masks', '-m', mesh_filename, '-g', geojson_filename, '-o', mask_filename, '-t', 'cell', '--process_count', f'{cpus_per_task}', '--format', netcdf_format, '--engine', netcdf_engine] check_call(args, logger=logger) mask_and_transect_filename = \ f'{mesh_short_name}_mocBasinsAndTransects{date}.nc' ds_mesh = xr.open_dataset(mesh_filename) ds_mask = xr.open_dataset(mask_filename) ds_masks_and_transects = add_moc_southern_boundary_transects( ds_mask, ds_mesh, logger=logger) write_netcdf(ds_masks_and_transects, mask_and_transect_filename, char_dim_name='StrLen') # make links in output directories (both inputdata and diagnostics) for output_dir in moc_mask_dirs: symlink(os.path.abspath(mask_and_transect_filename), f'{output_dir}/{mask_and_transect_filename}')