Source code for mpas_analysis.ocean.conservation

# This software is open source software available under the BSD-3 license.
#
# Copyright (c) 2022 Triad National Security, LLC. All rights reserved.
# Copyright (c) 2022 Lawrence Livermore National Security, LLC. All rights
# reserved.
# Copyright (c) 2022 UT-Battelle, LLC. All rights reserved.
#
# Additional copyright and license information can be found in the LICENSE file
# distributed with this code, or at
# https://raw.githubusercontent.com/MPAS-Dev/MPAS-Analysis/main/LICENSE
#
# Author
# -------
# Carolyn Begeman

from distutils.spawn import find_executable
import numpy as np
import matplotlib.pyplot as plt
import os
import subprocess
import xarray as xr

from mpas_analysis.shared.analysis_task import AnalysisTask
from mpas_analysis.shared.constants import constants
from mpas_analysis.shared.html import write_image_xml
from mpas_analysis.shared.io import open_mpas_dataset
from mpas_analysis.shared.io.utility import build_config_full_path, \
    make_directories, get_files_year_month, decode_strings
from mpas_analysis.shared.plot import timeseries_analysis_plot, savefig
from mpas_analysis.shared.timekeeping.utility import date_to_days, \
    days_to_datetime


[docs] class ConservationTask(AnalysisTask): """ This task generates time series plots from output from the conservation analysis member. A number of different plot types are supported, as indicated in the `plotTypes` config option in the `conservation` section. Attributes ---------- config : mpas_tools.config.MpasConfigParser Contains configuration options controlConfig : mpas_tools.config.MpasConfigParser Contains configuration options for a control run, if provided outputFile : str The path to the output file produced by this analysis runDirectory : str The path to the restart files from the main simulation being analyzed historyDirectory : str The path to the history files from the main simulation being analyzed startYear : int The year to start the analysis endYear : int The year to end the analysis inputFiles : list of str The paths to all conservation AM files mainRunName : str The name of the main run from the config file plotTypes : list of str The plot types requested in the config file masterVariableList : dict of key-[list of str] pairs Keys are the supported plot types. Entries are lists of the variables that are needed to produce that plot type. derivedVariableList : dict of key-[list of str] pairs Keys are the derived variables. Entries are lists of variables in the AM output that are needed to derive that variable. xmlFileNames : list of str File names for xml output with full path filePrefixes : list of str File prefixes for xml files variableList : dict of key-[list of str] pairs Keys are the requested plot types. Entries are lists of the variables in the AM output that are needed to produce that plot type. """ # Authors # ------- # Carolyn Begeman
[docs] def __init__(self, config, controlConfig): """ Construct the analysis task. Parameters ---------- config : mpas_tools.config.MpasConfigParser Contains configuration options """ # Authors # ------- # Carolyn Begeman super(ConservationTask, self).__init__( config=config, taskName='oceanConservation', componentName='ocean', tags=['timeSeries', 'conservation']) self.controlConfig = controlConfig
def setup_and_check(self): """ Perform steps to set up the analysis and check for errors in the setup. Also, adds attributes to the task that will be needed later. Raises ------ ValueError: if plot type is not supported, conservation analysis member is inactive or input files are missing """ # Authors # ------- # Carolyn Begeman super(ConservationTask, self).setup_and_check() # Check that the conservation analysis member is active self.check_analysis_enabled( analysisOptionName='config_am_conservationcheck_enable', raiseException=True) # Specify where to put analysis task output config = self.config baseDirectory = build_config_full_path( config, 'output', 'conservationSubdirectory') make_directories(baseDirectory) self.outputFile = f'{baseDirectory}/{self.fullTaskName}.nc' # get a list of conservationCheck output files from the streams file, # reading only those that are between the start and end dates # the run directory contains the restart files self.runDirectory = build_config_full_path(self.config, 'input', 'runSubdirectory') # if the history directory exists, use it; if not, fall back on # runDirectory self.historyDirectory = build_config_full_path( self.config, 'input', f'{self.componentName}HistorySubdirectory', defaultPath=self.runDirectory) self.startYear = self.config.getint('timeSeries', 'startYear') self.endYear = self.config.getint('timeSeries', 'endYear') self.inputFiles = sorted(self.historyStreams.readpath( 'conservationCheckOutput', startDate=f'{self.startYear:04d}-01-01_00:00:00', endDate=f'{self.endYear:04d}-01-01_00:00:00', calendar=self.calendar)) if len(self.inputFiles) == 0: raise IOError(f'No files were found matching {self.inputFiles}') with xr.open_dataset(self.inputFiles[0]) as ds: self.allVariables = list(ds.data_vars.keys()) self.mainRunName = self.config.get('runs', 'mainRunName') self.plotTypes = self.config.getexpression('timeSeriesConservation', 'plotTypes') self.masterVariableList = {'absolute_energy_error': ['absoluteEnergyError'], 'total_energy_flux': ['netEnergyFlux'], 'absolute_salt_error': ['absoluteSaltError'], 'ice_salt_flux': ['netSaltFlux'], 'total_mass_flux': ['netMassFlux'], 'total_mass_change': ['netMassChange'], 'land_ice_mass_change': ['landIceMassChange'], 'land_ice_ssh_change': ['landIceSshChange'], 'land_ice_mass_flux': ['landIceMassFlux'], 'land_ice_mass_flux_components': ['accumulatedIcebergFlux', 'accumulatedLandIceFlux', 'accumulatedRemovedRiverRunoffFlux', 'accumulatedRemovedIceRunoffFlux']} # for each derived variable, which source variables are needed self.derivedVariableList = {'netMassChange': ['massChange'], 'landIceMassFlux': ['accumulatedIcebergFlux', 'accumulatedLandIceFlux', 'accumulatedRemovedRiverRunoffFlux', 'accumulatedRemovedIceRunoffFlux'], 'landIceSshChange': ['accumulatedIcebergFlux', 'accumulatedLandIceFlux', 'accumulatedRemovedRiverRunoffFlux', 'accumulatedRemovedIceRunoffFlux'], 'landIceMassChange': ['accumulatedIcebergFlux', 'accumulatedLandIceFlux', 'accumulatedRemovedRiverRunoffFlux', 'accumulatedRemovedIceRunoffFlux']} # Determine the xml files for each plot and the variables each plot will use self.xmlFileNames = [] self.filePrefixes = {} self.variableList = {} for plot_type in self.plotTypes: if plot_type not in self.masterVariableList.keys(): raise ValueError(f'plot type {plot_type} not supported') filePrefix = f'conservation_{self.mainRunName}_{plot_type}_' \ f'years{self.startYear:04d}-{self.endYear:04d}' self.xmlFileNames.append(f'{self.plotsDirectory}/{filePrefix}.xml') self.filePrefixes[plot_type] = filePrefix self.variableList[plot_type] = self._add_variables(self.masterVariableList[plot_type]) def run_task(self): """ Create an output netCDF file that has all of the requested conservation AM variables in the requested time window. Then generate all requested conservation plot types. """ # Authors # ------- # Carolyn Begeman all_plots_variable_list = [] for plot_type in self.plotTypes: for varname in self.variableList[plot_type]: all_plots_variable_list.append(varname) self._compute_time_series_with_ncrcat(all_plots_variable_list) for plot_type in self.plotTypes: self._make_plot(plot_type) def _add_variables(self, target_variable_list): """ Add one or more variables to extract as a time series. Parameters ---------- variableList : list of str A list of variable names in ``conservationCheck`` to be included in the time series Raises ------ ValueError if this function is called before this task has been set up (so the list of available variables has not yet been set) or if one or more of the requested variables is not available in the ``conservationCheck`` output. """ # Authors # ------- # Xylar Asay-Davis variable_list = [] if self.allVariables is None: raise ValueError('add_variables() can only be called after ' 'setup_and_check() in ConservationTask.\n' 'Presumably tasks were added in the wrong order ' 'or add_variables() is being called in the wrong ' 'place.') for variable in target_variable_list: if variable not in self.allVariables and \ variable not in self.derivedVariableList.keys(): raise ValueError( f'{variable} is not available in conservationCheck' 'output:\n{self.allVariables}') if variable in self.allVariables and variable not in variable_list: variable_list.append(variable) # If it's a derived variable, add all of the variables it depends on if variable in self.derivedVariableList.keys() and \ variable not in variable_list: for var in self.derivedVariableList[variable]: variable_list.append(var) return variable_list def _make_plot(self, plot_type): """ Generate time series plots from conservation AM output. Parameters ---------- plot_type: str The type of plot to generate from conservationCheck variables """ config = self.config filePrefix = self.filePrefixes[plot_type] outFileName = f'{self.plotsDirectory}/{filePrefix}.png' titles = {} titles['total_energy_flux'] = 'Total energy flux' titles['absolute_energy_error'] = 'Energy error' titles['ice_salt_flux'] = 'Salt flux related to land ice and sea ice' titles['absolute_salt_error'] = 'Salt conservation error' titles['total_mass_flux'] = 'Total mass flux' titles['total_mass_change'] = 'Total mass anomaly' titles['land_ice_mass_flux'] = 'Mass flux due to land ice' titles['land_ice_mass_change'] = 'Mass anomaly due to land ice fluxes' titles['land_ice_ssh_change'] = 'SSH anomaly due to land ice fluxes' titles['land_ice_mass_flux_components'] = 'Mass fluxes from land ice' y_labels = {} y_labels['total_energy_flux'] = 'Energy flux (W)' y_labels['absolute_energy_error'] = 'Energy (J)' y_labels['ice_salt_flux'] = 'Salt flux (Gt/yr)' y_labels['absolute_salt_error'] = 'Salt (Gt)' y_labels['total_mass_flux'] = 'Mass flux (Gt/yr)' y_labels['total_mass_change'] = 'Mass (Gt)' y_labels['land_ice_mass_flux'] = 'Mass flux (Gt/yr)' y_labels['land_ice_mass_change'] = 'Mass (Gt)' y_labels['land_ice_ssh_change'] = 'SSH anomaly (mm)' y_labels['land_ice_mass_flux_components'] = 'Mass flux (Gt/yr)' captions = {} captions['total_energy_flux'] = 'Total energy flux' captions['absolute_energy_error'] = 'Absolute energy conservation error' captions['ice_salt_flux'] = 'Salt flux related to land ice and sea ice ' \ '(sea ice salinity flux, sea ice frazil flux, and land ice frazil flux)' captions['absolute_salt_error'] = 'Absolute salt conservation error' captions['total_mass_flux'] = 'Total mass flux' captions['total_mass_change'] = 'Total mass anomaly' captions['land_ice_mass_flux'] = 'Mass flux due to land ice' captions['land_ice_mass_change'] = 'Mass anomaly due to land ice fluxes' captions['land_ice_ssh_change'] = 'SSH anomaly due to land ice fluxes. Assumes a constant ocean area.' captions['land_ice_mass_flux_components'] = 'Mass flux components from land ice' self.logger.info(f' Open conservation file {self.outputFile}...') ds = open_mpas_dataset(fileName=self.outputFile, calendar=self.calendar, variableList=self.variableList[plot_type], timeVariableNames='xtime', startDate=f'{self.startYear:04d}-01-01_00:00:00', endDate=f'{self.endYear:04d}-01-01_00:00:00') if self.controlConfig is not None: baseDirectory = build_config_full_path( self.controlConfig, 'output', 'timeSeriesSubdirectory') controlFileName = f'{baseDirectory}/{self.fullTaskName}.nc' self.logger.info(' Load in conservation for a control run ' f'{controlFileName}...') ds_ref = open_mpas_dataset(fileName=controlFileName, calendar=self.calendar, variableList=self.variableList[plot_type], timeVariableNames='xtime') controlEndYear = self.controlConfig.getint('timeSeries', 'endYear') if self.startYear <= controlEndYear: timeStart = date_to_days(year=self.startYear, month=1, day=1, calendar=self.calendar) timeEnd = date_to_days(year=self.endYear, month=12, day=31, calendar=self.calendar) ds_ref_slice = \ ds_ref.sel(Time=slice(timeStart, timeEnd)) else: self.logger.warning('Control time series ends before the ' 'timeSeries startYear and will not be ' 'plotted.') self.controlConfig = None # make the plot self.logger.info(' Make conservation plots...') xLabel = 'Time (years)' title = titles[plot_type] yLabel = y_labels[plot_type] lineStylesBase = ['-', '--', '-.', ':'] # gather all the variables for this plot type fields = [] legendText = [] lineColors = [] lineStyles = [] for index, varname in enumerate(self.masterVariableList[plot_type]): variable = self._get_variable(ds, varname) fields.append(variable) legend_text = '' if self.controlConfig is not None: legend_text = self.mainRunName if len(self.masterVariableList[plot_type]) > 1: if len(legend_text) > 0: legend_text = f'{legend_text}, ' legend_text = f"{legend_text}{varname.replace('accumulated', '').replace('Flux', '')}" legendText.append(legend_text) lineColors.append(config.get('timeSeries', 'mainColor')) lineStyles.append(lineStylesBase[index]) if self.controlConfig is not None: variable = self._get_variable(ds_ref, varname) fields.append(variable) legend_text = self.controlConfig.get('runs', 'mainRunName') if len(self.masterVariableList[plot_type]) > 1: legend_text = f"{legend_text}, {varname.replace('accumulated', '').replace('Flux', '')}" legendText.append(legend_text) lineColors.append(config.get('timeSeries', 'controlColor')) lineStyles.append(lineStylesBase[index]) lineWidths = [3 for i in fields] if config.has_option('timeSeries', 'movingAveragePoints'): movingAveragePoints = config.getint('timeSeries', 'movingAveragePoints') else: movingAveragePoints = None if config.has_option('timeSeries', 'firstYearXTicks'): firstYearXTicks = config.getint('timeSeries', 'firstYearXTicks') else: firstYearXTicks = None if config.has_option('timeSeries', 'yearStrideXTicks'): yearStrideXTicks = config.getint('timeSeries', 'yearStrideXTicks') else: yearStrideXTicks = None timeseries_analysis_plot(config, fields, calendar=self.calendar, title=title, xlabel=xLabel, ylabel=yLabel, movingAveragePoints=movingAveragePoints, lineColors=lineColors, lineStyles=lineStyles[:len(fields)], lineWidths=lineWidths, legendText=legendText, firstYearXTicks=firstYearXTicks, yearStrideXTicks=yearStrideXTicks) # save the plot to the output file plt.savefig(outFileName) caption = captions[plot_type] write_image_xml( config=self.config, filePrefix=filePrefix, componentName='Ocean', componentSubdirectory='ocean', galleryGroup='Time Series', groupLink='timeseries', gallery='Conservation', thumbnailDescription=title, imageDescription=caption, imageCaption=caption) def _get_variable(self, ds, varname, mks=False): if varname not in self.derivedVariableList: variable = ds[varname] else: # Here we keep the units mks if varname == 'netMassChange': variable = self._get_variable(ds, 'massChange', mks=True) # mass_flux = self._get_variable(ds, 'netMassFlux') # # Assume that the frequency of output is monthly # dt = constants.sec_per_month # # Convert from kg/s to kg # derived_variable = mass_flux.cumsum(axis=0) * dt elif varname == 'landIceMassChange': land_ice_mass_flux = self._get_variable(ds, 'landIceMassFlux', mks=True) # Assume that the frequency of output is monthly dt = constants.sec_per_month # Convert from kg/s to kg/month land_ice_mass_flux = land_ice_mass_flux * dt # Convert from kg/month to kg variable = np.cumsum(land_ice_mass_flux) elif varname == 'landIceMassFlux': variable = self._get_variable(ds, 'accumulatedIcebergFlux', mks=True) + \ self._get_variable(ds, 'accumulatedLandIceFlux', mks=True) + \ self._get_variable(ds, 'accumulatedRemovedRiverRunoffFlux', mks=True) + \ self._get_variable(ds, 'accumulatedRemovedIceRunoffFlux', mks=True) elif varname == 'landIceSshChange': ts_files = sorted(self.historyStreams.readpath( 'timeSeriesStatsMonthlyOutput', startDate=f'{self.startYear:04d}-01-01_00:00:00', endDate=f'{self.endYear:04d}-01-01_00:00:00', calendar=self.calendar)) # Note that here we assume that the area of the ocean is constant in time # to save computational expense because most configurations do not allow # the area of the ocean to change ts_file = ts_files[0] if not os.path.exists(ts_file): raise ValueError(f'Could not find timeMonthlyStats file {ts_file}') var = 'timeMonthly_avg_areaCellGlobal' ds_ts = open_mpas_dataset(fileName=ts_file, calendar=self.calendar, variableList=[var]) A = ds_ts[var].mean() land_ice_mass_change = self._get_variable(ds, 'landIceMassChange', mks=True) rho = self.namelist.getfloat('config_density0') # Convert from to kg to m variable = land_ice_mass_change / (rho * A) else: raise ValueError(f'Attempted to derive non-supported variable {varname}') removed_vars = ['accumulatedRemovedRiverRunoffFlux', 'accumulatedRemovedIceRunoffFlux'] if varname in removed_vars: variable = -variable if not mks: # Here we do all the unit conversion from mks into whatever we want mass_vars = ['initialMass', 'finalMass', 'absoluteMassError', 'relativeMassError', 'massChange', 'landIceMassChange'] salt_vars = ['initialSalt', 'finalSalt', 'absoluteSaltError', 'relativeSaltError'] mass_flux_vars = ['netMassFlux', 'landIceMassFlux'] salt_flux_vars = ['netSaltFlux'] ssh_vars = ['landIceSshChange', 'sshChange'] if (varname in mass_vars) or (varname in salt_vars): # Convert from kg to Gt variable = variable * 1e-12 if (varname in mass_flux_vars) or (varname in salt_flux_vars): # Convert from kg/s to Gt/yr variable = variable * 1e-12 * constants.sec_per_year if varname in ssh_vars: # Convert from m to mm variable = variable * 1e3 return variable def _compute_time_series_with_ncrcat(self, variable_list): """ Uses ncrcat to extact time series from conservationCheckOutput files Raises ------ OSError If ``ncrcat`` is not in the system path. """ if find_executable('ncrcat') is None: raise OSError('ncrcat not found. Make sure the latest nco ' 'package is installed: \n' 'conda install nco\n' 'Note: this presumes use of the conda-forge ' 'channel.') inputFiles = self.inputFiles append = False if os.path.exists(self.outputFile): # make sure all the necessary variables are also present with xr.open_dataset(self.outputFile) as ds: if ds.sizes['Time'] == 0: updateSubset = False else: updateSubset = True for variableName in variable_list: if variableName not in ds.variables: updateSubset = False break if updateSubset: # add only input files with times that aren't already in # the output file append = True fileNames = sorted(self.inputFiles) inYears, inMonths = get_files_year_month( fileNames, self.historyStreams, 'conservationCheckOutput') inYears = np.array(inYears) inMonths = np.array(inMonths) totalMonths = 12 * inYears + inMonths dates = decode_strings(ds.xtime) lastDate = dates[-1] lastYear = int(lastDate[0:4]) lastMonth = int(lastDate[5:7]) lastTotalMonths = 12 * lastYear + lastMonth inputFiles = [] for index, inputFile in enumerate(fileNames): if totalMonths[index] > lastTotalMonths: inputFiles.append(inputFile) if len(inputFiles) == 0: # nothing to do return else: # there is an output file but it has the wrong variables # so we need ot delete it. self.logger.warning('Warning: deleting file {self.outputFile}' ' because it is empty or some variables' ' were missing') os.remove(self.outputFile) variableList = variable_list + ['xtime'] args = ['ncrcat', '-4', '--no_tmp_fl', '-v', ','.join(variableList)] if append: args.append('--record_append') printCommand = '{} {} ... {} {}'.format(' '.join(args), inputFiles[0], inputFiles[-1], self.outputFile) args.extend(inputFiles) args.append(self.outputFile) self.logger.info(f'running: {printCommand}') for handler in self.logger.handlers: handler.flush() process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() if stdout: stdout = stdout.decode('utf-8') for line in stdout.split('\n'): self.logger.info(line) if stderr: stderr = stderr.decode('utf-8') for line in stderr.split('\n'): self.logger.error(line) if process.returncode != 0: raise subprocess.CalledProcessError(process.returncode, ' '.join(args))