Coverage for src / mafw / tools / toml_tools.py: 99%
150 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5The module provides tools to read / write / modify specific TOML files.
6"""
8import datetime
9import logging
10from pathlib import Path, PosixPath, WindowsPath
11from typing import Any, cast
13import tomlkit
14from tomlkit import TOMLDocument, boolean, comment, document, item, nl, table
15from tomlkit.exceptions import ConvertError
16from tomlkit.items import Item, String, StringType
17from tomlkit.toml_file import TOMLFile
19import mafw.mafw_errors
20from mafw.__about__ import __version__ as version
21from mafw.db.db_configurations import default_conf
22from mafw.lazy_import import LazyImportProcessor, ProcessorClassProtocol
23from mafw.processor import Processor
24from mafw.steering.builder import SteeringBuilder, ValidationLevel
26log = logging.getLogger(__name__)
29class PathItem(String):
30 """TOML item representing a Path"""
32 def unwrap(self) -> Path: # type: ignore[override] # do not know how to do it
33 return Path(super().unwrap())
36def path_encoder(obj: Any) -> Item:
37 """Encoder for PathItem."""
38 if isinstance(obj, PosixPath):
39 return PathItem.from_raw(str(obj), type_=StringType.SLB, escape=False)
40 elif isinstance(obj, WindowsPath):
41 return PathItem.from_raw(str(obj), type_=StringType.SLL, escape=False)
42 else:
43 raise ConvertError
46tomlkit.register_encoder(path_encoder) # type: ignore[type-var] # tomlkit's generic type E cannot infer Callable[[Any], Item]
49def generate_steering_file(
50 output_file: Path | str,
51 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol,
52 database_conf: dict[str, Any] | None = None,
53 db_engine: str = 'sqlite',
54) -> None:
55 """
56 Generates a steering file.
58 :param output_file: The output filename where the steering file will be save.
59 :type output_file: Path | str
60 :param processors: The processors list for which the steering file will be generated.
61 :type processors: list[type[Processor] | Processor], type[Processor], Processor
62 :param database_conf: The database configuration dictionary
63 :type database_conf: dict, Optional
64 :param db_engine: A string representing the DB engine to be used. Possible values are: *sqlite*, *postgresql*
65 and *mysql*.
66 :type: str
67 """
68 if isinstance(output_file, str):
69 output_file = Path(output_file)
71 doc = _new_toml_doc()
72 doc = _add_db_configuration(database_conf, db_engine=db_engine, doc=doc)
73 doc = _add_processor_parameters_to_toml_doc(processors, doc)
74 doc = _add_user_interface_configuration(doc)
76 with open(output_file, 'w') as fp:
77 tomlkit.dump(doc, fp)
80def _new_toml_doc() -> TOMLDocument:
81 doc = document()
82 doc.add(comment(f'MAFw steering file generated on {datetime.datetime.now()}'))
83 doc.add(nl())
84 doc.add(
85 comment('uncomment the line below and insert the processors you want to run from the available processor list')
86 )
87 doc.add(comment('processors_to_run = []'))
88 doc.add(nl())
89 doc.add(comment('customise the name of the analysis'))
90 doc.add('analysis_name', String.from_raw('mafw analysis', StringType.SLB))
91 doc.add('analysis_description', String.from_raw('Summing up numbers', StringType.MLB))
92 doc.add('new_only', boolean('true'))
93 doc.add('mafw_version', String.from_raw(version, StringType.SLB))
94 doc.add('create_standard_tables', boolean('true'))
95 return doc
98def _add_db_configuration(
99 database_conf: dict[str, Any] | None = None, db_engine: str = 'sqlite', doc: TOMLDocument | None = None
100) -> TOMLDocument:
101 """Add the DB configuration to the TOML document
103 The expected structure of the database_conf dictionary is one of these two:
105 .. code-block:: python
107 option1 = {
108 'DBConfiguration': {
109 'URL': 'sqlite:///:memory:',
110 'pragmas': {
111 'journal_mode': 'wal',
112 'cache_size': -64000,
113 'foreign_keys': 1,
114 'synchronous': 0,
115 },
116 }
117 }
119 option2 = {
120 'URL': 'sqlite:///:memory:',
121 'pragmas': {
122 'journal_mode': 'wal',
123 'cache_size': -64000,
124 'foreign_keys': 1,
125 'synchronous': 0,
126 },
127 }
129 We will always convert the option1 in option2.
131 :param database_conf: A dictionary with the database configuration. See comments above. If None, then the default
132 is used.
133 :type database_conf: dict
134 :param db_engine: The database engine. It is used only in case the provided database configuration is invalid to
135 retrieve the default configuration. Defaults to sqlite.
136 :type db_engine: str, Optional
137 :param doc: The TOML document to add the DB configuration. If None, one will be created.
138 :type doc: TOMLDocument, Optional
139 :return: The modified document.
140 :rtype: TOMLDocument
141 :raises UnknownDBEngine: if the `database_conf` is invalid and the db_engine is not yet implemented.
142 """
143 if doc is None:
144 doc = _new_toml_doc()
146 if database_conf is None:
147 if db_engine in default_conf:
148 database_conf = default_conf[db_engine]
149 else:
150 log.critical('The provided db_engine (%s) is not yet implemented', db_engine)
151 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented')
153 is_conf_valid = True
154 if 'DBConfiguration' in database_conf:
155 # it should be option 1. let's check if there is the URL that is required.
156 if 'URL' not in database_conf['DBConfiguration']:
157 # no URL
158 is_conf_valid = False
159 else:
160 database_conf = cast(dict[str, Any], database_conf['DBConfiguration'])
161 else:
162 # option 2
163 if 'URL' not in database_conf:
164 # no URL
165 is_conf_valid = False
167 if not is_conf_valid:
168 log.error('The provided database configuration is invalid. Adding default configuration')
169 if db_engine not in default_conf:
170 log.critical('The provided db_engine (%s) is not yet implemented', db_engine)
171 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented')
172 database_conf = default_conf[db_engine]
174 db_table = table()
175 for key, value in database_conf.items():
176 db_table[key] = value
177 if key == 'URL':
178 db_table[key].comment(
179 'Change the protocol depending on the DB type. Update this file to the path of your DB.'
180 )
181 if key == 'pragmas':
182 db_table[key].comment('Leave these default values, unless you know what you are doing!')
184 doc.add('DBConfiguration', db_table)
185 doc.add(nl())
187 return doc
190def _add_processor_parameters_to_toml_doc(
191 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, doc: TOMLDocument | None = None
192) -> TOMLDocument:
193 if not isinstance(processors, list):
194 processors = [processors]
196 if not processor_validator(processors):
197 raise TypeError('Only processor instances and classes can be accepted')
199 if doc is None:
200 doc = _new_toml_doc()
202 # add an array with all available processors
203 proc_names = []
204 for processor in processors:
205 if isinstance(processor, LazyImportProcessor):
206 proc_names.append(processor.plugin_name)
207 elif isinstance(processor, Processor):
208 proc_names.append(processor.name)
209 else:
210 proc_names.append(processor.__name__)
211 doc.add('available_processors', item(proc_names))
212 doc.add(nl())
214 for p_item in processors:
215 if isinstance(p_item, LazyImportProcessor):
216 processor_cls = p_item._load()
217 section_name = processor_cls.__name__
218 docstring = processor_cls.__doc__
219 elif isinstance(p_item, Processor):
220 processor_cls = p_item.__class__
221 section_name = p_item.name
222 docstring = p_item.__doc__
223 else:
224 processor_cls = cast(type[Processor], p_item)
225 section_name = processor_cls.__name__
226 docstring = processor_cls.__doc__
228 # create a table for the current processor
229 p_table = table()
231 if docstring:
232 lines = docstring.splitlines()
233 for line in lines: 233 ↛ 239line 233 didn't jump to line 239 because the loop on line 233 didn't complete
234 line = line.strip()
235 if line:
236 p_table.comment(line)
237 break
239 for schema in processor_cls.parameter_schema():
240 p_table[schema.name] = schema.default
241 if schema.help:
242 p_table.value.item(schema.name).comment(schema.help)
244 doc.add(section_name, p_table)
245 doc.add(nl())
247 return doc
250def processor_validator(processors: list[ProcessorClassProtocol]) -> bool:
251 """
252 Validates that all items in the list are valid processor instances or classes.
254 :param processors: The list of items to be validated.
255 :type processors: list[type[Processor] | Processor]
256 :return: True if all items are valid.
257 :rtype: bool
258 """
259 return all([isinstance(p, (Processor, type(Processor), LazyImportProcessor)) for p in processors])
262def dump_processor_parameters_to_toml(
263 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, output_file: Path | str
264) -> None:
265 """
266 Dumps a toml file with processor parameters.
268 This helper function can be used when the parameters of one or many processors have to be dumped to a TOML file.
269 For each Processor in the `processors` a table in the TOML file will be added with their parameters is the shape of
270 parameter name = value.
272 It must be noted that `processors` can be:
274 - a list of processor classes (list[type[Processor]])
275 - a list of processor instances (list[Processor]])
276 - one single processor class (type[Processor])
277 - one single processor instance (Processor)
279 What value of the parameters will be dumped?
280 --------------------------------------------
282 Good question, have a look at this :ref:`explanation <parameter_dump>`.
284 :param processors: One or more processors for which the parameters should be dumped.
285 :type processors: list[type[Processor | Processor]] | type[Processor] | Processor
286 :param output_file: The name of the output file for the dump.
287 :type output_file: Path | str
288 :raise KeyAlreadyPresent: if an attempt to add twice, the same processor is made.
289 :raise TypeError: if the list contains items different from Processor classes and instances.
290 """
292 doc = _add_processor_parameters_to_toml_doc(processors)
294 with open(output_file, 'w') as fp:
295 tomlkit.dump(doc, fp)
298def _add_user_interface_configuration(doc: TOMLDocument | None = None) -> TOMLDocument:
299 if doc is None:
300 doc = _new_toml_doc()
302 ui_table = table()
303 ui_table.comment('Specify UI options')
304 ui_table['interface'] = 'rich'
305 ui_table['interface'].comment('Default "rich", backup "console"')
306 doc.add('UserInterface', ui_table)
308 return doc
311def load_steering_file_legacy(steering_file: Path | str) -> dict[str, Any]:
312 """
313 Load a steering file without any semantic validation.
315 :param steering_file: The path to the steering file.
316 :type steering_file: Path, str
317 :return: The parsed steering dictionary.
318 :rtype: dict
319 """
320 if isinstance(steering_file, str): 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 steering_file = Path(steering_file)
323 doc = TOMLFile(steering_file).read()
324 return doc.value
327def load_steering_file(
328 steering_file: Path | str, validation_level: ValidationLevel | None = ValidationLevel.SEMANTIC
329) -> dict[str, Any]:
330 """
331 Load a steering file for the execution framework.
333 :param steering_file: The path to the steering file.
334 :type steering_file: Path, str
335 :param validation_level: Requested validation tier, or ``None`` to skip validation.
336 :return: The configuration dictionary.
337 :rtype: dict
338 :raise mafw.mafw_errors.InvalidSteeringFile: if the validation level reports at least one issue.
339 """
340 builder = SteeringBuilder.from_toml(steering_file)
341 if validation_level is not None:
342 issues = builder.validate(validation_level)
343 if issues:
344 raise issues[0]
345 return builder.to_config_dict()