Source code for mpas_analysis.shared.transects.compute_transect_masks_subtask

# This software is open source software available under the BSD-3 license.
#
# Copyright (c) 2022 Triad National Security, LLC. All rights reserved.
# Copyright (c) 2022 Lawrence Livermore National Security, LLC. All rights
# reserved.
# Copyright (c) 2022 UT-Battelle, LLC. All rights reserved.
#
# Additional copyright and license information can be found in the LICENSE file
# distributed with this code, or at
# https://raw.githubusercontent.com/MPAS-Dev/MPAS-Analysis/main/LICENSE

import os
import xarray as xr

from geometric_features import read_feature_collection, GeometricFeatures
from geometric_features.aggregation import get_aggregator_by_name
import mpas_tools.conversion
from mpas_tools.logging import check_call

from mpas_analysis.shared.analysis_task import AnalysisTask

from mpas_analysis.shared.io.utility import build_config_full_path, \
    make_directories, get_region_mask
from mpas_analysis.shared.io import write_netcdf_with_fill

from mpas_analysis.shared.regions import get_feature_list


[docs] def compute_mpas_transect_masks(geojsonFileName, meshFileName, maskFileName, logger=None, processCount=1, chunkSize=1000, subdivisionThreshold=10e3, useMpasMaskCreator=False, dir=None): """ Build a transect mask file from the given MPAS mesh and geojson file \ defining a set of transects. """ if os.path.exists(maskFileName): return # For now, we need to use mpas_tools.conversion.mask() because # compute_mpas_transect_masks doesn't produce edge sign, needed for # transport transects if useMpasMaskCreator: dsMesh = xr.open_dataset(meshFileName) fcMask = read_feature_collection(geojsonFileName) dsMask = mpas_tools.conversion.mask(dsMesh=dsMesh, fcMask=fcMask, logger=logger, dir=dir) write_netcdf_with_fill(dsMask, maskFileName) else: args = ['compute_mpas_transect_masks', '-m', meshFileName, '-g', geojsonFileName, '-o', maskFileName, '-t', 'edge', '-s', '{}'.format(subdivisionThreshold), '--chunk_size', '{}'.format(chunkSize), '--process_count', '{}'.format(processCount), '--add_edge_sign'] check_call(args, logger=logger)
[docs] class ComputeTransectMasksSubtask(AnalysisTask): """ An analysis tasks for computing cell masks for transects defined by geojson features Attributes ---------- aggregationFunction : callable An aggregation function returned by :py:func:`geometric_features.aggregation.get_region_by_name()` geojsonFileName : str A geojson file, typically from the MPAS ``geometric_features`` repository, defining the shapes to be masked outFileSuffix : str The suffix for the resulting mask file maskFileName : str The name of the output mask file """ # Authors # ------- # Xylar Asay-Davis
[docs] def __init__(self, parentTask, transectGroup, subprocessCount=None): """ Construct the analysis task and adds it as a subtask of the ``parentTask``. Parameters ---------- parentTask : ``AnalysisTask`` The parent task, used to get the ``taskName``, ``config`` and ``componentName`` transectGroup : str The name of a transect group, see :py:func:`mpas_analysis.shared.transects.get_transect_info()` subprocessCount : int, optional The number of processes that can be used to make the mask, default is as many processes as allowed """ # Authors # ------- # Xylar Asay-Davis subtaskName = transectGroup.replace(' ', '') # call the constructor from the base class (AnalysisTask) super(ComputeTransectMasksSubtask, self).__init__( config=parentTask.config, taskName=parentTask.taskName, subtaskName=subtaskName, componentName=parentTask.componentName, tags=[]) if subprocessCount is None: self.subprocessCount = self.config.getint( 'execute', 'parallelTaskCount') else: self.subprocessCount = subprocessCount self.obsFileName = None self.maskSubdirectory = None self.maskFileName = None self.transectGroup = transectGroup self.aggregationFunction, prefix, date = \ get_aggregator_by_name(self.transectGroup) self.outFileSuffix = '{}{}'.format(prefix, date) self.geojsonFileName = \ get_region_mask(self.config, '{}.geojson'.format(self.outFileSuffix))
def make_transect_mask(self): """ If the geojson mask file has not already been cached in the diagnostics or custom diagnostic directories, it will be created in the analysis output's masks directory. """ function = self.aggregationFunction filename = self.geojsonFileName if not os.path.exists(filename): gf = GeometricFeatures() fc = function(gf) fc.to_geojson(filename) def expand_transect_names(self, transectNames): """ If ``transectNames`` contains ``'all'``, make sure the geojson file exists and then return all the transect names found in the file. Parameters ---------- transectNames : list A list of transect names Returns ------- transectNames : list A list of transect names """ if 'all' in transectNames: self.make_transect_mask() transectNames = get_feature_list(self.geojsonFileName) return transectNames def setup_and_check(self): """ Perform steps to set up the analysis and check for errors in the setup. Raises ------ IOError : If a restart file is not available from which to read mesh information or if no history files are available from which to compute the climatology in the desired time range. """ # Authors # ------- # Xylar Asay-Davis # first, call setup_and_check from the base class (AnalysisTask), # which will perform some common setup, including storing: # self.runDirectory , self.historyDirectory, self.plotsDirectory, # self.namelist, self.runStreams, self.historyStreams, # self.calendar super(ComputeTransectMasksSubtask, self).setup_and_check() try: self.obsFileName = self.runStreams.readpath('restart')[0] except ValueError: raise IOError('No MPAS restart file found: need at least one ' 'restart file to perform region masking.') self.maskSubdirectory = build_config_full_path(self.config, 'output', 'maskSubdirectory') make_directories(self.maskSubdirectory) # first, see if we have cached a mask file name in the region masks # directory meshName = self.config.get('input', 'mpasMeshName') self.maskFileName = get_region_mask( self.config, '{}_{}.nc'.format(meshName, self.outFileSuffix)) if not os.path.exists(self.maskFileName): # no cached mask file, so let's see if there's already one in the # masks subfolder of the output directory self.maskFileName = '{}/{}_{}.nc'.format(self.maskSubdirectory, meshName, self.outFileSuffix) if os.path.exists(self.maskFileName): # nothing to do so don't block a bunch of other processes self.subprocessCount = 1 def run_task(self): """ Compute the requested climatologies """ # Authors # ------- # Xylar Asay-Davis if os.path.exists(self.maskFileName): return # make the geojson file if it doesn't exist self.make_transect_mask() compute_mpas_transect_masks( self.geojsonFileName, self.obsFileName, self.maskFileName, logger=self.logger, processCount=self.subprocessCount, dir=self.maskSubdirectory)