Source code for compass.landice.tests.ensemble_generator.spinup_ensemble

import sys

import numpy as np
from scipy.stats import qmc

from compass.landice.iceshelf_melt import calc_mean_TF
from compass.landice.tests.ensemble_generator.ensemble_manager import (
    EnsembleManager,
)
from compass.landice.tests.ensemble_generator.ensemble_member import (
    EnsembleMember,
)
from compass.testcase import TestCase
from compass.validate import compare_variables


[docs] class SpinupEnsemble(TestCase): """ A test case for performing an ensemble of simulations for uncertainty quantification studies. """
[docs] def __init__(self, test_group): """ Create the test case Parameters ---------- test_group : compass.landice.tests.ensemble_generator.EnsembleGenerator The test group that this test case belongs to """ name = 'spinup_ensemble' super().__init__(test_group=test_group, name=name) # We don't want to initialize all the individual runs # So during init, we only add the run manager self.add_step(EnsembleManager(test_case=self))
[docs] def configure(self): """ Configure a parameter ensemble of MALI simulations. Start by identifying the start and end run numbers to set up from the config. Next, read a pre-defined unit parameter vector that can be used for assigning parameter values to each ensemble member. The main work is using the unit parameter vector to set parameter values for each parameter to be varied, over prescribed ranges. Then create the ensemble member as a step in the test case by calling the EnsembleMember constructor. Finally, add this step to the test case's step_to_run. This normally happens automatically if steps are added to the test case in the test case constructor, but because we waited to add these steps until this configure phase, we must explicitly add the steps to steps_to_run. """ # Define some constants rhoi = 910.0 rhosw = 1028.0 cp_seawater = 3.974e3 latent_heat_ice = 335.0e3 sec_in_yr = 3600.0 * 24.0 * 365.0 c_melt = (rhosw * cp_seawater / (rhoi * latent_heat_ice))**2 section = self.config['ensemble'] # Determine start and end run numbers being requested self.start_run = section.getint('start_run') self.end_run = section.getint('end_run') # Define parameters being sampled and their ranges param_list = ['fric_exp', 'mu_scale', 'stiff_scale', 'von_mises_threshold', 'calv_limit', 'gamma0', 'meltflux'] # Determine how many and which parameters are being used n_params = 0 param_dict = {} for param in param_list: param_dict[param] = {} param_dict[param]['active'] = section.getboolean(f'use_{param}') n_params += param_dict[param]['active'] if n_params == 0: sys.exit("ERROR: At least one parameter must be specified.") # Generate unit parameter vectors - either uniform or Sobol sampling_method = section.get('sampling_method') max_samples = section.getint('max_samples') if max_samples < self.end_run: sys.exit("ERROR: max_samples is exceeded by end_run") if sampling_method == 'sobol': # Generate unit Sobol sequence for number of parameters being used print(f"Generating Sobol sequence for {n_params} parameter(s)") sampler = qmc.Sobol(d=n_params, scramble=True, seed=4) param_unit_values = sampler.random(n=max_samples) elif sampling_method == 'uniform': print(f"Generating uniform sampling for {n_params} parameter(s)") samples = np.linspace(0.0, 1.0, max_samples).reshape(-1, 1) param_unit_values = np.tile(samples, (1, n_params)) else: sys.exit("ERROR: Unsupported sampling method specified.") # Define parameter vectors for each param being used idx = 0 for param in param_list: if param_dict[param]['active']: print('Including parameter ' + param) min_val = section.getfloat(f'{param}_min') max_val = section.getfloat(f'{param}_max') param_dict[param]['vec'] = param_unit_values[:, idx] * \ (max_val - min_val) + min_val idx += 1 else: param_dict[param]['vec'] = np.full((max_samples,), None) # Deal with a few special cases # change units on calving speed limit from m/yr to s/yr if param_dict['calv_limit']['active']: param_dict['calv_limit']['vec'] = \ param_dict['calv_limit']['vec'][:] / sec_in_yr # melt flux needs to be converted to deltaT if param_dict['meltflux']['active']: # First calculate mean TF for this domain iceshelf_area_obs = section.getfloat('iceshelf_area_obs') input_file_path = section.get('input_file_path') TF_file_path = section.get('TF_file_path') mean_TF, iceshelf_area = calc_mean_TF(input_file_path, TF_file_path) # Adjust observed melt flux for ice-shelf area in init. condition print(f'IS area: model={iceshelf_area}, Obs={iceshelf_area_obs}') area_correction = iceshelf_area / iceshelf_area_obs print(f"Ice-shelf area correction is {area_correction}.") if (np.absolute(area_correction - 1.0) > 0.2): print("WARNING: ice-shelf area correction is larger than " "20%. Check data consistency before proceeding.") param_dict['meltflux']['vec'] *= iceshelf_area / iceshelf_area_obs # Set up an array of TF values to use for linear interpolation # Make it span a large enough range to capture deltaT what would # be needed for the range of gamma0 values considered. # Not possible to know a priori, so pick a wide range. TFs = np.linspace(-5.0, 10.0, num=int(15.0 / 0.01)) deltaT_vec = np.zeros(max_samples) # For each run, calculate the deltaT needed to obtain the target # melt flux for ii in range(self.start_run, self.end_run + 1): # spatially averaged version of ISMIP6 melt param.: meltfluxes = (param_dict['gamma0']['vec'][ii] * c_melt * TFs * np.absolute(TFs) * iceshelf_area) * rhoi / 1.0e12 # Gt/yr # interpolate deltaT value. Use nan values outside of range # so out of range results get detected deltaT_vec[ii] = np.interp(param_dict['meltflux']['vec'][ii], meltfluxes, TFs, left=np.nan, right=np.nan) - mean_TF if np.isnan(deltaT_vec[ii]): sys.exit("ERROR: interpolated deltaT out of range. " "Adjust definition of 'TFs'") else: deltaT_vec = [None] * max_samples # add runs as steps based on the run range requested if self.end_run > max_samples: sys.exit("Error: end_run specified in config exceeds maximum " "sample size available in param_vector_filename") for run_num in range(self.start_run, self.end_run + 1): self.add_step(EnsembleMember( test_case=self, run_num=run_num, basal_fric_exp=param_dict['fric_exp']['vec'][run_num], mu_scale=param_dict['mu_scale']['vec'][run_num], stiff_scale=param_dict['stiff_scale']['vec'][run_num], von_mises_threshold=param_dict['von_mises_threshold']['vec'][run_num], # noqa calv_spd_lim=param_dict['calv_limit']['vec'][run_num], gamma0=param_dict['gamma0']['vec'][run_num], meltflux=param_dict['meltflux']['vec'][run_num], deltaT=deltaT_vec[run_num])) # Note: do not add to steps_to_run, because ensemble_manager # will handle submitting and running the runs # Have 'compass run' only run the run_manager but not any actual runs. # This is because the individual runs will be submitted as jobs # by the ensemble manager. self.steps_to_run = ['ensemble_manager',]
# no run() method is needed # no validate() method is needed