# Copyright 2025–2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Tools for reading, writing, and validating MAFw TOML steering files.
:Author: Bulgheroni Antonio
:Description: Utilities to generate and load TOML steering files and related helpers.
"""
import datetime
import logging
import os
import re
from pathlib import Path, PosixPath, WindowsPath
from typing import Any, Mapping, cast
import tomlkit
from tomlkit import TOMLDocument, boolean, comment, document, item, nl, table
from tomlkit.exceptions import ConvertError
from tomlkit.items import Item, String, StringType
from tomlkit.toml_file import TOMLFile
import mafw.mafw_errors
from mafw.__about__ import __version__ as version
from mafw.db.db_configurations import default_conf
from mafw.lazy_import import LazyImportProcessor, ProcessorClassProtocol
from mafw.processor import Processor
from mafw.steering.builder import SteeringBuilder, ValidationLevel
log = logging.getLogger(__name__)
ENV_PATTERN = re.compile(
r"""
\$\{ # opening ${
(?P<name>[A-Za-z_][A-Za-z0-9_]*) # variable name
(?:
(?P<op>:-|:\?) # operator (:- or :?)
(?P<value>[^}]*) # default or error message
)?
\} # closing }
""",
re.VERBOSE,
)
"""Regex matching supported environment variable expansion patterns."""
ENV_ESCAPE_SENTINEL = '__MAFW_ENV_ESCAPE__{'
"""Sentinel used to preserve escaped variable patterns."""
MAX_ENV_RESOLUTION_PASSES = 10
"""Maximum number of expansion passes applied to a single string."""
[docs]
class PathItem(String):
"""TOML item representing a Path"""
[docs]
def unwrap(self) -> Path: # type: ignore[override] # do not know how to do it
return Path(super().unwrap())
[docs]
def path_encoder(obj: Any) -> Item:
"""Encoder for PathItem."""
if isinstance(obj, PosixPath):
return PathItem.from_raw(str(obj), type_=StringType.SLB, escape=False)
elif isinstance(obj, WindowsPath):
return PathItem.from_raw(str(obj), type_=StringType.SLL, escape=False)
else:
raise ConvertError
tomlkit.register_encoder(path_encoder)
[docs]
def generate_steering_file(
output_file: Path | str,
processors: list[ProcessorClassProtocol] | ProcessorClassProtocol,
database_conf: dict[str, Any] | None = None,
db_engine: str = 'sqlite',
) -> None:
"""
Generates a steering file.
:param output_file: The output filename where the steering file will be save.
:type output_file: Path | str
:param processors: The processors list for which the steering file will be generated.
:type processors: list[type[Processor] | Processor], type[Processor], Processor
:param database_conf: The database configuration dictionary
:type database_conf: dict, Optional
:param db_engine: A string representing the DB engine to be used. Possible values are: *sqlite*, *postgresql*
and *mysql*.
:type: str
"""
if isinstance(output_file, str):
output_file = Path(output_file)
doc = _new_toml_doc()
doc = _add_db_configuration(database_conf, db_engine=db_engine, doc=doc)
doc = _add_processor_parameters_to_toml_doc(processors, doc)
doc = _add_user_interface_configuration(doc)
with open(output_file, 'w') as fp:
tomlkit.dump(doc, fp)
def _new_toml_doc() -> TOMLDocument:
doc = document()
doc.add(comment(f'MAFw steering file generated on {datetime.datetime.now()}'))
doc.add(nl())
doc.add(
comment('uncomment the line below and insert the processors you want to run from the available processor list')
)
doc.add(comment('processors_to_run = []'))
doc.add(nl())
doc.add(comment('customise the name of the analysis'))
doc.add('analysis_name', String.from_raw('mafw analysis', StringType.SLB))
doc.add('analysis_description', String.from_raw('Summing up numbers', StringType.MLB))
doc.add('new_only', boolean('true'))
doc.add('mafw_version', String.from_raw(version, StringType.SLB))
doc.add('create_standard_tables', boolean('true'))
return doc
[docs]
def _add_db_configuration(
database_conf: dict[str, Any] | None = None, db_engine: str = 'sqlite', doc: TOMLDocument | None = None
) -> TOMLDocument:
"""Add the DB configuration to the TOML document
The expected structure of the database_conf dictionary is one of these:
.. code-block:: python
option1 = {
'DBConfiguration': {
'URL': 'sqlite:///:memory:',
'parameters': {
'sqlite': {
'pragmas': {
'journal_mode': 'wal',
'cache_size': -64000,
'foreign_keys': 1,
'synchronous': 0,
},
},
},
}
}
option2 = {
'URL': 'sqlite:///:memory:',
'authentication': {
'method': 'env',
'username': 'POSTGRES_USER',
'password': 'POSTGRES_PASS',
},
'parameters': {
'postgresql': {
'sslmode': 'require',
},
},
}
:param database_conf: A dictionary with the database configuration. See comments above. If None, then the default
is used.
:type database_conf: dict
:param db_engine: The database engine. It is used only in case the provided database configuration is invalid to
retrieve the default configuration. Defaults to sqlite.
:type db_engine: str, Optional
:param doc: The TOML document to add the DB configuration. If None, one will be created.
:type doc: TOMLDocument, Optional
:return: The modified document.
:rtype: TOMLDocument
:raises UnknownDBEngine: if the `database_conf` is invalid and the db_engine is not yet implemented.
"""
if doc is None:
doc = _new_toml_doc()
if database_conf is None:
if db_engine in default_conf:
database_conf = default_conf[db_engine]
else:
log.critical('The provided db_engine (%s) is not yet implemented', db_engine)
raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented')
is_conf_valid = True
if 'DBConfiguration' in database_conf:
# it should be option 1. let's check if there is the URL that is required.
if 'URL' not in database_conf['DBConfiguration']:
# no URL
is_conf_valid = False
else:
database_conf = cast(dict[str, Any], database_conf['DBConfiguration'])
else:
# option 2
if 'URL' not in database_conf:
# no URL
is_conf_valid = False
if not is_conf_valid:
log.error('The provided database configuration is invalid. Adding default configuration')
if db_engine not in default_conf:
log.critical('The provided db_engine (%s) is not yet implemented', db_engine)
raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented')
database_conf = default_conf[db_engine]
db_table = table()
for key, value in database_conf.items():
if key == 'authentication' and isinstance(value, dict):
auth_table = table()
auth_table.comment('Select auth method; see documentation placeholder link: DOC_LINK_PLACEHOLDER')
for auth_key, auth_value in value.items():
auth_table[auth_key] = auth_value
db_table['authentication'] = auth_table
continue
if key == 'parameters' and isinstance(value, dict):
params_table = table()
for backend, params in value.items():
backend_table = table()
if isinstance(params, dict):
for param_key, param_value in params.items():
if param_key == 'pragmas' and isinstance(param_value, dict):
pragmas_table = table()
pragmas_table.comment('Leave these default values, unless you know what you are doing!')
for pragma_key, pragma_value in param_value.items():
pragmas_table[pragma_key] = pragma_value
backend_table['pragmas'] = pragmas_table
else:
backend_table[param_key] = param_value
params_table[backend] = backend_table
db_table['parameters'] = params_table
continue
db_table[key] = value
if key == 'URL':
db_table[key].comment(
'Change the protocol depending on the DB type. Update this file to the path of your DB.'
)
if key == 'pragmas':
db_table[key].comment('Leave these default values, unless you know what you are doing!')
doc.add('DBConfiguration', db_table)
doc.add(nl())
return doc
def _add_processor_parameters_to_toml_doc(
processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, doc: TOMLDocument | None = None
) -> TOMLDocument:
if not isinstance(processors, list):
processors = [processors]
if not processor_validator(processors):
raise TypeError('Only processor instances and classes can be accepted')
if doc is None:
doc = _new_toml_doc()
# add an array with all available processors
proc_names = []
for processor in processors:
if isinstance(processor, LazyImportProcessor):
proc_names.append(processor.plugin_name)
elif isinstance(processor, Processor):
proc_names.append(processor.name)
else:
proc_names.append(processor.__name__)
doc.add('available_processors', item(proc_names))
doc.add(nl())
for p_item in processors:
if isinstance(p_item, LazyImportProcessor):
processor_cls = p_item._load()
section_name = processor_cls.__name__
docstring = processor_cls.__doc__
elif isinstance(p_item, Processor):
processor_cls = p_item.__class__
section_name = p_item.name
docstring = p_item.__doc__
else:
processor_cls = cast(type[Processor], p_item)
section_name = processor_cls.__name__
docstring = processor_cls.__doc__
# create a table for the current processor
p_table = table()
if docstring:
lines = docstring.splitlines()
for line in lines:
line = line.strip()
if line:
p_table.comment(line)
break
for schema in processor_cls.parameter_schema():
p_table[schema.name] = schema.default
if schema.help:
# starting from tomlkit 0.15, item can return an union type between Item and OutOfOrderTableProxy
cast(Item, p_table.value.item(schema.name)).comment(schema.help)
doc.add(section_name, p_table)
doc.add(nl())
return doc
[docs]
def processor_validator(processors: list[ProcessorClassProtocol]) -> bool:
"""
Validates that all items in the list are valid processor instances or classes.
:param processors: The list of items to be validated.
:type processors: list[type[Processor] | Processor]
:return: True if all items are valid.
:rtype: bool
"""
return all([isinstance(p, (Processor, type(Processor), LazyImportProcessor)) for p in processors])
[docs]
def dump_processor_parameters_to_toml(
processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, output_file: Path | str
) -> None:
"""
Dumps a toml file with processor parameters.
This helper function can be used when the parameters of one or many processors have to be dumped to a TOML file.
For each Processor in the `processors` a table in the TOML file will be added with their parameters is the shape of
parameter name = value.
It must be noted that `processors` can be:
- a list of processor classes (list[type[Processor]])
- a list of processor instances (list[Processor]])
- one single processor class (type[Processor])
- one single processor instance (Processor)
What value of the parameters will be dumped?
--------------------------------------------
Good question, have a look at this :ref:`explanation <parameter_dump>`.
:param processors: One or more processors for which the parameters should be dumped.
:type processors: list[type[Processor | Processor]] | type[Processor] | Processor
:param output_file: The name of the output file for the dump.
:type output_file: Path | str
:raise KeyAlreadyPresent: if an attempt to add twice, the same processor is made.
:raise TypeError: if the list contains items different from Processor classes and instances.
"""
doc = _add_processor_parameters_to_toml_doc(processors)
with open(output_file, 'w') as fp:
tomlkit.dump(doc, fp)
def _add_user_interface_configuration(doc: TOMLDocument | None = None) -> TOMLDocument:
if doc is None:
doc = _new_toml_doc()
ui_table = table()
ui_table.comment('Specify UI options')
ui_table['interface'] = 'rich'
ui_table['interface'].comment('Default "rich", backup "console"')
doc.add('UserInterface', ui_table)
return doc
[docs]
def load_steering_file_legacy(steering_file: Path | str) -> dict[str, Any]:
"""
Load a steering file without any semantic validation.
:param steering_file: The path to the steering file.
:type steering_file: Path, str
:return: The parsed steering dictionary.
:rtype: dict
"""
if isinstance(steering_file, str):
steering_file = Path(steering_file)
doc = TOMLFile(steering_file).read()
return doc.value
[docs]
def load_steering_file(
steering_file: Path | str, validation_level: ValidationLevel | None = ValidationLevel.SEMANTIC
) -> dict[str, Any]:
"""
Load a steering file for the execution framework.
:param steering_file: The path to the steering file.
:type steering_file: Path, str
:param validation_level: Requested validation tier, or ``None`` to skip validation.
:return: The configuration dictionary.
:rtype: dict
:raise mafw.mafw_errors.InvalidSteeringFile: if the validation level reports at least one issue.
"""
builder = SteeringBuilder.from_toml(steering_file)
if validation_level is not None:
issues = builder.validate(validation_level)
if issues:
raise issues[0]
return resolve_config_env(builder.to_config_dict())
[docs]
def resolve_config_env(config: dict[str, Any], env: Mapping[str, str] | None = None) -> dict[str, Any]:
"""
Resolve environment variables in every string value of a configuration dictionary.
:param config: Configuration dictionary to resolve.
:type config: dict[str, Any]
:param env: Optional environment mapping; defaults to ``os.environ``.
:type env: Mapping[str, str] | None
:return: A new configuration dictionary with resolved values.
:rtype: dict[str, Any]
:raises ValueError: If a required variable is missing or expansion does not converge.
"""
if env is None:
env = os.environ
return cast(dict[str, Any], _resolve_value(config, env))
def _resolve_value(value: Any, env: Mapping[str, str]) -> Any:
if isinstance(value, str):
return resolve_string(value, env)
if isinstance(value, dict):
return {key: _resolve_value(item, env) for key, item in value.items()}
if isinstance(value, list):
return [_resolve_value(item, env) for item in value]
return value
[docs]
def resolve_string(value: str, env: Mapping[str, str]) -> str:
"""
Resolve environment variables within a string value.
:param value: Input string to resolve.
:type value: str
:param env: Environment mapping to use for substitution.
:type env: Mapping[str, str]
:return: The resolved string.
:rtype: str
:raises ValueError: If a required variable is missing or expansion does not converge.
"""
escaped = value.replace(r'\${', ENV_ESCAPE_SENTINEL)
for _ in range(MAX_ENV_RESOLUTION_PASSES):
if not ENV_PATTERN.search(escaped):
break
escaped = ENV_PATTERN.sub(lambda match: _resolve_match(match, env), escaped)
else:
raise ValueError('Environment variable expansion did not converge.')
return escaped.replace(ENV_ESCAPE_SENTINEL, '${')
def _resolve_match(match: re.Match[str], env: Mapping[str, str]) -> str:
name = match.group('name')
op = match.group('op')
value = match.group('value') or ''
if name in env:
return env[name]
if op == ':-':
return value
if op == ':?':
raise ValueError(value or f'{name} is required')
raise ValueError(f"Environment variable '{name}' not set")