# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""Editable steering configuration builder independent from the execution engine.
:Author: Bulgheroni Antonio
:Description: Provides helpers to construct steering metadata without running processors.
"""
from __future__ import annotations
from collections.abc import Iterable
from enum import Enum
from pathlib import Path
from typing import Any, cast
import tomlkit
from tomlkit.items import Array, Item, Table
from tomlkit.toml_file import TOMLFile
import mafw.mafw_errors
from mafw.db.db_configurations import default_conf
from mafw.db.db_filter import ExprParser
from .models import (
Condition,
ConditionalFilterConfig,
DBConfiguration,
FieldFilterConfig,
FilterConfig,
GlobalSettings,
GroupConfig,
ModelFilterConfig,
ParameterConfig,
ParameterSchemaStatus,
ParameterSource,
ProcessorConfig,
ProcessorRef,
UIConfiguration,
)
[docs]
class ValidationLevel(Enum):
"""Validation tiers that can be requested from the steering builder."""
SEMANTIC = 'semantic'
FULL = 'full'
[docs]
class SteeringBuilder:
"""Editable domain model for building MAFw steering files."""
def __init__(self) -> None:
self.globals = GlobalSettings()
self.processors: dict[str, ProcessorConfig] = {}
self.groups: dict[str, GroupConfig] = {}
self.db_config = DBConfiguration()
self.ui_config = UIConfiguration()
self._extra_globals: dict[str, Any] = {}
self._document: tomlkit.TOMLDocument | None = None
self.set_default()
[docs]
@classmethod
def from_toml(cls, path: Path | str) -> 'SteeringBuilder':
"""Create a builder from an existing steering file while keeping TOML metadata."""
if isinstance(path, str):
path = Path(path)
doc = TOMLFile(path).read()
builder = cls()
builder._document = doc
builder._load_from_document(doc)
return builder
[docs]
@classmethod
def from_toml_text(cls, text: str) -> 'SteeringBuilder':
"""Create a builder from TOML text while keeping TOML metadata."""
doc = tomlkit.parse(text)
builder = cls()
builder._document = doc
builder._load_from_document(doc)
return builder
def _load_from_document(self, doc: tomlkit.TOMLDocument) -> None:
self._extra_globals.clear()
# we assume that there is no DBConfiguration section.
self.disable_db_configuration()
for key, value in doc.items():
if key == 'processors_to_run':
self.globals.processors_to_run = self._ensure_str_list(value)
continue
if key in ('analysis_name', 'analysis_description', 'new_only', 'create_standard_tables'):
setattr(self.globals, key, self._toml_to_python(value))
continue
if key == 'DBConfiguration':
# we got a configuration section for the DB,
# we need to enable it (done inside the _parse_db_config
self._parse_db_config(value)
continue
if key == 'UserInterface':
self._parse_ui_config(value)
continue
if isinstance(value, Table):
parts = key.split('.')
if '__filter__' in parts:
self._parse_filter_section(parts, value)
continue
if 'processors_to_run' in value:
self._parse_group(key, value)
continue
self._parse_processor(key, value)
continue
self._extra_globals[key] = self._toml_to_python(value)
def _ensure_str_list(self, value: Any) -> list[str]:
python_value = self._toml_to_python(value)
if python_value is None:
return []
return [str(item) for item in python_value]
def _toml_to_python(self, value: Any) -> Any:
if isinstance(value, Table):
return {k: self._toml_to_python(v) for k, v in value.items()}
if isinstance(value, Array):
return [self._toml_to_python(item) for item in value]
if isinstance(value, Item):
return value.unwrap()
return value
def _parse_db_config(self, table: Table) -> None:
self.enable_db_configuration()
self.db_config.attributes.clear()
self.db_config.pragmas.clear()
self.db_config.url = None
for key, value in table.items():
if key == 'pragmas' and isinstance(value, Table):
self.db_config.pragmas = self._toml_to_python(value)
continue
python_value = self._toml_to_python(value)
self.db_config.attributes[key] = python_value
if key == 'URL':
self.db_config.url = python_value
def _parse_ui_config(self, table: Table) -> None:
interface = table.get('interface')
if interface is not None:
self.ui_config.interface = self._toml_to_python(interface)
def _parse_group(self, name: str, table: Table) -> None:
processors = self._ensure_str_list(table.get('processors_to_run'))
description = self._toml_to_python(table.get('description'))
attributes: dict[str, Any] = {}
for attr_key, attr_value in table.items():
if attr_key in {'processors_to_run', 'description'}:
continue
attributes[attr_key] = self._toml_to_python(attr_value)
self.groups[name] = GroupConfig(
name=name,
processors=processors,
description=description,
attributes=attributes,
)
def _parse_processor(self, name: str, table: Table) -> None:
config = ProcessorConfig(name=name)
for key, value in table.items():
if key == '__filter__' and isinstance(value, Table):
logic_str = value.get('__logic__')
if logic_str is not None:
self._set_logic_expression(config, self._toml_to_python(logic_str))
config.filters = self._load_filters(value)
non_table_entries = any(not isinstance(item, Table) for item in value.values())
config.has_filter_root = non_table_entries
continue
if key == '__logic__':
self._set_logic_expression(config, self._toml_to_python(value))
continue
if key == '__new_only__':
config.new_only = bool(self._toml_to_python(value))
continue
if key == '__inheritance__':
config.inheritance = bool(self._toml_to_python(value))
continue
config.parameters[key] = ParameterConfig(
name=key,
value=self._toml_to_python(value),
source=ParameterSource.CONFIG,
status=ParameterSchemaStatus.OK,
)
self.processors[name] = config
def _parse_condition(self, value: Any) -> Condition:
if isinstance(value, dict) and 'op' in value and 'value' in value:
return Condition(operator=value['op'], value=value['value'], is_implicit=False)
if isinstance(value, list):
return Condition(operator='IN', value=value, is_implicit=True)
if isinstance(value, str):
return Condition(operator='GLOB', value=value, is_implicit=True)
return Condition(operator='==', value=value, is_implicit=True)
def _load_filters(self, table: Table) -> dict[str, list[FilterConfig]]:
result: dict[str, list[FilterConfig]] = {}
for model_name, model_table in table.items():
if model_name == '__logic__':
continue
if not isinstance(model_table, Table):
continue
# Convert TOML table to python dict first to simplify processing
model_data = self._toml_to_python(model_table)
filters: list[FilterConfig] = []
# 1. Model Filter (base conditions)
model_filter = ModelFilterConfig(name=model_name, model=model_name)
if '__enable__' in model_data:
model_filter.enabled = bool(model_data.pop('__enable__'))
# Extract logic for the model
if '__logic__' in model_data:
self._set_logic_expression(model_filter, model_data.pop('__logic__'))
# Extract conditionals
if '__conditional__' in model_data:
conditionals = model_data.pop('__conditional__')
if isinstance(conditionals, list):
for cond_data in conditionals:
filters.append(self._parse_conditional_config(model_name, cond_data))
# Process remaining fields
for field_name, field_value in model_data.items():
if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value):
# It's a Field Filter (sub-table logic)
filters.append(self._parse_field_filter_config(model_name, field_name, field_value))
else:
# It's a simple condition for the Model Filter
model_filter.conditions[field_name] = self._parse_condition(field_value)
filters.insert(0, model_filter)
result[model_name] = filters
return result
def _parse_conditional_config(self, model_name: str, data: dict[str, Any]) -> ConditionalFilterConfig:
config = ConditionalFilterConfig(
name=data.get('name', ''),
model=model_name,
auto_named='name' not in data,
condition_field=data.get('condition_field', ''),
then_field=data.get('then_field', ''),
else_field=data.get('else_field'),
)
if '__enable__' in data:
config.enabled = bool(data.get('__enable__'))
# Only create Condition objects if sufficient data is present
# Or blindly create them but assume they will be serialized only if present?
# The test provided incomplete data, which means we should be robust.
if 'condition_op' in data or 'condition_value' in data:
config.condition = Condition(data.get('condition_op', '=='), data.get('condition_value'), is_implicit=False)
if 'then_op' in data or 'then_value' in data:
config.then_clause = Condition(data.get('then_op', '=='), data.get('then_value'), is_implicit=False)
if 'else_op' in data or 'else_value' in data:
config.else_clause = Condition(data.get('else_op', '=='), data.get('else_value'), is_implicit=False)
return config
def _parse_field_filter_config(self, model_name: str, field_name: str, data: dict[str, Any]) -> FieldFilterConfig:
config = FieldFilterConfig(name=field_name, model=model_name, field_name=field_name)
if '__enable__' in data:
config.enabled = bool(data.pop('__enable__'))
if '__logic__' in data:
self._set_logic_expression(config, data.pop('__logic__'))
for key, value in data.items():
config.conditions[key] = self._parse_condition(value)
return config
def _parse_filter_section(self, parts: list[str], table: Table) -> None:
processor_name = parts[0]
config = self._ensure_processor(processor_name)
if parts[-1] == '__filter__':
logic_str = table.get('__logic__')
if logic_str is not None:
self._set_logic_expression(config, self._toml_to_python(logic_str))
# This is replacing the whole filter section, effectively merging or resetting?
# Existing code seemed to merge.
# "fields = config.filters.setdefault(model_name, {})" -> implied merging or raw access.
# Here we should probably reload the filters for the affected models.
loaded_filters = self._load_filters(table)
for model_name, filter_list in loaded_filters.items():
# We overwrite the filters for this model if they exist, or add them.
# Since _load_filters creates a fresh list, we replace.
config.filters[model_name] = filter_list
config.has_filter_root = True
return
if len(parts) < 3:
return
# [Processor.__filter__.Model]
model_name = parts[-1]
# We need to parse this table as if it was inside __filter__
# Create a temporary table wrapper to reuse _load_filters or parse manually.
# Simpler: convert table to python and parse
model_data = self._toml_to_python(table)
# We need to update existing filters or create new ones for this model.
# If we have existing filters, we should try to preserve them?
# The semantics of TOML parsing usually imply "last definition wins" or "merge".
# But _load_filters constructs a complete list.
# Let's reconstruct the list for this model.
# But wait, if we are parsing [Processor.__filter__.Model], we have the full definition for that model here.
filters: list[FilterConfig] = []
model_filter = ModelFilterConfig(name=model_name, model=model_name)
if '__enable__' in model_data:
model_filter.enabled = bool(model_data.pop('__enable__'))
if '__logic__' in model_data:
self._set_logic_expression(model_filter, model_data.pop('__logic__'))
if '__conditional__' in model_data:
conditionals = model_data.pop('__conditional__')
if isinstance(conditionals, list):
for cond_data in conditionals:
filters.append(self._parse_conditional_config(model_name, cond_data))
for field_name, field_value in model_data.items():
if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value):
filters.append(self._parse_field_filter_config(model_name, field_name, field_value))
else:
model_filter.conditions[field_name] = self._parse_condition(field_value)
filters.insert(0, model_filter)
config.filters[model_name] = filters
[docs]
def add_processor(self, base_name: str, replica: str | None = None) -> None:
"""Add a processor reference to the global processors_to_run list."""
ref = ProcessorRef(base_name=base_name, replica=replica)
target = ref.full_name
if target in self.globals.processors_to_run:
return
self.globals.processors_to_run.append(target)
if replica:
self._ensure_processor(base_name)
self._ensure_processor(target)
[docs]
def remove_processor(self, full_name: str) -> None:
"""Remove a processor or replica from the run list."""
if full_name in self.globals.processors_to_run:
self.globals.processors_to_run.remove(full_name)
[docs]
def set_processors_to_run(self, processors: Iterable[str]) -> None:
"""Overwrite the processors_to_run list."""
self.globals.processors_to_run = [str(p) for p in processors]
[docs]
def set_parameter(self, processor_full_name: str, key: str, value: Any) -> None:
"""Set a processor parameter."""
config = self._ensure_processor(processor_full_name)
if key in config.parameters:
config.parameters[key].value = value
config.parameters[key].source = ParameterSource.CONFIG
config.parameters[key].active_override = True
else:
config.parameters[key] = ParameterConfig(
name=key, value=value, source=ParameterSource.CONFIG, status=ParameterSchemaStatus.OK
)
config.parameters[key].active_override = True
[docs]
def remove_parameter(self, processor_full_name: str, key: str) -> None:
"""Remove a processor parameter override if present."""
config = self._ensure_processor(processor_full_name)
config.parameters.pop(key, None)
[docs]
def clear_parameters(self, processor_full_name: str) -> None:
"""Clear every parameter override for a processor."""
config = self._ensure_processor(processor_full_name)
config.parameters.clear()
[docs]
def add_replica(self, base_name: str, replica: str) -> None:
"""Create a replica entry without touching the base configuration."""
self._ensure_processor(base_name)
self._ensure_processor(f'{base_name}#{replica}')
[docs]
def set_replica_inheritance(self, replica_full_name: str, inheritance: bool | None) -> None:
"""Toggle the inheritance behaviour for a replica."""
config = self._ensure_processor(replica_full_name)
config.inheritance = inheritance
[docs]
def set_processor_new_only(self, processor_full_name: str, new_only: bool | None) -> None:
"""Explicitly set ``__new_only__`` for a processor or replica."""
config = self._ensure_processor(processor_full_name)
config.new_only = new_only
[docs]
def set_filter_config(self, processor_full_name: str, model_name: str, config: dict[str, Any]) -> None:
"""Replace the configuration for a given filter model."""
# Convert dict config back to FilterConfig objects
# We can reuse the logic from _parse_filter_section mostly, but we have a dict here, not TOML Table.
proc = self._ensure_processor(processor_full_name)
filters: list[FilterConfig] = []
model_filter = ModelFilterConfig(name=model_name, model=model_name)
data = config.copy() # Shallow copy to avoid mutation
if '__enable__' in data:
model_filter.enabled = bool(data.pop('__enable__'))
if '__logic__' in data:
model_filter.logic_str_original = data.pop('__logic__')
if model_filter.logic_str_original is not None:
try:
model_filter.logic_ast = ExprParser(model_filter.logic_str_original).parse()
except (mafw.db.db_filter.ParseError, ValueError):
model_filter.logic_ast = None
model_filter.logic_dirty = True
if '__conditional__' in data:
conditionals = data.pop('__conditional__')
if isinstance(conditionals, list):
for cond_data in conditionals:
filters.append(self._parse_conditional_config(model_name, cond_data))
for field_name, field_value in data.items():
if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value):
filters.append(self._parse_field_filter_config(model_name, field_name, field_value))
else:
model_filter.conditions[field_name] = self._parse_condition(field_value)
filters.insert(0, model_filter)
proc.filters[model_name] = filters
def _set_logic_expression(
self,
config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig,
logic_text: str | None,
*,
dirty: bool = False,
) -> None:
config.logic_str_original = logic_text
if logic_text is None:
config.logic_ast = None
config.logic_is_valid = True
config.logic_dirty = dirty
return
text = str(logic_text)
if not text.strip():
config.logic_ast = None
config.logic_is_valid = True
config.logic_dirty = dirty
return
try:
config.logic_ast = ExprParser(text).parse()
config.logic_is_valid = True
except (mafw.db.db_filter.ParseError, ValueError):
config.logic_ast = None
config.logic_is_valid = False
config.logic_dirty = dirty
[docs]
def set_filter_field(self, processor_full_name: str, model_name: str, field: str, value: Any) -> None:
"""Set or update a single field within a filter model."""
proc = self._ensure_processor(processor_full_name)
filter_list = proc.filters.setdefault(model_name, [ModelFilterConfig(name=model_name, model=model_name)])
# Find the ModelFilterConfig (it should be there)
model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None)
if model_config is None:
model_config = ModelFilterConfig(name=model_name, model=model_name)
filter_list.insert(0, model_config)
model_config.conditions[field] = self._parse_condition(value)
[docs]
def remove_filter(self, processor_full_name: str, model_name: str) -> None:
"""Remove a filter model definition."""
proc = self._ensure_processor(processor_full_name)
proc.filters.pop(model_name, None)
[docs]
def set_processor_filters(
self, processor_full_name: str, filters: dict[str, list[FilterConfig]], logic: str | None
) -> None:
"""Update the filters and logic for a given processor."""
proc = self._ensure_processor(processor_full_name)
proc.filters = filters
self.set_filter_logic(processor_full_name, logic)
[docs]
def set_filter_logic(self, processor_full_name: str, logic: str | None) -> None:
"""Set the global ``__logic__`` string for the processor filters."""
proc = self._ensure_processor(processor_full_name)
self._set_logic_expression(proc, logic, dirty=True)
[docs]
def set_filter_conditionals(
self, processor_full_name: str, model_name: str, conditionals: list[dict[str, Any]] | None
) -> None:
"""Assign ``__conditional__`` blocks to a filter model."""
proc = self._ensure_processor(processor_full_name)
filter_list = proc.filters.setdefault(model_name, [ModelFilterConfig(name=model_name, model=model_name)])
# Remove existing conditionals
proc.filters[model_name] = [f for f in filter_list if not isinstance(f, ConditionalFilterConfig)]
if conditionals:
for cond_data in conditionals:
proc.filters[model_name].append(self._parse_conditional_config(model_name, cond_data))
[docs]
def set_analysis_name(self, name: str | None) -> None:
"""Set ``analysis_name``."""
self.globals.analysis_name = name
[docs]
def set_analysis_description(self, description: str | None) -> None:
"""Set ``analysis_description``."""
self.globals.analysis_description = description
[docs]
def set_new_only(self, value: bool | None) -> None:
"""Set the top-level ``new_only`` flag."""
self.globals.new_only = value
[docs]
def set_create_standard_tables(self, value: bool | None) -> None:
"""Set ``create_standard_tables``."""
self.globals.create_standard_tables = value
[docs]
def set_db_url(self, url: str | None) -> None:
"""Override the database URL."""
self.db_config.url = url
if url is None:
self.db_config.attributes.pop('URL', None)
else:
self.db_config.attributes['URL'] = url
[docs]
def set_db_pragmas(self, pragmas: dict[str, Any]) -> None:
"""Set database pragmas."""
self.db_config.pragmas = dict(pragmas)
[docs]
def set_default(self) -> None:
"""Initialize globals, database, and UI defaults for a fresh builder."""
self.globals.analysis_name = 'analysis-name'
self.globals.analysis_description = 'analysis-description'
self.globals.new_only = True
self.globals.create_standard_tables = True
self.set_db_url('sqlite:///:memory:')
self.set_db_pragmas(dict(cast(dict[str, Any], default_conf['sqlite']['pragmas'])))
self.ui_config.interface = 'rich'
self.enable_db_configuration()
[docs]
def set_db_attribute(self, key: str, value: Any | None) -> None:
"""Store a generic key/value pair inside DBConfiguration."""
if value is None:
self.db_config.attributes.pop(key, None)
return
self.db_config.attributes[key] = value
[docs]
def enable_db_configuration(self) -> None:
"""Ensure the DBConfiguration section will be serialized."""
self.db_config.enabled = True
[docs]
def disable_db_configuration(self) -> None:
"""Prevent the DBConfiguration section from being emitted."""
self.db_config.enabled = False
[docs]
def is_db_configuration_enabled(self) -> bool:
"""Return whether the DBConfiguration section should be present."""
return self.db_config.enabled
[docs]
def set_ui_interface(self, interface: str) -> None:
"""Pick the interface used by ``UserInterface``."""
self.ui_config.interface = interface
[docs]
def add_group(self, name: str, processors: Iterable[str], description: str | None = None) -> None:
"""Register a processor group."""
self.groups[name] = GroupConfig(name=name, processors=[str(p) for p in processors], description=description)
[docs]
def remove_group(self, name: str) -> None:
"""Delete a group by name."""
self.groups.pop(name, None)
[docs]
def list_processors(self) -> list[str]:
"""Return every processor section name currently configured."""
return list(self.processors.keys())
[docs]
def list_groups(self) -> list[str]:
"""Return every group section name currently configured."""
return list(self.groups.keys())
@property
def extra_globals(self) -> dict[str, Any]:
"""Return extra top-level globals preserved from the steering file."""
return self._extra_globals
@property
def document(self) -> tomlkit.TOMLDocument | None:
"""Return the parsed TOML document this builder originated from."""
return self._document
[docs]
def validate(
self, validation_level: ValidationLevel = ValidationLevel.SEMANTIC
) -> list['mafw.mafw_errors.ValidationIssue']:
"""Run steering validation at the requested level and report every issue."""
from .validation import validate as _validate # Avoid circular imports
return _validate(self, validation_level)
[docs]
def to_config_dict(self) -> dict[str, Any]:
"""Return a plain dictionary representing the steering configuration."""
if self._document is not None:
return self._document.value
return self.to_document().value
[docs]
def get_processor_config(self, full_name: str) -> ProcessorConfig:
"""Return the stored configuration for a processor or replica."""
return self.processors[full_name]
[docs]
def get_group(self, name: str) -> GroupConfig:
"""Return the stored configuration for a group section."""
return self.groups[name]
[docs]
def to_document(self, *, validation_level: ValidationLevel | None = None) -> tomlkit.TOMLDocument:
"""Serialize the builder state into a TOML document."""
from .serializer import serialize
return serialize(self, validation_level=validation_level)
[docs]
def write(self, path: Path | str, *, validation_level: ValidationLevel | None = None) -> None:
"""Dump the builder to disk."""
if isinstance(path, str):
path = Path(path)
doc = self.to_document(validation_level=validation_level)
with path.open('w', encoding='utf-8') as handle:
tomlkit.dump(doc, handle)
def _ensure_processor(self, name: str) -> ProcessorConfig:
if name not in self.processors:
self.processors[name] = ProcessorConfig(name=name)
return self.processors[name]