# This software is open source software available under the BSD-3 license.
#
# Copyright (c) 2020 Triad National Security, LLC. All rights reserved.
# Copyright (c) 2020 Lawrence Livermore National Security, LLC. All rights
# reserved.
# Copyright (c) 2020 UT-Battelle, LLC. All rights reserved.
#
# Additional copyright and license information can be found in the LICENSE file
# distributed with this code, or at
# https://raw.githubusercontent.com/MPAS-Dev/MPAS-Analysis/master/LICENSE
from __future__ import absolute_import, division, print_function, \
unicode_literals
import os
import xarray as xr
import numpy
import shapely.geometry
import json
from multiprocessing import Pool
import progressbar
from functools import partial
from geometric_features import read_feature_collection
import mpas_tools.conversion
from mpas_analysis.shared.analysis_task import AnalysisTask
from mpas_analysis.shared.io.utility import build_config_full_path, \
make_directories, get_region_mask
from mpas_analysis.shared.io import write_netcdf
[docs]def get_feature_list(geojsonFileName):
'''
Builds a list of features found in the geojson file
'''
# Authors
# -------
# Xylar Asay-Davis
featureList = []
with open(geojsonFileName) as f:
featureData = json.load(f)
for feature in featureData['features']:
name = feature['properties']['name']
featureList.append(name)
return featureList
def compute_mpas_region_masks(geojsonFileName, meshFileName, maskFileName,
featureList=None, logger=None, processCount=1,
chunkSize=1000, showProgress=True,
useMpasMaskCreator=False):
'''
Build a region mask file from the given MPAS mesh and geojson file defining
a set of regions.
'''
if os.path.exists(maskFileName):
return
if useMpasMaskCreator:
dsMesh = xr.open_dataset(meshFileName)
fcMask = read_feature_collection(geojsonFileName)
dsMasks = mpas_tools.conversion.mask(dsMesh=dsMesh, fcMask=fcMask,
logger=logger)
else:
with xr.open_dataset(meshFileName) as dsMesh:
dsMesh = dsMesh[['lonCell', 'latCell']]
latCell = numpy.rad2deg(dsMesh.latCell.values)
# transform longitudes to [-180, 180)
lonCell = numpy.mod(numpy.rad2deg(dsMesh.lonCell.values) + 180.,
360.) - 180.
# create shapely geometry for lonCell and latCell
cellPoints = [shapely.geometry.Point(x, y) for x, y in
zip(lonCell, latCell)]
regionNames, masks, properties, nChar = compute_region_masks(
geojsonFileName, cellPoints, maskFileName, featureList, logger,
processCount, chunkSize, showProgress)
nCells = len(cellPoints)
# create a new data array for masks and another for mask names
if logger is not None:
logger.info(' Creating and writing masks dataset...')
nRegions = len(regionNames)
dsMasks = xr.Dataset()
dsMasks['regionCellMasks'] = (('nRegions', 'nCells'),
numpy.zeros((nRegions, nCells), dtype=bool))
dsMasks['regionNames'] = (('nRegions'),
numpy.zeros((nRegions),
dtype='|S{}'.format(nChar)))
for index in range(nRegions):
regionName = regionNames[index]
mask = masks[index]
dsMasks['regionCellMasks'][index, :] = mask
dsMasks['regionNames'][index] = regionName
for propertyName in properties:
dsMasks[propertyName] = (('nRegions'), properties[propertyName])
write_netcdf(dsMasks, maskFileName)
def compute_lon_lat_region_masks(geojsonFileName, lon, lat, maskFileName,
featureList=None, logger=None, processCount=1,
chunkSize=1000, showProgress=True,
lonDim='lon', latDim='lat'):
'''
Build a region mask file from the given lon, lat and geojson file defining
a set of regions.
'''
if os.path.exists(maskFileName):
return
nLon = len(lon)
nLat = len(lat)
# make sure -180 <= lon < 180
lon = numpy.mod(lon + 180., 360.) - 180.
if lonDim != latDim:
lon, lat = numpy.meshgrid(lon, lat)
# create shapely geometry for lonCell and latCell
cellPoints = [shapely.geometry.Point(x, y) for x, y in
zip(lon.ravel(), lat.ravel())]
regionNames, masks, properties, nChar = compute_region_masks(
geojsonFileName, cellPoints, maskFileName, featureList, logger,
processCount, chunkSize, showProgress)
# create a new data array for masks and another for mask names
if logger is not None:
logger.info(' Creating and writing masks dataset...')
nRegions = len(regionNames)
dsMasks = xr.Dataset()
if lonDim == latDim:
dsMasks['regionCellMasks'] = (('nRegions', lonDim),
numpy.zeros((nRegions, nLon),
dtype=bool))
else:
dsMasks['regionCellMasks'] = (('nRegions', latDim, lonDim),
numpy.zeros((nRegions, nLat, nLon),
dtype=bool))
dsMasks['regionNames'] = (('nRegions'),
numpy.zeros((nRegions),
dtype='|S{}'.format(nChar)))
for index in range(nRegions):
regionName = regionNames[index]
mask = masks[index]
if lonDim == latDim:
dsMasks['regionCellMasks'][index, :] = mask
else:
dsMasks['regionCellMasks'][index, :] = mask.reshape([nLat, nLon])
dsMasks['regionNames'][index] = regionName
for propertyName in properties:
dsMasks['{}Regions'.format(propertyName)] = \
(('nRegions'), properties[propertyName])
write_netcdf(dsMasks, maskFileName)
def compute_region_masks(geojsonFileName, cellPoints, maskFileName,
featureList=None, logger=None, processCount=1,
chunkSize=1000, showProgress=True):
'''
Build a region mask file from the given mesh and geojson file defining
a set of regions.
'''
if os.path.exists(maskFileName):
return
if logger is not None:
logger.info('Creating masks file {}'.format(maskFileName))
if featureList is None:
# get a list of features for use by other tasks (e.g. to determine
# plot names)
featureList = get_feature_list(geojsonFileName)
nCells = len(cellPoints)
with open(geojsonFileName) as f:
featureData = json.load(f)
regionNames = []
for feature in featureData['features']:
name = feature['properties']['name']
if name not in featureList:
continue
regionNames.append(name)
propertyNames = set()
for feature in featureData['features']:
for propertyName in feature['properties']:
if propertyName not in ['name', 'author', 'tags', 'component',
'object']:
propertyNames.add(propertyName)
properties = {}
for propertyName in propertyNames:
properties[propertyName] = []
for feature in featureData['features']:
if propertyName in feature['properties']:
propertyVal = feature['properties'][propertyName]
properties[propertyName].append(propertyVal)
else:
properties[propertyName].append('')
if logger is not None:
logger.info(' Computing masks from {}...'.format(geojsonFileName))
masks = []
nChar = 0
for feature in featureData['features']:
name = feature['properties']['name']
if name not in featureList:
continue
if logger is not None:
logger.info(' {}'.format(name))
shape = shapely.geometry.shape(feature['geometry'])
if processCount == 1:
mask = _contains(shape, cellPoints)
else:
nChunks = int(numpy.ceil(nCells / chunkSize))
chunks = []
indices = [0]
for iChunk in range(nChunks):
start = iChunk * chunkSize
end = min((iChunk + 1) * chunkSize, nCells)
chunks.append(cellPoints[start:end])
indices.append(end)
partial_func = partial(_contains, shape)
pool = Pool(processCount)
if showProgress:
widgets = [' ', progressbar.Percentage(), ' ',
progressbar.Bar(), ' ', progressbar.ETA()]
bar = progressbar.ProgressBar(widgets=widgets,
maxval=nChunks).start()
mask = numpy.zeros((nCells,), bool)
for iChunk, maskChunk in \
enumerate(pool.imap(partial_func, chunks)):
mask[indices[iChunk]:indices[iChunk + 1]] = maskChunk
if showProgress:
bar.update(iChunk + 1)
if showProgress:
bar.finish()
pool.terminate()
nChar = max(nChar, len(name))
masks.append(mask)
return regionNames, masks, properties, nChar # }}}
def _contains(shape, cellPoints):
mask = numpy.array([shape.contains(point) for point in cellPoints],
dtype=bool)
return mask
[docs]class ComputeRegionMasksSubtask(AnalysisTask): # {{{
'''
An analysis tasks for computing cell masks for regions defined by geojson
features
Attributes
----------
geojsonFileName : str
A geojson file, typically from the MPAS ``geometric_features``
repository, defining the shapes to be masked
outFileSuffix : str
The suffix for the resulting mask file
featureList : list of str
A list of features to include or ``None`` for all features
maskFileName : str
The name of the output mask file
maskExists : bool
Whether the mask file already exists
obsFileName : str
The name of an observations file to create masks for. But default,
lon/lat are taken from an MPAS restart file
lonVar, latVar : str
The name of the longitude and latitude variables in ``obsFileName``
meshName : str
The name of the mesh or grid, used as part of the mask file name.
Default is the MPAS mesh name
'''
# Authors
# -------
# Xylar Asay-Davis
[docs] def __init__(self, parentTask, geojsonFileName, outFileSuffix,
featureList=None, subtaskName='computeRegionMasks',
subprocessCount=1, obsFileName=None, lonVar='lon',
latVar='lat', meshName=None, useMpasMaskCreator=False):
# {{{
'''
Construct the analysis task and adds it as a subtask of the
``parentTask``.
Parameters
----------
parentTask : ``AnalysisTask``
The parent task, used to get the ``taskName``, ``config`` and
``componentName``
geojsonFileName : str
A geojson file, typically from the MPAS ``geometric_features``
repository, defining the shapes to be masked
outFileSuffix : str
The suffix for the resulting mask file
featureList : list of str, optional
A list of features to include. Default is all features in all
files
subtaskName : str, optional
The name of the subtask
subprocessCount : int, optional
The number of processes that can be used to make the mask
obsFileName : str, optional
The name of an observations file to create masks for. But default,
lon/lat are taken from an MPAS restart file
lonVar, latVar : str, optional
The name of the longitude and latitude variables in ``obsFileName``
meshName : str, optional
The name of the mesh or grid, used as part of the mask file name.
Default is the MPAS mesh name
useMpasMaskCreator : bool, optional
If ``True``, the mask creator from ``mpas_tools`` will be used
to create the mask. Otherwise, python code is used.
'''
# Authors
# -------
# Xylar Asay-Davis
# call the constructor from the base class (AnalysisTask)
super(ComputeRegionMasksSubtask, self).__init__(
config=parentTask.config,
taskName=parentTask.taskName,
subtaskName=subtaskName,
componentName=parentTask.componentName,
tags=[])
self.geojsonFileName = geojsonFileName
self.outFileSuffix = outFileSuffix
self.featureList = featureList
self.subprocessCount = subprocessCount
self.obsFileName = obsFileName
self.lonVar = lonVar
self.latVar = latVar
self.meshName = meshName
self.useMpasMaskCreator = useMpasMaskCreator
if not self.useMpasMaskCreator:
# because this uses a Pool, it cannot be launched as a separate
# process
self.runDirectly = True
parentTask.add_subtask(self)
# }}}
def setup_and_check(self): # {{{
'''
Perform steps to set up the analysis and check for errors in the setup.
Raises
------
IOError :
If a restart file is not available from which to read mesh
information or if no history files are available from which to
compute the climatology in the desired time range.
'''
# Authors
# -------
# Xylar Asay-Davis
# first, call setup_and_check from the base class (AnalysisTask),
# which will perform some common setup, including storing:
# self.runDirectory , self.historyDirectory, self.plotsDirectory,
# self.namelist, self.runStreams, self.historyStreams,
# self.calendar
super(ComputeRegionMasksSubtask, self).setup_and_check()
self.useMpasMesh = self.obsFileName is None
if self.useMpasMesh:
try:
self.obsFileName = self.runStreams.readpath('restart')[0]
except ValueError:
raise IOError('No MPAS restart file found: need at least one '
'restart file to perform region masking.')
maskSubdirectory = build_config_full_path(self.config, 'output',
'maskSubdirectory')
make_directories(maskSubdirectory)
if self.meshName is None:
self.meshName = self.config.get('input', 'mpasMeshName')
# first, see if we have cached a mask file name in the region masks
# directory
self.maskFileName = get_region_mask(
self.config, '{}_{}.nc'.format(self.meshName, self.outFileSuffix))
if not os.path.exists(self.maskFileName):
# no cached mask file, so let's see if there's already one in the
# masks subfolder of the output directory
maskSubdirectory = build_config_full_path(self.config, 'output',
'maskSubdirectory')
self.maskFileName = '{}/{}_{}.nc'.format(maskSubdirectory,
self.meshName,
self.outFileSuffix)
if os.path.exists(self.maskFileName):
# nothing to do so don't block a bunch of other processes
self.subprocessCount = 1
# }}}
def run_task(self): # {{{
'''
Compute the requested climatologies
'''
# Authors
# -------
# Xylar Asay-Davis
if os.path.exists(self.maskFileName):
return
if self.featureList is None:
# get a list of features for use by other tasks (e.g. to determine
# plot names)
self.featureList = get_feature_list(self.geojsonFileName)
if self.useMpasMesh:
compute_mpas_region_masks(
self.geojsonFileName, self.obsFileName, self.maskFileName,
self.featureList, self.logger, self.subprocessCount,
showProgress=False, useMpasMaskCreator=self.useMpasMaskCreator)
else:
dsGrid = xr.open_dataset(self.obsFileName)
latVar = dsGrid[self.latVar]
lonVar = dsGrid[self.lonVar]
if len(latVar.dims) > 1 or len(lonVar.dims) > 1:
raise ValueError('Masking does not support multidimensional'
'lat/lon with dims {}'.format(latVar.dims))
latDim = latVar.dims[0]
lonDim = lonVar.dims[0]
lat = latVar.values
lon = lonVar.values
compute_lon_lat_region_masks(
self.geojsonFileName, lon, lat, self.maskFileName,
self.featureList, self.logger, self.subprocessCount,
showProgress=False, lonDim=lonDim, latDim=latDim)
# }}}
# vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python