"""
Arguments processing
This module contains the internal machinery that processes environment variables, configuration
files and command-line parameters.
As of March 22, 2022, configpile is still pretty much influenced by :mod:`argparse`, and the
machinery below supports a subset of :mod:`argparse` functionality. Later on, we may cut ties
with :mod:`argparse`, add our own help/usage message writing, our own Sphinx extension and
encourage extending those processing classes.
.. rubric:: Types
This module uses the following types.
.. py:data:: _Config
Type of the configuration dataclass to construct
"""
from __future__ import annotations
import argparse
import configparser
import sys
import warnings
from configparser import ConfigParser
from dataclasses import dataclass
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Dict,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
Type,
TypeVar,
Union,
)
from typing_extensions import Annotated, get_args, get_origin, get_type_hints
from .arg import Arg, Expander, Param
from .enums import SpecialAction
from .handlers import CLHandler, CLPos, CLSpecialAction, CLStdHandler, KVHandler
from .userr import Err, Res
from .util import ClassDoc, filter_types_single
if TYPE_CHECKING:
from .config import Config
_Config = TypeVar("_Config", bound="Config")
[docs]@dataclass
class State:
"""
Describes the (mutable) state of a configuration being parsed
"""
root_path: Optional[Path] #: Base path to use to resolve relative config file paths
instances: Dict[str, List[Any]] #: Contains the sequence of values for each parameter
config_files_to_process: List[Path] #: Contains a list of configuration files to process
special_action: Optional[SpecialAction] #: Contains a special action if flag was encountered
[docs] def append(self, key: str, value: Any) -> None:
"""
Appends a value to a parameter
No type checking is performed, be careful.
Args:
key: Parameter name
value: Value to append
"""
assert key in self.instances, f"{key} is not a Param name"
self.instances[key] = [*self.instances[key], value]
[docs] @staticmethod
def make(root_path: Optional[Path], params: Iterable[Param[Any]]) -> State:
"""
Creates the initial state, populated with the default values when present
Args:
params: Sequence of parameters
Raises:
ValueError: If a default value cannot be parsed correctly
Returns:
The initial mutable state
"""
instances: Dict[str, List[Any]] = {}
for p in params:
assert p.name is not None, "Arguments have names after initialization"
if p.default_value is not None:
res = p.parser.parse(p.default_value)
if isinstance(res, Err):
raise ValueError(f"Invalid default {p.default_value} for parameter {p.name}")
instances[p.name] = [res]
else:
instances[p.name] = []
return State(root_path, instances, config_files_to_process=[], special_action=None)
[docs]@dataclass(frozen=True)
class IniProcessor:
"""
INI configuration file processor
"""
section_strict: Mapping[str, bool] #: Sections and their strictness
kv_handlers: Mapping[str, KVHandler] #: Handler for key/value pairs
def _process(self, parser: ConfigParser, state: State) -> Sequence[Err]:
"""
Processes a filled ConfigParser
Args:
parser: INI data to parse
state: Mutable state to update
Returns:
Errors that occurred, if any
"""
errors: List[Err] = []
try:
for section_name in parser.sections():
if section_name in self.section_strict:
for key, value in parser[section_name].items():
err: Optional[Err] = None
if key in self.kv_handlers:
res = self.kv_handlers[key].handle(value, state)
if isinstance(res, Err):
err = res
else:
if self.section_strict[section_name]:
err = Err.make(f"Unknown key {key}")
if err is not None:
errors.append(err.in_context(ini_section=section_name))
except configparser.Error:
errors.append(Err.make("Parse error"))
except IOError:
errors.append(Err.make("IO Error"))
return errors
[docs] def process_string(self, ini_contents: str, state: State) -> Optional[Err]:
"""
Processes a configuration file given as a string
Args:
ini_contents: Contents of the INI file
state: Mutable state to update
Returns:
An optional error
"""
errors: List[Err] = []
parser = ConfigParser()
try:
parser.read_string(ini_contents)
errors.extend(self._process(parser, state))
except configparser.Error:
errors.append(Err.make("Parse error"))
except IOError:
errors.append(Err.make("IO Error"))
if errors:
return Err.collect(*errors)
else:
return None
[docs] def process(self, ini_file_path: Path, state: State) -> Optional[Err]:
"""
Processes a configuration file
Args:
ini_path: Path to the INI file
state: Mutable state to update
Returns:
An optional error
"""
errors: List[Err] = []
if not ini_file_path.exists():
return Err.make(f"Config file {ini_file_path} does not exist")
if not ini_file_path.is_file():
return Err.make(f"Path {ini_file_path} is not a file")
parser = ConfigParser()
# disable conversion to lower-case
parser.optionxform = str # type: ignore
try:
with open(ini_file_path, "r", encoding="utf-8") as file:
parser.read_file(file)
errors.extend(self._process(parser, state))
except configparser.Error:
errors.append(Err.make("Parse error"))
except IOError:
errors.append(Err.make("IO Error"))
if errors:
return Err.collect(*errors)
else:
return None
[docs]@dataclass
class ProcessorFactory(Generic[_Config]):
"""
Describes a processor in construction
This factory is passed to the different arguments present in the configuration.
"""
#: List of parameters indexed by their field name
params_by_name: Dict[str, Param[Any]]
#: Argument parser to update, used to display help and for the Sphinx documentation
argument_parser: argparse.ArgumentParser
#: Argument parser group for commands
ap_commands: argparse._ArgumentGroup
#: Argument parser group for required parameters
ap_required: argparse._ArgumentGroup
#: Argument parser group for optional parameters
ap_optional: argparse._ArgumentGroup
#: Handlers for environment variables
env_handlers: Dict[str, KVHandler] # = {}
#: List of INI sections with their corresponding strictness
ini_section_strict: Dict[str, bool]
#: List of handlers for key/value pairs present in INI files
ini_handlers: Dict[str, KVHandler]
#: List of command line flag handlers
cl_flag_handlers: Dict[str, CLHandler]
#: List of positional arguments
cl_positionals: List[Param[Any]]
validators: List[Callable[[_Config], Optional[Err]]]
@staticmethod
def _trim_docstring(docstring: str) -> str:
"""Trims a docstring
Args:
docstring: Docstring to trim according to PEP 257
Returns:
The trimmed docstring
"""
if not docstring:
return ""
# Convert tabs to spaces (following the normal Python rules)
# and split into a list of lines:
lines = docstring.expandtabs().splitlines()
# Determine minimum indentation (first line doesn't count):
indent = sys.maxsize
for line in lines[1:]:
stripped = line.lstrip()
if stripped:
indent = min(indent, len(line) - len(stripped))
# Remove indentation (first line is special):
trimmed = [lines[0].strip()]
if indent < sys.maxsize:
for line in lines[1:]:
trimmed.append(line[indent:].rstrip())
# Strip off trailing and leading blank lines:
while trimmed and not trimmed[-1]:
trimmed.pop()
while trimmed and not trimmed[0]:
trimmed.pop(0)
# Return a single string:
return "\n".join(trimmed)
[docs] @staticmethod
def make(config_type: Type[_Config]) -> ProcessorFactory[_Config]:
"""
Constructs an empty processor factory
Args:
config_type: Configuration to process
Returns:
A processor factory
"""
# fill program name from script invocation
prog = config_type.prog_
if prog is None:
prog = sys.argv[0]
# fill description from class docstring
description: Optional[str] = config_type.description_
if description is None:
description = config_type.__doc__
if description is not None:
description = ProcessorFactory._trim_docstring(description)
argument_parser = argparse.ArgumentParser(
prog=prog,
description=description,
formatter_class=argparse.RawTextHelpFormatter,
add_help=False,
)
argument_parser._action_groups.pop() # pylint: disable=protected-access
return ProcessorFactory(
params_by_name={},
argument_parser=argument_parser,
ap_commands=argument_parser.add_argument_group("commands"),
ap_optional=argument_parser.add_argument_group("optional arguments"),
ap_required=argument_parser.add_argument_group("required arguments"),
env_handlers={},
ini_section_strict={s.name: s.strict for s in config_type.ini_sections_()},
ini_handlers={},
cl_flag_handlers={},
cl_positionals=[],
validators=[*config_type.validators_()],
)
[docs]@dataclass(frozen=True)
class Processor(Generic[_Config]):
"""
Configuration processor
"""
#: Configuration to parse
config_type: Type[_Config]
#: Completed argument parser, used only for documentation purposes (CLI and Sphinx)
argument_parser: argparse.ArgumentParser
#: Environment variable handlers
env_handlers: Mapping[str, KVHandler]
#: INI file processor
ini_processor: IniProcessor
#: Command line arguments handler
cl_handler: CLStdHandler
#: Dictionary of parameters by field name
params_by_name: Mapping[str, Param[Any]]
validators: Sequence[Callable[[_Config], Optional[Err]]]
[docs] @staticmethod
def process_fields(config_type: Type[_Config]) -> Sequence[Arg]:
"""
Returns a sequence of the arguments present in a configuration, with updated data
Args:
config_type: Configuration to process
Returns:
Sequence of arguments
"""
args: List[Arg] = []
docs: ClassDoc[_Config] = ClassDoc.make(config_type)
th = get_type_hints(config_type, include_extras=True)
for name, typ in th.items():
arg: Optional[Arg] = None
if get_origin(typ) is ClassVar:
a = getattr(config_type, name)
if isinstance(a, Arg):
assert isinstance(a, Expander), "Only commands (Cmd) can be class attributes"
arg = a
if get_origin(typ) is Annotated:
param = filter_types_single(Param, get_args(typ))
if param is not None:
arg = param
if arg is not None:
help_lines = docs[name]
if help_lines is None:
hlp = ""
else:
hlp = "\n".join(help_lines)
arg = arg.updated(name, hlp, config_type.env_prefix_)
args.append(arg)
return args
[docs] @staticmethod
def make(
config_type: Type[_Config],
) -> Processor[_Config]:
"""
Creates the processor corresponding to a configuration
"""
pf = ProcessorFactory.make(config_type)
for arg in Processor.process_fields(config_type):
arg.update_processor(pf)
# if these flags are no longer provided by default, update the overview concept notebook
# in the documentation
pf.cl_flag_handlers["-h"] = CLSpecialAction(SpecialAction.HELP)
pf.cl_flag_handlers["--help"] = CLSpecialAction(SpecialAction.HELP)
return Processor(
config_type=config_type,
argument_parser=pf.argument_parser,
env_handlers=pf.env_handlers,
ini_processor=IniProcessor(pf.ini_section_strict, pf.ini_handlers),
cl_handler=CLStdHandler(pf.cl_flag_handlers, CLPos(pf.cl_positionals)),
params_by_name=pf.params_by_name,
validators=pf.validators,
)
def _process_config(self, state: State) -> Optional[Err]:
"""
Processes configuration files if such processing was requested by a handler
Args:
state: Mutable state to update
Returns:
An optional error
"""
paths = state.config_files_to_process
state.config_files_to_process = []
errors: List[Err] = []
for p in paths:
err: Optional[Err] = None
config_path: Optional[Path] = None
if p.is_absolute():
config_path = p
else:
if state.root_path is None:
err = Err.make(
"Relative configuration file path given with no root path known"
)
else:
config_path = state.root_path / p
if config_path is not None:
err = self.ini_processor.process(config_path, state)
if err is not None:
errors.append(err.in_context(ini_file=p))
return Err.collect(*errors)
[docs] def process_ini_contents(self, ini_contents: str) -> Res[_Config]:
"""
Processes the configuration in INI format contained in a string
Args:
ini_contents: Multiline string containing information in INI format
Returns:
The parsed configuration or an error
"""
state = self._state_with_default_values(None)
err = self.ini_processor.process_string(ini_contents, state)
if err is not None:
return err
return self._finish_processing_state(state)
[docs] def process_ini_file(self, ini_file_path: Path) -> Res[_Config]:
"""
Processes the configuration contained in an INI file
Args:
ini_file_path: Path to the file to parse
Returns:
The parsed configuration or an error
"""
state = self._state_with_default_values(None)
err = self.ini_processor.process(ini_file_path, state)
if err is not None:
return err
return self._finish_processing_state(state)
[docs] def process(
self,
cwd: Path,
args: Sequence[str],
env: Mapping[str, str],
) -> Res[Union[_Config, SpecialAction]]:
"""
Processes command-line arguments (deprecated)
See also: :meth:`.process_command_line`
"""
warnings.warn(
"process has been deprecated, use process_command_line instead",
DeprecationWarning,
stacklevel=2,
)
return self.process_command_line(cwd, args, env)
def _finish_processing_state(self, state: State) -> Res[_Config]:
"""
Finishes the processing of the state
This method performs:
- the collection of parameter values
- the final validation using validation methods
Args:
state: State after parsing all elements, must have :attr:`.State.special_action` set
to :data:`None`
Returns:
The parsed configuration or an error
"""
assert state.special_action is None
errors: List[Err] = []
collected: Dict[str, Any] = {}
for name, param in self.params_by_name.items():
instances = state.instances[name]
res = param.collector.collect(instances)
if isinstance(res, Err):
errors.append(res.in_context(param=name))
else:
collected[name] = res
if errors:
return Err.collect1(*errors)
c: _Config = self.config_type(**collected)
validation_error: Optional[Err] = Err.collect(*[f(c) for f in self.validators])
if validation_error is not None:
return validation_error
else:
return c
def _state_with_default_values(self, root_path: Optional[Path]) -> State:
"""
Returns a new state instance with default values populated
Args:
root_path: Optional root path used to resolve configuration file paths
"""
return State.make(root_path, self.params_by_name.values())
[docs] def process_command_line(
self,
cwd: Path,
args: Sequence[str],
env: Mapping[str, str],
) -> Res[Union[_Config, SpecialAction]]:
"""
Processes command-line arguments, configuration files and environment variables
Args:
cwd: Working directory, used as a base for configuration file relative paths
args: Command line arguments to parse
env: Environment variables
Returns:
Either a parsed configuration, a special action to execute, or (a list of) errors
"""
errors: List[Err] = []
state = self._state_with_default_values(cwd)
# process environment variables
for key, value in env.items():
handler = self.env_handlers.get(key)
if handler is not None:
err = handler.handle(value, state)
if err is not None:
errors.append(err.in_context(environment_variable=key))
err = self._process_config(state)
if err is not None:
errors.append(err.in_context(environment_variable=key))
# process command line arguments
rest_args: Sequence[str] = args
while rest_args:
rest_args, err = self.cl_handler.handle(rest_args, state)
if err is not None:
errors.append(err)
err = self._process_config(state)
if err is not None:
errors.append(err)
if state.special_action is not None:
return state.special_action
if errors:
return Err.collect1(*errors)
return self._finish_processing_state(state)