Coverage for src / mafw / tools / toml_tools.py: 96%
146 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5The module provides tools to read / write / modify specific TOML files.
6"""
8import datetime
9import logging
10from pathlib import Path, PosixPath, WindowsPath
11from typing import Any, cast
13import tomlkit
14from tomlkit import TOMLDocument, boolean, comment, document, item, nl, table
15from tomlkit.exceptions import ConvertError
16from tomlkit.items import Item, String, StringType
17from tomlkit.toml_file import TOMLFile
19import mafw.mafw_errors
20from mafw.__about__ import __version__ as version
21from mafw.db.db_configurations import default_conf
22from mafw.lazy_import import LazyImportProcessor, ProcessorClassProtocol
23from mafw.mafw_errors import InvalidSteeringFile
24from mafw.processor import PassiveParameter, Processor
25from mafw.tools.regexp import parse_processor_name
27log = logging.getLogger(__name__)
30class PathItem(String):
31 """TOML item representing a Path"""
33 def unwrap(self) -> Path: # type: ignore[override] # do not know how to do it
34 return Path(super().unwrap())
37def path_encoder(obj: Any) -> Item:
38 """Encoder for PathItem."""
39 if isinstance(obj, PosixPath):
40 return PathItem.from_raw(str(obj), type_=StringType.SLB, escape=False)
41 elif isinstance(obj, WindowsPath):
42 return PathItem.from_raw(str(obj), type_=StringType.SLL, escape=False)
43 else:
44 raise ConvertError
47tomlkit.register_encoder(path_encoder)
50def generate_steering_file(
51 output_file: Path | str,
52 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol,
53 database_conf: dict[str, Any] | None = None,
54 db_engine: str = 'sqlite',
55) -> None:
56 """
57 Generates a steering file.
59 :param output_file: The output filename where the steering file will be save.
60 :type output_file: Path | str
61 :param processors: The processors list for which the steering file will be generated.
62 :type processors: list[type[Processor] | Processor], type[Processor], Processor
63 :param database_conf: The database configuration dictionary
64 :type database_conf: dict, Optional
65 :param db_engine: A string representing the DB engine to be used. Possible values are: *sqlite*, *postgresql*
66 and *mysql*.
67 :type: str
68 """
69 if isinstance(output_file, str):
70 output_file = Path(output_file)
72 doc = _new_toml_doc()
73 doc = _add_db_configuration(database_conf, db_engine=db_engine, doc=doc)
74 doc = _add_processor_parameters_to_toml_doc(processors, doc)
75 doc = _add_user_interface_configuration(doc)
77 with open(output_file, 'w') as fp:
78 tomlkit.dump(doc, fp)
81def _new_toml_doc() -> TOMLDocument:
82 doc = document()
83 doc.add(comment(f'MAFw steering file generated on {datetime.datetime.now()}'))
84 doc.add(nl())
85 doc.add(
86 comment('uncomment the line below and insert the processors you want to run from the available processor list')
87 )
88 doc.add(comment('processors_to_run = []'))
89 doc.add(nl())
90 doc.add(comment('customise the name of the analysis'))
91 doc.add('analysis_name', String.from_raw('mafw analysis', StringType.SLB))
92 doc.add('analysis_description', String.from_raw('Summing up numbers', StringType.MLB))
93 doc.add('new_only', boolean('true'))
94 doc.add('mafw_version', String.from_raw(version, StringType.SLB))
95 doc.add('create_standard_tables', boolean('true'))
96 return doc
99def _add_db_configuration(
100 database_conf: dict[str, Any] | None = None, db_engine: str = 'sqlite', doc: TOMLDocument | None = None
101) -> TOMLDocument:
102 """Add the DB configuration to the TOML document
104 The expected structure of the database_conf dictionary is one of these two:
106 .. code-block:: python
108 option1 = {
109 'DBConfiguration': {
110 'URL': 'sqlite:///:memory:',
111 'pragmas': {
112 'journal_mode': 'wal',
113 'cache_size': -64000,
114 'foreign_keys': 1,
115 'synchronous': 0,
116 },
117 }
118 }
120 option2 = {
121 'URL': 'sqlite:///:memory:',
122 'pragmas': {
123 'journal_mode': 'wal',
124 'cache_size': -64000,
125 'foreign_keys': 1,
126 'synchronous': 0,
127 },
128 }
130 We will always convert the option1 in option2.
132 :param database_conf: A dictionary with the database configuration. See comments above. If None, then the default
133 is used.
134 :type database_conf: dict
135 :param db_engine: The database engine. It is used only in case the provided database configuration is invalid to
136 retrieve the default configuration. Defaults to sqlite.
137 :type db_engine: str, Optional
138 :param doc: The TOML document to add the DB configuration. If None, one will be created.
139 :type doc: TOMLDocument, Optional
140 :return: The modified document.
141 :rtype: TOMLDocument
142 :raises UnknownDBEngine: if the `database_conf` is invalid and the db_engine is not yet implemented.
143 """
144 if doc is None:
145 doc = _new_toml_doc()
147 if database_conf is None:
148 if db_engine in default_conf:
149 database_conf = default_conf[db_engine]
150 else:
151 log.critical('The provided db_engine (%s) is not yet implemented', db_engine)
152 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented')
154 is_conf_valid = True
155 if 'DBConfiguration' in database_conf:
156 # it should be option 1. let's check if there is the URL that is required.
157 if 'URL' not in database_conf['DBConfiguration']:
158 # no URL
159 is_conf_valid = False
160 else:
161 database_conf = cast(dict[str, Any], database_conf['DBConfiguration'])
162 else:
163 # option 2
164 if 'URL' not in database_conf:
165 # no URL
166 is_conf_valid = False
168 if not is_conf_valid:
169 log.error('The provided database configuration is invalid. Adding default configuration')
170 if db_engine not in default_conf: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true
171 log.critical('The provided db_engine (%s) is not yet implemented', db_engine)
172 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented')
173 database_conf = default_conf[db_engine]
175 db_table = table()
176 for key, value in database_conf.items():
177 db_table[key] = value
178 if key == 'URL':
179 db_table[key].comment(
180 'Change the protocol depending on the DB type. Update this file to the path of your DB.'
181 )
182 if key == 'pragmas':
183 db_table[key].comment('Leave these default values, unless you know what you are doing!')
185 doc.add('DBConfiguration', db_table)
186 doc.add(nl())
188 return doc
191def _add_processor_parameters_to_toml_doc(
192 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, doc: TOMLDocument | None = None
193) -> TOMLDocument:
194 if not isinstance(processors, list):
195 processors = [processors]
197 if not processor_validator(processors):
198 raise TypeError('Only processor instances and classes can be accepted')
200 if doc is None:
201 doc = _new_toml_doc()
203 # add an array with all available processors
204 proc_names = []
205 for processor in processors:
206 if isinstance(processor, LazyImportProcessor): 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 proc_names.append(processor.plugin_name)
208 elif isinstance(processor, Processor):
209 proc_names.append(processor.name)
210 else:
211 proc_names.append(processor.__name__)
212 doc.add('available_processors', item(proc_names))
213 doc.add(nl())
215 # loop over processors
216 for p_item in processors:
217 if not isinstance(p_item, Processor):
218 # p is a class not an instance. so let's create an instance of p
219 p = p_item()
220 else:
221 p = p_item
223 # create a table for the current processor
224 p_table = table()
226 # add the first line of the class documentation
227 if p.__doc__:
228 lines = p.__doc__.splitlines()
229 for line in lines: 229 ↛ 236line 229 didn't jump to line 236 because the loop on line 229 didn't complete
230 line = line.strip()
231 if line: 231 ↛ 229line 231 didn't jump to line 229 because the condition on line 231 was always true
232 p_table.comment(line)
233 break
234 # add all parameters to the table, including the help_doc as a comment
235 param: PassiveParameter[Any]
236 for name, param in p.get_parameters().items():
237 p_table[name] = param.value
238 if param.doc: 238 ↛ 236line 238 didn't jump to line 236 because the condition on line 238 was always true
239 p_table.value.item(name).comment(param.doc)
241 # add the table to the doc and a new line before going to the next item.
242 doc.add(p.name, p_table)
243 doc.add(nl())
245 return doc
248def processor_validator(processors: list[ProcessorClassProtocol]) -> bool:
249 """
250 Validates that all items in the list are valid processor instances or classes.
252 :param processors: The list of items to be validated.
253 :type processors: list[type[Processor] | Processor]
254 :return: True if all items are valid.
255 :rtype: bool
256 """
257 return all([isinstance(p, (Processor, type(Processor), LazyImportProcessor)) for p in processors])
260def dump_processor_parameters_to_toml(
261 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, output_file: Path | str
262) -> None:
263 """
264 Dumps a toml file with processor parameters.
266 This helper function can be used when the parameters of one or many processors have to be dumped to a TOML file.
267 For each Processor in the `processors` a table in the TOML file will be added with their parameters is the shape of
268 parameter name = value.
270 It must be noted that `processors` can be:
272 - a list of processor classes (list[type[Processor]])
273 - a list of processor instances (list[Processor]])
274 - one single processor class (type[Processor])
275 - one single processor instance (Processor)
277 What value of the parameters will be dumped?
278 --------------------------------------------
280 Good question, have a look at this :ref:`explanation <parameter_dump>`.
282 :param processors: One or more processors for which the parameters should be dumped.
283 :type processors: list[type[Processor | Processor]] | type[Processor] | Processor
284 :param output_file: The name of the output file for the dump.
285 :type output_file: Path | str
286 :raise KeyAlreadyPresent: if an attempt to add twice, the same processor is made.
287 :raise TypeError: if the list contains items different from Processor classes and instances.
288 """
290 doc = _add_processor_parameters_to_toml_doc(processors)
292 with open(output_file, 'w') as fp:
293 tomlkit.dump(doc, fp)
296def _add_user_interface_configuration(doc: TOMLDocument | None = None) -> TOMLDocument:
297 if doc is None:
298 doc = _new_toml_doc()
300 ui_table = table()
301 ui_table.comment('Specify UI options')
302 ui_table['interface'] = 'rich'
303 ui_table['interface'].comment('Default "rich", backup "console"')
304 doc.add('UserInterface', ui_table)
306 return doc
309def load_steering_file(steering_file: Path | str, validate: bool = True) -> dict[str, Any]:
310 """
311 Load a steering file for the execution framework.
313 .. versionchanged:: v2.0.0
314 Introduce support for replica names along with base names in file validation
316 :param steering_file: The path to the steering file.
317 :type steering_file: Path, str
318 :param validate: A flag to validate the content. Defaults to True.
319 :type validate: bool, Optional
320 :return: The configuration dictionary.
321 :rtype: dict
322 :raise FileNotFound: if steering_file does not exist.
323 """
324 doc = TOMLFile(steering_file).read()
326 if validate:
327 required_fields = ['processors_to_run', 'UserInterface']
328 for field in required_fields:
329 if field not in doc.value:
330 log.error('Missing section %s in %s' % (field, str(steering_file)))
331 raise InvalidSteeringFile(f'Missing {field} in {str(steering_file)}')
332 for processor in doc['processors_to_run']: # type: ignore[union-attr]
333 # processor to run is a list of replica aware processor name.
334 # the steering file must contain one configuration section for either the
335 # base processor or the replica.
336 replica_name = processor
337 base_name, _ = parse_processor_name(processor)
338 # Check if neither the replica nor the base processor configuration exists
339 if not any([name in doc.value for name in [replica_name, base_name]]):
340 log.error('Missing section %s in %s' % (processor, str(steering_file)))
341 raise InvalidSteeringFile(f'Missing {processor} in {str(steering_file)}')
343 return doc.value