# This software is open source software available under the BSD-3 license.
#
# Copyright (c) 2022 Triad National Security, LLC. All rights reserved.
# Copyright (c) 2022 Lawrence Livermore National Security, LLC. All rights
# reserved.
# Copyright (c) 2022 UT-Battelle, LLC. All rights reserved.
#
# Additional copyright and license information can be found in the LICENSE file
# distributed with this code, or at
# https://raw.githubusercontent.com/MPAS-Dev/MPAS-Analysis/main/LICENSE
"""
IO utility functions
Phillip J. Wolfram, Xylar Asay-Davis
"""
import glob
import os
import random
import string
from datetime import datetime
import numpy
import shutil
import warnings
[docs]
def paths(*args):
    """
    Returns glob'd paths in list for arbitrary number of function arguments.
    Note, each expanded set of paths is sorted.
    Parameters
    ----------
    *args : list
        A list of arguments to pass to ``glob.glob``
    Returns
    -------
    paths : list of str
        A list of file paths
    """
    # Authors
    # -------
    # Phillip J. Wolfram
    paths = []
    for aargs in args:
        paths += sorted(glob.glob(aargs))
    return paths 
def fingerprint_generator(size=12,
                          chars=string.ascii_uppercase + string.digits):
    """
    Returns a random string that can be used as a unique fingerprint
    Parameters
    ----------
    size : int, optional
        The number of characters in the fingerprint
    chars : list of char, optional
        The fingerprint
    Returns
    -------
    fingerprint : str
        A random string
    Reference
    ---------
    http://stackoverflow.com/questions/2257441/random-string-generation-with-upper-case-letters-and-digits-in-python
    """
    # Authors
    # -------
    # Phillip J. Wolfram
    return ''.join(random.choice(chars) for _ in range(size))
[docs]
def make_directories(path):
    """
    Make the given path if it does not already exist.
    Parameters
    ----------
    path : str
        the path to make
    Returns
    -------
    path : str
        the path unchanged
    """
    # Authors
    # -------
    # Xylar Asay-Davis
    try:
        os.makedirs(path)
    except OSError:
        pass
    return path 
[docs]
def build_config_full_path(config, section, relativePathOption,
                           relativePathSection=None,
                           defaultPath=None,
                           baseDirectoryOption='baseDirectory'):
    """
    Get a full path from a base directory and a relative path
    Parameters
    ----------
    config : tranche.Tranche
        configuration from which to read the path
    section : str
        the name of a section in `config`, which must have an option
        ``baseDirectory``
    relativePathOption : str
        the name of an option in ``section`` of the relative path within
        ``baseDirectory`` (or possibly an absolute path)
    relativePathSection : str, optional
        the name of a section for ``relativePathOption`` if not ``section``
    defaultPath : str, optional
        the name of a path to return if the resulting path doesn't exist.
    baseDirectoryOption : str, optional
        the name of the option in ``section`` for the base directorys
    Returns
    -------
    fullPath : str
        The full path to the given relative path within the given
        ``baseDirectory``
    """
    # Authors
    # -------
    # Xylar Asay-Davis
    if relativePathSection is None:
        relativePathSection = section
    subDirectory = config.get(relativePathSection, relativePathOption)
    if os.path.isabs(subDirectory):
        fullPath = subDirectory
    else:
        fullPath = '{}/{}'.format(config.get(section, baseDirectoryOption),
                                  subDirectory)
    if defaultPath is not None and not os.path.exists(fullPath):
        fullPath = defaultPath
    return fullPath 
def get_region_mask(config, regionMaskFile):
    """
    Get the full path for a region mask with a given file name
    Parameters
    ----------
    config : tranche.Tranche
        configuration from which to read the path
    regionMaskFile : str
        the file name of the region mask, typically a relative path
    Returns
    -------
    fullFileName : str
        The absolute path to the given fileName within the custom or base
        diagnostics directories
    """
    # Authors
    # -------
    # Xylar Asay-Davis
    if os.path.isabs(regionMaskFile):
        fullFileName = regionMaskFile
    else:
        tryCustom = config.get('diagnostics', 'customDirectory') != 'none'
        found = False
        fullFileName = None
        if tryCustom:
            # first see if region mask file is in the custom directory
            regionMaskDirectory = build_config_full_path(
                config, 'diagnostics', 'regionMaskSubdirectory',
                baseDirectoryOption='customDirectory')
            fullFileName = '{}/{}'.format(regionMaskDirectory,
                                          regionMaskFile)
            found = os.path.exists(fullFileName)
        if not found:
            # no, so second see if mapping files are in the base directory
            regionMaskDirectory = build_config_full_path(
                config, 'diagnostics', 'regionMaskSubdirectory',
                baseDirectoryOption='base_path')
            fullFileName = '{}/{}'.format(regionMaskDirectory,
                                          regionMaskFile)
            found = os.path.exists(fullFileName)
        if not found:
            # still not found, point to a local mask directory
            maskSubdirectory = build_config_full_path(config, 'output',
                                                      'maskSubdirectory')
            make_directories(maskSubdirectory)
            fullFileName = '{}/{}'.format(maskSubdirectory,
                                          regionMaskFile)
    return fullFileName
def build_obs_path(config, component, relativePathOption=None,
                   relativePathSection=None, relativePath=None):
    """
    Parameters
    ----------
    config : tranche.Tranche
        configuration from which to read the path
    component : {'ocean', 'seaIce', 'iceberg'}
        the prefix on the ``*Observations`` section in ``config``, which must
        have an option ``obsSubdirectory``
    relativePathOption : str, optional
        the name of an option in `section` of the relative path within
        ``obsSubdirectory`` (or possibly an absolute path)
    relativePathSection : str, optional
        the name of a section for ``relativePathOption`` if not
        ``<component>Observations``
    relativePath : str, optional
        As an alternative to giving the option (and possibly section) of the
        relative path, it can be supplied directly
    Returns
    -------
    fullPath : str
        The full path to the given relative path within the observations
        directory for the given component
    """
    # Authors
    # -------
    # Xylar Asay-Davis
    obsSection = '{}Observations'.format(component)
    if relativePath is None:
        if relativePathSection is None:
            relativePathSection = obsSection
        relativePath = config.get(relativePathSection, relativePathOption)
    if os.path.isabs(relativePath):
        fullPath = relativePath
    else:
        obsSubdirectory = config.get(obsSection, 'obsSubdirectory')
        if os.path.isabs(obsSubdirectory):
            fullPath = '{}/{}'.format(obsSubdirectory, relativePath)
        else:
            basePath = config.get('diagnostics', 'customDirectory')
            fullPath = '{}/{}/{}'.format(basePath, obsSubdirectory,
                                         relativePath)
            if basePath == 'none' or not os.path.exists(fullPath):
                basePath = config.get('diagnostics', 'base_path')
                fullPath = '{}/{}/{}'.format(basePath, obsSubdirectory,
                                             relativePath)
    return fullPath
[docs]
def check_path_exists(path):
    """
    Raise an exception if the given path does not exist.
    Parameters
    ----------
    path : str
        Absolute path
    Raises
    ------
    OSError
        If the path does not exist
    """
    # Authors
    # -------
    # Xylar Asay-Davis
    if not (os.path.isdir(path) or os.path.isfile(path)):
        raise OSError('Path {} not found'.format(path)) 
def get_files_year_month(fileNames, streamsFile, streamName):
    """
    Extract the year and month from file names associated with a stream
    Parameters
    ----------
    fileNames : list of str
        The names of files with a year and month in their names.
    streamsFile : ``StreamsFile``
        The parsed streams file, used to get a template for the
    streamName : str
        The name of the stream with a file-name template for ``fileNames``
    Returns
    -------
    years, months : list of int
        The years and months for each file in ``fileNames``
    """
    # Authors
    # -------
    # Xylar Asay-Davis
    template = streamsFile.read_datetime_template(streamName)
    template = os.path.basename(template)
    dts = [datetime.strptime(os.path.basename(fileName), template) for
           fileName in fileNames]
    years = [dt.year for dt in dts]
    months = [dt.month for dt in dts]
    return years, months
def decode_strings(da):
    """
    Decode to unicode strings an array that might either be char or string type
    in the NetCDF file.
    Parameters
    ----------
    da : ``xarray.DataArray``
        the data array of strings to decode
    Returns
    -------
    strings : list
        The data array as a list of unicode strings
    """
    # Authors
    # -------
    # Xylar Asay-Davis
    if da.dtype.type is numpy.bytes_:
        strings = [bytes.decode(name) for name in da.values]
    else:
        strings = [name for name in da.values]
    return strings
def copyfile(src, dst):
    """ Copy a file, retrying if temporarily unavailable """
    try:
        shutil.copyfile(src, dst)
    except BlockingIOError:
        # this is an occasional problem on Chrysalis.  Try a slow copy
        warnings.warn('Making a slow copy from {} to {}'.format(src, dst))
        with open(src, 'rb') as fsrc, open(dst, 'wb') as fdst:
            shutil.copyfileobj(fsrc, fdst)