Coverage for src / mafw / steering / builder.py: 100%
399 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""Editable steering configuration builder independent from the execution engine.
6:Author: Bulgheroni Antonio
7:Description: Provides helpers to construct steering metadata without running processors.
8"""
10from __future__ import annotations
12from collections.abc import Iterable
13from enum import Enum
14from pathlib import Path
15from typing import Any, cast
17import tomlkit
18from tomlkit.items import Array, Item, Table
19from tomlkit.toml_file import TOMLFile
21import mafw.mafw_errors
22from mafw.db.db_configurations import default_conf
23from mafw.db.db_filter import ExprParser
25from .models import (
26 Condition,
27 ConditionalFilterConfig,
28 DBConfiguration,
29 FieldFilterConfig,
30 FilterConfig,
31 GlobalSettings,
32 GroupConfig,
33 ModelFilterConfig,
34 ParameterConfig,
35 ParameterSchemaStatus,
36 ParameterSource,
37 ProcessorConfig,
38 ProcessorRef,
39 UIConfiguration,
40)
43class ValidationLevel(Enum):
44 """Validation tiers that can be requested from the steering builder."""
46 SEMANTIC = 'semantic'
47 FULL = 'full'
50class SteeringBuilder:
51 """Editable domain model for building MAFw steering files."""
53 def __init__(self) -> None:
54 self.globals = GlobalSettings()
55 self.processors: dict[str, ProcessorConfig] = {}
56 self.groups: dict[str, GroupConfig] = {}
57 self.db_config = DBConfiguration()
58 self.ui_config = UIConfiguration()
59 self._extra_globals: dict[str, Any] = {}
60 self._document: tomlkit.TOMLDocument | None = None
61 self.set_default()
63 @classmethod
64 def from_toml(cls, path: Path | str) -> 'SteeringBuilder':
65 """Create a builder from an existing steering file while keeping TOML metadata."""
67 if isinstance(path, str):
68 path = Path(path)
69 doc = TOMLFile(path).read()
70 builder = cls()
71 builder._document = doc
72 builder._load_from_document(doc)
73 return builder
75 @classmethod
76 def from_toml_text(cls, text: str) -> 'SteeringBuilder':
77 """Create a builder from TOML text while keeping TOML metadata."""
79 doc = tomlkit.parse(text)
80 builder = cls()
81 builder._document = doc
82 builder._load_from_document(doc)
83 return builder
85 def _load_from_document(self, doc: tomlkit.TOMLDocument) -> None:
86 self._extra_globals.clear()
87 # we assume that there is no DBConfiguration section.
88 self.disable_db_configuration()
90 for key, value in doc.items():
91 if key == 'processors_to_run':
92 self.globals.processors_to_run = self._ensure_str_list(value)
93 continue
94 if key in ('analysis_name', 'analysis_description', 'new_only', 'create_standard_tables'):
95 setattr(self.globals, key, self._toml_to_python(value))
96 continue
97 if key == 'DBConfiguration':
98 # we got a configuration section for the DB,
99 # we need to enable it (done inside the _parse_db_config
100 self._parse_db_config(value)
101 continue
102 if key == 'UserInterface':
103 self._parse_ui_config(value)
104 continue
105 if isinstance(value, Table):
106 parts = key.split('.')
107 if '__filter__' in parts:
108 self._parse_filter_section(parts, value)
109 continue
110 if 'processors_to_run' in value:
111 self._parse_group(key, value)
112 continue
113 self._parse_processor(key, value)
114 continue
115 self._extra_globals[key] = self._toml_to_python(value)
117 def _ensure_str_list(self, value: Any) -> list[str]:
118 python_value = self._toml_to_python(value)
119 if python_value is None:
120 return []
121 return [str(item) for item in python_value]
123 def _toml_to_python(self, value: Any) -> Any:
124 if isinstance(value, Table):
125 return {k: self._toml_to_python(v) for k, v in value.items()}
126 if isinstance(value, Array):
127 return [self._toml_to_python(item) for item in value]
128 if isinstance(value, Item):
129 return value.unwrap()
130 return value
132 def _parse_db_config(self, table: Table) -> None:
133 self.enable_db_configuration()
134 self.db_config.attributes.clear()
135 self.db_config.pragmas.clear()
136 self.db_config.url = None
137 for key, value in table.items():
138 if key == 'pragmas' and isinstance(value, Table):
139 self.db_config.pragmas = self._toml_to_python(value)
140 continue
141 python_value = self._toml_to_python(value)
142 self.db_config.attributes[key] = python_value
143 if key == 'URL':
144 self.db_config.url = python_value
146 def _parse_ui_config(self, table: Table) -> None:
147 interface = table.get('interface')
148 if interface is not None:
149 self.ui_config.interface = self._toml_to_python(interface)
151 def _parse_group(self, name: str, table: Table) -> None:
152 processors = self._ensure_str_list(table.get('processors_to_run'))
153 description = self._toml_to_python(table.get('description'))
154 attributes: dict[str, Any] = {}
155 for attr_key, attr_value in table.items():
156 if attr_key in {'processors_to_run', 'description'}:
157 continue
158 attributes[attr_key] = self._toml_to_python(attr_value)
159 self.groups[name] = GroupConfig(
160 name=name,
161 processors=processors,
162 description=description,
163 attributes=attributes,
164 )
166 def _parse_processor(self, name: str, table: Table) -> None:
167 config = ProcessorConfig(name=name)
168 for key, value in table.items():
169 if key == '__filter__' and isinstance(value, Table):
170 logic_str = value.get('__logic__')
171 if logic_str is not None:
172 self._set_logic_expression(config, self._toml_to_python(logic_str))
173 config.filters = self._load_filters(value)
174 non_table_entries = any(not isinstance(item, Table) for item in value.values())
175 config.has_filter_root = non_table_entries
176 continue
177 if key == '__logic__':
178 self._set_logic_expression(config, self._toml_to_python(value))
179 continue
180 if key == '__new_only__':
181 config.new_only = bool(self._toml_to_python(value))
182 continue
183 if key == '__inheritance__':
184 config.inheritance = bool(self._toml_to_python(value))
185 continue
186 config.parameters[key] = ParameterConfig(
187 name=key,
188 value=self._toml_to_python(value),
189 source=ParameterSource.CONFIG,
190 status=ParameterSchemaStatus.OK,
191 )
192 self.processors[name] = config
194 def _parse_condition(self, value: Any) -> Condition:
195 if isinstance(value, dict) and 'op' in value and 'value' in value:
196 return Condition(operator=value['op'], value=value['value'], is_implicit=False)
197 if isinstance(value, list):
198 return Condition(operator='IN', value=value, is_implicit=True)
199 if isinstance(value, str):
200 return Condition(operator='GLOB', value=value, is_implicit=True)
201 return Condition(operator='==', value=value, is_implicit=True)
203 def _load_filters(self, table: Table) -> dict[str, list[FilterConfig]]:
204 result: dict[str, list[FilterConfig]] = {}
205 for model_name, model_table in table.items():
206 if model_name == '__logic__':
207 continue
208 if not isinstance(model_table, Table):
209 continue
211 # Convert TOML table to python dict first to simplify processing
212 model_data = self._toml_to_python(model_table)
213 filters: list[FilterConfig] = []
215 # 1. Model Filter (base conditions)
216 model_filter = ModelFilterConfig(name=model_name, model=model_name)
217 if '__enable__' in model_data:
218 model_filter.enabled = bool(model_data.pop('__enable__'))
220 # Extract logic for the model
221 if '__logic__' in model_data:
222 self._set_logic_expression(model_filter, model_data.pop('__logic__'))
224 # Extract conditionals
225 if '__conditional__' in model_data:
226 conditionals = model_data.pop('__conditional__')
227 if isinstance(conditionals, list):
228 for cond_data in conditionals:
229 filters.append(self._parse_conditional_config(model_name, cond_data))
231 # Process remaining fields
232 for field_name, field_value in model_data.items():
233 if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value):
234 # It's a Field Filter (sub-table logic)
235 filters.append(self._parse_field_filter_config(model_name, field_name, field_value))
236 else:
237 # It's a simple condition for the Model Filter
238 model_filter.conditions[field_name] = self._parse_condition(field_value)
240 filters.insert(0, model_filter)
241 result[model_name] = filters
243 return result
245 def _parse_conditional_config(self, model_name: str, data: dict[str, Any]) -> ConditionalFilterConfig:
246 config = ConditionalFilterConfig(
247 name=data.get('name', ''),
248 model=model_name,
249 auto_named='name' not in data,
250 condition_field=data.get('condition_field', ''),
251 then_field=data.get('then_field', ''),
252 else_field=data.get('else_field'),
253 )
254 if '__enable__' in data:
255 config.enabled = bool(data.get('__enable__'))
257 # Only create Condition objects if sufficient data is present
258 # Or blindly create them but assume they will be serialized only if present?
259 # The test provided incomplete data, which means we should be robust.
261 if 'condition_op' in data or 'condition_value' in data:
262 config.condition = Condition(data.get('condition_op', '=='), data.get('condition_value'), is_implicit=False)
264 if 'then_op' in data or 'then_value' in data:
265 config.then_clause = Condition(data.get('then_op', '=='), data.get('then_value'), is_implicit=False)
267 if 'else_op' in data or 'else_value' in data:
268 config.else_clause = Condition(data.get('else_op', '=='), data.get('else_value'), is_implicit=False)
270 return config
272 def _parse_field_filter_config(self, model_name: str, field_name: str, data: dict[str, Any]) -> FieldFilterConfig:
273 config = FieldFilterConfig(name=field_name, model=model_name, field_name=field_name)
274 if '__enable__' in data:
275 config.enabled = bool(data.pop('__enable__'))
276 if '__logic__' in data:
277 self._set_logic_expression(config, data.pop('__logic__'))
279 for key, value in data.items():
280 config.conditions[key] = self._parse_condition(value)
282 return config
284 def _parse_filter_section(self, parts: list[str], table: Table) -> None:
285 processor_name = parts[0]
286 config = self._ensure_processor(processor_name)
288 if parts[-1] == '__filter__':
289 logic_str = table.get('__logic__')
290 if logic_str is not None:
291 self._set_logic_expression(config, self._toml_to_python(logic_str))
293 # This is replacing the whole filter section, effectively merging or resetting?
294 # Existing code seemed to merge.
295 # "fields = config.filters.setdefault(model_name, {})" -> implied merging or raw access.
296 # Here we should probably reload the filters for the affected models.
298 loaded_filters = self._load_filters(table)
299 for model_name, filter_list in loaded_filters.items():
300 # We overwrite the filters for this model if they exist, or add them.
301 # Since _load_filters creates a fresh list, we replace.
302 config.filters[model_name] = filter_list
304 config.has_filter_root = True
305 return
307 if len(parts) < 3:
308 return
310 # [Processor.__filter__.Model]
311 model_name = parts[-1]
313 # We need to parse this table as if it was inside __filter__
314 # Create a temporary table wrapper to reuse _load_filters or parse manually.
316 # Simpler: convert table to python and parse
317 model_data = self._toml_to_python(table)
319 # We need to update existing filters or create new ones for this model.
320 # If we have existing filters, we should try to preserve them?
321 # The semantics of TOML parsing usually imply "last definition wins" or "merge".
322 # But _load_filters constructs a complete list.
324 # Let's reconstruct the list for this model.
325 # But wait, if we are parsing [Processor.__filter__.Model], we have the full definition for that model here.
327 filters: list[FilterConfig] = []
328 model_filter = ModelFilterConfig(name=model_name, model=model_name)
329 if '__enable__' in model_data:
330 model_filter.enabled = bool(model_data.pop('__enable__'))
332 if '__logic__' in model_data:
333 self._set_logic_expression(model_filter, model_data.pop('__logic__'))
335 if '__conditional__' in model_data:
336 conditionals = model_data.pop('__conditional__')
337 if isinstance(conditionals, list):
338 for cond_data in conditionals:
339 filters.append(self._parse_conditional_config(model_name, cond_data))
341 for field_name, field_value in model_data.items():
342 if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value):
343 filters.append(self._parse_field_filter_config(model_name, field_name, field_value))
344 else:
345 model_filter.conditions[field_name] = self._parse_condition(field_value)
347 filters.insert(0, model_filter)
348 config.filters[model_name] = filters
350 def add_processor(self, base_name: str, replica: str | None = None) -> None:
351 """Add a processor reference to the global processors_to_run list."""
353 ref = ProcessorRef(base_name=base_name, replica=replica)
354 target = ref.full_name
355 if target in self.globals.processors_to_run:
356 return
357 self.globals.processors_to_run.append(target)
358 if replica:
359 self._ensure_processor(base_name)
360 self._ensure_processor(target)
362 def remove_processor(self, full_name: str) -> None:
363 """Remove a processor or replica from the run list."""
365 if full_name in self.globals.processors_to_run:
366 self.globals.processors_to_run.remove(full_name)
368 def set_processors_to_run(self, processors: Iterable[str]) -> None:
369 """Overwrite the processors_to_run list."""
371 self.globals.processors_to_run = [str(p) for p in processors]
373 def set_parameter(self, processor_full_name: str, key: str, value: Any) -> None:
374 """Set a processor parameter."""
376 config = self._ensure_processor(processor_full_name)
377 if key in config.parameters:
378 config.parameters[key].value = value
379 config.parameters[key].source = ParameterSource.CONFIG
380 config.parameters[key].active_override = True
381 else:
382 config.parameters[key] = ParameterConfig(
383 name=key, value=value, source=ParameterSource.CONFIG, status=ParameterSchemaStatus.OK
384 )
385 config.parameters[key].active_override = True
387 def remove_parameter(self, processor_full_name: str, key: str) -> None:
388 """Remove a processor parameter override if present."""
390 config = self._ensure_processor(processor_full_name)
391 config.parameters.pop(key, None)
393 def clear_parameters(self, processor_full_name: str) -> None:
394 """Clear every parameter override for a processor."""
396 config = self._ensure_processor(processor_full_name)
397 config.parameters.clear()
399 def add_replica(self, base_name: str, replica: str) -> None:
400 """Create a replica entry without touching the base configuration."""
402 self._ensure_processor(base_name)
403 self._ensure_processor(f'{base_name}#{replica}')
405 def set_replica_inheritance(self, replica_full_name: str, inheritance: bool | None) -> None:
406 """Toggle the inheritance behaviour for a replica."""
408 config = self._ensure_processor(replica_full_name)
409 config.inheritance = inheritance
411 def set_processor_new_only(self, processor_full_name: str, new_only: bool | None) -> None:
412 """Explicitly set ``__new_only__`` for a processor or replica."""
414 config = self._ensure_processor(processor_full_name)
415 config.new_only = new_only
417 def set_filter_config(self, processor_full_name: str, model_name: str, config: dict[str, Any]) -> None:
418 """Replace the configuration for a given filter model."""
420 # Convert dict config back to FilterConfig objects
421 # We can reuse the logic from _parse_filter_section mostly, but we have a dict here, not TOML Table.
423 proc = self._ensure_processor(processor_full_name)
425 filters: list[FilterConfig] = []
426 model_filter = ModelFilterConfig(name=model_name, model=model_name)
428 data = config.copy() # Shallow copy to avoid mutation
430 if '__enable__' in data:
431 model_filter.enabled = bool(data.pop('__enable__'))
433 if '__logic__' in data:
434 model_filter.logic_str_original = data.pop('__logic__')
435 if model_filter.logic_str_original is not None:
436 try:
437 model_filter.logic_ast = ExprParser(model_filter.logic_str_original).parse()
438 except (mafw.db.db_filter.ParseError, ValueError):
439 model_filter.logic_ast = None
440 model_filter.logic_dirty = True
442 if '__conditional__' in data:
443 conditionals = data.pop('__conditional__')
444 if isinstance(conditionals, list):
445 for cond_data in conditionals:
446 filters.append(self._parse_conditional_config(model_name, cond_data))
448 for field_name, field_value in data.items():
449 if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value):
450 filters.append(self._parse_field_filter_config(model_name, field_name, field_value))
451 else:
452 model_filter.conditions[field_name] = self._parse_condition(field_value)
454 filters.insert(0, model_filter)
455 proc.filters[model_name] = filters
457 def _set_logic_expression(
458 self,
459 config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig,
460 logic_text: str | None,
461 *,
462 dirty: bool = False,
463 ) -> None:
464 config.logic_str_original = logic_text
465 if logic_text is None:
466 config.logic_ast = None
467 config.logic_is_valid = True
468 config.logic_dirty = dirty
469 return
470 text = str(logic_text)
471 if not text.strip():
472 config.logic_ast = None
473 config.logic_is_valid = True
474 config.logic_dirty = dirty
475 return
476 try:
477 config.logic_ast = ExprParser(text).parse()
478 config.logic_is_valid = True
479 except (mafw.db.db_filter.ParseError, ValueError):
480 config.logic_ast = None
481 config.logic_is_valid = False
482 config.logic_dirty = dirty
484 def set_filter_field(self, processor_full_name: str, model_name: str, field: str, value: Any) -> None:
485 """Set or update a single field within a filter model."""
487 proc = self._ensure_processor(processor_full_name)
488 filter_list = proc.filters.setdefault(model_name, [ModelFilterConfig(name=model_name, model=model_name)])
490 # Find the ModelFilterConfig (it should be there)
491 model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None)
492 if model_config is None:
493 model_config = ModelFilterConfig(name=model_name, model=model_name)
494 filter_list.insert(0, model_config)
496 model_config.conditions[field] = self._parse_condition(value)
498 def remove_filter(self, processor_full_name: str, model_name: str) -> None:
499 """Remove a filter model definition."""
501 proc = self._ensure_processor(processor_full_name)
502 proc.filters.pop(model_name, None)
504 def set_processor_filters(
505 self, processor_full_name: str, filters: dict[str, list[FilterConfig]], logic: str | None
506 ) -> None:
507 """Update the filters and logic for a given processor."""
508 proc = self._ensure_processor(processor_full_name)
509 proc.filters = filters
510 self.set_filter_logic(processor_full_name, logic)
512 def set_filter_logic(self, processor_full_name: str, logic: str | None) -> None:
513 """Set the global ``__logic__`` string for the processor filters."""
515 proc = self._ensure_processor(processor_full_name)
516 self._set_logic_expression(proc, logic, dirty=True)
518 def set_filter_conditionals(
519 self, processor_full_name: str, model_name: str, conditionals: list[dict[str, Any]] | None
520 ) -> None:
521 """Assign ``__conditional__`` blocks to a filter model."""
523 proc = self._ensure_processor(processor_full_name)
524 filter_list = proc.filters.setdefault(model_name, [ModelFilterConfig(name=model_name, model=model_name)])
526 # Remove existing conditionals
527 proc.filters[model_name] = [f for f in filter_list if not isinstance(f, ConditionalFilterConfig)]
529 if conditionals:
530 for cond_data in conditionals:
531 proc.filters[model_name].append(self._parse_conditional_config(model_name, cond_data))
533 def set_analysis_name(self, name: str | None) -> None:
534 """Set ``analysis_name``."""
536 self.globals.analysis_name = name
538 def set_analysis_description(self, description: str | None) -> None:
539 """Set ``analysis_description``."""
541 self.globals.analysis_description = description
543 def set_new_only(self, value: bool | None) -> None:
544 """Set the top-level ``new_only`` flag."""
546 self.globals.new_only = value
548 def set_create_standard_tables(self, value: bool | None) -> None:
549 """Set ``create_standard_tables``."""
551 self.globals.create_standard_tables = value
553 def set_db_url(self, url: str | None) -> None:
554 """Override the database URL."""
556 self.db_config.url = url
557 if url is None:
558 self.db_config.attributes.pop('URL', None)
559 else:
560 self.db_config.attributes['URL'] = url
562 def set_db_pragmas(self, pragmas: dict[str, Any]) -> None:
563 """Set database pragmas."""
565 self.db_config.pragmas = dict(pragmas)
567 def set_default(self) -> None:
568 """Initialize globals, database, and UI defaults for a fresh builder."""
570 self.globals.analysis_name = 'analysis-name'
571 self.globals.analysis_description = 'analysis-description'
572 self.globals.new_only = True
573 self.globals.create_standard_tables = True
574 self.set_db_url('sqlite:///:memory:')
575 self.set_db_pragmas(dict(cast(dict[str, Any], default_conf['sqlite']['pragmas'])))
576 self.ui_config.interface = 'rich'
577 self.enable_db_configuration()
579 def set_db_attribute(self, key: str, value: Any | None) -> None:
580 """Store a generic key/value pair inside DBConfiguration."""
582 if value is None:
583 self.db_config.attributes.pop(key, None)
584 return
585 self.db_config.attributes[key] = value
587 def enable_db_configuration(self) -> None:
588 """Ensure the DBConfiguration section will be serialized."""
590 self.db_config.enabled = True
592 def disable_db_configuration(self) -> None:
593 """Prevent the DBConfiguration section from being emitted."""
595 self.db_config.enabled = False
597 def is_db_configuration_enabled(self) -> bool:
598 """Return whether the DBConfiguration section should be present."""
600 return self.db_config.enabled
602 def set_ui_interface(self, interface: str) -> None:
603 """Pick the interface used by ``UserInterface``."""
605 self.ui_config.interface = interface
607 def add_group(self, name: str, processors: Iterable[str], description: str | None = None) -> None:
608 """Register a processor group."""
610 self.groups[name] = GroupConfig(name=name, processors=[str(p) for p in processors], description=description)
612 def remove_group(self, name: str) -> None:
613 """Delete a group by name."""
615 self.groups.pop(name, None)
617 def list_processors(self) -> list[str]:
618 """Return every processor section name currently configured."""
620 return list(self.processors.keys())
622 def list_groups(self) -> list[str]:
623 """Return every group section name currently configured."""
625 return list(self.groups.keys())
627 @property
628 def extra_globals(self) -> dict[str, Any]:
629 """Return extra top-level globals preserved from the steering file."""
631 return self._extra_globals
633 @property
634 def document(self) -> tomlkit.TOMLDocument | None:
635 """Return the parsed TOML document this builder originated from."""
637 return self._document
639 def validate(
640 self, validation_level: ValidationLevel = ValidationLevel.SEMANTIC
641 ) -> list['mafw.mafw_errors.ValidationIssue']:
642 """Run steering validation at the requested level and report every issue."""
644 from .validation import validate as _validate # Avoid circular imports
646 return _validate(self, validation_level)
648 def to_config_dict(self) -> dict[str, Any]:
649 """Return a plain dictionary representing the steering configuration."""
651 if self._document is not None:
652 return self._document.value
653 return self.to_document().value
655 def get_processor_config(self, full_name: str) -> ProcessorConfig:
656 """Return the stored configuration for a processor or replica."""
658 return self.processors[full_name]
660 def get_group(self, name: str) -> GroupConfig:
661 """Return the stored configuration for a group section."""
663 return self.groups[name]
665 def to_document(self, *, validation_level: ValidationLevel | None = None) -> tomlkit.TOMLDocument:
666 """Serialize the builder state into a TOML document."""
668 from .serializer import serialize
670 return serialize(self, validation_level=validation_level)
672 def write(self, path: Path | str, *, validation_level: ValidationLevel | None = None) -> None:
673 """Dump the builder to disk."""
675 if isinstance(path, str):
676 path = Path(path)
677 doc = self.to_document(validation_level=validation_level)
678 with path.open('w', encoding='utf-8') as handle:
679 tomlkit.dump(doc, handle)
681 def _ensure_processor(self, name: str) -> ProcessorConfig:
682 if name not in self.processors:
683 self.processors[name] = ProcessorConfig(name=name)
684 return self.processors[name]