import os
import mpas_tools.io
import numpy as np
import xarray as xr
from geometric_features import (
FeatureCollection,
GeometricFeatures,
read_feature_collection,
)
from mpas_tools.io import write_netcdf
from mpas_tools.logging import LoggingContext, check_call
from mpas_tools.mesh.conversion import cull
from mpas_tools.mesh.creation.sort_mesh import sort_mesh
from mpas_tools.mesh.cull import cull_dataset, map_culled_to_base
from mpas_tools.mesh.mask import compute_mpas_flood_fill_mask
from mpas_tools.ocean import inject_bathymetry
from mpas_tools.ocean.coastline_alteration import (
add_critical_land_blockages,
add_land_locked_cells_to_mask,
widen_transect_edge_masks,
)
from mpas_tools.viz.paraview_extractor import extract_vtk
from compass.model import make_graph_file
from compass.step import Step
[docs]
class CullMeshStep(Step):
"""
A step for culling a global MPAS-Ocean mesh
Attributes
----------
base_mesh_step : compass.mesh.spherical.SphericalBaseStep
The base mesh step containing input files to this step
with_ice_shelf_cavities : bool
Whether the mesh includes ice-shelf cavities
do_inject_bathymetry : bool
Whether to interpolate bathymetry from a data file so it
can be used as a culling criteria
preserve_floodplain : bool
Whether to leave land cells in the mesh based on bathymetry
specified by do_inject_bathymetry
remap_topography : compass.ocean.mesh.remap_topography.RemapTopography
A step for remapping topography. If provided, the remapped
topography is used to determine the land mask
"""
[docs]
def __init__(self, test_case, base_mesh_step, with_ice_shelf_cavities,
name='cull_mesh', subdir=None, do_inject_bathymetry=False,
preserve_floodplain=False, remap_topography=None):
"""
Create a new step
Parameters
----------
test_case : compass.ocean.tests.global_ocean.mesh.Mesh
The test case this step belongs to
base_mesh_step : compass.mesh.spherical.SphericalBaseStep
The base mesh step containing input files to this step
with_ice_shelf_cavities : bool
Whether the mesh includes ice-shelf cavities
name : str, optional
the name of the step
subdir : str, optional
the subdirectory for the step. The default is ``name``
do_inject_bathymetry : bool, optional
Whether to interpolate bathymetry from a data file so it
can be used as a culling criteria
preserve_floodplain : bool, optional
Whether to leave land cells in the mesh based on bathymetry
specified by do_inject_bathymetry
remap_topography : compass.ocean.mesh.remap_topography.RemapTopography, optional
A step for remapping topography. If provided, the remapped
topography is used to determine the land mask
""" # noqa: E501
super().__init__(test_case, name=name, subdir=subdir,
cpus_per_task=None, min_cpus_per_task=None)
self.base_mesh_step = base_mesh_step
self.remap_topography = remap_topography
for file in ['culled_mesh.nc', 'culled_graph.info',
'critical_passages_mask_final.nc']:
self.add_output_file(filename=file)
if with_ice_shelf_cavities:
self.add_output_file(filename='land_ice_mask.nc')
self.with_ice_shelf_cavities = with_ice_shelf_cavities
self.do_inject_bathymetry = do_inject_bathymetry
self.preserve_floodplain = preserve_floodplain
[docs]
def setup(self):
"""
Set up the test case in the work directory, including downloading any
dependencies.
"""
super().setup()
if self.do_inject_bathymetry:
self.add_input_file(
filename='earth_relief_15s.nc',
target='SRTM15_plus_earth_relief_15s.nc',
database='bathymetry_database')
base_path = self.base_mesh_step.path
base_filename = self.base_mesh_step.config.get(
'spherical_mesh', 'mpas_mesh_filename')
target = os.path.join(base_path, base_filename)
self.add_input_file(filename='base_mesh.nc', work_dir_target=target)
if self.remap_topography is not None:
topo_path = self.remap_topography.path
target = os.path.join(topo_path, 'topography_remapped.nc')
self.add_input_file(filename='topography.nc',
work_dir_target=target)
self.add_output_file('topography_culled.nc')
self.add_output_file('map_culled_to_base.nc')
self.add_input_file(filename='south_pole.geojson',
package='compass.ocean.mesh')
config = self.config
self.cpus_per_task = config.getint('spherical_mesh',
'cull_mesh_cpus_per_task')
self.min_cpus_per_task = config.getint('spherical_mesh',
'cull_mesh_min_cpus_per_task')
def constrain_resources(self, available_resources):
"""
Constrain ``cpus_per_task`` and ``ntasks`` based on the number of
cores available to this step
Parameters
----------
available_resources : dict
The total number of cores available to the step
"""
config = self.config
self.cpus_per_task = config.getint('spherical_mesh',
'cull_mesh_cpus_per_task')
self.min_cpus_per_task = config.getint('spherical_mesh',
'cull_mesh_min_cpus_per_task')
super().constrain_resources(available_resources)
[docs]
def run(self):
"""
Run this step of the test case
"""
with_ice_shelf_cavities = self.with_ice_shelf_cavities
logger = self.logger
config = self.config
# only use progress bars if we're not writing to a log file
use_progress_bar = self.log_filename is None
do_inject_bathymetry = self.do_inject_bathymetry
preserve_floodplain = self.preserve_floodplain
convert_to_cdf5 = config.getboolean('spherical_mesh',
'convert_culled_mesh_to_cdf5')
latitude_threshold = config.getfloat('spherical_mesh',
'latitude_threshold')
sweep_count = config.getint('spherical_mesh', 'sweep_count')
cull_mesh(with_critical_passages=True, logger=logger,
use_progress_bar=use_progress_bar,
preserve_floodplain=preserve_floodplain,
with_cavities=with_ice_shelf_cavities,
process_count=self.cpus_per_task,
convert_to_cdf5=convert_to_cdf5,
latitude_threshold=latitude_threshold,
sweep_count=sweep_count)
if do_inject_bathymetry:
inject_bathymetry(mesh_file='culled_mesh.nc')
[docs]
def cull_mesh(with_cavities=False, with_critical_passages=False,
custom_critical_passages=None, custom_land_blockages=None,
preserve_floodplain=False, logger=None, use_progress_bar=True,
process_count=1, convert_to_cdf5=False, latitude_threshold=43.,
sweep_count=20):
"""
First step of initializing the global ocean:
1. combining Natural Earth land coverage north of 60S with Antarctic
ice coverage or grounded ice coverage from BedMachineAntarctica
2. combining transects defining critical passages (if
``with_critical_passages=True``)
3. combining points used to seed a flood fill of the global ocean.
4. create masks from land coverage
5. add land-locked cells to land coverage mask.
6. create masks from transects (if
``with_critical_passages=True``)
7. cull cells based on land coverage but with transects present
8. create flood-fill mask based on seeds
9. cull cells based on flood-fill mask
10. create masks from transects on the final culled mesh (if
``with_critical_passages=True``)
Parameters
----------
with_cavities : bool, optional
Whether the mesh should include Antarctic ice-shelf cavities from
BedMachine Antarctica
with_critical_passages : bool, optional
Whether the mesh should open the standard critical passages and close
land blockages from geometric_features
custom_critical_passages : str, optional
The name of geojson file with critical passages to open. This file may
be supplied in addition to or instead of the default passages
(``with_critical_passages=True``)
custom_land_blockages : str, optional
The name of a geojson file with critical land blockages to close. This
file may be supplied in addition to or instead of the default blockages
(``with_critical_passages=True``)
preserve_floodplain : bool, optional
Whether to use the ``cellSeedMask`` field in the base mesh to preserve
a floodplain at elevations above z=0
logger : logging.Logger, optional
A logger for the output if not stdout
use_progress_bar : bool, optional
Whether to display progress bars (problematic in logging to a file)
process_count : int, optional
The number of cores to use to create masks (``None`` to use all
available cores)
convert_to_cdf5 : bool, optional
Convert the culled mesh to PNetCDF CDF-5 format
latitude_threshold : float, optional
Minimum latitude, in degrees, for masking land-locked cells
sweep_count : int, optional
Maximum number of sweeps to search for land-locked cells
"""
with LoggingContext(name=__name__, logger=logger) as logger:
_cull_mesh_with_logging(
logger, with_cavities, with_critical_passages,
custom_critical_passages, custom_land_blockages,
preserve_floodplain, use_progress_bar, process_count,
convert_to_cdf5, latitude_threshold, sweep_count)
def _cull_mesh_with_logging(logger, with_cavities, with_critical_passages,
custom_critical_passages, custom_land_blockages,
preserve_floodplain, use_progress_bar,
process_count, convert_to_cdf5, latitude_threshold,
sweep_count):
""" Cull the mesh once the logger is defined for sure """
critical_passages = with_critical_passages or \
(custom_critical_passages is not None)
land_blockages = with_critical_passages or \
(custom_land_blockages is not None)
gf = GeometricFeatures()
# these defaults may have been updated from config options -- pass them
# along to the subprocess
netcdf_format = mpas_tools.io.default_format
netcdf_engine = mpas_tools.io.default_engine
has_remapped_topo = os.path.exists('topography.nc')
if with_cavities and not has_remapped_topo:
raise ValueError('Mesh culling with caviites must be from '
'remapped topography.')
if has_remapped_topo:
_land_mask_from_topo(with_cavities,
topo_filename='topography.nc',
mask_filename='land_mask.nc')
else:
_land_mask_from_geojson(with_cavities=with_cavities,
process_count=process_count,
logger=logger,
mesh_filename='base_mesh.nc',
geojson_filename='land_coverage.geojson',
mask_filename='land_mask.nc')
dsBaseMesh = xr.open_dataset('base_mesh.nc')
dsLandMask = xr.open_dataset('land_mask.nc')
# create seed points for a flood fill of the ocean
# use all points in the ocean directory, on the assumption that they are,
# in fact, in the ocean
fcSeed = gf.read(componentName='ocean', objectType='point',
tags=['seed_point'])
if land_blockages:
if with_critical_passages:
# merge transects for critical land blockages into
# critical_land_blockages.geojson
fcCritBlockages = gf.read(
componentName='ocean', objectType='transect',
tags=['Critical_Land_Blockage'])
else:
fcCritBlockages = FeatureCollection()
if custom_land_blockages is not None:
fcCritBlockages.merge(read_feature_collection(
custom_land_blockages))
# create masks from the transects
fcCritBlockages.to_geojson('critical_blockages.geojson')
args = ['compute_mpas_transect_masks',
'-m', 'base_mesh.nc',
'-g', 'critical_blockages.geojson',
'-o', 'critical_blockages.nc',
'-t', 'cell',
'-s', '10e3',
'--process_count', f'{process_count}',
'--format', netcdf_format,
'--engine', netcdf_engine]
check_call(args, logger=logger)
dsCritBlockMask = xr.open_dataset('critical_blockages.nc')
dsLandMask = add_critical_land_blockages(dsLandMask, dsCritBlockMask)
fcCritPassages = FeatureCollection()
dsPreserve = []
if critical_passages:
if with_critical_passages:
# merge transects for critical passages into fcCritPassages
fcCritPassages.merge(gf.read(componentName='ocean',
objectType='transect',
tags=['Critical_Passage']))
if custom_critical_passages is not None:
fcCritPassages.merge(read_feature_collection(
custom_critical_passages))
# create masks from the transects
fcCritPassages.to_geojson('critical_passages.geojson')
args = ['compute_mpas_transect_masks',
'-m', 'base_mesh.nc',
'-g', 'critical_passages.geojson',
'-o', 'critical_passages.nc',
'-t', 'cell', 'edge',
'-s', '10e3',
'--process_count', f'{process_count}',
'--format', netcdf_format,
'--engine', netcdf_engine]
check_call(args, logger=logger)
dsCritPassMask = xr.open_dataset('critical_passages.nc')
# Alter critical passages to be at least two cells wide, to avoid sea
# ice blockage
dsCritPassMask = widen_transect_edge_masks(
dsCritPassMask, dsBaseMesh,
latitude_threshold=latitude_threshold)
dsPreserve.append(dsCritPassMask)
if preserve_floodplain:
dsPreserve.append(dsBaseMesh)
# fix land locked cells after adding critical land blockages, as these
# can lead to new land-locked cells
dsLandMask = add_land_locked_cells_to_mask(
dsLandMask, dsBaseMesh, latitude_threshold=latitude_threshold,
nSweeps=sweep_count)
write_netcdf(dsLandMask, 'land_mask_with_land_locked_cells.nc')
# cull the mesh based on the land mask
dsCulledMesh = cull(dsBaseMesh, dsMask=dsLandMask,
dsPreserve=dsPreserve, logger=logger, dir='.')
# create a mask for the flood fill seed points
dsSeedMask = compute_mpas_flood_fill_mask(dsMesh=dsCulledMesh,
fcSeed=fcSeed,
logger=logger)
# cull the mesh a second time using a flood fill from the seed points
dsCulledMesh = cull(dsCulledMesh, dsInverse=dsSeedMask, logger=logger,
dir='.')
# sort the cell, edge and vertex indices for better performances
dsCulledMesh = sort_mesh(dsCulledMesh)
out_filename = 'culled_mesh.nc'
if convert_to_cdf5:
write_filename = 'culled_mesh_before_cdf5.nc'
write_netcdf(dsCulledMesh, write_filename)
args = ['ncks', '-5', write_filename, out_filename]
check_call(args, logger=logger)
else:
write_netcdf(dsCulledMesh, out_filename)
# we need to make the graph file after sorting
make_graph_file(mesh_filename='culled_mesh.nc',
graph_filename='culled_graph.info')
if critical_passages:
# make a new version of the critical passages mask on the culled mesh
fcCritPassages.to_geojson('critical_passages.geojson')
args = ['compute_mpas_transect_masks',
'-m', 'culled_mesh.nc',
'-g', 'critical_passages.geojson',
'-o', 'critical_passages_mask_final.nc',
'-t', 'cell',
'-s', '10e3',
'--process_count', f'{process_count}',
'--format', netcdf_format,
'--engine', netcdf_engine]
check_call(args, logger=logger)
if has_remapped_topo:
_cull_topo(with_cavities, process_count, logger, latitude_threshold,
sweep_count, dsPreserve)
if with_cavities:
dsMask = xr.open_dataset('topography_culled.nc')
dsMask = dsMask[['regionCellMasks',]]
if not has_remapped_topo:
# we haven't dealt with cells land-locked next to land and
# land-ice yet
dsMask = add_land_locked_cells_to_mask(
dsMask, dsCulledMesh, latitude_threshold=latitude_threshold,
nSweeps=sweep_count)
landIceMask = dsMask.regionCellMasks.isel(nRegions=0)
dsLandIceMask = xr.Dataset()
dsLandIceMask['landIceMask'] = landIceMask
dsLandIceMask['landIceFloatingMask'] = landIceMask
write_netcdf(dsLandIceMask, 'land_ice_mask.nc')
dsLandIceCulledMesh = cull(dsCulledMesh, dsMask=dsMask, logger=logger,
dir='.')
write_netcdf(dsLandIceCulledMesh, 'no_ISC_culled_mesh.nc')
extract_vtk(ignore_time=True, dimension_list=['maxEdges='],
variable_list=['allOnCells'],
filename_pattern='culled_mesh.nc',
out_dir='culled_mesh_vtk',
use_progress_bar=use_progress_bar)
if with_cavities:
extract_vtk(ignore_time=True, dimension_list=['maxEdges='],
variable_list=['allOnCells'],
filename_pattern='no_ISC_culled_mesh.nc',
out_dir='no_ISC_culled_mesh_vtk',
use_progress_bar=use_progress_bar)
def _cull_topo(with_cavities, process_count, logger, latitude_threshold,
sweep_count, ds_preserve):
ds_topo = xr.open_dataset('topography.nc')
ds_base = xr.open_dataset('base_mesh.nc')
ds_culled = xr.open_dataset('culled_mesh.nc')
ds_map_culled_to_base = map_culled_to_base(ds_base=ds_base,
ds_culled=ds_culled,
workers=process_count)
write_netcdf(ds_map_culled_to_base, 'map_culled_to_base.nc')
if with_cavities:
_flood_fill_and_add_land_ice_mask(ds_topo, ds_base,
ds_map_culled_to_base, ds_preserve,
logger, latitude_threshold,
sweep_count)
ds_topo['ssh'] = ds_topo['landIceDraftObserved']
write_netcdf(ds_topo, 'topography_with_land_ice_mask.nc')
logger.info('Culling topography')
ds_culled_topo = cull_dataset(ds=ds_topo, ds_base_mesh=ds_base,
ds_culled_mesh=ds_culled,
ds_map_culled_to_base=ds_map_culled_to_base,
logger=logger)
write_netcdf(ds_culled_topo, 'topography_culled.nc')
def _land_mask_from_topo(with_cavities, topo_filename, mask_filename):
ds_topo = xr.open_dataset(topo_filename)
ocean_frac = ds_topo.oceanFracObserved
if with_cavities:
# we want the mask to be 1 where there's not ocean
cull_mask = xr.where(ocean_frac < 0.5, 1, 0)
else:
floating_ice_frac = ds_topo.landIceFloatingFracObserved
no_cavities_ocean_frac = ocean_frac - floating_ice_frac
# we want the mask to be 1 where there's not open ocean
cull_mask = xr.where(no_cavities_ocean_frac < 0.5, 1, 0)
cull_mask = cull_mask.expand_dims(dim='nRegions', axis=1)
ds_mask = xr.Dataset()
ds_mask['regionCellMasks'] = cull_mask
write_netcdf(ds_mask, mask_filename)
def _flood_fill_and_add_land_ice_mask(ds_topo, ds_base_mesh,
ds_map_culled_to_base, ds_perserve,
logger, latitude_threshold, sweep_count):
# we don't what land ice where we are preserving critical passages
for ds in ds_perserve:
not_preserve = ds.transectCellMasks.sum(dim='nTransects') == 0
for var in ['landIceFracObserved', 'landIcePressureObserved',
'landIceDraftObserved',
'landIceGroundedFracObserved',
'landIceFloatingFracObserved', 'landIceThkObserved']:
ds_topo[var] = ds_topo[var].where(not_preserve, 0.0)
ds_topo['oceanFracObserved'] = \
ds_topo['oceanFracObserved'].where(not_preserve, 1.0)
land_ice_frac = ds_topo.landIceFracObserved
land_ice_present = xr.where(land_ice_frac > 0.0, 1, 0)
gf = GeometricFeatures()
fc_ocean_seed = gf.read(componentName='ocean', objectType='point',
tags=['seed_point'])
fc_south_pole_seed = read_feature_collection('south_pole.geojson')
# flood fill anywhere land ice is present to remove isolated land ice
ds_mask = compute_mpas_flood_fill_mask(dsMesh=ds_base_mesh,
daGrow=land_ice_present,
fcSeed=fc_south_pole_seed,
logger=logger)
land_ice_present = ds_mask.cellSeedMask
# update land-ice variables and ocean fraction accordingly
for var in ['landIceFracObserved', 'landIcePressureObserved',
'landIceDraftObserved', 'landIceGroundedFracObserved',
'landIceFloatingFracObserved', 'landIceThkObserved']:
ds_topo[var] = ds_topo[var].where(land_ice_present, 0.0)
land_ice_frac = ds_topo.landIceFracObserved
land_ice_mask = xr.where(land_ice_frac > 0.5, 1, 0)
# now, remove land-locked or land-ice-locked cells
map_culled_to_base = ds_map_culled_to_base.mapCulledToBaseCell.values
ncells_base = ds_base_mesh.sizes['nCells']
# the mask is 1 (for land) by default
land_and_land_ice_mask = np.ones(ncells_base, dtype=int)
# where land has not been culled out, the mask is land_ice_mask
land_and_land_ice_mask[map_culled_to_base] = \
land_ice_mask[map_culled_to_base]
ds_mask = xr.Dataset()
ds_mask['regionCellMasks'] = ('nCells', land_and_land_ice_mask)
ds_mask['regionCellMasks'] = \
ds_mask.regionCellMasks.expand_dims(dim='nRegions', axis=1)
ds_mask.to_netcdf('land_and_land_ice_mask.nc')
# re-open from file so regionCellMasks can be assigned to.
ds_mask = xr.open_dataset('land_and_land_ice_mask.nc')
ds_mask = add_land_locked_cells_to_mask(
ds_mask, ds_base_mesh, latitude_threshold=latitude_threshold,
nSweeps=sweep_count)
land_ice_mask = ds_mask.regionCellMasks.isel(nRegions=0)
# finally, flood fill the ocean portion to fill in any holes in the land
# ice
ocean_mask = 1 - land_ice_mask
ds_mask = compute_mpas_flood_fill_mask(dsMesh=ds_base_mesh,
daGrow=ocean_mask,
fcSeed=fc_ocean_seed,
logger=logger)
land_ice_mask = 1 - ds_mask.cellSeedMask
ds_topo['landIceMask'] = land_ice_mask
region_cell_mask = land_ice_mask.expand_dims(dim='nRegions', axis=1)
ds_topo['regionCellMasks'] = region_cell_mask
def _land_mask_from_geojson(with_cavities, process_count, logger,
mesh_filename, geojson_filename, mask_filename):
gf = GeometricFeatures()
# start with the land coverage from Natural Earth
fcLandCoverage = gf.read(componentName='natural_earth',
objectType='region',
featureNames=['Land Coverage'])
# remove the region south of 60S so we can replace it based on ice-sheet
# topography
fcSouthMask = gf.read(componentName='ocean', objectType='region',
featureNames=['Global Ocean 90S to 60S'])
fcLandCoverage = fcLandCoverage.difference(fcSouthMask)
# Add "land" coverage from either the full ice sheet or just the grounded
# part
if with_cavities:
fcAntarcticLand = gf.read(
componentName='bedmachine', objectType='region',
featureNames=['AntarcticGroundedIceCoverage'])
else:
fcAntarcticLand = gf.read(
componentName='bedmachine', objectType='region',
featureNames=['AntarcticIceCoverage'])
fcLandCoverage.merge(fcAntarcticLand)
# save the feature collection to a geojson file
fcLandCoverage.to_geojson(geojson_filename)
# these defaults may have been updated from config options -- pass them
# along to the subprocess
netcdf_format = mpas_tools.io.default_format
netcdf_engine = mpas_tools.io.default_engine
# Create the land mask based on the land coverage, i.e. coastline data
args = ['compute_mpas_region_masks',
'-m', mesh_filename,
'-g', geojson_filename,
'-o', mask_filename,
'-t', 'cell',
'--process_count', f'{process_count}',
'--format', netcdf_format,
'--engine', netcdf_engine]
check_call(args, logger=logger)