from configparser import RawConfigParser, ConfigParser, ExtendedInterpolation
import os
import inspect
import sys
import numpy as np
import ast
from io import StringIO
try:
from importlib.resources import files as imp_res_files
except ImportError:
# python<=3.8
from importlib_resources import files as imp_res_files
[docs]class MpasConfigParser:
"""
A "meta" config parser that keeps a dictionary of config parsers and their
sources to combine when needed. The custom config parser allows provenance
of the source of different config options and allows the "user" config
options to always take precedence over other config options (even if they
are added later).
Attributes
----------
combined : {None, configparser.ConfigParser}
The combined config options
combined_comments : {None, dict}
The combined comments associated with sections and options
sources : {None, dict}
The source of each section or option
"""
_np_allowed = dict(linspace=np.linspace, xrange=range,
range=range, array=np.array, arange=np.arange,
pi=np.pi, Pi=np.pi, int=int, __builtins__=None)
[docs] def __init__(self):
"""
Make a new (empty) config parser
"""
self._configs = dict()
self._user_config = dict()
self._comments = dict()
self.combined = None
self.combined_comments = None
self.sources = None
[docs] def add_user_config(self, filename):
"""
Add a the contents of a user config file to the parser. These options
take precedence over all other options.
Parameters
----------
filename : str
The relative or absolute path to the config file
"""
self._add(filename, user=True)
[docs] def add_from_file(self, filename):
"""
Add the contents of a config file to the parser.
Parameters
----------
filename : str
The relative or absolute path to the config file
"""
self._add(filename, user=False)
[docs] def add_from_package(self, package, config_filename, exception=True):
"""
Add the contents of a config file to the parser.
Parameters
----------
package : str or Package
The package where ``config_filename`` is found
config_filename : str
The name of the config file to add
exception : bool, optional
Whether to raise an exception if the config file isn't found
"""
try:
path = imp_res_files(package) / config_filename
self._add(path, user=False)
except (ModuleNotFoundError, FileNotFoundError, TypeError):
if exception:
raise
[docs] def get(self, section, option):
"""
Get an option value for a given section.
Parameters
----------
section : str
The name of the config section
option : str
The name of the config option
Returns
-------
value : str
The value of the config option
"""
if self.combined is None:
self.combine()
return self.combined.get(section, option)
[docs] def getint(self, section, option):
"""
Get an option integer value for a given section.
Parameters
----------
section : str
The name of the config section
option : str
The name of the config option
Returns
-------
value : int
The value of the config option
"""
if self.combined is None:
self.combine()
return self.combined.getint(section, option)
[docs] def getfloat(self, section, option):
"""
Get an option float value for a given section.
Parameters
----------
section : str
The name of the config section
option : str
The name of the config option
Returns
-------
value : float
The value of the config option
"""
if self.combined is None:
self.combine()
return self.combined.getfloat(section, option)
[docs] def getboolean(self, section, option):
"""
Get an option boolean value for a given section.
Parameters
----------
section : str
The name of the config section
option : str
The name of the config option
Returns
-------
value : bool
The value of the config option
"""
if self.combined is None:
self.combine()
return self.combined.getboolean(section, option)
[docs] def getlist(self, section, option, dtype=str):
"""
Get an option value as a list for a given section.
Parameters
----------
section : str
The name of the config section
option : str
The name of the config option
dtype : {Type[str], Type[int], Type[float]}
The type of the elements in the list
Returns
-------
value : list
The value of the config option parsed into a list
"""
values = self.get(section, option)
values = [dtype(value) for value in values.replace(',', ' ').split()]
return values
[docs] def getexpression(self, section, option, dtype=None, use_numpyfunc=False):
"""
Get an option as an expression (typically a list, though tuples and
dicts are also available). The expression is required to have valid
python syntax, so that string entries are required to be in single or
double quotes.
Parameters
----------
section : str
The section in the config file
option : str
The option in the config file
dtype : {Type[bool], Type[int], Type[float], Type[list], Type[tuple], Type[str]}, optional
If supplied, each element in a list or tuple, or
each value in a dictionary are cast to this type. This is likely
most useful for ensuring that all elements in a list of numbers are
of type float, rather than int, when the distinction is important.
use_numpyfunc : bool, optional
If ``True``, the expression is evaluated including functionality
from the numpy package (which can be referenced either as ``numpy``
or ``np``).
"""
expression_string = self.get(section, option)
if use_numpyfunc:
assert '__' not in expression_string, \
f'"__" is not allowed in {expression_string} ' \
f'for use_numpyfunc=True'
sanitized_str = expression_string.replace('np.', '') \
.replace('numpy.', '')
result = eval(sanitized_str, MpasConfigParser._np_allowed)
else:
result = ast.literal_eval(expression_string)
if dtype is not None:
if isinstance(result, list):
result = [dtype(element) for element in result]
elif isinstance(result, tuple):
result = (dtype(element) for element in result)
elif isinstance(result, dict):
for key in result:
result[key] = dtype(result[key])
return result
[docs] def has_section(self, section):
"""
Whether the given section is part of the config
Parameters
----------
section : str
The name of the config section
Returns
-------
found : bool
Whether the option was found in the section
"""
if self.combined is None:
self.combine()
return self.combined.has_section(section)
[docs] def has_option(self, section, option):
"""
Whether the given section has the given option
Parameters
----------
section : str
The name of the config section
option : str
The name of the config option
Returns
-------
found : bool
Whether the option was found in the section
"""
if self.combined is None:
self.combine()
return self.combined.has_option(section, option)
[docs] def set(self, section, option, value=None, comment=None, user=False):
"""
Set the value of the given option in the given section. The file from
which this function was called is also retained for provenance.
Parameters
----------
section : str
The name of the config section
option : str
The name of the config option
value : str, optional
The value to set the option to
comment : str, optional
A comment to include with the config option when it is written
to a file
user : bool, optional
Whether this config option was supplied by the user (e.g. through
a command-line flag) and should take priority over other sources
"""
option = option.lower()
calling_frame = inspect.stack(context=2)[1]
filename = os.path.abspath(calling_frame.filename)
if user:
config_dict = self._user_config
else:
config_dict = self._configs
if filename not in config_dict:
config_dict[filename] = RawConfigParser()
config = config_dict[filename]
if not config.has_section(section):
config.add_section(section)
config.set(section, option, value)
self.combined = None
self.combined_comments = None
self.sources = None
if filename not in self._comments:
self._comments[filename] = dict()
if comment is None:
comment = ''
else:
comment = ''.join([f'# {line}\n' for line in comment.split('\n')])
self._comments[filename][(section, option)] = comment
[docs] def write(self, fp, include_sources=True, include_comments=True):
"""
Write the config options to the given file pointer.
Parameters
----------
fp : typing.TestIO
The file pointer to write to.
include_sources : bool, optional
Whether to include a comment above each option indicating the
source file where it was defined
include_comments : bool, optional
Whether to include the original comments associated with each
section or option
"""
if self.combined is None:
self.combine()
for section in self.combined.sections():
section_items = self.combined.items(section=section)
if include_comments and section in self.combined_comments:
fp.write(self.combined_comments[section])
fp.write(f'[{section}]\n\n')
for option, value in section_items:
if include_comments:
fp.write(self.combined_comments[(section, option)])
if include_sources:
source = self.sources[(section, option)]
fp.write(f'# source: {source}\n')
value = str(value).replace('\n', '\n\t').replace('$', '$$')
fp.write(f'{option} = {value}\n\n')
fp.write('\n')
def list_files(self):
"""
Get a list of files contributing to the combined config options
Returns
-------
filenames : list of str
A list of file paths
"""
filenames = list(self._configs.keys()) + list(self._user_config.keys())
return filenames
[docs] def copy(self):
"""
Get a deep copy of the config parser
Returns
-------
config_copy : mpas_tools.config.MpasConfigParser
The deep copy
"""
config_copy = MpasConfigParser()
for filename, config in self._configs.items():
config_copy._configs[filename] = MpasConfigParser._deepcopy(config)
for filename, config in self._user_config.items():
config_copy._user_config[filename] = \
MpasConfigParser._deepcopy(config)
config_copy._comments = dict(self._comments)
return config_copy
[docs] def __getitem__(self, section):
"""
Get get the config options for a given section.
Parameters
----------
section : str
The name of the section to retrieve.
Returns
-------
section_proxy : configparser.SectionProxy
The config options for the given section.
"""
if self.combined is None:
self.combine()
return self.combined[section]
def _add(self, filename, user):
filename = os.path.abspath(filename)
config = RawConfigParser()
if not os.path.exists(filename):
raise FileNotFoundError(f'Config file does not exist: {filename}')
config.read(filenames=filename)
with open(filename) as fp:
comments = self._parse_comments(fp, filename, comments_before=True)
if user:
self._user_config[filename] = config
else:
self._configs[filename] = config
self._comments[filename] = comments
self.combined = None
self.combined_comments = None
self.sources = None
def combine(self):
"""
Combine the config files into one. This is normally handled
automatically.
"""
self.combined = ConfigParser(interpolation=ExtendedInterpolation())
self.sources = dict()
self.combined_comments = dict()
for configs in [self._configs, self._user_config]:
for source, config in configs.items():
for section in config.sections():
if section in self._comments[source]:
self.combined_comments[section] = \
self._comments[source][section]
if not self.combined.has_section(section):
self.combined.add_section(section)
for option, value in config.items(section):
self.sources[(section, option)] = source
self.combined.set(section, option, value)
self.combined_comments[(section, option)] = \
self._comments[source][(section, option)]
@staticmethod
def _parse_comments(fp, filename, comments_before=True):
""" Parse the comments in a config file into a dictionary """
comments = dict()
current_comment = ''
section_name = None
option_name = None
indent_level = 0
for line_number, line in enumerate(fp, start=1):
value = line.strip()
is_comment = value.startswith('#')
if is_comment:
current_comment = current_comment + line
if len(value) == 0 or is_comment:
# end of value
indent_level = sys.maxsize
continue
cur_indent_level = len(line) - len(line.lstrip())
is_continuation = cur_indent_level > indent_level
# a section header or option header?
if section_name is None or option_name is None or \
not is_continuation:
indent_level = cur_indent_level
# is it a section header?
is_section = value.startswith('[') and value.endswith(']')
if is_section:
if not comments_before:
if option_name is None:
comments[section_name] = current_comment
else:
comments[(section_name, option_name)] = \
current_comment
section_name = value[1:-1].strip()
option_name = None
if comments_before:
comments[section_name] = current_comment
current_comment = ''
# an option line?
else:
delimiter_index = value.find('=')
if delimiter_index == -1:
raise ValueError(f'Expected to find "=" on line '
f'{line_number} of {filename}')
if not comments_before:
if option_name is None:
comments[section_name] = current_comment
else:
comments[(section_name, option_name)] = \
current_comment
option_name = value[:delimiter_index].strip().lower()
if comments_before:
comments[(section_name, option_name)] = current_comment
current_comment = ''
return comments
@staticmethod
def _deepcopy(config):
""" Make a deep copy of the ConfigParser object """
config_string = StringIO()
config.write(config_string)
# We must reset the buffer to make it ready for reading.
config_string.seek(0)
new_config = ConfigParser()
new_config.read_file(config_string)
return new_config