Coverage for src / mafw / tools / toml_tools.py: 99%
212 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 09:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 09:03 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Tools for reading, writing, and validating MAFw TOML steering files.
7:Author: Bulgheroni Antonio
8:Description: Utilities to generate and load TOML steering files and related helpers.
9"""
11import datetime
12import logging
13import os
14import re
15from pathlib import Path, PosixPath, WindowsPath
16from typing import Any, Mapping, cast
18import tomlkit
19from tomlkit import TOMLDocument, boolean, comment, document, item, nl, table
20from tomlkit.exceptions import ConvertError
21from tomlkit.items import Item, String, StringType
22from tomlkit.toml_file import TOMLFile
24import mafw.mafw_errors
25from mafw.__about__ import __version__ as version
26from mafw.db.db_configurations import default_conf
27from mafw.lazy_import import LazyImportProcessor, ProcessorClassProtocol
28from mafw.processor import Processor
29from mafw.steering.builder import SteeringBuilder, ValidationLevel
31log = logging.getLogger(__name__)
33ENV_PATTERN = re.compile(
34 r"""
35 \$\{ # opening ${
36 (?P<name>[A-Za-z_][A-Za-z0-9_]*) # variable name
37 (?:
38 (?P<op>:-|:\?) # operator (:- or :?)
39 (?P<value>[^}]*) # default or error message
40 )?
41 \} # closing }
42 """,
43 re.VERBOSE,
44)
45"""Regex matching supported environment variable expansion patterns."""
47ENV_ESCAPE_SENTINEL = '__MAFW_ENV_ESCAPE__{'
48"""Sentinel used to preserve escaped variable patterns."""
50MAX_ENV_RESOLUTION_PASSES = 10
51"""Maximum number of expansion passes applied to a single string."""
54class PathItem(String):
55 """TOML item representing a Path"""
57 def unwrap(self) -> Path: # type: ignore[override] # do not know how to do it
58 return Path(super().unwrap())
61def path_encoder(obj: Any) -> Item:
62 """Encoder for PathItem."""
63 if isinstance(obj, PosixPath):
64 return PathItem.from_raw(str(obj), type_=StringType.SLB, escape=False)
65 elif isinstance(obj, WindowsPath):
66 return PathItem.from_raw(str(obj), type_=StringType.SLL, escape=False)
67 else:
68 raise ConvertError
71tomlkit.register_encoder(path_encoder)
74def generate_steering_file(
75 output_file: Path | str,
76 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol,
77 database_conf: dict[str, Any] | None = None,
78 db_engine: str = 'sqlite',
79) -> None:
80 """
81 Generates a steering file.
83 :param output_file: The output filename where the steering file will be save.
84 :type output_file: Path | str
85 :param processors: The processors list for which the steering file will be generated.
86 :type processors: list[type[Processor] | Processor], type[Processor], Processor
87 :param database_conf: The database configuration dictionary
88 :type database_conf: dict, Optional
89 :param db_engine: A string representing the DB engine to be used. Possible values are: *sqlite*, *postgresql*
90 and *mysql*.
91 :type: str
92 """
93 if isinstance(output_file, str):
94 output_file = Path(output_file)
96 doc = _new_toml_doc()
97 doc = _add_db_configuration(database_conf, db_engine=db_engine, doc=doc)
98 doc = _add_processor_parameters_to_toml_doc(processors, doc)
99 doc = _add_user_interface_configuration(doc)
101 with open(output_file, 'w') as fp:
102 tomlkit.dump(doc, fp)
105def _new_toml_doc() -> TOMLDocument:
106 doc = document()
107 doc.add(comment(f'MAFw steering file generated on {datetime.datetime.now()}'))
108 doc.add(nl())
109 doc.add(
110 comment('uncomment the line below and insert the processors you want to run from the available processor list')
111 )
112 doc.add(comment('processors_to_run = []'))
113 doc.add(nl())
114 doc.add(comment('customise the name of the analysis'))
115 doc.add('analysis_name', String.from_raw('mafw analysis', StringType.SLB))
116 doc.add('analysis_description', String.from_raw('Summing up numbers', StringType.MLB))
117 doc.add('new_only', boolean('true'))
118 doc.add('mafw_version', String.from_raw(version, StringType.SLB))
119 doc.add('create_standard_tables', boolean('true'))
120 return doc
123def _add_db_configuration(
124 database_conf: dict[str, Any] | None = None, db_engine: str = 'sqlite', doc: TOMLDocument | None = None
125) -> TOMLDocument:
126 """Add the DB configuration to the TOML document
128 The expected structure of the database_conf dictionary is one of these:
130 .. code-block:: python
132 option1 = {
133 'DBConfiguration': {
134 'URL': 'sqlite:///:memory:',
135 'parameters': {
136 'sqlite': {
137 'pragmas': {
138 'journal_mode': 'wal',
139 'cache_size': -64000,
140 'foreign_keys': 1,
141 'synchronous': 0,
142 },
143 },
144 },
145 }
146 }
148 option2 = {
149 'URL': 'sqlite:///:memory:',
150 'authentication': {
151 'method': 'env',
152 'username': 'POSTGRES_USER',
153 'password': 'POSTGRES_PASS',
154 },
155 'parameters': {
156 'postgresql': {
157 'sslmode': 'require',
158 },
159 },
160 }
162 :param database_conf: A dictionary with the database configuration. See comments above. If None, then the default
163 is used.
164 :type database_conf: dict
165 :param db_engine: The database engine. It is used only in case the provided database configuration is invalid to
166 retrieve the default configuration. Defaults to sqlite.
167 :type db_engine: str, Optional
168 :param doc: The TOML document to add the DB configuration. If None, one will be created.
169 :type doc: TOMLDocument, Optional
170 :return: The modified document.
171 :rtype: TOMLDocument
172 :raises UnknownDBEngine: if the `database_conf` is invalid and the db_engine is not yet implemented.
173 """
174 if doc is None:
175 doc = _new_toml_doc()
177 if database_conf is None:
178 if db_engine in default_conf:
179 database_conf = default_conf[db_engine]
180 else:
181 log.critical('The provided db_engine (%s) is not yet implemented', db_engine)
182 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented')
184 is_conf_valid = True
185 if 'DBConfiguration' in database_conf:
186 # it should be option 1. let's check if there is the URL that is required.
187 if 'URL' not in database_conf['DBConfiguration']:
188 # no URL
189 is_conf_valid = False
190 else:
191 database_conf = cast(dict[str, Any], database_conf['DBConfiguration'])
192 else:
193 # option 2
194 if 'URL' not in database_conf:
195 # no URL
196 is_conf_valid = False
198 if not is_conf_valid:
199 log.error('The provided database configuration is invalid. Adding default configuration')
200 if db_engine not in default_conf:
201 log.critical('The provided db_engine (%s) is not yet implemented', db_engine)
202 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented')
203 database_conf = default_conf[db_engine]
205 db_table = table()
206 for key, value in database_conf.items():
207 if key == 'authentication' and isinstance(value, dict):
208 auth_table = table()
209 auth_table.comment('Select auth method; see documentation placeholder link: DOC_LINK_PLACEHOLDER')
210 for auth_key, auth_value in value.items():
211 auth_table[auth_key] = auth_value
212 db_table['authentication'] = auth_table
213 continue
214 if key == 'parameters' and isinstance(value, dict):
215 params_table = table()
216 for backend, params in value.items():
217 backend_table = table()
218 if isinstance(params, dict):
219 for param_key, param_value in params.items():
220 if param_key == 'pragmas' and isinstance(param_value, dict):
221 pragmas_table = table()
222 pragmas_table.comment('Leave these default values, unless you know what you are doing!')
223 for pragma_key, pragma_value in param_value.items():
224 pragmas_table[pragma_key] = pragma_value
225 backend_table['pragmas'] = pragmas_table
226 else:
227 backend_table[param_key] = param_value
228 params_table[backend] = backend_table
229 db_table['parameters'] = params_table
230 continue
231 db_table[key] = value
232 if key == 'URL':
233 db_table[key].comment(
234 'Change the protocol depending on the DB type. Update this file to the path of your DB.'
235 )
236 if key == 'pragmas':
237 db_table[key].comment('Leave these default values, unless you know what you are doing!')
239 doc.add('DBConfiguration', db_table)
240 doc.add(nl())
242 return doc
245def _add_processor_parameters_to_toml_doc(
246 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, doc: TOMLDocument | None = None
247) -> TOMLDocument:
248 if not isinstance(processors, list):
249 processors = [processors]
251 if not processor_validator(processors):
252 raise TypeError('Only processor instances and classes can be accepted')
254 if doc is None:
255 doc = _new_toml_doc()
257 # add an array with all available processors
258 proc_names = []
259 for processor in processors:
260 if isinstance(processor, LazyImportProcessor):
261 proc_names.append(processor.plugin_name)
262 elif isinstance(processor, Processor):
263 proc_names.append(processor.name)
264 else:
265 proc_names.append(processor.__name__)
266 doc.add('available_processors', item(proc_names))
267 doc.add(nl())
269 for p_item in processors:
270 if isinstance(p_item, LazyImportProcessor):
271 processor_cls = p_item._load()
272 section_name = processor_cls.__name__
273 docstring = processor_cls.__doc__
274 elif isinstance(p_item, Processor):
275 processor_cls = p_item.__class__
276 section_name = p_item.name
277 docstring = p_item.__doc__
278 else:
279 processor_cls = cast(type[Processor], p_item)
280 section_name = processor_cls.__name__
281 docstring = processor_cls.__doc__
283 # create a table for the current processor
284 p_table = table()
286 if docstring:
287 lines = docstring.splitlines()
288 for line in lines: 288 ↛ 294line 288 didn't jump to line 294 because the loop on line 288 didn't complete
289 line = line.strip()
290 if line:
291 p_table.comment(line)
292 break
294 for schema in processor_cls.parameter_schema():
295 p_table[schema.name] = schema.default
296 if schema.help:
297 # starting from tomlkit 0.15, item can return an union type between Item and OutOfOrderTableProxy
298 cast(Item, p_table.value.item(schema.name)).comment(schema.help)
300 doc.add(section_name, p_table)
301 doc.add(nl())
303 return doc
306def processor_validator(processors: list[ProcessorClassProtocol]) -> bool:
307 """
308 Validates that all items in the list are valid processor instances or classes.
310 :param processors: The list of items to be validated.
311 :type processors: list[type[Processor] | Processor]
312 :return: True if all items are valid.
313 :rtype: bool
314 """
315 return all([isinstance(p, (Processor, type(Processor), LazyImportProcessor)) for p in processors])
318def dump_processor_parameters_to_toml(
319 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, output_file: Path | str
320) -> None:
321 """
322 Dumps a toml file with processor parameters.
324 This helper function can be used when the parameters of one or many processors have to be dumped to a TOML file.
325 For each Processor in the `processors` a table in the TOML file will be added with their parameters is the shape of
326 parameter name = value.
328 It must be noted that `processors` can be:
330 - a list of processor classes (list[type[Processor]])
331 - a list of processor instances (list[Processor]])
332 - one single processor class (type[Processor])
333 - one single processor instance (Processor)
335 What value of the parameters will be dumped?
336 --------------------------------------------
338 Good question, have a look at this :ref:`explanation <parameter_dump>`.
340 :param processors: One or more processors for which the parameters should be dumped.
341 :type processors: list[type[Processor | Processor]] | type[Processor] | Processor
342 :param output_file: The name of the output file for the dump.
343 :type output_file: Path | str
344 :raise KeyAlreadyPresent: if an attempt to add twice, the same processor is made.
345 :raise TypeError: if the list contains items different from Processor classes and instances.
346 """
348 doc = _add_processor_parameters_to_toml_doc(processors)
350 with open(output_file, 'w') as fp:
351 tomlkit.dump(doc, fp)
354def _add_user_interface_configuration(doc: TOMLDocument | None = None) -> TOMLDocument:
355 if doc is None:
356 doc = _new_toml_doc()
358 ui_table = table()
359 ui_table.comment('Specify UI options')
360 ui_table['interface'] = 'rich'
361 ui_table['interface'].comment('Default "rich", backup "console"')
362 doc.add('UserInterface', ui_table)
364 return doc
367def load_steering_file_legacy(steering_file: Path | str) -> dict[str, Any]:
368 """
369 Load a steering file without any semantic validation.
371 :param steering_file: The path to the steering file.
372 :type steering_file: Path, str
373 :return: The parsed steering dictionary.
374 :rtype: dict
375 """
376 if isinstance(steering_file, str):
377 steering_file = Path(steering_file)
379 doc = TOMLFile(steering_file).read()
380 return doc.value
383def load_steering_file(
384 steering_file: Path | str, validation_level: ValidationLevel | None = ValidationLevel.SEMANTIC
385) -> dict[str, Any]:
386 """
387 Load a steering file for the execution framework.
389 :param steering_file: The path to the steering file.
390 :type steering_file: Path, str
391 :param validation_level: Requested validation tier, or ``None`` to skip validation.
392 :return: The configuration dictionary.
393 :rtype: dict
394 :raise mafw.mafw_errors.InvalidSteeringFile: if the validation level reports at least one issue.
395 """
396 builder = SteeringBuilder.from_toml(steering_file)
397 if validation_level is not None:
398 issues = builder.validate(validation_level)
399 if issues:
400 raise issues[0]
401 return resolve_config_env(builder.to_config_dict())
404def resolve_config_env(config: dict[str, Any], env: Mapping[str, str] | None = None) -> dict[str, Any]:
405 """
406 Resolve environment variables in every string value of a configuration dictionary.
408 :param config: Configuration dictionary to resolve.
409 :type config: dict[str, Any]
410 :param env: Optional environment mapping; defaults to ``os.environ``.
411 :type env: Mapping[str, str] | None
412 :return: A new configuration dictionary with resolved values.
413 :rtype: dict[str, Any]
414 :raises ValueError: If a required variable is missing or expansion does not converge.
415 """
416 if env is None:
417 env = os.environ
418 return cast(dict[str, Any], _resolve_value(config, env))
421def _resolve_value(value: Any, env: Mapping[str, str]) -> Any:
422 if isinstance(value, str):
423 return resolve_string(value, env)
424 if isinstance(value, dict):
425 return {key: _resolve_value(item, env) for key, item in value.items()}
426 if isinstance(value, list):
427 return [_resolve_value(item, env) for item in value]
428 return value
431def resolve_string(value: str, env: Mapping[str, str]) -> str:
432 """
433 Resolve environment variables within a string value.
435 :param value: Input string to resolve.
436 :type value: str
437 :param env: Environment mapping to use for substitution.
438 :type env: Mapping[str, str]
439 :return: The resolved string.
440 :rtype: str
441 :raises ValueError: If a required variable is missing or expansion does not converge.
442 """
443 escaped = value.replace(r'\${', ENV_ESCAPE_SENTINEL)
445 for _ in range(MAX_ENV_RESOLUTION_PASSES):
446 if not ENV_PATTERN.search(escaped):
447 break
448 escaped = ENV_PATTERN.sub(lambda match: _resolve_match(match, env), escaped)
449 else:
450 raise ValueError('Environment variable expansion did not converge.')
452 return escaped.replace(ENV_ESCAPE_SENTINEL, '${')
455def _resolve_match(match: re.Match[str], env: Mapping[str, str]) -> str:
456 name = match.group('name')
457 op = match.group('op')
458 value = match.group('value') or ''
460 if name in env:
461 return env[name]
462 if op == ':-':
463 return value
464 if op == ':?':
465 raise ValueError(value or f'{name} is required')
466 raise ValueError(f"Environment variable '{name}' not set")