Source code for compass.ocean.tests.utility.combine_topo

import os
import pathlib
from datetime import datetime
from glob import glob

import netCDF4
import numpy as np
import pyproj
import xarray as xr
from mpas_tools.logging import check_call
from pyremap import ProjectionGridDescriptor, get_lat_lon_descriptor

from compass.parallel import run_command
from compass.step import Step
from compass.testcase import TestCase


[docs] class CombineTopo(TestCase): """ A test case for combining GEBCO 2023 with BedMachineAntarctica topography datasets """
[docs] def __init__(self, test_group, target_grid): """ Create the test case Parameters ---------- test_group : compass.ocean.tests.utility.Utility The test group that this test case belongs to target_grid : str either "lat_lon" or "cubed_sphere" """ subdir = os.path.join('combine_topo', target_grid) super().__init__( test_group=test_group, name='combine_topo', subdir=subdir, ) self.add_step(Combine(test_case=self, target_grid=target_grid))
[docs] class Combine(Step): """ A step for combining GEBCO 2023 with BedMachineAntarctica topography datasets Attributes ---------- target_grid : str either "lat_lon" or "cubed_sphere" resolution : float degrees (float) or face subdivisions (int) resolution_name: str either x.xxxx_degrees or NExxx """
[docs] def __init__(self, test_case, target_grid): """ Create a new step Parameters ---------- test_case : compass.ocean.tests.utility.combine_topo.CombineTopo The test case this step belongs to target_grid : str either "lat_lon" or "cubed_sphere" """ super().__init__( test_case, name='combine', ntasks=None, min_tasks=None, ) self.target_grid = target_grid self.resolution = None self.resolution_name = None
[docs] def setup(self): """ Set up the step in the work directory, including downloading any dependencies. """ super().setup() config = self.config section = config['combine_topo'] # Get input filenames and resolution antarctic_filename = section.get('antarctic_filename') global_filename = section.get('global_filename') # Add bathymetry data input files self.add_input_file( filename=antarctic_filename, target=antarctic_filename, database='bathymetry_database', ) self.add_input_file( filename=global_filename, target=global_filename, database='bathymetry_database', ) self._set_res_and_outputs(update=False) # Get ntasks and min_tasks self.ntasks = section.getint('ntasks') self.min_tasks = section.getint('min_tasks')
def constrain_resources(self, available_resources): """ Constrain ``cpus_per_task`` and ``ntasks`` based on the number of cores available to this step Parameters ---------- available_resources : dict The total number of cores available to the step """ config = self.config self.ntasks = config.getint('combine_topo', 'ntasks') self.min_tasks = config.getint('combine_topo', 'min_tasks') super().constrain_resources(available_resources)
[docs] def run(self): """ Run this step of the test case """ self._set_res_and_outputs(update=True) self._modify_gebco() self._modify_bedmachine() self._create_target_scrip_file() self._remap_gebco() self._remap_bedmachine() self._combine() self._cleanup()
def _set_res_and_outputs(self, update): """ Set or update the resolution and output filenames based on config options """ config = self.config section = config['combine_topo'] # Get input filenames and resolution antarctic_filename = section.get('antarctic_filename') global_filename = section.get('global_filename') # Parse resolution and update resolution attributes if self.target_grid == 'cubed_sphere': resolution = section.getint('resolution_cubedsphere') if update and resolution == self.resolution: # nothing to do return self.resolution = resolution self.resolution_name = f'ne{resolution}' elif self.target_grid == 'lat_lon': resolution = section.getfloat('resolution_latlon') if update and resolution == self.resolution: # nothing to do return self.resolution = resolution self.resolution_name = f'{resolution:.4f}_degree' # Start over with empty outputs self.outputs = [] # Build output filenames datestamp = datetime.now().strftime('%Y%m%d') scrip_filename = f'{self.resolution_name}_{datestamp}.scrip.nc' combined_filename = '_'.join([ antarctic_filename.strip('.nc'), global_filename.strip('.nc'), self.resolution_name, f'{datestamp}.nc', ]) self.add_output_file(filename=scrip_filename) self.add_output_file(filename=combined_filename) if update: # We need to set absolute paths step_dir = self.work_dir self.outputs = [os.path.abspath(os.path.join(step_dir, filename)) for filename in self.outputs] def _modify_gebco(self): """ Modify GEBCO to include lon/lat bounds located at grid edges """ logger = self.logger logger.info('Adding bounds to GEBCO lat/lon') # Parse config config = self.config in_filename = config.get('combine_topo', 'global_filename') out_filename = in_filename.replace('.nc', '_cf.nc') # Modify GEBCO gebco = xr.open_dataset(in_filename) lat = gebco.lat lon = gebco.lon dlat = lat.isel(lat=1) - lat.isel(lat=0) dlon = lon.isel(lon=1) - lon.isel(lon=0) lat_bnds = xr.concat([lat - 0.5 * dlat, lat + 0.5 * dlat], dim='bnds') lon_bnds = xr.concat([lon - 0.5 * dlon, lon + 0.5 * dlon], dim='bnds') gebco['lat_bnds'] = lat_bnds.transpose('lat', 'bnds') gebco['lon_bnds'] = lon_bnds.transpose('lon', 'bnds') gebco.lat.attrs['bounds'] = 'lat_bnds' gebco.lon.attrs['bounds'] = 'lon_bnds' # Write modified GEBCO to netCDF _write_netcdf_with_fill_values(gebco, out_filename) logger.info(' Done.') def _modify_bedmachine(self): """ Modify BedMachineAntarctica to compute the fields needed by MPAS-Ocean """ logger = self.logger logger.info('Modifying BedMachineAntarctica with MPAS-Ocean names') # Parse config config = self.config in_filename = config.get('combine_topo', 'antarctic_filename') out_filename = in_filename.replace('.nc', '_mod.nc') # Load BedMachine and get ice, ocean and grounded masks bedmachine = xr.open_dataset(in_filename) mask = bedmachine.mask ice_mask = (mask != 0).astype(float) ocean_mask = np.logical_or(mask == 0, mask == 3).astype(float) grounded_mask = np.logical_or(np.logical_or(mask == 1, mask == 2), mask == 4).astype(float) # Add new variables and apply ocean mask bedmachine['bathymetry'] = bedmachine.bed bedmachine['thickness'] = bedmachine.thickness bedmachine['ice_draft'] = bedmachine.surface - bedmachine.thickness bedmachine.ice_draft.attrs['units'] = 'meters' bedmachine['ice_mask'] = ice_mask bedmachine['grounded_mask'] = grounded_mask bedmachine['ocean_mask'] = ocean_mask bedmachine['valid_mask'] = xr.ones_like(ocean_mask) # Remove all other variables varlist = [ 'bathymetry', 'ice_draft', 'thickness', 'ice_mask', 'grounded_mask', 'ocean_mask', 'valid_mask' ] bedmachine = bedmachine[varlist] # Write modified BedMachine to netCDF _write_netcdf_with_fill_values(bedmachine, out_filename) logger.info(' Done.') def _create_gebco_tile(self, lon_tile, lat_tile): """ Create GEBCO tile Parameters ---------- lon_tile : int tile number along lon dim lat_tile : int tile number along lat dim """ logger = self.logger # Parse config config = self.config section = config['combine_topo'] lat_tiles = section.getint('lat_tiles') lon_tiles = section.getint('lon_tiles') global_name = section.get('global_filename').strip('.nc') out_filename = f'tiles/{global_name}_tile_{lon_tile}_{lat_tile}.nc' logger.info(f' creating {out_filename}') # Load GEBCO gebco = xr.open_dataset(f'{global_name}_cf.nc') # Build lat and lon arrays for tile nlat = gebco.sizes['lat'] nlon = gebco.sizes['lon'] nlat_tile = nlat // lat_tiles nlon_tile = nlon // lon_tiles # Build tile latlon indices lat_indices = [lat_tile * nlat_tile, (lat_tile + 1) * nlat_tile] lon_indices = [lon_tile * nlon_tile, (lon_tile + 1) * nlon_tile] if lat_tile == lat_tiles - 1: lat_indices[1] = max([lat_indices[1], nlat]) else: lat_indices[1] += 1 if lon_tile == lon_tiles - 1: lon_indices[1] = max([lon_indices[1], nlon]) else: lon_indices[1] += 1 lat_indices = np.arange(*lat_indices) lon_indices = np.arange(*lon_indices) # Duplicate top and bottom rows to account for poles if lat_tile == 0: lat_indices = np.insert(lat_indices, 0, 0) if lat_tile == lat_tiles - 1: lat_indices = np.append(lat_indices, lat_indices[-1]) # Select tile from GEBCO tile = gebco.isel(lat=lat_indices, lon=lon_indices) if lat_tile == 0: tile.lat.values[0] = -90. # Correct south pole if lat_tile == lat_tiles - 1: tile.lat.values[-1] = 90. # Correct north pole # Write tile to netCDF _write_netcdf_with_fill_values(tile, out_filename) def _create_bedmachine_scrip_file(self): """ Create SCRIP file for BedMachineAntarctica data using pyremap """ logger = self.logger logger.info(' Creating BedMachine SCRIP file') # Parse config config = self.config section = config['combine_topo'] antarctic_name = section.get('antarctic_filename').strip('.nc') in_filename = f'{antarctic_name}_mod.nc' netcdf4_filename = f'{antarctic_name}.scrip.netcdf4.nc' out_filename = f'{antarctic_name}.scrip.nc' # Define projection projection = pyproj.Proj( '+proj=stere +lat_ts=-71.0 +lat_0=-90 +lon_0=0.0 ' '+k_0=1.0 +x_0=0.0 +y_0=0.0 +ellps=WGS84' ) # Create SCRIP file using pyremap bedmachine_descriptor = ProjectionGridDescriptor.read( projection, in_filename, 'BedMachineAntarctica500m', ) bedmachine_descriptor.to_scrip(netcdf4_filename) # writing directly in NETCDF3_64BIT_DATA proved prohibitively slow # so we will use ncks to convert args = [ 'ncks', '-O', '-5', netcdf4_filename, out_filename, ] check_call(args, logger) logger.info(' Done.') def _create_target_scrip_file(self): """ Create SCRIP file for either the x.xxxx degree (lat-lon) mesh or the NExxx (cubed-sphere) mesh, depending on the value of `self.target_grid` References: https://acme-climate.atlassian.net/wiki/spaces/DOC/pages/872579110/ Running+E3SM+on+New+Atmosphere+Grids """ logger = self.logger logger.info(f'Create SCRIP file for {self.resolution_name} mesh') out_filename = self.outputs[0] stem = pathlib.Path(out_filename).stem netcdf4_filename = f'{stem}.netcdf4.nc' # Build cubed sphere SCRIP file using tempestremap if self.target_grid == 'cubed_sphere': # Create EXODUS file args = [ 'GenerateCSMesh', '--alt', '--res', f'{self.resolution}', '--file', f'{self.resolution_name}.g', ] check_call(args, logger) # Create SCRIP file args = [ 'ConvertMeshToSCRIP', '--in', f'{self.resolution_name}.g', '--out', netcdf4_filename, ] check_call(args, logger) # Build lat-lon SCRIP file using pyremap elif self.target_grid == 'lat_lon': descriptor = get_lat_lon_descriptor( dLon=self.resolution, dLat=self.resolution, ) descriptor.to_scrip(netcdf4_filename) # writing out directly to NETCDF3_64BIT_DATA is either very slow or # unsupported, so use ncks args = [ 'ncks', '-O', '-5', netcdf4_filename, out_filename, ] check_call(args, logger) logger.info(' Done.') def _create_weights(self, in_filename, out_filename): """ Create weights file for remapping to `self.target_grid`. Filenames are passed as parameters so that the function can be applied to GEBCO and BedMachine Parameters ---------- in_filename : str source file name out_filename : str weights file name """ config = self.config method = config.get('combine_topo', 'method') # Generate weights file args = [ 'ESMF_RegridWeightGen', '--source', in_filename, '--destination', self.outputs[0], '--weight', out_filename, '--method', method, '--netcdf4', '--src_regional', '--ignore_unmapped', ] run_command( args, self.cpus_per_task, self.ntasks, self.openmp_threads, config, self.logger, ) def _remap_to_target( self, in_filename, mapping_filename, out_filename, default_dims=True, ): """ Remap to `self.target_grid`. Filenames are passed as parameters so that the function can be applied to GEBCO and BedMachine. Parameters ---------- in_filename : str source file name mapping_filename : str weights file name out_filename : str remapped file name default_dims : bool default True, if False specify non-default source dims y,x """ # Build command args args = [ 'ncremap', '-m', mapping_filename, '--vrb=1' ] # Add non-default gridding args regridArgs = [] if not default_dims: regridArgs.extend([ '--rgr lat_nm=y', '--rgr lon_nm=x', ]) if self.target_grid == 'lat_lon': regridArgs.extend([ '--rgr lat_nm_out=lat', '--rgr lon_nm_out=lon', '--rgr lat_dmn_nm=lat', '--rgr lon_dmn_nm=lon', ]) if len(regridArgs) > 0: args.extend(['-R', ' '.join(regridArgs)]) # Append input and output file names args.extend([in_filename, out_filename]) # Remap to target grid check_call(args, self.logger) def _remap_gebco(self): """ Remap GEBCO to `self.target_grid` """ logger = self.logger logger.info('Remapping GEBCO data') # Parse config config = self.config section = config['combine_topo'] global_name = section.get('global_filename').strip('.nc') method = section.get('method') lat_tiles = section.getint('lat_tiles') lon_tiles = section.getint('lon_tiles') # Make tiles directory os.makedirs('tiles', exist_ok=True) # Initialize combined xarray.Dataset gebco_remapped = xr.Dataset() # Create tile maps and remapped tiles for lat_tile in range(lat_tiles): for lon_tile in range(lon_tiles): # File names tile_suffix = f'tile_{lon_tile}_{lat_tile}.nc' tile_filename = f'tiles/{global_name}_{tile_suffix}' mapping_filename = ( f'tiles/map_{global_name}_to_{self.resolution_name}' f'_{method}_{tile_suffix}' ) remapped_filename = ( f'tiles/{global_name}_{self.resolution_name}_{tile_suffix}' ) # Call remapping functions self._create_gebco_tile(lon_tile, lat_tile) self._create_weights(tile_filename, mapping_filename) self._remap_to_target( tile_filename, mapping_filename, remapped_filename, ) # Add tile to remapped global bathymetry logger.info(f' adding {remapped_filename}') bathy = xr.open_dataset(remapped_filename).elevation bathy = bathy.where(bathy.notnull(), 0.) if 'bathymetry' in gebco_remapped: gebco_remapped['bathymetry'] = ( gebco_remapped.bathymetry + bathy ) else: gebco_remapped['bathymetry'] = bathy # Write tile to netCDF out_filename = f'{global_name}_{self.resolution_name}.nc' logger.info(f' writing {out_filename}') _write_netcdf_with_fill_values(gebco_remapped, out_filename) logger.info(' Done.') def _remap_bedmachine(self): """ Remap BedMachineAntarctica to `self.target_grid` """ logger = self.logger logger.info('Remapping BedMachine data') # Parse config config = self.config section = config['combine_topo'] antarctic_name = section.get('antarctic_filename').strip('.nc') method = section.get('method') # File names in_filename = f'{antarctic_name}_mod.nc' scrip_filename = f'{antarctic_name}.scrip.nc' mapping_filename = ( f'map_{antarctic_name}_to_{self.resolution_name}_{method}.nc' ) remapped_filename = f'{antarctic_name}_{self.resolution_name}.nc' # Call remapping functions self._create_bedmachine_scrip_file() self._create_weights(scrip_filename, mapping_filename) self._remap_to_target( in_filename, mapping_filename, remapped_filename, default_dims=False, ) logger.info(' Done.') def _combine(self): """ Combine remapped GEBCO and BedMachine data sets """ logger = self.logger logger.info('Combine BedMachineAntarctica and GEBCO') config = self.config section = config['combine_topo'] renorm_thresh = section.getfloat('renorm_thresh') out_filename = self.outputs[1] stem = pathlib.Path(out_filename).stem netcdf4_filename = f'{stem}.netcdf4.nc' # Parse config config = self.config section = config['combine_topo'] sfx = f'_{self.resolution_name}.nc' global_fname = section.get('global_filename').replace('.nc', sfx) antarctic_fname = section.get('antarctic_filename').replace('.nc', sfx) latmin = section.getfloat('latmin') latmax = section.getfloat('latmax') # Load and mask GEBCO gebco = xr.open_dataset(global_fname) gebco_bathy = gebco.bathymetry gebco_bathy = gebco_bathy.where(gebco_bathy.notnull(), 0.) gebco_bathy = gebco_bathy.where(gebco_bathy < 0., 0.) # Load and mask BedMachine bedmachine = xr.open_dataset(antarctic_fname) # renormalize variables denom = bedmachine.valid_mask renorm_mask = denom >= renorm_thresh denom = xr.where(renorm_mask, denom, 1.0) vars = ['bathymetry', 'thickness', 'ice_draft', 'ice_mask', 'grounded_mask', 'ocean_mask'] for var in vars: attrs = bedmachine[var].attrs bedmachine[var] = (bedmachine[var] / denom).where(renorm_mask) bedmachine[var].attrs = attrs bed_bathy = bedmachine.bathymetry bed_bathy = bed_bathy.where(bed_bathy.notnull(), 0.) bed_bathy = bed_bathy.where(bed_bathy < 0., 0.) # Blend data sets into combined data set combined = xr.Dataset() alpha = (gebco.lat - latmin) / (latmax - latmin) alpha = np.maximum(np.minimum(alpha, 1.0), 0.0) combined['bathymetry'] = ( alpha * gebco_bathy + (1.0 - alpha) * bed_bathy ) bathy_mask = xr.where(combined.bathymetry < 0., 1.0, 0.0) # Add remaining Bedmachine variables to combined Dataset for field in ['ice_draft', 'thickness']: combined[field] = bathy_mask * bedmachine[field] combined['water_column'] = combined.ice_draft - combined.bathymetry for field in ['bathymetry', 'ice_draft', 'thickness', 'water_column']: combined[field].attrs['unit'] = 'meters' # Add masks combined['bathymetry_mask'] = bathy_mask for field in ['ice_mask', 'grounded_mask', 'ocean_mask']: combined[field] = bedmachine[field] # Add fill values fill_vals = { 'ice_draft': 0., 'thickness': 0., 'ice_mask': 0., 'grounded_mask': 0., 'ocean_mask': bathy_mask, } for field, fill_val in fill_vals.items(): valid = combined[field].notnull() combined[field] = combined[field].where(valid, fill_val) # Save combined bathy to NetCDF _write_netcdf_with_fill_values(combined, netcdf4_filename) # writing directly in NETCDF3_64BIT_DATA proved prohibitively slow # so we will use ncks to convert args = [ 'ncks', '-O', '-5', netcdf4_filename, out_filename, ] check_call(args, logger) logger.info(' Done.') def _cleanup(self): """ Clean up work directory """ logger = self.logger logger.info('Cleaning up work directory') # Remove PETxxx.RegridWeightGen.Log files for f in glob('*.RegridWeightGen.Log'): os.remove(f) logger.info(' Done.')
def _write_netcdf_with_fill_values(ds, filename, format='NETCDF4'): """ Write an xarray Dataset with NetCDF4 fill values where needed """ fill_values = netCDF4.default_fillvals encoding = {} vars = list(ds.data_vars.keys()) + list(ds.coords.keys()) for var_name in vars: # If there's already a fill value attribute, drop it ds[var_name].attrs.pop("_FillValue", None) is_numeric = np.issubdtype(ds[var_name].dtype, np.number) if is_numeric: dtype = ds[var_name].dtype for fill_type in fill_values: if dtype == np.dtype(fill_type): encoding[var_name] = {"_FillValue": fill_values[fill_type]} break else: encoding[var_name] = {"_FillValue": None} ds.to_netcdf(filename, encoding=encoding, format=format)