Coverage for src / mafw / steering / serializer.py: 99%
186 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""Serialize steering metadata into TOML documents via tomlkit."""
6from __future__ import annotations
8from typing import TYPE_CHECKING, Any
10import tomlkit
11from tomlkit import aot, document, nl, table
12from tomlkit.items import AoT
14from mafw.db.db_filter import ast_to_string
15from mafw.enumerators import LogicalOp
17from .models import (
18 Condition,
19 ConditionalFilterConfig,
20 DBConfiguration,
21 FieldFilterConfig,
22 GlobalSettings,
23 GroupConfig,
24 ModelFilterConfig,
25 ProcessorConfig,
26 UIConfiguration,
27)
29if TYPE_CHECKING:
30 from mafw.steering.builder import SteeringBuilder, ValidationLevel
33def serialize(
34 builder: SteeringBuilder,
35 *,
36 validation_level: ValidationLevel | None = None,
37) -> tomlkit.TOMLDocument:
38 """Convert builder metadata into a TOML document matching the required layout."""
40 if validation_level is not None:
41 issues = builder.validate(validation_level)
42 if issues:
43 raise issues[0]
44 doc = document()
45 _add_globals(doc, builder.globals, builder.extra_globals)
46 _add_db(doc, builder.db_config)
47 _add_processors(doc, builder)
48 _add_groups(doc, builder.groups)
49 _add_user_interface(doc, builder.ui_config)
50 return doc
53def _add_globals(doc: tomlkit.TOMLDocument, globals_: GlobalSettings, extra_globals: dict[str, Any]) -> None:
54 doc['processors_to_run'] = globals_.processors_to_run
55 if globals_.analysis_name is not None:
56 doc['analysis_name'] = globals_.analysis_name
57 if globals_.analysis_description is not None:
58 doc['analysis_description'] = globals_.analysis_description
59 if globals_.new_only is not None:
60 doc['new_only'] = globals_.new_only
61 for key, value in extra_globals.items():
62 doc[key] = value
63 if globals_.create_standard_tables is not None:
64 doc['create_standard_tables'] = globals_.create_standard_tables
65 doc.add(nl())
68def _add_db(doc: tomlkit.TOMLDocument, db_config: DBConfiguration) -> None:
69 if not db_config.enabled:
70 return
71 db_table = table()
72 for key, value in db_config.attributes.items():
73 db_table[key] = value
74 if db_config.pragmas:
75 pragmas_table = table()
76 for key, value in db_config.pragmas.items():
77 pragmas_table[key] = value
78 db_table['pragmas'] = pragmas_table
79 doc['DBConfiguration'] = db_table
80 doc.add(nl())
83def _add_processors(doc: tomlkit.TOMLDocument, builder: 'SteeringBuilder') -> None:
84 base_names = [name for name in builder.processors if '#' not in name]
85 replica_names = [name for name in builder.processors if '#' in name]
86 for name in base_names:
87 _add_processor_table(doc, builder.processors[name])
88 for name in replica_names:
89 _add_processor_table(doc, builder.processors[name])
92def _normalize_logical_op(value: str | LogicalOp) -> LogicalOp | None:
93 """Return the LogicalOp enum for the provided value when possible."""
95 if isinstance(value, LogicalOp):
96 return value
97 if not isinstance(value, str):
98 return None
99 token = value
100 if token.startswith('LogicalOp.'):
101 token = token.split('LogicalOp.', 1)[-1]
102 try:
103 return LogicalOp[token]
104 except KeyError:
105 return None
106 try:
107 return LogicalOp(token)
108 except ValueError:
109 return None
112def _serialize_condition(condition: Condition) -> Any:
113 """Convert Condition object to TOML-friendly value (implicit or dict)."""
115 op = str(condition.operator)
116 val = condition.value
117 op_enum = _normalize_logical_op(condition.operator)
118 if op_enum in (LogicalOp.IS_NULL, LogicalOp.IS_NOT_NULL):
119 val = True
121 # If explicitly marked implicit, try to return raw value if possible
122 if condition.is_implicit:
123 return val
125 # Explicit format
126 return {'op': op, 'value': val}
129def _add_processor_table(doc: tomlkit.TOMLDocument, config: ProcessorConfig) -> None:
130 proc_table = table()
131 for key, param_config in config.parameters.items():
132 proc_table[key] = param_config.value
133 if config.new_only is not None:
134 proc_table['__new_only__'] = config.new_only
135 if config.inheritance is not None:
136 proc_table['__inheritance__'] = config.inheritance
137 filter_root = table()
138 filter_added = False
140 processor_logic = None
141 if config.logic_dirty and config.logic_ast:
142 processor_logic = ast_to_string(config.logic_ast)
143 elif config.logic_str_original:
144 processor_logic = config.logic_str_original
146 if processor_logic is not None:
147 proc_table['__logic__'] = processor_logic
148 if config.has_filter_root:
149 filter_root['__logic__'] = processor_logic
150 filter_added = True
152 for model_name, filter_list in config.filters.items():
153 filter_table = table()
154 model_added = False
156 # 1. Model Filter (base conditions & logic)
157 model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None)
158 if model_config:
159 model_added = True
160 if not model_config.enabled:
161 filter_table['__enable__'] = False
162 model_logic = None
163 if model_config.logic_dirty and model_config.logic_ast:
164 model_logic = ast_to_string(model_config.logic_ast)
165 elif model_config.logic_str_original:
166 model_logic = model_config.logic_str_original
168 if model_logic:
169 filter_table['__logic__'] = model_logic
170 for field_name, condition in model_config.conditions.items():
171 filter_table[field_name] = _serialize_condition(condition)
173 # 2. Field Filters (sub-tables)
174 for field_filter in filter_list:
175 if isinstance(field_filter, FieldFilterConfig):
176 model_added = True
177 sub_table = table()
178 if not field_filter.enabled:
179 sub_table['__enable__'] = False
180 field_logic = None
181 if field_filter.logic_dirty and field_filter.logic_ast:
182 field_logic = ast_to_string(field_filter.logic_ast)
183 elif field_filter.logic_str_original:
184 field_logic = field_filter.logic_str_original
186 if field_logic:
187 sub_table['__logic__'] = field_logic
188 for k, v in field_filter.conditions.items():
189 sub_table[k] = _serialize_condition(v)
190 filter_table[field_filter.field_name] = sub_table
192 # 3. Conditionals
193 conditionals = [f for f in filter_list if isinstance(f, ConditionalFilterConfig)]
194 if conditionals:
195 model_added = True
196 filter_table['__conditional__'] = _build_conditional_array_from_objects(conditionals)
198 if model_added: 198 ↛ 152line 198 didn't jump to line 152 because the condition on line 198 was always true
199 filter_root[model_name] = filter_table
200 filter_added = True
202 if filter_added:
203 proc_table['__filter__'] = filter_root
204 doc[config.name] = proc_table
205 doc.add(nl())
208def _build_conditional_array_from_objects(conditionals: list[ConditionalFilterConfig]) -> AoT:
209 tbl_array = aot()
210 for cond in conditionals:
211 tbl = table()
212 if not cond.enabled:
213 tbl['__enable__'] = False
214 if not cond.auto_named:
215 tbl['name'] = cond.name
217 if cond.condition:
218 tbl['condition_field'] = cond.condition_field
219 tbl['condition_op'] = str(cond.condition.operator)
220 tbl['condition_value'] = cond.condition.value
222 if cond.then_clause:
223 tbl['then_field'] = cond.then_field
224 tbl['then_op'] = str(cond.then_clause.operator)
225 tbl['then_value'] = cond.then_clause.value
227 if cond.else_clause:
228 tbl['else_field'] = cond.else_field
229 tbl['else_op'] = str(cond.else_clause.operator)
230 tbl['else_value'] = cond.else_clause.value
232 tbl_array.append(tbl)
233 return tbl_array
236def _build_conditional_array(conditionals: list[dict[str, Any]]) -> AoT:
237 # Deprecated but kept for compatibility if needed, though unused in new logic
238 tbl_array = aot()
239 for entry in conditionals:
240 tbl = table()
241 for key, value in entry.items():
242 tbl[key] = value
243 tbl_array.append(tbl)
244 return tbl_array
247def _add_groups(doc: tomlkit.TOMLDocument, groups: dict[str, GroupConfig]) -> None:
248 for group in groups.values():
249 tbl = table()
250 tbl['processors_to_run'] = group.processors
251 if group.description is not None:
252 tbl['description'] = group.description
253 for attr_key, attr_value in group.attributes.items():
254 tbl[attr_key] = attr_value
255 doc[group.name] = tbl
256 doc.add(nl())
259def _add_user_interface(doc: tomlkit.TOMLDocument, ui_config: UIConfiguration) -> None:
260 ui_table = table()
261 ui_table['interface'] = ui_config.interface
262 doc['UserInterface'] = ui_table