# This software is open source software available under the BSD-3 license.
#
# Copyright (c) 2020 Triad National Security, LLC. All rights reserved.
# Copyright (c) 2020 Lawrence Livermore National Security, LLC. All rights
# reserved.
# Copyright (c) 2020 UT-Battelle, LLC. All rights reserved.
#
# Additional copyright and license information can be found in the LICENSE file
# distributed with this code, or at
# https://raw.githubusercontent.com/MPAS-Dev/MPAS-Analysis/master/LICENSE
import os
import xarray as xr
import numpy
import shapely.geometry
import json
from multiprocessing import Pool
import progressbar
from functools import partial
import mpas_tools.conversion
from geometric_features import read_feature_collection, GeometricFeatures
from geometric_features.aggregation.ocean import basins, subbasins, antarctic, \
ice_shelves
from mpas_analysis.shared.analysis_task import AnalysisTask
from mpas_analysis.shared.io.utility import build_config_full_path, \
make_directories, get_region_mask
from mpas_analysis.shared.io import write_netcdf
def get_region_info(regionGroup, config):
"""
Get a geojson mask file and the appropriate file suffix for the given
region group.
Parameters
----------
regionGroup : str
The name of a region group to get mask features for, one of
'Antarctic Regions', 'Ocean Basins', 'Ice Shelves', or 'Ocean Subbasins'
config : mpas_analysis.configuration.MpasAnalysisConfigParser
Configuration options
Returns
-------
region : dict
A dictionary of information about the region
filename : str
The name of a geojson file with mask features
suffix : str
A suffix to use for mask files created with these features
"""
regions = {'Antarctic Regions': {'prefix': 'antarcticRegions',
'date': '20200621',
'function': antarctic},
'Ocean Basins': {'prefix': 'oceanBasins',
'date': '20200621',
'function': basins},
'Ice Shelves': {'prefix': 'iceShelves',
'date': '20200621',
'function': ice_shelves},
'Ocean Subbasins': {'prefix': 'oceanSubbasins',
'date': '20201123',
'function': subbasins}}
if regionGroup not in regions:
raise ValueError('Unknown region group {}'.format(regionGroup))
region = regions[regionGroup]
prefix = region['prefix']
date = region['date']
suffix = '{}{}'.format(prefix, date)
filename = get_region_mask(config, '{}.geojson'.format(suffix))
return region, filename, suffix
[docs]def get_feature_list(geojsonFileName):
"""
Builds a list of features found in the geojson file
"""
# Authors
# -------
# Xylar Asay-Davis
featureList = []
with open(geojsonFileName) as f:
featureData = json.load(f)
for feature in featureData['features']:
name = feature['properties']['name']
featureList.append(name)
return featureList
def compute_mpas_region_masks(geojsonFileName, meshFileName, maskFileName,
featureList=None, logger=None, processCount=1,
chunkSize=1000, showProgress=True,
useMpasMaskCreator=False, dir=None):
"""
Build a region mask file from the given MPAS mesh and geojson file defining
a set of regions.
"""
if os.path.exists(maskFileName):
return
if useMpasMaskCreator:
dsMesh = xr.open_dataset(meshFileName)
fcMask = read_feature_collection(geojsonFileName)
dsMasks = mpas_tools.conversion.mask(dsMesh=dsMesh, fcMask=fcMask,
logger=logger, dir=dir)
else:
with xr.open_dataset(meshFileName) as dsMesh:
dsMesh = dsMesh[['lonCell', 'latCell']]
latCell = numpy.rad2deg(dsMesh.latCell.values)
# transform longitudes to [-180, 180)
lonCell = numpy.mod(numpy.rad2deg(dsMesh.lonCell.values) + 180.,
360.) - 180.
# create shapely geometry for lonCell and latCell
cellPoints = [shapely.geometry.Point(x, y) for x, y in
zip(lonCell, latCell)]
regionNames, masks, properties, nChar = compute_region_masks(
geojsonFileName, cellPoints, maskFileName, featureList, logger,
processCount, chunkSize, showProgress)
nCells = len(cellPoints)
# create a new data array for masks and another for mask names
if logger is not None:
logger.info(' Creating and writing masks dataset...')
nRegions = len(regionNames)
dsMasks = xr.Dataset()
dsMasks['regionCellMasks'] = (('nRegions', 'nCells'),
numpy.zeros((nRegions, nCells), dtype=bool))
dsMasks['regionNames'] = (('nRegions'),
numpy.zeros((nRegions),
dtype='|S{}'.format(nChar)))
for index in range(nRegions):
regionName = regionNames[index]
mask = masks[index]
dsMasks['regionCellMasks'][index, :] = mask
dsMasks['regionNames'][index] = regionName
for propertyName in properties:
dsMasks[propertyName] = (('nRegions'), properties[propertyName])
write_netcdf(dsMasks, maskFileName)
def compute_lon_lat_region_masks(geojsonFileName, lon, lat, maskFileName,
featureList=None, logger=None, processCount=1,
chunkSize=1000, showProgress=True,
lonDim='lon', latDim='lat'):
"""
Build a region mask file from the given lon, lat and geojson file defining
a set of regions.
"""
if os.path.exists(maskFileName):
return
nLon = len(lon)
nLat = len(lat)
# make sure -180 <= lon < 180
lon = numpy.mod(lon + 180., 360.) - 180.
if lonDim != latDim:
lon, lat = numpy.meshgrid(lon, lat)
# create shapely geometry for lonCell and latCell
cellPoints = [shapely.geometry.Point(x, y) for x, y in
zip(lon.ravel(), lat.ravel())]
regionNames, masks, properties, nChar = compute_region_masks(
geojsonFileName, cellPoints, maskFileName, featureList, logger,
processCount, chunkSize, showProgress)
# create a new data array for masks and another for mask names
if logger is not None:
logger.info(' Creating and writing masks dataset...')
nRegions = len(regionNames)
dsMasks = xr.Dataset()
if lonDim == latDim:
dsMasks['regionCellMasks'] = (('nRegions', lonDim),
numpy.zeros((nRegions, nLon),
dtype=bool))
else:
dsMasks['regionCellMasks'] = (('nRegions', latDim, lonDim),
numpy.zeros((nRegions, nLat, nLon),
dtype=bool))
dsMasks['regionNames'] = (('nRegions'),
numpy.zeros((nRegions),
dtype='|S{}'.format(nChar)))
for index in range(nRegions):
regionName = regionNames[index]
mask = masks[index]
if lonDim == latDim:
dsMasks['regionCellMasks'][index, :] = mask
else:
dsMasks['regionCellMasks'][index, :] = mask.reshape([nLat, nLon])
dsMasks['regionNames'][index] = regionName
for propertyName in properties:
dsMasks['{}Regions'.format(propertyName)] = \
(('nRegions'), properties[propertyName])
write_netcdf(dsMasks, maskFileName)
def compute_region_masks(geojsonFileName, cellPoints, maskFileName,
featureList=None, logger=None, processCount=1,
chunkSize=1000, showProgress=True):
"""
Build a region mask file from the given mesh and geojson file defining
a set of regions.
"""
if os.path.exists(maskFileName):
return
if logger is not None:
logger.info('Creating masks file {}'.format(maskFileName))
if featureList is None:
# get a list of features for use by other tasks (e.g. to determine
# plot names)
featureList = get_feature_list(geojsonFileName)
nCells = len(cellPoints)
with open(geojsonFileName) as f:
featureData = json.load(f)
regionNames = []
for feature in featureData['features']:
name = feature['properties']['name']
if name not in featureList:
continue
regionNames.append(name)
propertyNames = set()
for feature in featureData['features']:
for propertyName in feature['properties']:
if propertyName not in ['name', 'author', 'tags', 'component',
'object']:
propertyNames.add(propertyName)
properties = {}
for propertyName in propertyNames:
properties[propertyName] = []
for feature in featureData['features']:
if propertyName in feature['properties']:
propertyVal = feature['properties'][propertyName]
properties[propertyName].append(propertyVal)
else:
properties[propertyName].append('')
if logger is not None:
logger.info(' Computing masks from {}...'.format(geojsonFileName))
masks = []
nChar = 0
for feature in featureData['features']:
name = feature['properties']['name']
if name not in featureList:
continue
if logger is not None:
logger.info(' {}'.format(name))
shape = shapely.geometry.shape(feature['geometry'])
if processCount == 1:
mask = _contains(shape, cellPoints)
else:
nChunks = int(numpy.ceil(nCells / chunkSize))
chunks = []
indices = [0]
for iChunk in range(nChunks):
start = iChunk * chunkSize
end = min((iChunk + 1) * chunkSize, nCells)
chunks.append(cellPoints[start:end])
indices.append(end)
partial_func = partial(_contains, shape)
pool = Pool(processCount)
if showProgress:
widgets = [' ', progressbar.Percentage(), ' ',
progressbar.Bar(), ' ', progressbar.ETA()]
bar = progressbar.ProgressBar(widgets=widgets,
max_value=nChunks).start()
else:
bar = None
mask = numpy.zeros((nCells,), bool)
for iChunk, maskChunk in \
enumerate(pool.imap(partial_func, chunks)):
mask[indices[iChunk]:indices[iChunk + 1]] = maskChunk
if showProgress:
bar.update(iChunk + 1)
if showProgress:
bar.finish()
pool.terminate()
nChar = max(nChar, len(name))
masks.append(mask)
return regionNames, masks, properties, nChar # }}}
def _contains(shape, cellPoints):
mask = numpy.array([shape.contains(point) for point in cellPoints],
dtype=bool)
return mask
[docs]class ComputeRegionMasksSubtask(AnalysisTask): # {{{
"""
An analysis tasks for computing cell masks for regions defined by geojson
features
Attributes
----------
regionGroup : str
The name of one of the supported region groups (see
:py:func:`mpas_analysis.shared.regions.get_region_info()`)
region : dict
A dictionary of information about the region from
:py:func:`mpas_analysis.shared.regions.get_region_info()`
geojsonFileName : str
A geojson file, typically from the MPAS ``geometric_features``
repository, defining the shapes to be masked
outFileSuffix : str
The suffix for the resulting mask file
featureList : list of str
A list of features to include or ``None`` for all features
maskFileName : str
The name of the output mask file
obsFileName : str
The name of an observations file to create masks for. But default,
lon/lat are taken from an MPAS restart file
lonVar, latVar : str
The name of the longitude and latitude variables in ``obsFileName``
meshName : str
The name of the mesh or grid, used as part of the mask file name.
Default is the MPAS mesh name
"""
# Authors
# -------
# Xylar Asay-Davis
[docs] def __init__(self, parentTask, regionGroup, meshName, subprocessCount=1,
obsFileName=None, lonVar='lon', latVar='lat',
useMpasMaskCreator=False):
# {{{
"""
Construct the analysis task and adds it as a subtask of the
``parentTask``.
Parameters
----------
parentTask : ``AnalysisTask``
The parent task, used to get the ``taskName``, ``config`` and
``componentName``
regionGroup : str
The name of one of the supported region groups (see
:py:func:`mpas_analysis.shared.regions.get_region_info()`)
meshName : str
The name of the mesh or grid, used as part of the mask file name.
Default is the MPAS mesh name
subprocessCount : int, optional
The number of processes that can be used to make the mask
obsFileName : str, optional
The name of an observations file to create masks for. But default,
lon/lat are taken from an MPAS restart file
lonVar, latVar : str, optional
The name of the longitude and latitude variables in ``obsFileName``
useMpasMaskCreator : bool, optional
If ``True``, the mask creator from ``mpas_tools`` will be used
to create the mask. Otherwise, python code is used.
"""
# Authors
# -------
# Xylar Asay-Davis
suffix = regionGroup.replace(' ', '')
subtaskName = '{}_{}'.format(meshName, suffix)
# call the constructor from the base class (AnalysisTask)
super(ComputeRegionMasksSubtask, self).__init__(
config=parentTask.config,
taskName=parentTask.taskName,
subtaskName=subtaskName,
componentName=parentTask.componentName,
tags=[])
self.regionGroup = regionGroup
self.featureList = None
self.subprocessCount = subprocessCount
self.obsFileName = obsFileName
self.lonVar = lonVar
self.latVar = latVar
self.meshName = meshName
self.useMpasMaskCreator = useMpasMaskCreator
self.useMpasMesh = self.obsFileName is None
self.maskFileName = None
self.region, self.geojsonFileName, self.outFileSuffix = get_region_info(
self.regionGroup, self.config)
if not self.useMpasMaskCreator:
# because this uses a Pool, it cannot be launched as a separate
# process
self.runDirectly = True
parentTask.add_subtask(self)
# }}}
def make_region_mask(self):
"""
If the geojson mask file has not already been cached in the diagnostics
or custom diagnostic directories, it will be created in the analysis
output's masks directory.
"""
function = self.region['function']
filename = self.geojsonFileName
if not os.path.exists(filename):
gf = GeometricFeatures()
fc = function(gf)
fc.to_geojson(filename)
def expand_region_names(self, regionNames):
"""
If ``regionNames`` contains ``'all'``, make sure the geojson file exists
and then return all the region names found in the file.
Parameters
----------
regionNames : list
A list of region names
Returns
-------
regionNames : list
A list of region names
"""
if 'all' in regionNames:
self.make_region_mask()
regionNames = get_feature_list(self.geojsonFileName)
return regionNames
def setup_and_check(self): # {{{
"""
Perform steps to set up the analysis and check for errors in the setup.
Raises
------
IOError :
If a restart file is not available from which to read mesh
information or if no history files are available from which to
compute the climatology in the desired time range.
"""
# Authors
# -------
# Xylar Asay-Davis
# first, call setup_and_check from the base class (AnalysisTask),
# which will perform some common setup, including storing:
# self.runDirectory , self.historyDirectory, self.plotsDirectory,
# self.namelist, self.runStreams, self.historyStreams,
# self.calendar
super(ComputeRegionMasksSubtask, self).setup_and_check()
if self.useMpasMesh:
try:
self.obsFileName = self.runStreams.readpath('restart')[0]
except ValueError:
raise IOError('No MPAS restart file found: need at least one '
'restart file to perform region masking.')
maskSubdirectory = build_config_full_path(self.config, 'output',
'maskSubdirectory')
make_directories(maskSubdirectory)
self.maskFileName = get_region_mask(
self.config, '{}_{}.nc'.format(self.meshName, self.outFileSuffix))
if not os.path.exists(self.maskFileName):
# no cached mask file, so let's see if there's already one in the
# masks subfolder of the output directory
maskSubdirectory = build_config_full_path(self.config, 'output',
'maskSubdirectory')
self.maskFileName = '{}/{}_{}.nc'.format(maskSubdirectory,
self.meshName,
self.outFileSuffix)
if os.path.exists(self.maskFileName):
# nothing to do so don't block a bunch of other processes
self.subprocessCount = 1
# }}}
def run_task(self): # {{{
"""
Compute the requested climatologies
"""
# Authors
# -------
# Xylar Asay-Davis
if os.path.exists(self.maskFileName):
return
# make the geojson file if it doesn't exist
self.make_region_mask()
if self.featureList is None:
# get a list of features for use by other tasks (e.g. to determine
# plot names)
self.featureList = get_feature_list(self.geojsonFileName)
if self.useMpasMesh:
maskSubdirectory = build_config_full_path(self.config, 'output',
'maskSubdirectory')
make_directories(maskSubdirectory)
compute_mpas_region_masks(
self.geojsonFileName, self.obsFileName, self.maskFileName,
self.featureList, self.logger, self.subprocessCount,
showProgress=False, useMpasMaskCreator=self.useMpasMaskCreator,
dir=maskSubdirectory)
else:
dsGrid = xr.open_dataset(self.obsFileName)
latVar = dsGrid[self.latVar]
lonVar = dsGrid[self.lonVar]
if len(latVar.dims) > 1 or len(lonVar.dims) > 1:
raise ValueError('Masking does not support multidimensional'
'lat/lon with dims {}'.format(latVar.dims))
latDim = latVar.dims[0]
lonDim = lonVar.dims[0]
lat = latVar.values
lon = lonVar.values
compute_lon_lat_region_masks(
self.geojsonFileName, lon, lat, self.maskFileName,
self.featureList, self.logger, self.subprocessCount,
showProgress=False, lonDim=lonDim, latDim=latDim)
# }}}
# vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python