Source code for mafw.steering.builder

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""Editable steering configuration builder independent from the execution engine.

:Author: Bulgheroni Antonio
:Description: Provides helpers to construct steering metadata without running processors.
"""

from __future__ import annotations

from collections.abc import Iterable
from enum import Enum
from pathlib import Path
from typing import Any, cast

import tomlkit
from tomlkit.items import Array, Item, Table
from tomlkit.toml_file import TOMLFile

import mafw.mafw_errors
from mafw.db.db_configurations import default_conf
from mafw.db.db_filter import ExprParser

from .models import (
    Condition,
    ConditionalFilterConfig,
    DBConfiguration,
    FieldFilterConfig,
    FilterConfig,
    GlobalSettings,
    GroupConfig,
    ModelFilterConfig,
    ParameterConfig,
    ParameterSchemaStatus,
    ParameterSource,
    ProcessorConfig,
    ProcessorRef,
    UIConfiguration,
)


[docs] class ValidationLevel(Enum): """Validation tiers that can be requested from the steering builder.""" SEMANTIC = 'semantic' FULL = 'full'
[docs] class SteeringBuilder: """Editable domain model for building MAFw steering files.""" def __init__(self) -> None: self.globals = GlobalSettings() self.processors: dict[str, ProcessorConfig] = {} self.groups: dict[str, GroupConfig] = {} self.db_config = DBConfiguration() self.ui_config = UIConfiguration() self._extra_globals: dict[str, Any] = {} self._document: tomlkit.TOMLDocument | None = None self.set_default()
[docs] @classmethod def from_toml(cls, path: Path | str) -> 'SteeringBuilder': """Create a builder from an existing steering file while keeping TOML metadata.""" if isinstance(path, str): path = Path(path) doc = TOMLFile(path).read() builder = cls() builder._document = doc builder._load_from_document(doc) return builder
[docs] @classmethod def from_toml_text(cls, text: str) -> 'SteeringBuilder': """Create a builder from TOML text while keeping TOML metadata.""" doc = tomlkit.parse(text) builder = cls() builder._document = doc builder._load_from_document(doc) return builder
def _load_from_document(self, doc: tomlkit.TOMLDocument) -> None: self._extra_globals.clear() # we assume that there is no DBConfiguration section. self.disable_db_configuration() for key, value in doc.items(): if key == 'processors_to_run': self.globals.processors_to_run = self._ensure_str_list(value) continue if key in ('analysis_name', 'analysis_description', 'new_only', 'create_standard_tables'): setattr(self.globals, key, self._toml_to_python(value)) continue if key == 'DBConfiguration': # we got a configuration section for the DB, # we need to enable it (done inside the _parse_db_config self._parse_db_config(value) continue if key == 'UserInterface': self._parse_ui_config(value) continue if isinstance(value, Table): parts = key.split('.') if '__filter__' in parts: self._parse_filter_section(parts, value) continue if 'processors_to_run' in value: self._parse_group(key, value) continue self._parse_processor(key, value) continue self._extra_globals[key] = self._toml_to_python(value) def _ensure_str_list(self, value: Any) -> list[str]: python_value = self._toml_to_python(value) if python_value is None: return [] return [str(item) for item in python_value] def _toml_to_python(self, value: Any) -> Any: if isinstance(value, Table): return {k: self._toml_to_python(v) for k, v in value.items()} if isinstance(value, Array): return [self._toml_to_python(item) for item in value] if isinstance(value, Item): return value.unwrap() return value def _parse_db_config(self, table: Table) -> None: self.enable_db_configuration() self.db_config.attributes.clear() self.db_config.pragmas.clear() self.db_config.url = None for key, value in table.items(): if key == 'pragmas' and isinstance(value, Table): self.db_config.pragmas = self._toml_to_python(value) continue python_value = self._toml_to_python(value) self.db_config.attributes[key] = python_value if key == 'URL': self.db_config.url = python_value def _parse_ui_config(self, table: Table) -> None: interface = table.get('interface') if interface is not None: self.ui_config.interface = self._toml_to_python(interface) def _parse_group(self, name: str, table: Table) -> None: processors = self._ensure_str_list(table.get('processors_to_run')) description = self._toml_to_python(table.get('description')) attributes: dict[str, Any] = {} for attr_key, attr_value in table.items(): if attr_key in {'processors_to_run', 'description'}: continue attributes[attr_key] = self._toml_to_python(attr_value) self.groups[name] = GroupConfig( name=name, processors=processors, description=description, attributes=attributes, ) def _parse_processor(self, name: str, table: Table) -> None: config = ProcessorConfig(name=name) for key, value in table.items(): if key == '__filter__' and isinstance(value, Table): logic_str = value.get('__logic__') if logic_str is not None: self._set_logic_expression(config, self._toml_to_python(logic_str)) config.filters = self._load_filters(value) non_table_entries = any(not isinstance(item, Table) for item in value.values()) config.has_filter_root = non_table_entries continue if key == '__logic__': self._set_logic_expression(config, self._toml_to_python(value)) continue if key == '__new_only__': config.new_only = bool(self._toml_to_python(value)) continue if key == '__inheritance__': config.inheritance = bool(self._toml_to_python(value)) continue config.parameters[key] = ParameterConfig( name=key, value=self._toml_to_python(value), source=ParameterSource.CONFIG, status=ParameterSchemaStatus.OK, ) self.processors[name] = config def _parse_condition(self, value: Any) -> Condition: if isinstance(value, dict) and 'op' in value and 'value' in value: return Condition(operator=value['op'], value=value['value'], is_implicit=False) if isinstance(value, list): return Condition(operator='IN', value=value, is_implicit=True) if isinstance(value, str): return Condition(operator='GLOB', value=value, is_implicit=True) return Condition(operator='==', value=value, is_implicit=True) def _load_filters(self, table: Table) -> dict[str, list[FilterConfig]]: result: dict[str, list[FilterConfig]] = {} for model_name, model_table in table.items(): if model_name == '__logic__': continue if not isinstance(model_table, Table): continue # Convert TOML table to python dict first to simplify processing model_data = self._toml_to_python(model_table) filters: list[FilterConfig] = [] # 1. Model Filter (base conditions) model_filter = ModelFilterConfig(name=model_name, model=model_name) if '__enable__' in model_data: model_filter.enabled = bool(model_data.pop('__enable__')) # Extract logic for the model if '__logic__' in model_data: self._set_logic_expression(model_filter, model_data.pop('__logic__')) # Extract conditionals if '__conditional__' in model_data: conditionals = model_data.pop('__conditional__') if isinstance(conditionals, list): for cond_data in conditionals: filters.append(self._parse_conditional_config(model_name, cond_data)) # Process remaining fields for field_name, field_value in model_data.items(): if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value): # It's a Field Filter (sub-table logic) filters.append(self._parse_field_filter_config(model_name, field_name, field_value)) else: # It's a simple condition for the Model Filter model_filter.conditions[field_name] = self._parse_condition(field_value) filters.insert(0, model_filter) result[model_name] = filters return result def _parse_conditional_config(self, model_name: str, data: dict[str, Any]) -> ConditionalFilterConfig: config = ConditionalFilterConfig( name=data.get('name', ''), model=model_name, auto_named='name' not in data, condition_field=data.get('condition_field', ''), then_field=data.get('then_field', ''), else_field=data.get('else_field'), ) if '__enable__' in data: config.enabled = bool(data.get('__enable__')) # Only create Condition objects if sufficient data is present # Or blindly create them but assume they will be serialized only if present? # The test provided incomplete data, which means we should be robust. if 'condition_op' in data or 'condition_value' in data: config.condition = Condition(data.get('condition_op', '=='), data.get('condition_value'), is_implicit=False) if 'then_op' in data or 'then_value' in data: config.then_clause = Condition(data.get('then_op', '=='), data.get('then_value'), is_implicit=False) if 'else_op' in data or 'else_value' in data: config.else_clause = Condition(data.get('else_op', '=='), data.get('else_value'), is_implicit=False) return config def _parse_field_filter_config(self, model_name: str, field_name: str, data: dict[str, Any]) -> FieldFilterConfig: config = FieldFilterConfig(name=field_name, model=model_name, field_name=field_name) if '__enable__' in data: config.enabled = bool(data.pop('__enable__')) if '__logic__' in data: self._set_logic_expression(config, data.pop('__logic__')) for key, value in data.items(): config.conditions[key] = self._parse_condition(value) return config def _parse_filter_section(self, parts: list[str], table: Table) -> None: processor_name = parts[0] config = self._ensure_processor(processor_name) if parts[-1] == '__filter__': logic_str = table.get('__logic__') if logic_str is not None: self._set_logic_expression(config, self._toml_to_python(logic_str)) # This is replacing the whole filter section, effectively merging or resetting? # Existing code seemed to merge. # "fields = config.filters.setdefault(model_name, {})" -> implied merging or raw access. # Here we should probably reload the filters for the affected models. loaded_filters = self._load_filters(table) for model_name, filter_list in loaded_filters.items(): # We overwrite the filters for this model if they exist, or add them. # Since _load_filters creates a fresh list, we replace. config.filters[model_name] = filter_list config.has_filter_root = True return if len(parts) < 3: return # [Processor.__filter__.Model] model_name = parts[-1] # We need to parse this table as if it was inside __filter__ # Create a temporary table wrapper to reuse _load_filters or parse manually. # Simpler: convert table to python and parse model_data = self._toml_to_python(table) # We need to update existing filters or create new ones for this model. # If we have existing filters, we should try to preserve them? # The semantics of TOML parsing usually imply "last definition wins" or "merge". # But _load_filters constructs a complete list. # Let's reconstruct the list for this model. # But wait, if we are parsing [Processor.__filter__.Model], we have the full definition for that model here. filters: list[FilterConfig] = [] model_filter = ModelFilterConfig(name=model_name, model=model_name) if '__enable__' in model_data: model_filter.enabled = bool(model_data.pop('__enable__')) if '__logic__' in model_data: self._set_logic_expression(model_filter, model_data.pop('__logic__')) if '__conditional__' in model_data: conditionals = model_data.pop('__conditional__') if isinstance(conditionals, list): for cond_data in conditionals: filters.append(self._parse_conditional_config(model_name, cond_data)) for field_name, field_value in model_data.items(): if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value): filters.append(self._parse_field_filter_config(model_name, field_name, field_value)) else: model_filter.conditions[field_name] = self._parse_condition(field_value) filters.insert(0, model_filter) config.filters[model_name] = filters
[docs] def add_processor(self, base_name: str, replica: str | None = None) -> None: """Add a processor reference to the global processors_to_run list.""" ref = ProcessorRef(base_name=base_name, replica=replica) target = ref.full_name if target in self.globals.processors_to_run: return self.globals.processors_to_run.append(target) if replica: self._ensure_processor(base_name) self._ensure_processor(target)
[docs] def remove_processor(self, full_name: str) -> None: """Remove a processor or replica from the run list.""" if full_name in self.globals.processors_to_run: self.globals.processors_to_run.remove(full_name)
[docs] def set_processors_to_run(self, processors: Iterable[str]) -> None: """Overwrite the processors_to_run list.""" self.globals.processors_to_run = [str(p) for p in processors]
[docs] def set_parameter(self, processor_full_name: str, key: str, value: Any) -> None: """Set a processor parameter.""" config = self._ensure_processor(processor_full_name) if key in config.parameters: config.parameters[key].value = value config.parameters[key].source = ParameterSource.CONFIG config.parameters[key].active_override = True else: config.parameters[key] = ParameterConfig( name=key, value=value, source=ParameterSource.CONFIG, status=ParameterSchemaStatus.OK ) config.parameters[key].active_override = True
[docs] def remove_parameter(self, processor_full_name: str, key: str) -> None: """Remove a processor parameter override if present.""" config = self._ensure_processor(processor_full_name) config.parameters.pop(key, None)
[docs] def clear_parameters(self, processor_full_name: str) -> None: """Clear every parameter override for a processor.""" config = self._ensure_processor(processor_full_name) config.parameters.clear()
[docs] def add_replica(self, base_name: str, replica: str) -> None: """Create a replica entry without touching the base configuration.""" self._ensure_processor(base_name) self._ensure_processor(f'{base_name}#{replica}')
[docs] def set_replica_inheritance(self, replica_full_name: str, inheritance: bool | None) -> None: """Toggle the inheritance behaviour for a replica.""" config = self._ensure_processor(replica_full_name) config.inheritance = inheritance
[docs] def set_processor_new_only(self, processor_full_name: str, new_only: bool | None) -> None: """Explicitly set ``__new_only__`` for a processor or replica.""" config = self._ensure_processor(processor_full_name) config.new_only = new_only
[docs] def set_filter_config(self, processor_full_name: str, model_name: str, config: dict[str, Any]) -> None: """Replace the configuration for a given filter model.""" # Convert dict config back to FilterConfig objects # We can reuse the logic from _parse_filter_section mostly, but we have a dict here, not TOML Table. proc = self._ensure_processor(processor_full_name) filters: list[FilterConfig] = [] model_filter = ModelFilterConfig(name=model_name, model=model_name) data = config.copy() # Shallow copy to avoid mutation if '__enable__' in data: model_filter.enabled = bool(data.pop('__enable__')) if '__logic__' in data: model_filter.logic_str_original = data.pop('__logic__') if model_filter.logic_str_original is not None: try: model_filter.logic_ast = ExprParser(model_filter.logic_str_original).parse() except (mafw.db.db_filter.ParseError, ValueError): model_filter.logic_ast = None model_filter.logic_dirty = True if '__conditional__' in data: conditionals = data.pop('__conditional__') if isinstance(conditionals, list): for cond_data in conditionals: filters.append(self._parse_conditional_config(model_name, cond_data)) for field_name, field_value in data.items(): if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value): filters.append(self._parse_field_filter_config(model_name, field_name, field_value)) else: model_filter.conditions[field_name] = self._parse_condition(field_value) filters.insert(0, model_filter) proc.filters[model_name] = filters
def _set_logic_expression( self, config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig, logic_text: str | None, *, dirty: bool = False, ) -> None: config.logic_str_original = logic_text if logic_text is None: config.logic_ast = None config.logic_is_valid = True config.logic_dirty = dirty return text = str(logic_text) if not text.strip(): config.logic_ast = None config.logic_is_valid = True config.logic_dirty = dirty return try: config.logic_ast = ExprParser(text).parse() config.logic_is_valid = True except (mafw.db.db_filter.ParseError, ValueError): config.logic_ast = None config.logic_is_valid = False config.logic_dirty = dirty
[docs] def set_filter_field(self, processor_full_name: str, model_name: str, field: str, value: Any) -> None: """Set or update a single field within a filter model.""" proc = self._ensure_processor(processor_full_name) filter_list = proc.filters.setdefault(model_name, [ModelFilterConfig(name=model_name, model=model_name)]) # Find the ModelFilterConfig (it should be there) model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None) if model_config is None: model_config = ModelFilterConfig(name=model_name, model=model_name) filter_list.insert(0, model_config) model_config.conditions[field] = self._parse_condition(value)
[docs] def remove_filter(self, processor_full_name: str, model_name: str) -> None: """Remove a filter model definition.""" proc = self._ensure_processor(processor_full_name) proc.filters.pop(model_name, None)
[docs] def set_processor_filters( self, processor_full_name: str, filters: dict[str, list[FilterConfig]], logic: str | None ) -> None: """Update the filters and logic for a given processor.""" proc = self._ensure_processor(processor_full_name) proc.filters = filters self.set_filter_logic(processor_full_name, logic)
[docs] def set_filter_logic(self, processor_full_name: str, logic: str | None) -> None: """Set the global ``__logic__`` string for the processor filters.""" proc = self._ensure_processor(processor_full_name) self._set_logic_expression(proc, logic, dirty=True)
[docs] def set_filter_conditionals( self, processor_full_name: str, model_name: str, conditionals: list[dict[str, Any]] | None ) -> None: """Assign ``__conditional__`` blocks to a filter model.""" proc = self._ensure_processor(processor_full_name) filter_list = proc.filters.setdefault(model_name, [ModelFilterConfig(name=model_name, model=model_name)]) # Remove existing conditionals proc.filters[model_name] = [f for f in filter_list if not isinstance(f, ConditionalFilterConfig)] if conditionals: for cond_data in conditionals: proc.filters[model_name].append(self._parse_conditional_config(model_name, cond_data))
[docs] def set_analysis_name(self, name: str | None) -> None: """Set ``analysis_name``.""" self.globals.analysis_name = name
[docs] def set_analysis_description(self, description: str | None) -> None: """Set ``analysis_description``.""" self.globals.analysis_description = description
[docs] def set_new_only(self, value: bool | None) -> None: """Set the top-level ``new_only`` flag.""" self.globals.new_only = value
[docs] def set_create_standard_tables(self, value: bool | None) -> None: """Set ``create_standard_tables``.""" self.globals.create_standard_tables = value
[docs] def set_db_url(self, url: str | None) -> None: """Override the database URL.""" self.db_config.url = url if url is None: self.db_config.attributes.pop('URL', None) else: self.db_config.attributes['URL'] = url
[docs] def set_db_pragmas(self, pragmas: dict[str, Any]) -> None: """Set database pragmas.""" self.db_config.pragmas = dict(pragmas)
[docs] def set_default(self) -> None: """Initialize globals, database, and UI defaults for a fresh builder.""" self.globals.analysis_name = 'analysis-name' self.globals.analysis_description = 'analysis-description' self.globals.new_only = True self.globals.create_standard_tables = True self.set_db_url('sqlite:///:memory:') self.set_db_pragmas(dict(cast(dict[str, Any], default_conf['sqlite']['pragmas']))) self.ui_config.interface = 'rich' self.enable_db_configuration()
[docs] def set_db_attribute(self, key: str, value: Any | None) -> None: """Store a generic key/value pair inside DBConfiguration.""" if value is None: self.db_config.attributes.pop(key, None) return self.db_config.attributes[key] = value
[docs] def enable_db_configuration(self) -> None: """Ensure the DBConfiguration section will be serialized.""" self.db_config.enabled = True
[docs] def disable_db_configuration(self) -> None: """Prevent the DBConfiguration section from being emitted.""" self.db_config.enabled = False
[docs] def is_db_configuration_enabled(self) -> bool: """Return whether the DBConfiguration section should be present.""" return self.db_config.enabled
[docs] def set_ui_interface(self, interface: str) -> None: """Pick the interface used by ``UserInterface``.""" self.ui_config.interface = interface
[docs] def add_group(self, name: str, processors: Iterable[str], description: str | None = None) -> None: """Register a processor group.""" self.groups[name] = GroupConfig(name=name, processors=[str(p) for p in processors], description=description)
[docs] def remove_group(self, name: str) -> None: """Delete a group by name.""" self.groups.pop(name, None)
[docs] def list_processors(self) -> list[str]: """Return every processor section name currently configured.""" return list(self.processors.keys())
[docs] def list_groups(self) -> list[str]: """Return every group section name currently configured.""" return list(self.groups.keys())
@property def extra_globals(self) -> dict[str, Any]: """Return extra top-level globals preserved from the steering file.""" return self._extra_globals @property def document(self) -> tomlkit.TOMLDocument | None: """Return the parsed TOML document this builder originated from.""" return self._document
[docs] def validate( self, validation_level: ValidationLevel = ValidationLevel.SEMANTIC ) -> list['mafw.mafw_errors.ValidationIssue']: """Run steering validation at the requested level and report every issue.""" from .validation import validate as _validate # Avoid circular imports return _validate(self, validation_level)
[docs] def to_config_dict(self) -> dict[str, Any]: """Return a plain dictionary representing the steering configuration.""" if self._document is not None: return self._document.value return self.to_document().value
[docs] def get_processor_config(self, full_name: str) -> ProcessorConfig: """Return the stored configuration for a processor or replica.""" return self.processors[full_name]
[docs] def get_group(self, name: str) -> GroupConfig: """Return the stored configuration for a group section.""" return self.groups[name]
[docs] def to_document(self, *, validation_level: ValidationLevel | None = None) -> tomlkit.TOMLDocument: """Serialize the builder state into a TOML document.""" from .serializer import serialize return serialize(self, validation_level=validation_level)
[docs] def write(self, path: Path | str, *, validation_level: ValidationLevel | None = None) -> None: """Dump the builder to disk.""" if isinstance(path, str): path = Path(path) doc = self.to_document(validation_level=validation_level) with path.open('w', encoding='utf-8') as handle: tomlkit.dump(doc, handle)
def _ensure_processor(self, name: str) -> ProcessorConfig: if name not in self.processors: self.processors[name] = ProcessorConfig(name=name) return self.processors[name]