# This software is open source software available under the BSD-3 license.
#
# Copyright (c) 2019 Triad National Security, LLC. All rights reserved.
# Copyright (c) 2019 Lawrence Livermore National Security, LLC. All rights
# reserved.
# Copyright (c) 2019 UT-Battelle, LLC. All rights reserved.
#
# Additional copyright and license information can be found in the LICENSE file
# distributed with this code, or at
# https://raw.githubusercontent.com/MPAS-Dev/MPAS-Analysis/master/LICENSE
'''
Functions for performing interpolation
Functions
---------
build_remap_weights - constructs a mapping file containing the indices and
weights needed to perform horizontal interpolation
remap - perform horizontal interpolation on a data sets, given a mapping file
'''
# Authors
# -------
# Xylar Asay-Davis
import subprocess
from tempfile import TemporaryDirectory
import os
from distutils.spawn import find_executable
import numpy
from scipy.sparse import csr_matrix
import xarray as xr
import sys
from pyremap.descriptor import MpasMeshDescriptor, \
LatLonGridDescriptor, LatLon2DGridDescriptor, ProjectionGridDescriptor, \
PointCollectionDescriptor
[docs]class Remapper(object):
'''
A class for remapping fields using a given mapping file. The weights and
indices from the mapping file can be loaded once and reused multiple times
to map several fields between the same source and destination grids.
'''
# Authors
# -------
# Xylar Asay-Davis
[docs] def __init__(self, sourceDescriptor, destinationDescriptor,
mappingFileName=None): # {{{
'''
Create the remapper and read weights and indices from the given file
for later used in remapping fields.
Parameters
----------
sourceDescriptor : ``shared.grid.MeshDescriptor``
An object used to write a scrip file and to determine the type of
the source mesh or grid.
destinationDescriptor : ``shared.grid.MeshDescriptor``
An object used to write a scrip files and to determine the type of
the destination mesh or grid.
mappingFileName : str, optional
The path where the mapping file containing interpolation weights
and indices will be written and/or read. If ``None``,
no interpolation is performed and data sets are returned unchanged.
This is useful if the source and destination grids are determined
to be the same (though the Remapper does not attempt to determine
if this is the case).
'''
# Authors
# -------
# Xylar Asay-Davis
if isinstance(sourceDescriptor, PointCollectionDescriptor):
raise TypeError("sourceDescriptor of type "
"PointCollectionDescriptor is not supported.")
if not isinstance(sourceDescriptor,
(MpasMeshDescriptor, LatLonGridDescriptor,
LatLon2DGridDescriptor, ProjectionGridDescriptor)):
raise TypeError("sourceDescriptor is not of a recognized type.")
if not isinstance(destinationDescriptor,
(MpasMeshDescriptor, LatLonGridDescriptor,
LatLon2DGridDescriptor, ProjectionGridDescriptor,
PointCollectionDescriptor)):
raise TypeError(
"destinationDescriptor is not of a recognized type.")
self.sourceDescriptor = sourceDescriptor
self.destinationDescriptor = destinationDescriptor
self.mappingFileName = mappingFileName
self.mappingLoaded = False
# }}}
def build_mapping_file(self, method='bilinear', additionalArgs=None,
logger=None, mpiTasks=1, tempdir=None): # {{{
'''
Given a source file defining either an MPAS mesh or a lat-lon grid and
a destination file or set of arrays defining a lat-lon grid, constructs
a mapping file used for interpolation between the source and
destination grids.
Parameters
----------
method : {'bilinear', 'neareststod', 'conserve'}, optional
The method of interpolation used, see documentation for
`ESMF_RegridWeightGen` for details.
additionalArgs : list of str, optional
A list of additional arguments to ``ESMF_RegridWeightGen``
logger : ``logging.Logger``, optional
A logger to which ncclimo output should be redirected
mpiTasks : int, optional
The number of MPI tasks (a number > 1 implies that
ESMF_RegridWeightGen will be called with ``mpirun``)
tempdir: str, optional
A temporary directory. By default, a temporary directory is
created, typically in ``/tmp`` but on some systems such as compute
nodes this may not be visible to all processors in the subsequent
``ESMF_RegridWeightGen`` call
Raises
------
OSError
If ``ESMF_RegridWeightGen`` is not in the system path.
ValueError
If sourceDescriptor or destinationDescriptor is of an unknown type
'''
# Authors
# -------
# Xylar Asay-Davis
if isinstance(self.destinationDescriptor,
PointCollectionDescriptor) and \
method not in ['bilinear', 'neareststod']:
raise ValueError("method {} not supported for destination "
"grid of type PointCollectionDescriptor."
"".format(method))
if self.mappingFileName is None or \
os.path.exists(self.mappingFileName):
# a valid weight file already exists, so nothing to do
return
rwgPath = find_executable('ESMF_RegridWeightGen')
if rwgPath is None:
raise OSError('ESMF_RegridWeightGen not found. Make sure esmf '
'package is installed via\n'
'latest nco: \n'
'conda install nco\n'
'Note: this presumes use of the conda-forge '
'channel.')
# Write source and destination SCRIP files in temporary locations
if tempdir is None:
tempobj = TemporaryDirectory()
tempdir = tempobj.name
else:
tempobj = None
self.sourceDescriptor.to_scrip('{}/src_mesh.nc'.format(tempdir))
self.destinationDescriptor.to_scrip(
'{}/dst_mesh.nc'.format(tempdir))
args = [rwgPath,
'--source', self.sourceDescriptor.scripFileName,
'--destination', self.destinationDescriptor.scripFileName,
'--weight', self.mappingFileName,
'--method', method,
'--netcdf4',
'--no_log']
if mpiTasks > 1:
if 'CONDA_PREFIX' in os.environ:
mpirun_path = '{}/bin/mpirun'.format(os.environ['CONDA_PREFIX'])
else:
mpirun_path = 'mpirun'
args = [mpirun_path, '-np', '{}'.format(mpiTasks)] + args
if self.sourceDescriptor.regional:
args.append('--src_regional')
if self.destinationDescriptor.regional:
args.append('--dst_regional')
if self.sourceDescriptor.regional or \
self.destinationDescriptor.regional:
args.append('--ignore_unmapped')
if additionalArgs is not None:
args.extend(additionalArgs)
if logger is None:
print('running: {}'.format(' '.join(args)))
# make sure any output is flushed before we add output from the
# subprocess
sys.stdout.flush()
sys.stderr.flush()
# throw out the standard output from ESMF_RegridWeightGen, as it's
# rather verbose but keep stderr
DEVNULL = open(os.devnull, 'wb')
subprocess.check_call(args, stdout=DEVNULL)
else:
logger.info('running: {}'.format(' '.join(args)))
for handler in logger.handlers:
handler.flush()
process = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
# throw out the standard output from ESMF_RegridWeightGen, as it's
# rather verbose but keep stderr
if stderr:
for line in stderr.split('\n'):
logger.error(line)
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode,
' '.join(args))
if tempobj is not None:
tempobj.cleanup()
# }}}
def remap_file(self, inFileName, outFileName, variableList=None,
overwrite=False, renormalize=None, logger=None): # {{{
'''
Given a source file defining either an MPAS mesh or a lat-lon grid and
a destination file or set of arrays defining a lat-lon grid, constructs
a mapping file used for interpolation between the source and
destination grids.
Parameters
----------
inFileName : str
The path to the file containing a data set on the source grid
outFileName : str
The path where the data on the destination grid should be written
variableList : list of str, optional
A list of variables to be mapped. By default, all variables are
mapped
overwrite : bool, optional
Whether the destination file should be overwritten if it already
exists. If `False`, and the destination file is already present,
the function does nothing and returns immediately
renormalize : float, optional
A threshold to use to renormalize the data
logger : ``logging.Logger``, optional
A logger to which ncclimo output should be redirected
Raises
------
OSError
If ``ncremap`` is not in the system path.
ValueError
If ``mappingFileName`` is ``None`` (meaning no remapping is
needed).
'''
# Authors
# -------
# Xylar Asay-Davis
if self.mappingFileName is None:
raise ValueError('No mapping file was given because remapping is '
'not necessary. The calling\n'
'code should simply use the constents of {} '
'directly.'.format(inFileName))
if not overwrite and os.path.exists(outFileName):
# a remapped file already exists, so nothing to do
return
if isinstance(self.sourceDescriptor, (PointCollectionDescriptor)):
raise TypeError('Source grid is a point collection, which is not'
'supported.')
if find_executable('ncremap') is None:
raise OSError('ncremap not found. Make sure the latest nco '
'package is installed: \n'
'conda install nco\n'
'Note: this presumes use of the conda-forge '
'channel.')
args = ['ncremap',
'-i', inFileName,
'-m', self.mappingFileName,
'--vrb=1',
'-o', outFileName]
regridArgs = []
if renormalize is not None:
regridArgs.append('--renormalize={}'.format(renormalize))
if isinstance(self.sourceDescriptor, LatLonGridDescriptor):
regridArgs.extend(['--rgr lat_nm={}'.format(
self.sourceDescriptor.latVarName),
'--rgr lon_nm={}'.format(
self.sourceDescriptor.lonVarName)])
elif isinstance(self.sourceDescriptor, ProjectionGridDescriptor):
regridArgs.extend(['--rgr lat_nm={}'.format(
self.sourceDescriptor.yVarName),
'--rgr lon_nm={}'.format(
self.sourceDescriptor.xVarName)])
if isinstance(self.destinationDescriptor, LatLonGridDescriptor):
regridArgs.extend(['--rgr lat_nm_out={}'.format(
self.destinationDescriptor.latVarName),
'--rgr lon_nm_out={}'.format(
self.destinationDescriptor.lonVarName)])
elif isinstance(self.destinationDescriptor, ProjectionGridDescriptor):
regridArgs.extend(['--rgr lat_dmn_nm={}'.format(
self.destinationDescriptor.xVarName),
'--rgr lon_dmn_nm={}'.format(
self.destinationDescriptor.yVarName),
'--rgr lat_nm_out=lat', '--rgr lon_nm_out=lon'])
if isinstance(self.destinationDescriptor, PointCollectionDescriptor):
regridArgs.extend(['--rgr lat_nm_out=lat', '--rgr lon_nm_out=lon'])
if len(regridArgs) > 0:
args.extend(['-R', ' '.join(regridArgs)])
if isinstance(self.sourceDescriptor, MpasMeshDescriptor):
# Note: using the -C (climatology) flag for now because otherwise
# ncremap tries to add a _FillValue attribute that might
# already be present and quits with an error
args.extend(['-P', 'mpas', '-C'])
if variableList is not None:
args.extend(['-v', ','.join(variableList)])
# set an environment variable to make sure we're not using czender's
# local version of NCO instead of one we have intentionally loaded
env = os.environ.copy()
env['NCO_PATH_OVERRIDE'] = 'No'
if logger is None:
print('running: {}'.format(' '.join(args)))
# make sure any output is flushed before we add output from the
# subprocess
sys.stdout.flush()
sys.stderr.flush()
subprocess.check_call(args, env=env)
else:
logger.info('running: {}'.format(' '.join(args)))
for handler in logger.handlers:
handler.flush()
process = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env)
stdout, stderr = process.communicate()
if stdout:
stdout = stdout.decode('utf-8')
for line in stdout.split('\n'):
logger.info(line)
if stderr:
stderr = stderr.decode('utf-8')
for line in stderr.split('\n'):
logger.error(line)
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode,
' '.join(args))
# }}}
def remap(self, ds, renormalizationThreshold=None): # {{{
'''
Given a source data set, returns a remapped version of the data set,
possibly masked and renormalized.
Parameters
----------
ds : ``xarray.Dataset`` or ``xarray.DataArray``
The dimention(s) along ``self.sourceDimNames`` must match
``self.src_grid_dims`` read from the mapping file.
renormalizationThreshold : float, optional
The minimum weight of a denstination cell after remapping, below
which it is masked out, or ``None`` for no renormalization and
masking.
Returns
-------
remappedDs : `xarray.Dataset`` or ``xarray.DataArray``
Returns a remapped data set (or data array) where dimensions other
than ``self.sourceDimNames`` are the same as in ``ds`` and the
dimension(s) given by ``self.sourceDimNames`` have been replaced by
``self.destinationDimNames``.
Raises
------
ValueError
If the size of ``self.sourceDimNames`` in ``ds`` do not match the
source dimensions read in from the mapping file
(``self.src_grid_dims``).
TypeError
If ds is not an ``xarray.Dataset`` or ``xarray.DataArray`` object
'''
# Authors
# -------
# Xylar Asay-Davis
if self.mappingFileName is None:
# No remapping is needed
return ds
self._load_mapping()
for index, dim in enumerate(self.sourceDescriptor.dims):
if self.src_grid_dims[index] != ds.sizes[dim]:
raise ValueError('data set and remapping source dimension {} '
'don\'t have the same size: {} != {}'.format(
dim, self.src_grid_dims[index],
len(ds.sizes[dim])))
if isinstance(ds, xr.DataArray):
remappedDs = self._remap_data_array(ds, renormalizationThreshold)
elif isinstance(ds, xr.Dataset):
drop = []
for var in ds.data_vars:
if self._check_drop(ds[var]):
drop.append(var)
remappedDs = ds.drop(drop)
remappedDs = remappedDs.apply(self._remap_data_array,
keep_attrs=True,
args=(renormalizationThreshold,))
else:
raise TypeError('ds not an xarray Dataset or DataArray.')
# Update history attribute of netCDF file
if 'history' in remappedDs.attrs:
newhist = '\n'.join([remappedDs.attrs['history'],
' '.join(sys.argv[:])])
else:
newhist = sys.argv[:]
remappedDs.attrs['history'] = newhist
remappedDs.attrs['meshName'] = self.destinationDescriptor.meshName
return remappedDs # }}}
def _load_mapping(self): # {{{
'''
Load weights and indices from a mapping file, if this has not already
been done
'''
# Authors
# -------
# Xylar Asay-Davis
if self.mappingLoaded:
return
dsMapping = xr.open_dataset(self.mappingFileName)
n_a = dsMapping.dims['n_a']
n_b = dsMapping.dims['n_b']
nSourceDims = len(self.sourceDescriptor.dims)
src_grid_rank = dsMapping.dims['src_grid_rank']
nDestinationDims = len(self.destinationDescriptor.dims)
dst_grid_rank = dsMapping.dims['dst_grid_rank']
# check that the mapping file has the right number of dimensions
if nSourceDims != src_grid_rank or \
nDestinationDims != dst_grid_rank:
raise ValueError('The number of source and/or '
'destination dimensions does not\n'
'match the expected number of source and '
'destination dimensions in the mapping\n'
'file. {} != {} and/or {} != {}'.format(
nSourceDims, src_grid_rank,
nDestinationDims, dst_grid_rank))
# grid dimensions need to be reversed because they are in Fortran order
self.src_grid_dims = dsMapping['src_grid_dims'].values[::-1]
self.dst_grid_dims = dsMapping['dst_grid_dims'].values[::-1]
# now, check that each source and destination dimension is right
for index in range(len(self.sourceDescriptor.dims)):
dim = self.sourceDescriptor.dims[index]
dimSize = self.sourceDescriptor.dimSize[index]
checkDimSize = self.src_grid_dims[index]
if dimSize != checkDimSize:
raise ValueError('source mesh descriptor and remapping source '
'dimension {} don\'t have the same size: \n'
'{} != {}'.format(dim, dimSize, checkDimSize))
for index in range(len(self.destinationDescriptor.dims)):
dim = self.destinationDescriptor.dims[index]
dimSize = self.destinationDescriptor.dimSize[index]
checkDimSize = self.dst_grid_dims[index]
if dimSize != checkDimSize:
raise ValueError('dest. mesh descriptor and remapping dest. '
'dimension {} don\'t have the same size: \n'
'{} != {}'.format(dim, dimSize, checkDimSize))
self.frac_b = dsMapping['frac_b'].values
col = dsMapping['col'].values - 1
row = dsMapping['row'].values - 1
S = dsMapping['S'].values
self.matrix = csr_matrix((S, (row, col)), shape=(n_b, n_a))
self.mappingLoaded = True # }}}
def _check_drop(self, dataArray): # {{{
sourceDims = self.sourceDescriptor.dims
sourceDimsInArray = [dim in dataArray.dims for dim in sourceDims]
return (numpy.any(sourceDimsInArray) and not
numpy.all(sourceDimsInArray)) # }}}
def _remap_data_array(self, dataArray, renormalizationThreshold): # {{{
'''
Remap a single xarray data array
'''
# Authors
# -------
# Xylar Asay-Davis
sourceDims = self.sourceDescriptor.dims
destDims = self.destinationDescriptor.dims
sourceDimsInArray = [dim in dataArray.dims for dim in sourceDims]
if not numpy.any(sourceDimsInArray):
# no remapping is needed
return dataArray
if not numpy.all(sourceDimsInArray):
# no remapping is possible so the variable array should have been
# dropped
raise ValueError('Data array with some (but not all) required '
'source dims cannot be remapped\n'
'and should have been dropped.')
# make a list of dims and remapAxes
dims = []
remapAxes = []
destDimsAdded = False
for index, dim in enumerate(dataArray.dims):
if dim in sourceDims:
remapAxes.append(index)
if not destDimsAdded:
dims.extend(destDims)
destDimsAdded = True
else:
dims.append(dim)
# make a dict of coords
coordDict = {}
# copy unmodified coords
for coord in dataArray.coords:
sourceDimInCoord = numpy.any([dim in dataArray.coords[coord].dims
for dim in sourceDims])
if not sourceDimInCoord:
coordDict[coord] = {'dims': dataArray.coords[coord].dims,
'data': dataArray.coords[coord].values}
# add dest coords
coordDict.update(self.destinationDescriptor.coords)
# remap the values
field = dataArray.values
mask = numpy.isnan(field)
if numpy.count_nonzero(mask) > 0:
field = numpy.ma.masked_array(field, mask)
remappedField = self._remap_numpy_array(field, remapAxes,
renormalizationThreshold)
arrayDict = {'coords': coordDict,
'attrs': dataArray.attrs,
'dims': dims,
'data': remappedField,
'name': dataArray.name}
# make a new data array
remappedArray = xr.DataArray.from_dict(arrayDict)
return remappedArray # }}}
def _remap_numpy_array(self, inField, remapAxes,
renormalizationThreshold): # {{{
'''
Remap a single numpy array
'''
# Authors
# -------
# Xylar Asay-Davis
# permute the dimensions of inField so the axes to remap are first,
# then flatten the remapping and the extra dimensions separately for
# the matrix multiply
extraAxes = [axis for axis in numpy.arange(inField.ndim)
if axis not in remapAxes]
newShape = [numpy.prod([inField.shape[axis] for axis in remapAxes])]
if len(extraAxes) > 0:
extraShape = [inField.shape[axis] for axis in extraAxes]
newShape.append(numpy.prod(extraShape))
else:
extraShape = []
newShape.append(1)
permutedAxes = remapAxes + extraAxes
# permute axes so the remapped dimension(s) come first and "flatten"
# the remapping dimension
inField = inField.transpose(permutedAxes).reshape(newShape)
masked = (isinstance(inField, numpy.ma.MaskedArray) and
renormalizationThreshold is not None)
if masked:
inMask = numpy.array(numpy.logical_not(inField.mask), float)
outField = self.matrix.dot(inMask * inField)
outMask = self.matrix.dot(inMask)
mask = outMask > renormalizationThreshold
else:
outField = self.matrix.dot(inField)
# make frac_b match the shape of outField
outMask = numpy.reshape(self.frac_b, (len(self.frac_b), 1)).repeat(
newShape[1], axis=1)
mask = outMask > 0.
# normalize the result based on outMask
outField[mask] /= outMask[mask]
outField = numpy.ma.masked_array(outField,
mask=numpy.logical_not(mask))
destRemapDimCount = len(self.dst_grid_dims)
outDimCount = len(extraShape) + destRemapDimCount
# "unflatten" the remapped dimension(s)
destShape = list(self.dst_grid_dims) + extraShape
outField = numpy.reshape(outField, destShape)
# "unpermute" the axes to be in the expected order
index = numpy.amin(remapAxes)
unpermuteAxes = list(numpy.arange(destRemapDimCount, outDimCount))
unpermuteAxes = (unpermuteAxes[0:index] +
list(numpy.arange(destRemapDimCount)) +
unpermuteAxes[index:])
outField = numpy.transpose(outField, axes=unpermuteAxes)
return outField # }}}
def _get_temp_path(): # {{{
'''Returns the name of a temporary NetCDF file'''
return '{}/{}.nc'.format(tempfile._get_default_tempdir(),
next(tempfile._get_candidate_names())) # }}}
# vim: ai ts=4 sts=4 et sw=4 ft=python