# This software is open source software available under the BSD-3 license.
#
# Copyright (c) 2019 Triad National Security, LLC. All rights reserved.
# Copyright (c) 2019 Lawrence Livermore National Security, LLC. All rights
# reserved.
# Copyright (c) 2019 UT-Battelle, LLC. All rights reserved.
#
# Additional copyright and license information can be found in the LICENSE file
# distributed with this code, or at
# https://raw.githubusercontent.com/MPAS-Dev/MPAS-Analysis/master/LICENSE
from __future__ import absolute_import, division, print_function, \
unicode_literals
import os
import xarray
import numpy
import dask
import multiprocessing
from multiprocessing.pool import ThreadPool
import matplotlib.pyplot as plt
from geometric_features import FeatureCollection, read_feature_collection
from mpas_analysis.shared.analysis_task import AnalysisTask
from mpas_analysis.shared.plot import timeseries_analysis_plot, savefig, \
add_inset
from mpas_analysis.shared.io import open_mpas_dataset, write_netcdf
from mpas_analysis.shared.io.utility import build_config_full_path, \
get_files_year_month, decode_strings, get_region_mask
from mpas_analysis.shared.html import write_image_xml
from mpas_analysis.shared.regions import ComputeRegionMasksSubtask, \
get_feature_list
from mpas_analysis.ocean.utility import compute_zmid
[docs]class TimeSeriesOceanRegions(AnalysisTask): # {{{
"""
Performs analysis of the time-series output of regionoal mean temperature,
salinity, etc.
"""
# Authors
# -------
# Xylar Asay-Davis
[docs] def __init__(self, config, controlConfig=None):
# {{{
"""
Construct the analysis task.
Parameters
----------
config : ``MpasAnalysisConfigParser``
Configuration options
controlConfig : ``MpasAnalysisConfigParser``, optional
Configuration options for a control run (if any)
"""
# Authors
# -------
# Xylar Asay-Davis
# first, call the constructor from the base class (AnalysisTask)
super(TimeSeriesOceanRegions, self).__init__(
config=config,
taskName='timeSeriesOceanRegions',
componentName='ocean',
tags=['timeSeries', 'regions', 'antarctic'])
startYear = config.getint('timeSeries', 'startYear')
endYear = config.getint('timeSeries', 'endYear')
regionGroups = config.getExpression(self.taskName, 'regionGroups')
parallelTaskCount = config.getWithDefault('execute',
'parallelTaskCount',
default=1)
for regionGroup in regionGroups:
sectionSuffix = regionGroup[0].upper() + \
regionGroup[1:].replace(' ', '')
fileSuffix = sectionSuffix[0].lower() + sectionSuffix[1:]
sectionName = 'timeSeries{}'.format(sectionSuffix)
regionMaskFile = config.getExpression(sectionName, 'regionMask')
regionMaskFile = get_region_mask(config, regionMaskFile)
regionNames = config.getExpression(sectionName, 'regionNames')
if 'all' in regionNames and os.path.exists(regionMaskFile):
regionNames = get_feature_list(regionMaskFile)
subtaskName = 'compute{}Masks'.format(sectionSuffix)
masksSubtask = ComputeRegionMasksSubtask(
self, regionMaskFile, outFileSuffix=fileSuffix,
featureList=regionNames, subtaskName=subtaskName,
subprocessCount=parallelTaskCount,)
self.add_subtask(masksSubtask)
years = range(startYear, endYear + 1)
# in the end, we'll combine all the time series into one, but we
# create this task first so it's easier to tell it to run after all
# the compute tasks
combineSubtask = CombineRegionalProfileTimeSeriesSubtask(
self, startYears=years, endYears=years,
regionGroup=regionGroup)
# run one subtask per year
for year in years:
computeSubtask = ComputeRegionTimeSeriesSubtask(
self, startYear=year, endYear=year,
masksSubtask=masksSubtask, regionGroup=regionGroup,
regionNames=regionNames)
self.add_subtask(computeSubtask)
computeSubtask.run_after(masksSubtask)
combineSubtask.run_after(computeSubtask)
self.add_subtask(combineSubtask)
for index, regionName in enumerate(regionNames):
fullSuffix = sectionSuffix + '_' + regionName[0].lower() + \
regionName[1:].replace(' ', '')
plotRegionSubtask = PlotRegionTimeSeriesSubtask(
self, regionGroup, regionName, index, controlConfig,
sectionName, fullSuffix)
plotRegionSubtask.run_after(combineSubtask)
self.add_subtask(plotRegionSubtask)
# }}}
# }}}
class ComputeRegionTimeSeriesSubtask(AnalysisTask): # {{{
'''
Compute regional and depth mean at a function of time for a set of MPAS
fields
Attributes
----------
startYear, endYear : int
The beginning and end of the time series to compute
masksSubtask : ``ComputeRegionMasksSubtask``
A task for creating mask files for each region to plot
regionGroup : str
The name of the region group being computed (e.g. "Antarctic Basins")
regionNames : list of str
The names of the regions to compute
'''
# Authors
# -------
# Xylar Asay-Davis
def __init__(self, parentTask, startYear, endYear, masksSubtask,
regionGroup, regionNames): # {{{
'''
Construct the analysis task.
Parameters
----------
parentTask : ``TimeSeriesOceanRegions``
The main task of which this is a subtask
startYear, endYear : int
The beginning and end of the time series to compute
masksSubtask : ``ComputeRegionMasksSubtask``
A task for creating mask files for each region to plot
regionGroup : str
The name of the region group being computed (e.g. "Antarctic
Basins")
regionNames : list of str
The names of the regions to compute
'''
# Authors
# -------
# Xylar Asay-Davis
suffix = regionGroup[0].upper() + regionGroup[1:].replace(' ', '')
# first, call the constructor from the base class (AnalysisTask)
super(ComputeRegionTimeSeriesSubtask, self).__init__(
config=parentTask.config,
taskName=parentTask.taskName,
componentName=parentTask.componentName,
tags=parentTask.tags,
subtaskName='compute{}_{:04d}-{:04d}'.format(suffix, startYear,
endYear))
parentTask.add_subtask(self)
self.startYear = startYear
self.endYear = endYear
self.masksSubtask = masksSubtask
self.regionGroup = regionGroup
self.regionNames = regionNames
parallelTaskCount = self.config.getint('execute', 'parallelTaskCount')
self.subprocessCount = min(parallelTaskCount,
self.config.getint(self.taskName,
'subprocessCount'))
self.daskThreads = min(
multiprocessing.cpu_count(),
self.config.getint(self.taskName, 'daskThreads'))
# }}}
def setup_and_check(self): # {{{
'''
Perform steps to set up the analysis and check for errors in the setup.
Raises
------
ValueError
if timeSeriesStatsMonthly is not enabled in the MPAS run
'''
# Authors
# -------
# Xylar Asay-Davis
# first, call setup_and_check from the base class (AnalysisTask),
# which will perform some common setup, including storing:
# self.runDirectory , self.historyDirectory, self.plotsDirectory,
# self.namelist, self.runStreams, self.historyStreams,
# self.calendar
super(ComputeRegionTimeSeriesSubtask, self).setup_and_check()
self.check_analysis_enabled(
analysisOptionName='config_am_timeseriesstatsmonthly_enable',
raiseException=True)
# }}}
def run_task(self): # {{{
'''
Compute the regional-mean time series
'''
# Authors
# -------
# Xylar Asay-Davis
config = self.config
self.logger.info("\nCompute time series of regional means...")
startDate = '{:04d}-01-01_00:00:00'.format(self.startYear)
endDate = '{:04d}-12-31_23:59:59'.format(self.endYear)
regionGroup = self.regionGroup
sectionSuffix = regionGroup[0].upper() + \
regionGroup[1:].replace(' ', '')
timeSeriesName = sectionSuffix[0].lower() + sectionSuffix[1:]
sectionName = 'timeSeries{}'.format(sectionSuffix)
outputDirectory = '{}/{}/'.format(
build_config_full_path(config, 'output', 'timeseriesSubdirectory'),
timeSeriesName)
try:
os.makedirs(outputDirectory)
except OSError:
pass
outFileName = '{}/{}_{:04d}-{:04d}.nc'.format(
outputDirectory, timeSeriesName, self.startYear, self.endYear)
inputFiles = sorted(self.historyStreams.readpath(
'timeSeriesStatsMonthlyOutput', startDate=startDate,
endDate=endDate, calendar=self.calendar))
years, months = get_files_year_month(inputFiles,
self.historyStreams,
'timeSeriesStatsMonthlyOutput')
variables = config.getExpression(sectionName, 'variables')
variableList = [var['mpas'] for var in variables] + \
['timeMonthly_avg_layerThickness']
outputExists = os.path.exists(outFileName)
outputValid = outputExists
if outputExists:
with open_mpas_dataset(fileName=outFileName,
calendar=self.calendar,
timeVariableNames=None,
variableList=None,
startDate=startDate,
endDate=endDate) as dsOut:
for inIndex in range(dsOut.dims['Time']):
mask = numpy.logical_and(
dsOut.year[inIndex].values == years,
dsOut.month[inIndex].values == months)
if numpy.count_nonzero(mask) == 0:
outputValid = False
break
if outputValid:
self.logger.info(' Time series exists -- Done.')
return
# Load mesh related variables
try:
restartFileName = self.runStreams.readpath('restart')[0]
except ValueError:
raise IOError('No MPAS-O restart file found: need at least one '
'restart file for ocean region time series')
cellsChunk = 32768
timeChunk = 1
datasets = []
for timeIndex, fileName in enumerate(inputFiles):
dsTimeSlice = open_mpas_dataset(
fileName=fileName,
calendar=self.calendar,
variableList=variableList,
startDate=startDate,
endDate=endDate)
datasets.append(dsTimeSlice)
chunk = {'Time': timeChunk, 'nCells': cellsChunk}
if config.has_option(sectionName, 'zmin'):
config_zmin = config.getfloat(sectionName, 'zmin')
else:
config_zmin = None
if config.has_option(sectionName, 'zmax'):
config_zmax = config.getfloat(sectionName, 'zmax')
else:
config_zmax = None
with dask.config.set(schedular='threads',
pool=ThreadPool(self.daskThreads)):
# combine data sets into a single data set
dsIn = xarray.concat(datasets, 'Time').chunk(chunk)
chunk = {'nCells': cellsChunk}
dsRestart = xarray.open_dataset(restartFileName)
dsRestart = dsRestart.isel(Time=0).chunk(chunk)
dsIn['areaCell'] = dsRestart.areaCell
if 'landIceMask' in dsRestart:
# only the region outside of ice-shelf cavities
dsIn['openOceanMask'] = dsRestart.landIceMask == 0
dsIn['zMid'] = compute_zmid(dsRestart.bottomDepth,
dsRestart.maxLevelCell,
dsRestart.layerThickness)
regionMaskFileName = self.masksSubtask.maskFileName
dsRegionMask = xarray.open_dataset(regionMaskFileName)
maskRegionNames = decode_strings(dsRegionMask.regionNames)
datasets = []
regionIndices = []
for regionName in self.regionNames:
self.logger.info(' region: {}'.format(regionName))
regionIndex = maskRegionNames.index(regionName)
regionIndices.append(regionIndex)
chunk = {'nCells': cellsChunk}
dsMask = dsRegionMask.isel(nRegions=regionIndex).chunk(chunk)
cellMask = dsMask.regionCellMasks == 1
if 'openOceanMask' in dsIn:
cellMask = numpy.logical_and(cellMask, dsIn.openOceanMask)
dsRegion = dsIn.where(cellMask, drop=True)
totalArea = dsRegion['areaCell'].sum()
self.logger.info(' totalArea: {} mil. km^2'.format(
1e-12*totalArea.values))
self.logger.info("Don't worry about the following dask "
"warnings.")
if config_zmin is None:
zmin = dsMask.zmin
else:
zmin = config_zmin
if config_zmax is None:
zmax = dsMask.zmax
else:
zmax = config_zmax
depthMask = numpy.logical_and(dsRegion.zMid >= zmin,
dsRegion.zMid <= zmax)
depthMask.compute()
self.logger.info("Dask warnings should be done.")
dsRegion['depthMask'] = depthMask
layerThickness = dsRegion.timeMonthly_avg_layerThickness
dsRegion['volCell'] = (dsRegion.areaCell*layerThickness).where(
depthMask)
totalVol = dsRegion.volCell.sum(dim='nVertLevels').sum(
dim='nCells')
totalVol.compute()
self.logger.info(' totalVol (mil. km^3): {}'.format(
1e-15*totalVol.values))
dsRegion = dsRegion.transpose('Time', 'nCells', 'nVertLevels')
dsOut = xarray.Dataset()
dsOut['totalVol'] = totalVol
dsOut.totalVol.attrs['units'] = 'm^3'
dsOut['totalArea'] = totalArea
dsOut.totalArea.attrs['units'] = 'm^2'
dsOut['zbounds'] = ('nbounds', [zmin, zmax])
dsOut.zbounds.attrs['units'] = 'm'
for var in variables:
outName = var['name']
self.logger.info(' {}'.format(outName))
mpasVarName = var['mpas']
timeSeries = dsRegion[mpasVarName]
units = timeSeries.units
description = timeSeries.long_name
is3d = 'nVertLevels' in timeSeries.dims
if is3d:
timeSeries = \
(dsRegion.volCell*timeSeries.where(depthMask)).sum(
dim='nVertLevels').sum(dim='nCells') / totalVol
else:
timeSeries = \
(dsRegion.areaCell*timeSeries).sum(
dim='nCells') / totalArea
timeSeries.compute()
dsOut[outName] = timeSeries
dsOut[outName].attrs['units'] = units
dsOut[outName].attrs['description'] = description
dsOut[outName].attrs['is3d'] = str(is3d)
datasets.append(dsOut)
# combine data sets into a single data set
dsOut = xarray.concat(datasets, 'nRegions')
dsOut.coords['regionNames'] = dsRegionMask.regionNames.isel(
nRegions=regionIndices)
dsOut.coords['year'] = (('Time'), years)
dsOut['year'].attrs['units'] = 'years'
dsOut.coords['month'] = (('Time'), months)
dsOut['month'].attrs['units'] = 'months'
write_netcdf(dsOut, outFileName)
# }}}
# }}}
class CombineRegionalProfileTimeSeriesSubtask(AnalysisTask): # {{{
'''
Combine individual time series into a single data set
'''
# Authors
# -------
# Xylar Asay-Davis
def __init__(self, parentTask, startYears, endYears, regionGroup): # {{{
'''
Construct the analysis task.
Parameters
----------
parentTask : ``TimeSeriesOceanRegions``
The main task of which this is a subtask
startYears, endYears : list of int
The beginning and end of each time series to combine
regionGroup : str
The name of the region group being computed (e.g. "Antarctic
Basins")
'''
# Authors
# -------
# Xylar Asay-Davis
taskSuffix = regionGroup[0].upper() + regionGroup[1:].replace(' ', '')
subtaskName = 'combine{}TimeSeries'.format(taskSuffix)
# first, call the constructor from the base class (AnalysisTask)
super(CombineRegionalProfileTimeSeriesSubtask, self).__init__(
config=parentTask.config,
taskName=parentTask.taskName,
componentName=parentTask.componentName,
tags=parentTask.tags,
subtaskName=subtaskName)
self.startYears = startYears
self.endYears = endYears
self.regionGroup = regionGroup
# }}}
def run_task(self): # {{{
'''
Combine the time series
'''
# Authors
# -------
# Xylar Asay-Davis
regionGroup = self.regionGroup
timeSeriesName = regionGroup[0].lower() + \
regionGroup[1:].replace(' ', '')
outputDirectory = '{}/{}/'.format(
build_config_full_path(self.config, 'output',
'timeseriesSubdirectory'),
timeSeriesName)
outFileName = '{}/{}_{:04d}-{:04d}.nc'.format(
outputDirectory, timeSeriesName, self.startYears[0],
self.endYears[-1])
if not os.path.exists(outFileName):
inFileNames = []
for startYear, endYear in zip(self.startYears, self.endYears):
inFileName = '{}/{}_{:04d}-{:04d}.nc'.format(
outputDirectory, timeSeriesName, startYear, endYear)
inFileNames.append(inFileName)
ds = xarray.open_mfdataset(inFileNames, combine='nested',
concat_dim='Time', decode_times=False)
# a few variables have become time dependent and shouldn't be
for var in ['totalArea', 'zbounds']:
ds[var] = ds[var].isel(Time=0, drop=True)
write_netcdf(ds, outFileName)
# }}}
# }}}
class PlotRegionTimeSeriesSubtask(AnalysisTask):
"""
Plots time-series output of properties in an ocean region.
Attributes
----------
regionGroup : str
Name of the collection of region to plot
regionName : str
Name of the region to plot
regionIndex : int
The index into the dimension ``nRegions`` of the region to plot
sectionName : str
The section of the config file to get options from
controlConfig : ``MpasAnalysisConfigParser``
The configuration options for the control run (if any)
"""
# Authors
# -------
# Xylar Asay-Davis
def __init__(self, parentTask, regionGroup, regionName, regionIndex,
controlConfig, sectionName, fullSuffix):
# {{{
"""
Construct the analysis task.
Parameters
----------
parentTask : ``AnalysisTask``
The parent task, used to get the ``taskName``, ``config`` and
``componentName``
regionGroup : str
Name of the collection of region to plot
regionName : str
Name of the region to plot
regionIndex : int
The index into the dimension ``nRegions`` of the region to plot
controlConfig : ``MpasAnalysisConfigParser``, optional
Configuration options for a control run (if any)
sectionName : str
The config section with options for this regionGroup
fullSuffix : str
The regionGroup and regionName combined and modified to be
appropriate as a task or file suffix
"""
# Authors
# -------
# Xylar Asay-Davis
# first, call the constructor from the base class (AnalysisTask)
super(PlotRegionTimeSeriesSubtask, self).__init__(
config=parentTask.config,
taskName=parentTask.taskName,
componentName=parentTask.componentName,
tags=parentTask.tags,
subtaskName='plot{}'.format(fullSuffix))
self.regionGroup = regionGroup
self.regionName = regionName
self.regionIndex = regionIndex
self.sectionName = sectionName
self.controlConfig = controlConfig
self.prefix = fullSuffix[0].lower() + fullSuffix[1:]
# }}}
def setup_and_check(self): # {{{
"""
Perform steps to set up the analysis and check for errors in the setup.
Raises
------
IOError
If files are not present
"""
# Authors
# -------
# Xylar Asay-Davis
# first, call setup_and_check from the base class (AnalysisTask),
# which will perform some common setup, including storing:
# self.inDirectory, self.plotsDirectory, self.namelist, self.streams
# self.calendar
super(PlotRegionTimeSeriesSubtask, self).setup_and_check()
self.variables = self.config.getExpression(self.sectionName,
'variables')
self.xmlFileNames = []
for var in self.variables:
self.xmlFileNames.append('{}/{}_{}.xml'.format(
self.plotsDirectory, self.prefix, var['name']))
return # }}}
def run_task(self): # {{{
"""
Plots time-series output of properties in an ocean region.
"""
# Authors
# -------
# Xylar Asay-Davis
self.logger.info("\nPlotting time series of ocean properties of {}"
"...".format(self.regionName))
self.logger.info(' Load time series...')
config = self.config
calendar = self.calendar
regionMaskFile = config.getExpression(self.sectionName, 'regionMask')
regionMaskFile = get_region_mask(config, regionMaskFile)
fcAll = read_feature_collection(regionMaskFile)
fc = FeatureCollection()
for feature in fcAll.features:
if feature['properties']['name'] == self.regionName:
fc.add_feature(feature)
break
baseDirectory = build_config_full_path(
config, 'output', 'timeSeriesSubdirectory')
startYear = config.getint('timeSeries', 'startYear')
endYear = config.getint('timeSeries', 'endYear')
regionGroup = self.regionGroup
timeSeriesName = regionGroup[0].lower() + \
regionGroup[1:].replace(' ', '')
inFileName = '{}/{}/{}_{:04d}-{:04d}.nc'.format(
baseDirectory, timeSeriesName, timeSeriesName, startYear, endYear)
dsIn = xarray.open_dataset(inFileName).isel(nRegions=self.regionIndex)
zbounds = dsIn.zbounds.values
controlConfig = self.controlConfig
plotControl = controlConfig is not None
if plotControl:
controlRunName = controlConfig.get('runs', 'mainRunName')
baseDirectory = build_config_full_path(
controlConfig, 'output', 'timeSeriesSubdirectory')
startYear = controlConfig.getint('timeSeries', 'startYear')
endYear = controlConfig.getint('timeSeries', 'endYear')
inFileName = '{}/{}/{}_{:04d}-{:04d}.nc'.format(
baseDirectory, timeSeriesName, timeSeriesName, startYear,
endYear)
dsRef = xarray.open_dataset(inFileName).isel(
nRegions=self.regionIndex)
zboundsRef = dsRef.zbounds.values
mainRunName = config.get('runs', 'mainRunName')
movingAverageMonths = 1
self.logger.info(' Make plots...')
groupLink = self.regionGroup[0].lower() + \
self.regionGroup[1:].replace(' ', '')
for var in self.variables:
varName = var['name']
mainArray = dsIn[varName]
is3d = mainArray.attrs['is3d'] == 'True'
if is3d:
title = 'Volume-Mean {} in {}'.format(
var['title'], self.regionName)
else:
title = 'Area-Mean {} in {}'.format(var['title'],
self.regionName)
if plotControl:
refArray = dsRef[varName]
xLabel = 'Time (yr)'
yLabel = '{} ({})'.format(var['title'], var['units'])
filePrefix = '{}_{}'.format(self.prefix, varName)
outFileName = '{}/{}.png'.format(self.plotsDirectory, filePrefix)
fields = [mainArray]
lineColors = ['k']
lineWidths = [2.5]
legendText = [mainRunName]
if plotControl:
fields.append(refArray)
lineColors.append('r')
lineWidths.append(1.2)
legendText.append(controlRunName)
if is3d:
if not plotControl or numpy.all(zbounds == zboundsRef):
title = '{} ({} < z < {} m)'.format(title, zbounds[0],
zbounds[1])
else:
legendText[0] = '{} ({} < z < {} m)'.format(
legendText[0], zbounds[0], zbounds[1])
legendText[1] = '{} ({} < z < {} m)'.format(
legendText[1], zboundsRef[0], zboundsRef[1])
fig = timeseries_analysis_plot(config, fields, movingAverageMonths,
title, xLabel, yLabel,
calendar=calendar,
lineColors=lineColors,
lineWidths=lineWidths,
legendText=legendText)
# do this before the inset because otherwise it moves the inset
# and cartopy doesn't play too well with tight_layout anyway
plt.tight_layout()
add_inset(fig, fc, width=2.0, height=2.0)
savefig(outFileName, tight=False)
caption = 'Regional mean of {}'.format(title)
write_image_xml(
config=config,
filePrefix=filePrefix,
componentName='Ocean',
componentSubdirectory='ocean',
galleryGroup='{} Time Series'.format(self.regionGroup),
groupLink=groupLink,
gallery=var['title'],
thumbnailDescription=self.regionName,
imageDescription=caption,
imageCaption=caption)
# }}}
# }}}
# vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python