"""
mpas_tools.mesh.mask
====================
Efficient creation of region and transect masks on MPAS meshes and lon/lat
grids, with support for multiprocessing.
This module provides functions to create region masks (from polygons), transect
masks (from lines), and flood-fill masks on MPAS meshes or longitude/latitude
grids. For large datasets, it is highly recommended to use the ``pool``
argument to parallelize the computation.
Main functions:
- compute_mpas_region_masks
- compute_mpas_transect_masks
- compute_mpas_flood_fill_mask
- compute_lon_lat_region_masks
- compute_projection_grid_region_masks
See function docstrings for details.
"""
import argparse
from functools import partial
import numpy
import progressbar
import shapely.geometry
import xarray as xr
from geometric_features import read_feature_collection
from igraph import Graph
from scipy.spatial import KDTree
from shapely.geometry import GeometryCollection, MultiPolygon, Polygon, box
from shapely.strtree import STRtree
from mpas_tools.cime.constants import constants
from mpas_tools.io import default_nchar, write_netcdf
from mpas_tools.logging import LoggingContext
from mpas_tools.parallel import create_pool
from mpas_tools.transects import (
cartesian_to_lon_lat,
lon_lat_to_cartesian,
subdivide_great_circle,
)
[docs]
def compute_mpas_region_masks(
dsMesh,
fcMask,
maskTypes=('cell', 'vertex'),
logger=None,
pool=None,
chunkSize=1000,
showProgress=False,
subdivisionThreshold=30.0,
):
"""
Create region masks (polygons) on an MPAS mesh.
Parameters
----------
dsMesh : xarray.Dataset
MPAS mesh dataset.
fcMask : geometric_features.FeatureCollection
Feature collection containing polygon regions.
maskTypes : tuple of {'cell', 'edge', 'vertex'}, optional
Which types of masks to create.
logger : logging.Logger, optional
Logger for progress output.
pool : multiprocessing.Pool, optional
Pool for parallel computation. Strongly recommended for large meshes.
chunkSize : int, optional
Number of points to process per chunk.
showProgress : bool, optional
Show a progress bar.
subdivisionThreshold : float, optional
Subdivide large polygons for efficiency (degrees).
Returns
-------
dsMasks : xarray.Dataset
Dataset containing region masks and region names.
"""
suffixes = {'cell': 'Cell', 'edge': 'Edge', 'vertex': 'Vertex'}
dims = {'cell': 'nCells', 'edge': 'nEdges', 'vertex': 'nVertices'}
dsMasks = xr.Dataset()
for maskType in maskTypes:
suffix = suffixes[maskType]
dim = dims[maskType]
lonName = f'lon{suffix}'
latName = f'lat{suffix}'
lat = numpy.rad2deg(dsMesh[latName].values)
# transform longitudes to [-180, 180)
lon = (
numpy.mod(numpy.rad2deg(dsMesh[lonName].values) + 180.0, 360.0)
- 180.0
)
if logger is not None:
logger.info(f' Computing {maskType} masks:')
# create shapely geometry for lon and lat
points = [shapely.geometry.Point(x, y) for x, y in zip(lon, lat)]
regionNames, masks, properties = _compute_region_masks(
fcMask,
points,
logger,
pool,
chunkSize,
showProgress,
subdivisionThreshold,
)
nPoints = len(points)
if logger is not None:
logger.info(' Adding masks to dataset...')
nRegions = len(regionNames)
# create a new data array for masks
masksVarName = f'region{suffix}Masks'
dsMasks[masksVarName] = (
(dim, 'nRegions'),
numpy.zeros((nPoints, nRegions), dtype=numpy.int32),
)
for index in range(nRegions):
mask = masks[index]
dsMasks[masksVarName][:, index] = numpy.array(
mask, dtype=numpy.int32
)
properties['regionNames'] = regionNames
_add_properties(
ds=dsMasks,
properties=properties,
dim='nRegions',
)
if logger is not None:
logger.info(' Done.')
return dsMasks
def entry_point_compute_mpas_region_masks():
"""Entry point for ``compute_mpas_region_masks()``"""
parser = argparse.ArgumentParser()
parser.add_argument(
'-m',
'--mesh_file_name',
dest='mesh_file_name',
type=str,
required=True,
help='An MPAS mesh file',
)
parser.add_argument(
'-g',
'--geojson_file_name',
dest='geojson_file_name',
type=str,
required=True,
help='An Geojson file containing mask regions',
)
parser.add_argument(
'-o',
'--mask_file_name',
dest='mask_file_name',
type=str,
required=True,
help='An output MPAS region masks file',
)
parser.add_argument(
'-t',
'--mask_types',
nargs='+',
dest='mask_types',
type=str,
help='Which type(s) of masks to make: cell, edge or '
'vertex. Default is cell and vertex.',
)
parser.add_argument(
'-c',
'--chunk_size',
dest='chunk_size',
type=int,
default=1000,
help='The number of cells, vertices or edges that are '
'processed in one operation',
)
parser.add_argument(
'--show_progress',
dest='show_progress',
action='store_true',
help='Whether to show a progress bar',
)
parser.add_argument(
'-s',
'--subdivision',
dest='subdivision',
type=float,
default=30.0,
help='A threshold in degrees (lon or lat) above which '
'the mask region will be subdivided into smaller '
'polygons for faster intersection checking',
)
parser.add_argument(
'--process_count',
required=False,
dest='process_count',
type=int,
help='The number of processes to use to compute masks. The '
'default is to use all available cores',
)
parser.add_argument(
'--multiprocessing_method',
dest='multiprocessing_method',
default='forkserver',
help='The multiprocessing method use for python mask creation '
"('fork', 'spawn' or 'forkserver')",
)
parser.add_argument(
'--format', dest='format', type=str, help='NetCDF file format'
)
parser.add_argument(
'--engine', dest='engine', type=str, help='NetCDF output engine'
)
args = parser.parse_args()
dsMesh = xr.open_dataset(
args.mesh_file_name, decode_cf=False, decode_times=False
)
fcMask = read_feature_collection(args.geojson_file_name)
pool = create_pool(
process_count=args.process_count, method=args.multiprocessing_method
)
if args.mask_types is None:
args.mask_types = ('cell', 'vertex')
with LoggingContext('compute_mpas_region_masks') as logger:
dsMasks = compute_mpas_region_masks(
dsMesh=dsMesh,
fcMask=fcMask,
maskTypes=args.mask_types,
logger=logger,
pool=pool,
chunkSize=args.chunk_size,
showProgress=args.show_progress,
subdivisionThreshold=args.subdivision,
)
write_netcdf(
dsMasks,
args.mask_file_name,
format=args.format,
engine=args.engine,
logger=logger,
)
[docs]
def compute_mpas_transect_masks(
dsMesh,
fcMask,
earthRadius,
maskTypes=('cell', 'edge', 'vertex'),
logger=None,
pool=None,
chunkSize=1000,
showProgress=False,
subdivisionResolution=10e3,
addEdgeSign=False,
):
"""
Create transect masks (lines) on an MPAS mesh.
Parameters
----------
dsMesh : xarray.Dataset
MPAS mesh dataset.
fcMask : geometric_features.FeatureCollection
Feature collection containing transects (LineString or
MultiLineString).
earthRadius : float
Earth radius in meters.
maskTypes : tuple of {'cell', 'edge', 'vertex'}, optional
Which types of masks to create.
logger : logging.Logger, optional
Logger for progress output.
pool : multiprocessing.Pool, optional
Pool for parallel computation.
chunkSize : int, optional
Number of points to process per chunk.
showProgress : bool, optional
Show a progress bar.
subdivisionResolution : float, optional
Subdivide coarse transects for efficiency (meters).
addEdgeSign : bool, optional
Whether to add edge sign variable (extra computation).
Returns
-------
dsMasks : xarray.Dataset
Dataset containing transect masks and transect names.
"""
suffixes = {'cell': 'Cell', 'edge': 'Edge', 'vertex': 'Vertex'}
dims = {'cell': 'nCells', 'edge': 'nEdges', 'vertex': 'nVertices'}
dsMasks = xr.Dataset()
for maskType in maskTypes:
suffix = suffixes[maskType]
dim = dims[maskType]
if logger is not None:
logger.info(f' Computing {maskType} masks:')
polygons, nPolygons, duplicatePolygons = _get_polygons(
dsMesh, maskType
)
transectNames, masks, properties, shapes = _compute_transect_masks(
fcMask,
polygons,
logger,
pool,
chunkSize,
showProgress,
subdivisionResolution,
earthRadius,
)
if logger is not None:
if addEdgeSign and maskType == 'edge':
logger.info(' Adding masks and edge signs to dataset...')
else:
logger.info(' Adding masks to dataset...')
nTransects = len(transectNames)
# create a new data array for masks
masksVarName = f'transect{suffix}Masks'
dsMasks[masksVarName] = (
(dim, 'nTransects'),
numpy.zeros((nPolygons, nTransects), dtype=numpy.int32),
)
if addEdgeSign and maskType == 'edge':
dsMasks['transectEdgeMaskSigns'] = (
(dim, 'nTransects'),
numpy.zeros((nPolygons, nTransects), dtype=numpy.int32),
)
for index in range(nTransects):
maskAndDuplicates = masks[index]
mask = maskAndDuplicates[0:nPolygons]
mask[duplicatePolygons] = numpy.logical_or(
mask[duplicatePolygons], maskAndDuplicates[nPolygons:]
)
dsMasks[masksVarName][:, index] = numpy.array(
mask, dtype=numpy.int32
)
if addEdgeSign and maskType == 'edge':
print(transectNames[index])
dsMasks['transectEdgeMaskSigns'][:, index] = (
_compute_edge_sign(dsMesh, mask, shapes[index])
)
properties['transectNames'] = transectNames
_add_properties(
ds=dsMasks,
properties=properties,
dim='nTransects',
)
if logger is not None:
logger.info(' Done.')
return dsMasks
def entry_point_compute_mpas_transect_masks():
"""Entry point for ``compute_mpas_transect_masks()``"""
parser = argparse.ArgumentParser()
parser.add_argument(
'-m',
'--mesh_file_name',
dest='mesh_file_name',
type=str,
required=True,
help='An MPAS mesh file',
)
parser.add_argument(
'-g',
'--geojson_file_name',
dest='geojson_file_name',
type=str,
required=True,
help='An Geojson file containing transects',
)
parser.add_argument(
'-o',
'--mask_file_name',
dest='mask_file_name',
type=str,
required=True,
help='An output MPAS transect masks file',
)
parser.add_argument(
'-t',
'--mask_types',
nargs='+',
dest='mask_types',
type=str,
help='Which type(s) of masks to make: cell, edge or '
'vertex. Default is cell, edge and vertex.',
)
parser.add_argument(
'-c',
'--chunk_size',
dest='chunk_size',
type=int,
default=1000,
help='The number of cells, vertices or edges that are '
'processed in one operation',
)
parser.add_argument(
'--show_progress',
dest='show_progress',
action='store_true',
help='Whether to show a progress bar',
)
parser.add_argument(
'-s',
'--subdivision',
dest='subdivision',
type=float,
help='The maximum resolution (in meters) of segments '
'in a transect. If a transect is too coarse, it '
'will be subdivided. Default is no subdivision.',
)
parser.add_argument(
'--process_count',
required=False,
dest='process_count',
type=int,
help='The number of processes to use to compute masks. The '
'default is to use all available cores',
)
parser.add_argument(
'--multiprocessing_method',
dest='multiprocessing_method',
default='forkserver',
help='The multiprocessing method use for python mask creation '
"('fork', 'spawn' or 'forkserver')",
)
parser.add_argument(
'--add_edge_sign',
dest='add_edge_sign',
action='store_true',
help='Whether to add the transectEdgeMaskSigns variable',
)
parser.add_argument(
'--format', dest='format', type=str, help='NetCDF file format'
)
parser.add_argument(
'--engine', dest='engine', type=str, help='NetCDF output engine'
)
args = parser.parse_args()
dsMesh = xr.open_dataset(
args.mesh_file_name, decode_cf=False, decode_times=False
)
fcMask = read_feature_collection(args.geojson_file_name)
pool = create_pool(
process_count=args.process_count, method=args.multiprocessing_method
)
if args.mask_types is None:
args.mask_types = ('cell', 'edge', 'vertex')
earth_radius = constants['SHR_CONST_REARTH']
with LoggingContext('compute_mpas_transect_masks') as logger:
dsMasks = compute_mpas_transect_masks(
dsMesh=dsMesh,
fcMask=fcMask,
earthRadius=earth_radius,
maskTypes=args.mask_types,
logger=logger,
pool=pool,
chunkSize=args.chunk_size,
showProgress=args.show_progress,
subdivisionResolution=args.subdivision,
addEdgeSign=args.add_edge_sign,
)
write_netcdf(
dsMasks,
args.mask_file_name,
format=args.format,
engine=args.engine,
logger=logger,
)
[docs]
def compute_mpas_flood_fill_mask(
dsMesh, fcSeed, daGrow=None, logger=None, workers=-1
):
"""
Create a mask by flood-filling from seed points on an MPAS mesh.
Parameters
----------
dsMesh : xarray.Dataset
MPAS mesh dataset.
fcSeed : geometric_features.FeatureCollection
Feature collection containing seed points.
daGrow : xarray.DataArray, optional
Mask where flood fill is allowed to grow (default: all ones).
logger : logging.Logger, optional
Logger for progress output.
workers : int, optional
Number of threads for nearest neighbor search.
Returns
-------
dsMasks : xarray.Dataset
Dataset containing the flood-fill mask.
"""
dsMasks = xr.Dataset()
lat = numpy.rad2deg(dsMesh.latCell.values)
# transform longitudes to [-180, 180)
lon = (
numpy.mod(numpy.rad2deg(dsMesh.lonCell.values) + 180.0, 360.0) - 180.0
)
if logger is not None:
logger.info(' Computing flood fill mask on cells:')
seedMask = _compute_seed_mask(fcSeed, lon, lat, workers)
cellsOnCell = dsMesh.cellsOnCell.values - 1
if daGrow is not None:
growMask = daGrow.values
else:
growMask = numpy.ones(dsMesh.sizes['nCells'])
seedMask = _flood_fill_mask(seedMask, growMask, cellsOnCell)
if logger is not None:
logger.info(' Adding masks to dataset...')
# create a new data array for the mask
masksVarName = 'cellSeedMask'
dsMasks[masksVarName] = (
('nCells',),
numpy.array(seedMask, dtype=numpy.int32),
)
if logger is not None:
logger.info(' Done.')
return dsMasks
def entry_point_compute_mpas_flood_fill_mask():
"""Entry point for ``compute_mpas_flood_fill_mask()``"""
parser = argparse.ArgumentParser()
parser.add_argument(
'-m',
'--mesh_file_name',
dest='mesh_file_name',
type=str,
required=True,
help='An MPAS mesh file',
)
parser.add_argument(
'-g',
'--geojson_file_name',
dest='geojson_file_name',
type=str,
required=True,
help='An Geojson file containing points at which to '
'start the flood fill',
)
parser.add_argument(
'-o',
'--mask_file_name',
dest='mask_file_name',
type=str,
required=True,
help='An output MPAS region masks file',
)
parser.add_argument(
'--format', dest='format', type=str, help='NetCDF file format'
)
parser.add_argument(
'--engine', dest='engine', type=str, help='NetCDF output engine'
)
args = parser.parse_args()
dsMesh = xr.open_dataset(
args.mesh_file_name, decode_cf=False, decode_times=False
)
fcSeed = read_feature_collection(args.geojson_file_name)
with LoggingContext('compute_mpas_flood_fill_mask') as logger:
dsMasks = compute_mpas_flood_fill_mask(
dsMesh=dsMesh, fcSeed=fcSeed, logger=logger
)
write_netcdf(
dsMasks,
args.mask_file_name,
format=args.format,
engine=args.engine,
logger=logger,
)
[docs]
def compute_lon_lat_region_masks(
lon,
lat,
fcMask,
logger=None,
pool=None,
chunkSize=1000,
showProgress=False,
subdivisionThreshold=30.0,
):
"""
Create region masks on a 2D longitude/latitude grid.
Parameters
----------
lon : numpy.ndarray
1D array of longitudes (degrees, -180 to 180).
lat : numpy.ndarray
1D array of latitudes (degrees, -90 to 90).
fcMask : geometric_features.FeatureCollection
Feature collection containing polygon regions.
logger : logging.Logger, optional
Logger for progress output.
pool : multiprocessing.Pool, optional
Pool for parallel computation.
chunkSize : int, optional
Number of points to process per chunk.
showProgress : bool, optional
Show a progress bar.
subdivisionThreshold : float, optional
Subdivide large polygons for efficiency (degrees).
Returns
-------
dsMasks : xarray.Dataset
Dataset containing region masks and region names.
"""
dsMasks = xr.Dataset()
# make sure lon is between -180 and 180
lon = numpy.mod(lon + 180.0, 360.0) - 180.0
Lon, Lat = numpy.meshgrid(lon, lat)
shape = Lon.shape
Lon = Lon.ravel()
Lat = Lat.ravel()
# create shapely geometry for lon and lat
points = [shapely.geometry.Point(x, y) for x, y in zip(Lon, Lat)]
regionNames, masks, properties = _compute_region_masks(
fcMask,
points,
logger,
pool,
chunkSize,
showProgress,
subdivisionThreshold,
)
nlon = len(lon)
nlat = len(lat)
if logger is not None:
logger.info(' Adding masks to dataset...')
nRegions = len(regionNames)
# create a new data array for masks
masksVarName = 'regionMasks'
dsMasks[masksVarName] = (
('lat', 'lon', 'nRegions'),
numpy.zeros((nlat, nlon, nRegions), dtype=numpy.int32),
)
for index in range(nRegions):
mask = masks[index]
dsMasks[masksVarName][:, :, index] = numpy.array(
mask.reshape(shape), dtype=numpy.int32
)
properties['regionNames'] = regionNames
_add_properties(
ds=dsMasks,
properties=properties,
dim='nRegions',
)
if logger is not None:
logger.info(' Done.')
return dsMasks
def entry_point_compute_lon_lat_region_masks():
"""Entry point for ``compute_lon_lat_region_masks()``"""
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
'--grid_file_name',
dest='grid_file_name',
type=str,
required=True,
help='An input lon/lat grid file',
)
parser.add_argument(
'--lon',
dest='lon',
default='lon',
type=str,
help='The name of the longitude coordinate',
)
parser.add_argument(
'--lat',
dest='lat',
default='lat',
type=str,
help='The name of the latitude coordinate',
)
parser.add_argument(
'-g',
'--geojson_file_name',
dest='geojson_file_name',
type=str,
required=True,
help='An Geojson file containing mask regions',
)
parser.add_argument(
'-o',
'--mask_file_name',
dest='mask_file_name',
type=str,
required=True,
help='An output MPAS region masks file',
)
parser.add_argument(
'-c',
'--chunk_size',
dest='chunk_size',
type=int,
default=1000,
help='The number of grid points that are processed in one operation',
)
parser.add_argument(
'--show_progress',
dest='show_progress',
action='store_true',
help='Whether to show a progress bar',
)
parser.add_argument(
'-s',
'--subdivision',
dest='subdivision',
type=float,
default=30.0,
help='A threshold in degrees (lon or lat) above which '
'the mask region will be subdivided into smaller '
'polygons for faster intersection checking',
)
parser.add_argument(
'--process_count',
required=False,
dest='process_count',
type=int,
help='The number of processes to use to compute masks. The '
'default is to use all available cores',
)
parser.add_argument(
'--multiprocessing_method',
dest='multiprocessing_method',
default='forkserver',
help='The multiprocessing method use for python mask creation '
"('fork', 'spawn' or 'forkserver')",
)
parser.add_argument(
'--format', dest='format', type=str, help='NetCDF file format'
)
parser.add_argument(
'--engine', dest='engine', type=str, help='NetCDF output engine'
)
args = parser.parse_args()
dsGrid = xr.open_dataset(
args.grid_file_name, decode_cf=False, decode_times=False
)
lon = dsGrid[args.lon].values
lat = dsGrid[args.lat].values
fcMask = read_feature_collection(args.geojson_file_name)
pool = create_pool(
process_count=args.process_count, method=args.multiprocessing_method
)
with LoggingContext('compute_lon_lat_region_masks') as logger:
dsMasks = compute_lon_lat_region_masks(
lon=lon,
lat=lat,
fcMask=fcMask,
logger=logger,
pool=pool,
chunkSize=args.chunk_size,
showProgress=args.show_progress,
subdivisionThreshold=args.subdivision,
)
write_netcdf(
dsMasks,
args.mask_file_name,
format=args.format,
engine=args.engine,
logger=logger,
)
def compute_projection_grid_region_masks(
lon,
lat,
fcMask,
logger=None,
pool=None,
chunkSize=1000,
showProgress=False,
subdivisionThreshold=30.0,
xdim='x',
ydim='y',
):
"""
Create region masks on a projected (e.g., polar) grid.
Parameters
----------
lon : numpy.ndarray
2D array of longitudes (degrees, -180 to 180).
lat : numpy.ndarray
2D array of latitudes (degrees, -90 to 90).
fcMask : geometric_features.FeatureCollection
Feature collection containing polygon regions.
logger : logging.Logger, optional
Logger for progress output.
pool : multiprocessing.Pool, optional
Pool for parallel computation.
chunkSize : int, optional
Number of points to process per chunk.
showProgress : bool, optional
Show a progress bar.
subdivisionThreshold : float, optional
Subdivide large polygons for efficiency (degrees).
xdim, ydim : str, optional
Names of the x and y dimensions.
Returns
-------
dsMask : xarray.Dataset
Dataset containing region masks and region names.
"""
dsMasks = xr.Dataset()
# make sure -180 <= lon < 180
lon = numpy.mod(lon + 180.0, 360.0) - 180.0
ny, nx = lon.shape
# create shapely geometry for lon and lat
points = [
shapely.geometry.Point(x, y) for x, y in zip(lon.ravel(), lat.ravel())
]
regionNames, masks, properties = _compute_region_masks(
fcMask,
points,
logger,
pool,
chunkSize,
showProgress,
subdivisionThreshold,
)
if logger is not None:
logger.info(' Adding masks to dataset...')
nRegions = len(regionNames)
# create a new data array for masks
masksVarName = 'regionMasks'
dsMasks[masksVarName] = (
(ydim, xdim, 'nRegions'),
numpy.zeros((ny, nx, nRegions), dtype=numpy.int32),
)
for index in range(nRegions):
mask = masks[index]
dsMasks[masksVarName][:, :, index] = numpy.array(
mask.reshape((ny, nx)), dtype=numpy.int32
)
properties['regionNames'] = regionNames
_add_properties(
ds=dsMasks,
properties=properties,
dim='nRegions',
)
if logger is not None:
logger.info(' Done.')
return dsMasks
def entry_point_compute_projection_grid_region_masks():
"""Entry point for ``compute_projection_grid_region_masks()``"""
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
'--grid_file_name',
dest='grid_file_name',
type=str,
required=True,
help='An input lon/lat grid file',
)
parser.add_argument(
'--lon',
dest='lon',
default='lon',
type=str,
help='The name of the 2D longitude coordinate',
)
parser.add_argument(
'--lat',
dest='lat',
default='lat',
type=str,
help='The name of the 2D latitude coordinate',
)
parser.add_argument(
'-g',
'--geojson_file_name',
dest='geojson_file_name',
type=str,
required=True,
help='An Geojson file containing mask regions',
)
parser.add_argument(
'-o',
'--mask_file_name',
dest='mask_file_name',
type=str,
required=True,
help='An output MPAS region masks file',
)
parser.add_argument(
'-c',
'--chunk_size',
dest='chunk_size',
type=int,
default=1000,
help='The number of grid points that are processed in one operation',
)
parser.add_argument(
'--show_progress',
dest='show_progress',
action='store_true',
help='Whether to show a progress bar',
)
parser.add_argument(
'-s',
'--subdivision',
dest='subdivision',
type=float,
default=30.0,
help='A threshold in degrees (lon or lat) above which '
'the mask region will be subdivided into smaller '
'polygons for faster intersection checking',
)
parser.add_argument(
'--process_count',
required=False,
dest='process_count',
type=int,
help='The number of processes to use to compute masks. The '
'default is to use all available cores',
)
parser.add_argument(
'--multiprocessing_method',
dest='multiprocessing_method',
default='forkserver',
help='The multiprocessing method use for python mask creation '
"('fork', 'spawn' or 'forkserver')",
)
parser.add_argument(
'--format', dest='format', type=str, help='NetCDF file format'
)
parser.add_argument(
'--engine', dest='engine', type=str, help='NetCDF output engine'
)
args = parser.parse_args()
dsGrid = xr.open_dataset(
args.grid_file_name, decode_cf=False, decode_times=False
)
lon = dsGrid[args.lon]
lat = dsGrid[args.lat]
ydim, xdim = lon.dims
fcMask = read_feature_collection(args.geojson_file_name)
pool = create_pool(
process_count=args.process_count, method=args.multiprocessing_method
)
with LoggingContext('compute_lon_lat_region_masks') as logger:
dsMasks = compute_projection_grid_region_masks(
lon=lon.values,
lat=lat.values,
fcMask=fcMask,
logger=logger,
pool=pool,
chunkSize=args.chunk_size,
showProgress=args.show_progress,
subdivisionThreshold=args.subdivision,
xdim=xdim,
ydim=ydim,
)
write_netcdf(
dsMasks,
args.mask_file_name,
format=args.format,
engine=args.engine,
logger=logger,
)
def _compute_mask_from_shapes(
shapes1, shapes2, func, pool, chunkSize, showProgress
):
"""
If multiprocessing, break shapes2 into chunks and use multiprocessing to
apply the given function one chunk at a time
"""
nShapes2 = len(shapes2)
if pool is None:
mask = func(shapes1, shapes2)
else:
nChunks = int(numpy.ceil(nShapes2 / chunkSize))
chunks = []
indices = [0]
for iChunk in range(nChunks):
start = iChunk * chunkSize
end = min((iChunk + 1) * chunkSize, nShapes2)
chunks.append(shapes2[start:end])
indices.append(end)
partial_func = partial(func, shapes1)
if showProgress:
widgets = [
' ',
progressbar.Percentage(),
' ',
progressbar.Bar(),
' ',
progressbar.ETA(),
]
bar = progressbar.ProgressBar(
widgets=widgets, maxval=nChunks
).start()
else:
bar = None
mask = numpy.zeros((nShapes2,), bool)
for iChunk, maskChunk in enumerate(pool.imap(partial_func, chunks)):
mask[indices[iChunk] : indices[iChunk + 1]] = maskChunk
if showProgress:
bar.update(iChunk + 1)
if showProgress:
bar.finish()
return mask
def _add_properties(ds, properties, dim):
"""
Add properties to the dataset from a dictionary of properties
"""
nchar = default_nchar
for name, prop_list in properties.items():
if name not in ds:
if isinstance(prop_list[0], str):
ds[name] = (
(dim,),
numpy.zeros((len(prop_list),), dtype=f'|S{nchar}'),
)
for index, value in enumerate(prop_list):
ds[name][index] = value
else:
ds[name] = ((dim,), prop_list)
def _get_region_names_and_properties(fc):
regionNames = []
for feature in fc.features:
name = feature['properties']['name']
regionNames.append(name)
propertyNames = set()
for feature in fc.features:
for propertyName in feature['properties']:
if propertyName not in [
'name',
'author',
'tags',
'component',
'object',
]:
propertyNames.add(propertyName)
properties = {}
for propertyName in propertyNames:
properties[propertyName] = []
for feature in fc.features:
if propertyName in feature['properties']:
propertyVal = feature['properties'][propertyName]
properties[propertyName].append(propertyVal)
else:
properties[propertyName].append('')
return regionNames, properties
def _compute_region_masks(
fcMask, points, logger, pool, chunkSize, showProgress, threshold
):
"""
Build a region mask file from the given mesh and geojson file defining
a set of regions.
"""
regionNames, properties = _get_region_names_and_properties(fcMask)
masks = []
for feature in fcMask.features:
name = feature['properties']['name']
if logger is not None:
logger.info(f' {name}')
shape = shapely.geometry.shape(feature['geometry'])
shapes = _katana(shape, threshold=threshold)
mask = _compute_mask_from_shapes(
shapes1=shapes,
shapes2=points,
func=_contains,
pool=pool,
chunkSize=chunkSize,
showProgress=showProgress,
)
masks.append(mask)
return regionNames, masks, properties
def _contains(shapes, points):
tree = STRtree(points)
mask = numpy.zeros(len(points), dtype=bool)
for shape in shapes:
indicesInShape = tree.query(shape, predicate='covers')
mask[indicesInShape] = True
return mask
def _katana(geometry, threshold, count=0, maxcount=250):
"""
From https://snorfalorpagus.net/blog/2016/03/13/splitting-large-polygons-for-faster-intersections/
Split a Polygon into two parts across it's shortest dimension
Copyright (c) 2016, Joshua Arnott
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS”
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
"""
bounds = geometry.bounds
width = bounds[2] - bounds[0]
height = bounds[3] - bounds[1]
if max(width, height) <= threshold or count == maxcount:
# either the polygon is smaller than the threshold, or the maximum
# number of recursions has been reached
return [geometry]
if height >= width:
# split left to right
a = box(bounds[0], bounds[1], bounds[2], bounds[1] + height / 2)
b = box(bounds[0], bounds[1] + height / 2, bounds[2], bounds[3])
else:
# split top to bottom
a = box(bounds[0], bounds[1], bounds[0] + width / 2, bounds[3])
b = box(bounds[0] + width / 2, bounds[1], bounds[2], bounds[3])
result = []
for d in (
a,
b,
):
c = geometry.intersection(d)
if isinstance(c, GeometryCollection):
c = c.geoms
else:
c = [c]
for e in c:
if isinstance(e, (Polygon, MultiPolygon)):
result.extend(_katana(e, threshold, count + 1, maxcount))
if count > 0:
return result
# convert multipart into singlepart
final_result = []
for g in result:
if isinstance(g, MultiPolygon):
final_result.extend(g.geoms)
else:
final_result.append(g)
return final_result
def _compute_transect_masks(
fcMask,
polygons,
logger,
pool,
chunkSize,
showProgress,
subdivisionResolution,
earthRadius,
):
"""
Build a transect mask file from the given mesh and geojson file defining
a set of transects.
"""
transectNames, properties = _get_region_names_and_properties(fcMask)
masks = []
shapes = []
for feature in fcMask.features:
name = feature['properties']['name']
if logger is not None:
logger.info(f' {name}')
geom_type = feature['geometry']['type']
if geom_type == 'LineString':
coordinates = [feature['geometry']['coordinates']]
elif geom_type == 'MultiLineString':
coordinates = feature['geometry']['coordinates']
else:
raise ValueError(f'Unexpected geometry type {geom_type}')
new_coords = []
for coords in coordinates:
if subdivisionResolution is None:
new_coords.append(coords)
else:
lon, lat = zip(*coords)
x, y, z = lon_lat_to_cartesian(
lon, lat, earthRadius, degrees=True
)
x, y, z, _, _ = subdivide_great_circle(
x, y, z, subdivisionResolution, earthRadius
)
lon, lat = cartesian_to_lon_lat(
x, y, z, earthRadius, degrees=True
)
new_coords.append([list(a) for a in zip(lon, lat)])
if geom_type == 'LineString':
shape = shapely.geometry.LineString(new_coords[0])
else:
shape = shapely.geometry.MultiLineString(new_coords)
mask = _compute_mask_from_shapes(
shapes1=shape,
shapes2=polygons,
func=_intersects,
pool=pool,
chunkSize=chunkSize,
showProgress=showProgress,
)
masks.append(mask)
shapes.append(shape)
return transectNames, masks, properties, shapes
def _intersects(shape, polygons):
tree = STRtree(polygons)
mask = numpy.zeros(len(polygons), dtype=bool)
indicesInShape = tree.query(shape, predicate='intersects')
mask[indicesInShape] = True
return mask
def _get_polygons(dsMesh, maskType):
if maskType == 'cell':
# polygons are vertices on cell
vertexIndices = dsMesh.verticesOnCell.values - 1
nVerticesOnCell = dsMesh.nEdgesOnCell.values
maxVertices = vertexIndices.shape[1]
for iVertex in range(maxVertices):
mask = iVertex >= nVerticesOnCell
# copy the last valid vertex
vertexIndices[mask, iVertex] = vertexIndices[
mask, nVerticesOnCell[mask] - 1
]
lonVertex = dsMesh.lonVertex.values
latVertex = dsMesh.latVertex.values
lonCenter = dsMesh.lonCell.values
elif maskType == 'vertex':
# polygons are cells on vertex
vertexIndices = dsMesh.cellsOnVertex.values - 1
maxVertices = vertexIndices.shape[1]
firstValid = vertexIndices[:, 0]
for iVertex in range(1, maxVertices):
mask = firstValid < 0
firstValid[mask] = vertexIndices[mask, iVertex]
assert numpy.all(firstValid >= 0)
for iVertex in range(maxVertices):
mask = vertexIndices[:, iVertex] < 0
vertexIndices[mask, iVertex] = firstValid[mask]
lonVertex = dsMesh.lonCell.values
latVertex = dsMesh.latCell.values
lonCenter = dsMesh.lonVertex.values
elif maskType == 'edge':
# oh, dear, this is a bit more complicated. Polygons are kites
# involving both vertices and cell centers
verticesOnEdge = dsMesh.verticesOnEdge - 1
cellsOnEdge = dsMesh.cellsOnEdge - 1
nEdges = dsMesh.sizes['nEdges']
nCells = dsMesh.sizes['nCells']
vertexIndices = -1 * numpy.ones((nEdges, 4), int)
vertexIndices[:, 0] = cellsOnEdge[:, 0]
vertexIndices[:, 1] = verticesOnEdge[:, 0] + nCells
vertexIndices[:, 2] = cellsOnEdge[:, 1]
vertexIndices[:, 3] = verticesOnEdge[:, 1] + nCells
# if there are invalid cells, just point to the next vertex; all
# vertices on cell should be valid
mask = vertexIndices[:, 0] < 0
vertexIndices[mask, 0] = vertexIndices[mask, 1]
mask = vertexIndices[:, 2] < 0
vertexIndices[mask, 2] = vertexIndices[mask, 3]
lonVertex = numpy.append(
dsMesh.lonCell.values, dsMesh.lonVertex.values
)
latVertex = numpy.append(
dsMesh.latCell.values, dsMesh.latVertex.values
)
lonCenter = dsMesh.lonEdge.values
else:
raise ValueError(f'Unknown mask type {maskType}')
assert numpy.all(vertexIndices >= 0)
latVertex = numpy.rad2deg(latVertex)
# transform longitudes to [-180, 180)
lonVertex = numpy.mod(numpy.rad2deg(lonVertex) + 180.0, 360.0) - 180.0
lonCenter = numpy.mod(numpy.rad2deg(lonCenter) + 180.0, 360.0) - 180.0
lon = lonVertex[vertexIndices]
lat = latVertex[vertexIndices]
lon, lat, duplicatePolygons = _copy_dateline_lon_lat_vertices(
lon, lat, lonCenter
)
nPolygons = len(lonCenter)
polygons = []
for index in range(lon.shape[0]):
coords = zip(lon[index, :], lat[index, :])
polygons.append(shapely.geometry.Polygon(coords))
return polygons, nPolygons, duplicatePolygons
def _copy_dateline_lon_lat_vertices(lonVertex, latVertex, lonCenter):
nPolygons, _ = lonVertex.shape
lonDiff = lonVertex - lonCenter.reshape(nPolygons, 1)
# which polygons have vertices that are out of range to the west?
outOfRange = lonDiff < -180.0
duplicatePolygonsEast = numpy.flatnonzero(numpy.any(outOfRange, axis=1))
lonVertex[outOfRange] += 360.0
lonVertexToAdd = lonVertex[duplicatePolygonsEast, :] - 360.0
latVertexToAdd = latVertex[duplicatePolygonsEast, :]
# which polygons have vertices that are out of range to the east?
outOfRange = lonDiff >= 180.0
duplicatePolygonsWest = numpy.flatnonzero(numpy.any(outOfRange, axis=1))
lonVertex[outOfRange] -= 360.0
lonVertexToAdd = numpy.append(
lonVertexToAdd, lonVertex[duplicatePolygonsWest, :] + 360.0, axis=0
)
latVertexToAdd = numpy.append(
latVertexToAdd, latVertex[duplicatePolygonsWest, :], axis=0
)
lonVertex = numpy.append(lonVertex, lonVertexToAdd, axis=0)
latVertex = numpy.append(latVertex, latVertexToAdd, axis=0)
duplicatePolygons = numpy.append(
duplicatePolygonsEast, duplicatePolygonsWest
)
# TODO: we still need to do something about cells that contain the poles
return lonVertex, latVertex, duplicatePolygons
def _compute_seed_mask(fcSeed, lon, lat, workers):
"""
Find the cell centers (points) closes to the given seed points and set
the resulting mask to 1 there
"""
points = numpy.vstack((lon, lat)).T
tree = KDTree(points)
mask = numpy.zeros(len(lon), dtype=numpy.int32)
points = numpy.zeros((len(fcSeed.features), 2))
for index, feature in enumerate(fcSeed.features):
points[index, :] = feature['geometry']['coordinates']
_, indices = tree.query(points, workers=workers)
for index in indices:
mask[index] = 1
return mask
def _flood_fill_mask(seedMask, growMask, cellsOnCell):
"""
Flood fill starting with a mask of seed points
"""
maxNeighbors = cellsOnCell.shape[1]
while True:
neighbors = cellsOnCell[seedMask == 1, :]
maskCount = 0
for iNeighbor in range(maxNeighbors):
indices = neighbors[:, iNeighbor]
# we only want to mask valid neighbors, locations that aren't
# already masked, and locations that we're allowed to flood
indices = indices[indices >= 0]
localMask = numpy.logical_and(
seedMask[indices] == 0, growMask[indices] == 1
)
maskCount += numpy.count_nonzero(localMask)
indices = indices[localMask]
seedMask[indices] = 1
if maskCount == 0:
break
return seedMask
def _compute_edge_sign(dsMesh, edgeMask, shape):
"""Compute the edge sign along a transect"""
edge_indices = numpy.flatnonzero(edgeMask)
voe = dsMesh.verticesOnEdge.isel(nEdges=edge_indices).values - 1
lon = numpy.rad2deg(dsMesh.lonVertex.values[voe])
lon = numpy.mod(lon + 180.0, 360.0) - 180.0
lat = numpy.rad2deg(dsMesh.latVertex.values[voe])
lonEdge = numpy.rad2deg(dsMesh.lonEdge.values[edge_indices])
lonEdge = numpy.mod(lonEdge + 180.0, 360.0) - 180.0
lon, lat, duplicate_edges = _copy_dateline_lon_lat_vertices(
lon, lat, lonEdge
)
nEdges = dsMesh.sizes['nEdges']
nVertices = dsMesh.sizes['nVertices']
# give periodic copies unique edge and vertex indices
edge_indices = numpy.append(
edge_indices, edge_indices[duplicate_edges] + nEdges
)
voe = numpy.append(voe, voe[duplicate_edges, :] + nVertices, axis=0)
unique_vertices = numpy.unique(voe.ravel())
local_voe = numpy.zeros(voe.shape, dtype=numpy.int32)
distance = []
unique_lon = []
unique_lat = []
for local_v, v in enumerate(unique_vertices):
local_mask = voe == v
x = lon[local_mask][0]
y = lat[local_mask][0]
distance.append(shape.project(shapely.geometry.Point(x, y)))
unique_lon.append(x)
unique_lat.append(y)
local_voe[local_mask] = local_v
graph = Graph(
n=len(unique_vertices), edges=zip(local_voe[:, 0], local_voe[:, 1])
)
graph.vs['distance'] = distance
graph.vs['lon'] = unique_lon
graph.vs['lat'] = unique_lat
graph.vs['vertices'] = numpy.arange(len(unique_vertices))
graph.es['edges'] = edge_indices
graph.es['vertices'] = [(v0, v1) for v0, v1 in zip(voe[:, 0], voe[:, 1])]
edgeSign = numpy.zeros(edgeMask.shape, dtype=numpy.int32)
clusters = graph.connected_components()
for cluster_index in range(len(clusters)):
cluster = clusters.subgraph(cluster_index)
distance = cluster.vs['distance']
if len(cluster.es) == 1:
edges = cluster.es.select(0)
edge = edges[0]
if edge.source_vertex['distance'] < edge.target_vertex['distance']:
sign = [1]
else:
sign = [-1]
else:
start = numpy.argmin(distance)
end = numpy.argmax(distance)
indices = cluster.get_shortest_paths(
v=start, to=end, output='epath'
)[0]
edges = cluster.es.select(indices)
if len(edges) == 1:
edge = edges[0]
if (
edge.source_vertex['distance']
< edge.target_vertex['distance']
):
sign = [1]
else:
sign = [-1]
else:
verts = numpy.array(edges['vertices'])
sign = numpy.zeros(len(indices), dtype=numpy.int32)
for index in range(len(indices) - 1):
if verts[index, 1] in verts[index + 1, :]:
sign[index] = 1
elif verts[index, 0] in verts[index + 1, :]:
sign[index] = -1
else:
raise ValueError(f'could not find vertex {index}')
if verts[-1, 0] in verts[-2, :]:
sign[-1] = 1
elif verts[-1, 1] in verts[-2, :]:
sign[-1] = -1
else:
raise ValueError('could not find vertex -1')
sign = numpy.array(sign)
edge_indices = numpy.array(edges['edges'])
valid = numpy.array(edge_indices) < nEdges
edgeSign[edge_indices[valid]] = sign[valid]
duplicate = numpy.logical_not(valid)
edgeSign[edge_indices[duplicate] - nEdges] = sign[duplicate]
return edgeSign