Source code for compass.ocean.tests.drying_slope.initial_state

import xarray as xr
from mpas_tools.io import write_netcdf
from mpas_tools.mesh.conversion import convert, cull
from mpas_tools.planar_hex import make_planar_hex_mesh

from compass.ocean.vertical import init_vertical_coord
from compass.step import Step


[docs] class InitialState(Step): """ A step for creating a mesh and initial condition for drying slope test cases """
[docs] def __init__(self, test_case, resolution, name='initial_state', baroclinic=False): """ Create the step Parameters ---------- test_case : compass.ocean.tests.drying_slope.default.Default The test case this step belongs to """ super().__init__(test_case=test_case, name=name, ntasks=1, min_tasks=1, openmp_threads=1) self.resolution = resolution for file in ['base_mesh.nc', 'culled_mesh.nc', 'culled_graph.info', 'initial_state.nc', 'forcing.nc']: self.add_output_file(file) self.add_model_as_input()
[docs] def run(self): """ Run this step of the test case """ config = self.config logger = self.logger resolution = self.resolution # Fetch config options section = config['vertical_grid'] thin_film_thickness = section.getfloat('thin_film_thickness') vert_levels = section.getint('vert_levels') section = config['drying_slope'] nx = section.getint('nx') domain_length = section.getfloat('ly') * 1e3 drying_length = section.getfloat('ly_analysis') * 1e3 plug_width_frac = section.getfloat('plug_width_frac') right_bottom_depth = section.getfloat('right_bottom_depth') left_bottom_depth = section.getfloat('left_bottom_depth') plug_temperature = section.getfloat('plug_temperature') background_temperature = section.getfloat('background_temperature') background_salinity = section.getfloat('background_salinity') coriolis_parameter = section.getfloat('coriolis_parameter') # Check config options if domain_length < drying_length: raise ValueError('Domain is not long enough to capture wetting ' 'front') if right_bottom_depth < left_bottom_depth: raise ValueError('Right boundary must be deeper than left ' 'boundary') # Determine mesh parameters dc = 1e3 * resolution ny = round(domain_length / dc) # This is just for consistency with previous implementations and could # be removed if resolution < 1.: ny += 2 ny = 2 * round(ny / 2) logger.info(' * Make planar hex mesh') ds_mesh = make_planar_hex_mesh(nx=nx, ny=ny, dc=dc, nonperiodic_x=False, nonperiodic_y=True) logger.info(' * Completed Make planar hex mesh') write_netcdf(ds_mesh, 'base_mesh.nc') logger.info(' * Cull mesh') ds_mesh = cull(ds_mesh, logger=logger) logger.info(' * Convert mesh') ds_mesh = convert(ds_mesh, graphInfoFileName='culled_graph.info', logger=logger) logger.info(' * Completed Convert mesh') write_netcdf(ds_mesh, 'culled_mesh.nc') ds = ds_mesh.copy() ds_forcing = ds_mesh.copy() y_min = ds_mesh.yCell.min() y_max = ds_mesh.yCell.max() dc_edge_min = ds_mesh.dcEdge.min() y_cell = ds.yCell max_level_cell = vert_levels bottom_depth = (right_bottom_depth - (y_max - y_cell) / drying_length * (right_bottom_depth - left_bottom_depth)) ds['bottomDepth'] = bottom_depth # Set the water column to dry everywhere ds['ssh'] = -bottom_depth + thin_film_thickness * max_level_cell # We don't use config_tidal_forcing_monochromatic_baseline because the # default value doesn't alter the initial state init_vertical_coord(config, ds) plug_width = domain_length * plug_width_frac y_plug_boundary = y_min + plug_width ds['temperature'] = xr.where(y_cell < y_plug_boundary, plug_temperature, background_temperature) ds['tracer1'] = xr.where(y_cell < y_plug_boundary, 1.0, 0.0) ds['salinity'] = background_salinity * xr.ones_like(y_cell) normalVelocity = xr.zeros_like(ds_mesh.xEdge) normalVelocity, _ = xr.broadcast(normalVelocity, ds.refBottomDepth) normalVelocity = normalVelocity.transpose('nEdges', 'nVertLevels') normalVelocity = normalVelocity.expand_dims(dim='Time', axis=0) ds['normalVelocity'] = normalVelocity ds['fCell'] = coriolis_parameter * xr.ones_like(ds.xCell) ds['fEdge'] = coriolis_parameter * xr.ones_like(ds.xEdge) ds['fVertex'] = coriolis_parameter * xr.ones_like(ds.xVertex) write_netcdf(ds, 'initial_state.nc') # Define the tidal boundary condition over 1-cell width y_tidal_boundary = y_max - dc_edge_min / 2. tidal_forcing_mask = xr.where(y_cell > y_tidal_boundary, 1.0, 0.0) if tidal_forcing_mask.sum() <= 0: raise ValueError('Input mask for tidal case is not set!') ds_forcing['tidalInputMask'] = tidal_forcing_mask write_netcdf(ds_forcing, 'forcing.nc')