Coverage for src / mafw / steering / serializer.py: 98%
205 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 09:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 09:03 +0000
1# Copyright 2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""Serialize steering metadata into TOML documents via tomlkit."""
6from __future__ import annotations
8from typing import TYPE_CHECKING, Any
10import tomlkit
11from tomlkit import aot, document, nl, table
12from tomlkit.items import AoT
14from mafw.db.db_filter import ast_to_string
15from mafw.enumerators import LogicalOp
17from .models import (
18 Condition,
19 ConditionalFilterConfig,
20 DBConfiguration,
21 FieldFilterConfig,
22 GlobalSettings,
23 GroupConfig,
24 ModelFilterConfig,
25 ProcessorConfig,
26 UIConfiguration,
27)
29if TYPE_CHECKING:
30 from mafw.steering.builder import SteeringBuilder, ValidationLevel
33def serialize(
34 builder: SteeringBuilder,
35 *,
36 validation_level: ValidationLevel | None = None,
37) -> tomlkit.TOMLDocument:
38 """Convert builder metadata into a TOML document matching the required layout."""
40 if validation_level is not None:
41 issues = builder.validate(validation_level)
42 if issues:
43 raise issues[0]
44 doc = document()
45 _add_globals(doc, builder.globals, builder.extra_globals)
46 _add_db(doc, builder.db_config)
47 _add_processors(doc, builder)
48 _add_groups(doc, builder.groups)
49 _add_user_interface(doc, builder.ui_config)
50 return doc
53def _add_globals(doc: tomlkit.TOMLDocument, globals_: GlobalSettings, extra_globals: dict[str, Any]) -> None:
54 doc['processors_to_run'] = globals_.processors_to_run
55 if globals_.analysis_name is not None:
56 doc['analysis_name'] = globals_.analysis_name
57 if globals_.analysis_description is not None:
58 doc['analysis_description'] = globals_.analysis_description
59 if globals_.new_only is not None:
60 doc['new_only'] = globals_.new_only
61 for key, value in extra_globals.items():
62 doc[key] = value
63 if globals_.create_standard_tables is not None:
64 doc['create_standard_tables'] = globals_.create_standard_tables
65 doc.add(nl())
68def _add_db(doc: tomlkit.TOMLDocument, db_config: DBConfiguration) -> None:
69 if not db_config.enabled:
70 return
71 db_table = table()
72 for key, value in db_config.attributes.items():
73 db_table[key] = value
74 if db_config.authentication or db_config.parameters:
75 if db_config.authentication:
76 auth_table = table()
77 for key, value in db_config.authentication.items():
78 auth_table[key] = value
79 db_table['authentication'] = auth_table
80 if db_config.parameters: 80 ↛ 99line 80 didn't jump to line 99 because the condition on line 80 was always true
81 params_table = table()
82 for backend, params in db_config.parameters.items():
83 backend_table = table()
84 for key, value in params.items():
85 if key == 'pragmas' and isinstance(value, dict): 85 ↛ 91line 85 didn't jump to line 91 because the condition on line 85 was always true
86 pragmas_table = table()
87 for pragma_key, pragma_value in value.items():
88 pragmas_table[pragma_key] = pragma_value
89 backend_table['pragmas'] = pragmas_table
90 else:
91 backend_table[key] = value
92 params_table[backend] = backend_table
93 db_table['parameters'] = params_table
94 elif db_config.pragmas: 94 ↛ 99line 94 didn't jump to line 99 because the condition on line 94 was always true
95 pragmas_table = table()
96 for key, value in db_config.pragmas.items():
97 pragmas_table[key] = value
98 db_table['pragmas'] = pragmas_table
99 doc['DBConfiguration'] = db_table
100 doc.add(nl())
103def _add_processors(doc: tomlkit.TOMLDocument, builder: 'SteeringBuilder') -> None:
104 base_names = [name for name in builder.processors if '#' not in name]
105 replica_names = [name for name in builder.processors if '#' in name]
106 for name in base_names:
107 _add_processor_table(doc, builder.processors[name])
108 for name in replica_names:
109 _add_processor_table(doc, builder.processors[name])
112def _normalize_logical_op(value: str | LogicalOp) -> LogicalOp | None:
113 """Return the LogicalOp enum for the provided value when possible."""
115 if isinstance(value, LogicalOp):
116 return value
117 if not isinstance(value, str):
118 return None
119 token = value
120 if token.startswith('LogicalOp.'):
121 token = token.split('LogicalOp.', 1)[-1]
122 try:
123 return LogicalOp[token]
124 except KeyError:
125 return None
126 try:
127 return LogicalOp(token)
128 except ValueError:
129 return None
132def _serialize_condition(condition: Condition) -> Any:
133 """Convert Condition object to TOML-friendly value (implicit or dict)."""
135 op = str(condition.operator)
136 val = condition.value
137 op_enum = _normalize_logical_op(condition.operator)
138 if op_enum in (LogicalOp.IS_NULL, LogicalOp.IS_NOT_NULL):
139 val = True
141 # If explicitly marked implicit, try to return raw value if possible
142 if condition.is_implicit:
143 return val
145 # Explicit format
146 return {'op': op, 'value': val}
149def _add_processor_table(doc: tomlkit.TOMLDocument, config: ProcessorConfig) -> None:
150 proc_table = table()
151 for key, param_config in config.parameters.items():
152 proc_table[key] = param_config.value
153 if config.new_only is not None:
154 proc_table['__new_only__'] = config.new_only
155 if config.inheritance is not None:
156 proc_table['__inheritance__'] = config.inheritance
157 filter_root = table()
158 filter_added = False
160 processor_logic = None
161 if config.logic_dirty and config.logic_ast:
162 processor_logic = ast_to_string(config.logic_ast)
163 elif config.logic_str_original:
164 processor_logic = config.logic_str_original
166 if processor_logic is not None:
167 proc_table['__logic__'] = processor_logic
168 if config.has_filter_root:
169 filter_root['__logic__'] = processor_logic
170 filter_added = True
172 for model_name, filter_list in config.filters.items():
173 filter_table = table()
174 model_added = False
176 # 1. Model Filter (base conditions & logic)
177 model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None)
178 if model_config:
179 model_added = True
180 if not model_config.enabled:
181 filter_table['__enable__'] = False
182 model_logic = None
183 if model_config.logic_dirty and model_config.logic_ast:
184 model_logic = ast_to_string(model_config.logic_ast)
185 elif model_config.logic_str_original:
186 model_logic = model_config.logic_str_original
188 if model_logic:
189 filter_table['__logic__'] = model_logic
190 for field_name, condition in model_config.conditions.items():
191 filter_table[field_name] = _serialize_condition(condition)
193 # 2. Field Filters (sub-tables)
194 for field_filter in filter_list:
195 if isinstance(field_filter, FieldFilterConfig):
196 model_added = True
197 sub_table = table()
198 if not field_filter.enabled:
199 sub_table['__enable__'] = False
200 field_logic = None
201 if field_filter.logic_dirty and field_filter.logic_ast:
202 field_logic = ast_to_string(field_filter.logic_ast)
203 elif field_filter.logic_str_original:
204 field_logic = field_filter.logic_str_original
206 if field_logic:
207 sub_table['__logic__'] = field_logic
208 for k, v in field_filter.conditions.items():
209 sub_table[k] = _serialize_condition(v)
210 filter_table[field_filter.field_name] = sub_table
212 # 3. Conditionals
213 conditionals = [f for f in filter_list if isinstance(f, ConditionalFilterConfig)]
214 if conditionals:
215 model_added = True
216 filter_table['__conditional__'] = _build_conditional_array_from_objects(conditionals)
218 if model_added: 218 ↛ 172line 218 didn't jump to line 172 because the condition on line 218 was always true
219 filter_root[model_name] = filter_table
220 filter_added = True
222 if filter_added:
223 proc_table['__filter__'] = filter_root
224 doc[config.name] = proc_table
225 doc.add(nl())
228def _build_conditional_array_from_objects(conditionals: list[ConditionalFilterConfig]) -> AoT:
229 tbl_array = aot()
230 for cond in conditionals:
231 tbl = table()
232 if not cond.enabled:
233 tbl['__enable__'] = False
234 if not cond.auto_named:
235 tbl['name'] = cond.name
237 if cond.condition:
238 tbl['condition_field'] = cond.condition_field
239 tbl['condition_op'] = str(cond.condition.operator)
240 tbl['condition_value'] = cond.condition.value
242 if cond.then_clause:
243 tbl['then_field'] = cond.then_field
244 tbl['then_op'] = str(cond.then_clause.operator)
245 tbl['then_value'] = cond.then_clause.value
247 if cond.else_clause:
248 tbl['else_field'] = cond.else_field
249 tbl['else_op'] = str(cond.else_clause.operator)
250 tbl['else_value'] = cond.else_clause.value
252 tbl_array.append(tbl)
253 return tbl_array
256def _build_conditional_array(conditionals: list[dict[str, Any]]) -> AoT:
257 # Deprecated but kept for compatibility if needed, though unused in new logic
258 tbl_array = aot()
259 for entry in conditionals:
260 tbl = table()
261 for key, value in entry.items():
262 tbl[key] = value
263 tbl_array.append(tbl)
264 return tbl_array
267def _add_groups(doc: tomlkit.TOMLDocument, groups: dict[str, GroupConfig]) -> None:
268 for group in groups.values():
269 tbl = table()
270 tbl['processors_to_run'] = group.processors
271 if group.description is not None:
272 tbl['description'] = group.description
273 for attr_key, attr_value in group.attributes.items():
274 tbl[attr_key] = attr_value
275 doc[group.name] = tbl
276 doc.add(nl())
279def _add_user_interface(doc: tomlkit.TOMLDocument, ui_config: UIConfiguration) -> None:
280 ui_table = table()
281 ui_table['interface'] = ui_config.interface
282 doc['UserInterface'] = ui_table