# This software is open source software available under the BSD-3 license.
#
# Copyright (c) 2022 Triad National Security, LLC. All rights reserved.
# Copyright (c) 2022 Lawrence Livermore National Security, LLC. All rights
# reserved.
# Copyright (c) 2022 UT-Battelle, LLC. All rights reserved.
#
# Additional copyright and license information can be found in the LICENSE file
# distributed with this code, or at
# https://raw.githubusercontent.com/MPAS-Dev/MPAS-Analysis/master/LICENSE
"""
IO utility functions
Phillip J. Wolfram, Xylar Asay-Davis
"""
import glob
import os
import random
import string
from datetime import datetime
import numpy
import shutil
import warnings
[docs]def paths(*args):
"""
Returns glob'd paths in list for arbitrary number of function arguments.
Note, each expanded set of paths is sorted.
Parameters
----------
*args : list
A list of arguments to pass to ``glob.glob``
Returns
-------
paths : list of str
A list of file paths
"""
# Authors
# -------
# Phillip J. Wolfram
paths = []
for aargs in args:
paths += sorted(glob.glob(aargs))
return paths
def fingerprint_generator(size=12,
chars=string.ascii_uppercase + string.digits):
"""
Returns a random string that can be used as a unique fingerprint
Parameters
----------
size : int, optional
The number of characters in the fingerprint
chars : list of char, optional
The fingerprint
Returns
-------
fingerprint : str
A random string
Reference
---------
http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
"""
# Authors
# -------
# Phillip J. Wolfram
return ''.join(random.choice(chars) for _ in range(size))
[docs]def make_directories(path):
"""
Make the given path if it does not already exist.
Parameters
----------
path : str
the path to make
Returns
-------
path : str
the path unchanged
"""
# Authors
# -------
# Xylar Asay-Davis
try:
os.makedirs(path)
except OSError:
pass
return path
[docs]def build_config_full_path(config, section, relativePathOption,
relativePathSection=None,
defaultPath=None,
baseDirectoryOption='baseDirectory'):
"""
Get a full path from a base directory and a relative path
Parameters
----------
config : mpas_tools.config.MpasConfigParser
configuration from which to read the path
section : str
the name of a section in `config`, which must have an option
``baseDirectory``
relativePathOption : str
the name of an option in ``section`` of the relative path within
``baseDirectory`` (or possibly an absolute path)
relativePathSection : str, optional
the name of a section for ``relativePathOption`` if not ``section``
defaultPath : str, optional
the name of a path to return if the resulting path doesn't exist.
baseDirectoryOption : str, optional
the name of the option in ``section`` for the base directorys
Returns
-------
fullPath : str
The full path to the given relative path within the given
``baseDirectory``
"""
# Authors
# -------
# Xylar Asay-Davis
if relativePathSection is None:
relativePathSection = section
subDirectory = config.get(relativePathSection, relativePathOption)
if os.path.isabs(subDirectory):
fullPath = subDirectory
else:
fullPath = '{}/{}'.format(config.get(section, baseDirectoryOption),
subDirectory)
if defaultPath is not None and not os.path.exists(fullPath):
fullPath = defaultPath
return fullPath
def get_region_mask(config, regionMaskFile):
"""
Get the full path for a region mask with a given file name
Parameters
----------
config : mpas_tools.config.MpasConfigParser
configuration from which to read the path
regionMaskFile : str
the file name of the region mask, typically a relative path
Returns
-------
fullFileName : str
The absolute path to the given fileName within the custom or base
diagnostics directories
"""
# Authors
# -------
# Xylar Asay-Davis
if os.path.isabs(regionMaskFile):
fullFileName = regionMaskFile
else:
tryCustom = config.get('diagnostics', 'customDirectory') != 'none'
found = False
fullFileName = None
if tryCustom:
# first see if region mask file is in the custom directory
regionMaskDirectory = build_config_full_path(
config, 'diagnostics', 'regionMaskSubdirectory',
baseDirectoryOption='customDirectory')
fullFileName = '{}/{}'.format(regionMaskDirectory,
regionMaskFile)
found = os.path.exists(fullFileName)
if not found:
# no, so second see if mapping files are in the base directory
regionMaskDirectory = build_config_full_path(
config, 'diagnostics', 'regionMaskSubdirectory',
baseDirectoryOption='base_path')
fullFileName = '{}/{}'.format(regionMaskDirectory,
regionMaskFile)
found = os.path.exists(fullFileName)
if not found:
# still not found, point to a local mask directory
maskSubdirectory = build_config_full_path(config, 'output',
'maskSubdirectory')
make_directories(maskSubdirectory)
fullFileName = '{}/{}'.format(maskSubdirectory,
regionMaskFile)
return fullFileName
def build_obs_path(config, component, relativePathOption=None,
relativePathSection=None, relativePath=None):
"""
Parameters
----------
config : mpas_tools.config.MpasConfigParser
configuration from which to read the path
component : {'ocean', 'seaIce', 'iceberg'}
the prefix on the ``*Observations`` section in ``config``, which must
have an option ``obsSubdirectory``
relativePathOption : str, optional
the name of an option in `section` of the relative path within
``obsSubdirectory`` (or possibly an absolute path)
relativePathSection : str, optional
the name of a section for ``relativePathOption`` if not
``<component>Observations``
relativePath : str, optional
As an alternative to giving the option (and possibly section) of the
relative path, it can be supplied directly
Returns
-------
fullPath : str
The full path to the given relative path within the observations
directory for the given component
"""
# Authors
# -------
# Xylar Asay-Davis
obsSection = '{}Observations'.format(component)
if relativePath is None:
if relativePathSection is None:
relativePathSection = obsSection
relativePath = config.get(relativePathSection, relativePathOption)
if os.path.isabs(relativePath):
fullPath = relativePath
else:
obsSubdirectory = config.get(obsSection, 'obsSubdirectory')
if os.path.isabs(obsSubdirectory):
fullPath = '{}/{}'.format(obsSubdirectory, relativePath)
else:
basePath = config.get('diagnostics', 'customDirectory')
fullPath = '{}/{}/{}'.format(basePath, obsSubdirectory,
relativePath)
if basePath == 'none' or not os.path.exists(fullPath):
basePath = config.get('diagnostics', 'base_path')
fullPath = '{}/{}/{}'.format(basePath, obsSubdirectory,
relativePath)
return fullPath
[docs]def check_path_exists(path):
"""
Raise an exception if the given path does not exist.
Parameters
----------
path : str
Absolute path
Raises
------
OSError
If the path does not exist
"""
# Authors
# -------
# Xylar Asay-Davis
if not (os.path.isdir(path) or os.path.isfile(path)):
raise OSError('Path {} not found'.format(path))
def get_files_year_month(fileNames, streamsFile, streamName):
"""
Extract the year and month from file names associated with a stream
Parameters
----------
fileNames : list of str
The names of files with a year and month in their names.
streamsFile : ``StreamsFile``
The parsed streams file, used to get a template for the
streamName : str
The name of the stream with a file-name template for ``fileNames``
Returns
-------
years, months : list of int
The years and months for each file in ``fileNames``
"""
# Authors
# -------
# Xylar Asay-Davis
template = streamsFile.read_datetime_template(streamName)
template = os.path.basename(template)
dts = [datetime.strptime(os.path.basename(fileName), template) for
fileName in fileNames]
years = [dt.year for dt in dts]
months = [dt.month for dt in dts]
return years, months
def decode_strings(da):
"""
Decode to unicode strings an array that might either be char or string type
in the NetCDF file.
Parameters
----------
da : ``xarray.DataArray``
the data array of strings to decode
Returns
-------
strings : list
The data array as a list of unicode strings
"""
# Authors
# -------
# Xylar Asay-Davis
if da.dtype.type is numpy.string_:
strings = [bytes.decode(name) for name in da.values]
else:
strings = [name for name in da.values]
return strings
def copyfile(src, dst):
""" Copy a file, retrying if temporarily unavailable """
try:
shutil.copyfile(src, dst)
except BlockingIOError:
# this is an occasional problem on Chrysalis. Try a slow copy
warnings.warn('Making a slow copy from {} to {}'.format(src, dst))
with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
shutil.copyfileobj(fsrc, fdst)