# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""Editable steering configuration builder independent from the execution engine.
:Author: Bulgheroni Antonio
:Description: Provides helpers to construct steering metadata without running processors.
"""
from __future__ import annotations
from collections.abc import Iterable
from enum import Enum
from pathlib import Path
from typing import Any, Mapping, cast
import tomlkit
from tomlkit.items import Array, Item, Table
from tomlkit.toml_file import TOMLFile
import mafw.mafw_errors
from mafw.db.db_configurations import DEFAULT_SQLITE_PRAGMAS
from mafw.db.db_filter import ExprParser
from .models import (
Condition,
ConditionalFilterConfig,
DBConfiguration,
FieldFilterConfig,
FilterConfig,
GlobalSettings,
GroupConfig,
ModelFilterConfig,
ParameterConfig,
ParameterSchemaStatus,
ParameterSource,
ProcessorConfig,
ProcessorRef,
UIConfiguration,
)
[docs]
class ValidationLevel(Enum):
"""Validation tiers that can be requested from the steering builder."""
SEMANTIC = 'semantic'
FULL = 'full'
[docs]
class SteeringBuilder:
"""Editable domain model for building MAFw steering files."""
def __init__(self) -> None:
self.globals = GlobalSettings()
self.processors: dict[str, ProcessorConfig] = {}
self.groups: dict[str, GroupConfig] = {}
self.db_config = DBConfiguration()
self.ui_config = UIConfiguration()
self._extra_globals: dict[str, Any] = {}
self._document: tomlkit.TOMLDocument | None = None
self.set_default()
[docs]
@classmethod
def from_toml(cls, path: Path | str) -> 'SteeringBuilder':
"""Create a builder from an existing steering file while keeping TOML metadata."""
if isinstance(path, str):
path = Path(path)
doc = TOMLFile(path).read()
builder = cls()
builder._document = doc
builder._load_from_document(doc)
return builder
[docs]
@classmethod
def from_toml_text(cls, text: str) -> 'SteeringBuilder':
"""Create a builder from TOML text while keeping TOML metadata."""
doc = tomlkit.parse(text)
builder = cls()
builder._document = doc
builder._load_from_document(doc)
return builder
def _load_from_document(self, doc: tomlkit.TOMLDocument) -> None:
self._extra_globals.clear()
# we assume that there is no DBConfiguration section.
self.disable_db_configuration()
for key, value in doc.items():
if key == 'processors_to_run':
self.globals.processors_to_run = self._ensure_str_list(value)
continue
if key in ('analysis_name', 'analysis_description', 'new_only', 'create_standard_tables'):
setattr(self.globals, key, self._toml_to_python(value))
continue
if key == 'DBConfiguration':
# we got a configuration section for the DB,
# we need to enable it (done inside the _parse_db_config
self._parse_db_config(value)
continue
if key == 'UserInterface':
self._parse_ui_config(value)
continue
if isinstance(value, Table):
parts = key.split('.')
if '__filter__' in parts:
self._parse_filter_section(parts, value)
continue
if 'processors_to_run' in value:
self._parse_group(key, value)
continue
self._parse_processor(key, value)
continue
self._extra_globals[key] = self._toml_to_python(value)
def _ensure_str_list(self, value: Any) -> list[str]:
python_value = self._toml_to_python(value)
if python_value is None:
return []
return [str(item) for item in python_value]
def _toml_to_python(self, value: Any) -> Any:
if isinstance(value, Table):
return {k: self._toml_to_python(v) for k, v in value.items()}
if isinstance(value, Array):
return [self._toml_to_python(item) for item in value]
if isinstance(value, Item):
return value.unwrap()
return value
def _parse_db_config(self, table: Table) -> None:
self.enable_db_configuration()
self.db_config.attributes.clear()
self.db_config.pragmas.clear()
self.db_config.authentication.clear()
self.db_config.parameters.clear()
self.db_config.url = None
for key, value in table.items():
if key == 'pragmas' and isinstance(value, Table):
self.db_config.pragmas = self._toml_to_python(value)
continue
if key == 'authentication' and isinstance(value, Table):
self.db_config.authentication = self._toml_to_python(value)
continue
if key == 'parameters' and isinstance(value, Table):
self.db_config.parameters = self._toml_to_python(value)
# we keep pragmas in a separate dictionary of the db configuration.
# not sure it is really needed, but handy.
pragmas = self.db_config.parameters.get('sqlite', {}).get('pragmas')
if isinstance(pragmas, dict):
self.db_config.pragmas = dict(pragmas)
continue
python_value = self._toml_to_python(value)
self.db_config.attributes[key] = python_value
if key == 'URL':
self.db_config.url = python_value
def _parse_ui_config(self, table: Table) -> None:
interface = table.get('interface')
if interface is not None:
self.ui_config.interface = self._toml_to_python(interface)
def _parse_group(self, name: str, table: Table) -> None:
processors = self._ensure_str_list(table.get('processors_to_run'))
description = self._toml_to_python(table.get('description'))
attributes: dict[str, Any] = {}
for attr_key, attr_value in table.items():
if attr_key in {'processors_to_run', 'description'}:
continue
attributes[attr_key] = self._toml_to_python(attr_value)
self.groups[name] = GroupConfig(
name=name,
processors=processors,
description=description,
attributes=attributes,
)
def _parse_processor(self, name: str, table: Table) -> None:
config = ProcessorConfig(name=name)
for key, value in table.items():
if key == '__filter__' and isinstance(value, Table):
logic_str = value.get('__logic__')
if logic_str is not None:
self._set_logic_expression(config, self._toml_to_python(logic_str))
config.filters = self._load_filters(value)
non_table_entries = any(not isinstance(item, Table) for item in value.values())
config.has_filter_root = non_table_entries
continue
if key == '__logic__':
self._set_logic_expression(config, self._toml_to_python(value))
continue
if key == '__new_only__':
config.new_only = bool(self._toml_to_python(value))
continue
if key == '__inheritance__':
config.inheritance = bool(self._toml_to_python(value))
continue
config.parameters[key] = ParameterConfig(
name=key,
value=self._toml_to_python(value),
source=ParameterSource.CONFIG,
status=ParameterSchemaStatus.OK,
)
self.processors[name] = config
def _parse_condition(self, value: Any) -> Condition:
if isinstance(value, dict) and 'op' in value and 'value' in value:
return Condition(operator=value['op'], value=value['value'], is_implicit=False)
if isinstance(value, list):
return Condition(operator='IN', value=value, is_implicit=True)
if isinstance(value, str):
return Condition(operator='GLOB', value=value, is_implicit=True)
return Condition(operator='==', value=value, is_implicit=True)
def _load_filters(self, table: Table) -> dict[str, list[FilterConfig]]:
result: dict[str, list[FilterConfig]] = {}
for model_name, model_table in table.items():
if model_name == '__logic__':
continue
if not isinstance(model_table, Table):
continue
# Convert TOML table to python dict first to simplify processing
model_data = self._toml_to_python(model_table)
filters: list[FilterConfig] = []
# 1. Model Filter (base conditions)
model_filter = ModelFilterConfig(name=model_name, model=model_name)
if '__enable__' in model_data:
model_filter.enabled = bool(model_data.pop('__enable__'))
# Extract logic for the model
if '__logic__' in model_data:
self._set_logic_expression(model_filter, model_data.pop('__logic__'))
# Extract conditionals
if '__conditional__' in model_data:
conditionals = model_data.pop('__conditional__')
if isinstance(conditionals, list):
for cond_data in conditionals:
filters.append(self._parse_conditional_config(model_name, cond_data))
# Process remaining fields
for field_name, field_value in model_data.items():
if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value):
# It's a Field Filter (sub-table logic)
filters.append(self._parse_field_filter_config(model_name, field_name, field_value))
else:
# It's a simple condition for the Model Filter
model_filter.conditions[field_name] = self._parse_condition(field_value)
filters.insert(0, model_filter)
result[model_name] = filters
return result
def _parse_conditional_config(self, model_name: str, data: dict[str, Any]) -> ConditionalFilterConfig:
config = ConditionalFilterConfig(
name=data.get('name', ''),
model=model_name,
auto_named='name' not in data,
condition_field=data.get('condition_field', ''),
then_field=data.get('then_field', ''),
else_field=data.get('else_field'),
)
if '__enable__' in data:
config.enabled = bool(data.get('__enable__'))
# Only create Condition objects if sufficient data is present
# Or blindly create them but assume they will be serialized only if present?
# The test provided incomplete data, which means we should be robust.
if 'condition_op' in data or 'condition_value' in data:
config.condition = Condition(data.get('condition_op', '=='), data.get('condition_value'), is_implicit=False)
if 'then_op' in data or 'then_value' in data:
config.then_clause = Condition(data.get('then_op', '=='), data.get('then_value'), is_implicit=False)
if 'else_op' in data or 'else_value' in data:
config.else_clause = Condition(data.get('else_op', '=='), data.get('else_value'), is_implicit=False)
return config
def _parse_field_filter_config(self, model_name: str, field_name: str, data: dict[str, Any]) -> FieldFilterConfig:
config = FieldFilterConfig(name=field_name, model=model_name, field_name=field_name)
if '__enable__' in data:
config.enabled = bool(data.pop('__enable__'))
if '__logic__' in data:
self._set_logic_expression(config, data.pop('__logic__'))
for key, value in data.items():
config.conditions[key] = self._parse_condition(value)
return config
def _parse_filter_section(self, parts: list[str], table: Table) -> None:
processor_name = parts[0]
config = self._ensure_processor(processor_name)
if parts[-1] == '__filter__':
logic_str = table.get('__logic__')
if logic_str is not None:
self._set_logic_expression(config, self._toml_to_python(logic_str))
# This is replacing the whole filter section, effectively merging or resetting?
# Existing code seemed to merge.
# "fields = config.filters.setdefault(model_name, {})" -> implied merging or raw access.
# Here we should probably reload the filters for the affected models.
loaded_filters = self._load_filters(table)
for model_name, filter_list in loaded_filters.items():
# We overwrite the filters for this model if they exist, or add them.
# Since _load_filters creates a fresh list, we replace.
config.filters[model_name] = filter_list
config.has_filter_root = True
return
if len(parts) < 3:
return
# [Processor.__filter__.Model]
model_name = parts[-1]
# We need to parse this table as if it was inside __filter__
# Create a temporary table wrapper to reuse _load_filters or parse manually.
# Simpler: convert table to python and parse
model_data = self._toml_to_python(table)
# We need to update existing filters or create new ones for this model.
# If we have existing filters, we should try to preserve them?
# The semantics of TOML parsing usually imply "last definition wins" or "merge".
# But _load_filters constructs a complete list.
# Let's reconstruct the list for this model.
# But wait, if we are parsing [Processor.__filter__.Model], we have the full definition for that model here.
filters: list[FilterConfig] = []
model_filter = ModelFilterConfig(name=model_name, model=model_name)
if '__enable__' in model_data:
model_filter.enabled = bool(model_data.pop('__enable__'))
if '__logic__' in model_data:
self._set_logic_expression(model_filter, model_data.pop('__logic__'))
if '__conditional__' in model_data:
conditionals = model_data.pop('__conditional__')
if isinstance(conditionals, list):
for cond_data in conditionals:
filters.append(self._parse_conditional_config(model_name, cond_data))
for field_name, field_value in model_data.items():
if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value):
filters.append(self._parse_field_filter_config(model_name, field_name, field_value))
else:
model_filter.conditions[field_name] = self._parse_condition(field_value)
filters.insert(0, model_filter)
config.filters[model_name] = filters
[docs]
def add_processor(self, base_name: str, replica: str | None = None) -> None:
"""Add a processor reference to the global processors_to_run list."""
ref = ProcessorRef(base_name=base_name, replica=replica)
target = ref.full_name
if target in self.globals.processors_to_run:
return
self.globals.processors_to_run.append(target)
if replica:
self._ensure_processor(base_name)
self._ensure_processor(target)
[docs]
def remove_processor(self, full_name: str) -> None:
"""Remove a processor or replica from the run list."""
if full_name in self.globals.processors_to_run:
self.globals.processors_to_run.remove(full_name)
[docs]
def set_processors_to_run(self, processors: Iterable[str]) -> None:
"""Overwrite the processors_to_run list."""
self.globals.processors_to_run = [str(p) for p in processors]
[docs]
def set_parameter(self, processor_full_name: str, key: str, value: Any) -> None:
"""Set a processor parameter."""
config = self._ensure_processor(processor_full_name)
if key in config.parameters:
config.parameters[key].value = value
config.parameters[key].source = ParameterSource.CONFIG
config.parameters[key].active_override = True
else:
config.parameters[key] = ParameterConfig(
name=key, value=value, source=ParameterSource.CONFIG, status=ParameterSchemaStatus.OK
)
config.parameters[key].active_override = True
[docs]
def remove_parameter(self, processor_full_name: str, key: str) -> None:
"""Remove a processor parameter override if present."""
config = self._ensure_processor(processor_full_name)
config.parameters.pop(key, None)
[docs]
def clear_parameters(self, processor_full_name: str) -> None:
"""Clear every parameter override for a processor."""
config = self._ensure_processor(processor_full_name)
config.parameters.clear()
[docs]
def add_replica(self, base_name: str, replica: str) -> None:
"""Create a replica entry without touching the base configuration."""
self._ensure_processor(base_name)
self._ensure_processor(f'{base_name}#{replica}')
[docs]
def set_replica_inheritance(self, replica_full_name: str, inheritance: bool | None) -> None:
"""Toggle the inheritance behaviour for a replica."""
config = self._ensure_processor(replica_full_name)
config.inheritance = inheritance
[docs]
def set_processor_new_only(self, processor_full_name: str, new_only: bool | None) -> None:
"""Explicitly set ``__new_only__`` for a processor or replica."""
config = self._ensure_processor(processor_full_name)
config.new_only = new_only
[docs]
def set_filter_config(self, processor_full_name: str, model_name: str, config: dict[str, Any]) -> None:
"""Replace the configuration for a given filter model."""
# Convert dict config back to FilterConfig objects
# We can reuse the logic from _parse_filter_section mostly, but we have a dict here, not TOML Table.
proc = self._ensure_processor(processor_full_name)
filters: list[FilterConfig] = []
model_filter = ModelFilterConfig(name=model_name, model=model_name)
data = config.copy() # Shallow copy to avoid mutation
if '__enable__' in data:
model_filter.enabled = bool(data.pop('__enable__'))
if '__logic__' in data:
model_filter.logic_str_original = data.pop('__logic__')
if model_filter.logic_str_original is not None:
try:
model_filter.logic_ast = ExprParser(model_filter.logic_str_original).parse()
except (mafw.db.db_filter.ParseError, ValueError):
model_filter.logic_ast = None
model_filter.logic_dirty = True
if '__conditional__' in data:
conditionals = data.pop('__conditional__')
if isinstance(conditionals, list):
for cond_data in conditionals:
filters.append(self._parse_conditional_config(model_name, cond_data))
for field_name, field_value in data.items():
if isinstance(field_value, dict) and not ('op' in field_value and 'value' in field_value):
filters.append(self._parse_field_filter_config(model_name, field_name, field_value))
else:
model_filter.conditions[field_name] = self._parse_condition(field_value)
filters.insert(0, model_filter)
proc.filters[model_name] = filters
def _set_logic_expression(
self,
config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig,
logic_text: str | None,
*,
dirty: bool = False,
) -> None:
config.logic_str_original = logic_text
if logic_text is None:
config.logic_ast = None
config.logic_is_valid = True
config.logic_dirty = dirty
return
text = str(logic_text)
if not text.strip():
config.logic_ast = None
config.logic_is_valid = True
config.logic_dirty = dirty
return
try:
config.logic_ast = ExprParser(text).parse()
config.logic_is_valid = True
except (mafw.db.db_filter.ParseError, ValueError):
config.logic_ast = None
config.logic_is_valid = False
config.logic_dirty = dirty
[docs]
def set_filter_field(self, processor_full_name: str, model_name: str, field: str, value: Any) -> None:
"""Set or update a single field within a filter model."""
proc = self._ensure_processor(processor_full_name)
filter_list = proc.filters.setdefault(model_name, [ModelFilterConfig(name=model_name, model=model_name)])
# Find the ModelFilterConfig (it should be there)
model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None)
if model_config is None:
model_config = ModelFilterConfig(name=model_name, model=model_name)
filter_list.insert(0, model_config)
model_config.conditions[field] = self._parse_condition(value)
[docs]
def remove_filter(self, processor_full_name: str, model_name: str) -> None:
"""Remove a filter model definition."""
proc = self._ensure_processor(processor_full_name)
proc.filters.pop(model_name, None)
[docs]
def set_processor_filters(
self, processor_full_name: str, filters: dict[str, list[FilterConfig]], logic: str | None
) -> None:
"""Update the filters and logic for a given processor."""
proc = self._ensure_processor(processor_full_name)
proc.filters = filters
self.set_filter_logic(processor_full_name, logic)
[docs]
def set_filter_logic(self, processor_full_name: str, logic: str | None) -> None:
"""Set the global ``__logic__`` string for the processor filters."""
proc = self._ensure_processor(processor_full_name)
self._set_logic_expression(proc, logic, dirty=True)
[docs]
def set_filter_conditionals(
self, processor_full_name: str, model_name: str, conditionals: list[dict[str, Any]] | None
) -> None:
"""Assign ``__conditional__`` blocks to a filter model."""
proc = self._ensure_processor(processor_full_name)
filter_list = proc.filters.setdefault(model_name, [ModelFilterConfig(name=model_name, model=model_name)])
# Remove existing conditionals
proc.filters[model_name] = [f for f in filter_list if not isinstance(f, ConditionalFilterConfig)]
if conditionals:
for cond_data in conditionals:
proc.filters[model_name].append(self._parse_conditional_config(model_name, cond_data))
[docs]
def set_analysis_name(self, name: str | None) -> None:
"""Set ``analysis_name``."""
self.globals.analysis_name = name
[docs]
def set_analysis_description(self, description: str | None) -> None:
"""Set ``analysis_description``."""
self.globals.analysis_description = description
[docs]
def set_new_only(self, value: bool | None) -> None:
"""Set the top-level ``new_only`` flag."""
self.globals.new_only = value
[docs]
def set_create_standard_tables(self, value: bool | None) -> None:
"""Set ``create_standard_tables``."""
self.globals.create_standard_tables = value
[docs]
def set_db_url(self, url: str | None) -> None:
"""Override the database URL."""
self.db_config.url = url
if url is None:
self.db_config.attributes.pop('URL', None)
else:
self.db_config.attributes['URL'] = url
[docs]
def set_db_pragmas(self, pragmas: dict[str, Any]) -> None:
"""Set database pragmas."""
self.db_config.pragmas = dict(pragmas)
[docs]
def set_db_authentication(self, authentication: Mapping[str, Any] | None) -> None:
"""Set the database authentication mapping."""
self.db_config.authentication = dict(authentication or {})
[docs]
def set_db_parameters(self, parameters: Mapping[str, Mapping[str, Any]] | None) -> None:
"""Set the backend-specific database parameters mapping."""
normalized = {key: dict(value) for key, value in (parameters or {}).items()}
self.db_config.parameters = normalized
sqlite_pragmas = normalized.get('sqlite', {}).get('pragmas')
if isinstance(sqlite_pragmas, Mapping):
self.db_config.pragmas = dict(sqlite_pragmas)
else:
self.db_config.pragmas = {}
[docs]
def set_default(self) -> None:
"""Initialize globals, database, and UI defaults for a fresh builder."""
self.globals.analysis_name = 'analysis-name'
self.globals.analysis_description = 'analysis-description'
self.globals.new_only = True
self.globals.create_standard_tables = True
self.set_db_url('sqlite:///:memory:')
self.set_db_pragmas(dict(cast(dict[str, Any], DEFAULT_SQLITE_PRAGMAS)))
self.ui_config.interface = 'rich'
self.enable_db_configuration()
[docs]
def set_db_attribute(self, key: str, value: Any | None) -> None:
"""Store a generic key/value pair inside DBConfiguration."""
if value is None:
self.db_config.attributes.pop(key, None)
return
self.db_config.attributes[key] = value
[docs]
def enable_db_configuration(self) -> None:
"""Ensure the DBConfiguration section will be serialized."""
self.db_config.enabled = True
[docs]
def disable_db_configuration(self) -> None:
"""Prevent the DBConfiguration section from being emitted."""
self.db_config.enabled = False
[docs]
def is_db_configuration_enabled(self) -> bool:
"""Return whether the DBConfiguration section should be present."""
return self.db_config.enabled
[docs]
def set_ui_interface(self, interface: str) -> None:
"""Pick the interface used by ``UserInterface``."""
self.ui_config.interface = interface
[docs]
def add_group(self, name: str, processors: Iterable[str], description: str | None = None) -> None:
"""Register a processor group."""
self.groups[name] = GroupConfig(name=name, processors=[str(p) for p in processors], description=description)
[docs]
def remove_group(self, name: str) -> None:
"""Delete a group by name."""
self.groups.pop(name, None)
[docs]
def list_processors(self) -> list[str]:
"""Return every processor section name currently configured."""
return list(self.processors.keys())
[docs]
def list_groups(self) -> list[str]:
"""Return every group section name currently configured."""
return list(self.groups.keys())
@property
def extra_globals(self) -> dict[str, Any]:
"""Return extra top-level globals preserved from the steering file."""
return self._extra_globals
@property
def document(self) -> tomlkit.TOMLDocument | None:
"""Return the parsed TOML document this builder originated from."""
return self._document
[docs]
def validate(
self, validation_level: ValidationLevel = ValidationLevel.SEMANTIC
) -> list['mafw.mafw_errors.ValidationIssue']:
"""Run steering validation at the requested level and report every issue."""
from .validation import validate as _validate # Avoid circular imports
return _validate(self, validation_level)
[docs]
def to_config_dict(self) -> dict[str, Any]:
"""Return a plain dictionary representing the steering configuration."""
if self._document is not None:
return self._document.value
return self.to_document().value
[docs]
def get_processor_config(self, full_name: str) -> ProcessorConfig:
"""Return the stored configuration for a processor or replica."""
return self.processors[full_name]
[docs]
def get_group(self, name: str) -> GroupConfig:
"""Return the stored configuration for a group section."""
return self.groups[name]
[docs]
def to_document(self, *, validation_level: ValidationLevel | None = None) -> tomlkit.TOMLDocument:
"""Serialize the builder state into a TOML document."""
from .serializer import serialize
return serialize(self, validation_level=validation_level)
[docs]
def write(self, path: Path | str, *, validation_level: ValidationLevel | None = None) -> None:
"""Dump the builder to disk."""
if isinstance(path, str):
path = Path(path)
doc = self.to_document(validation_level=validation_level)
with path.open('w', encoding='utf-8') as handle:
tomlkit.dump(doc, handle)
def _ensure_processor(self, name: str) -> ProcessorConfig:
if name not in self.processors:
self.processors[name] = ProcessorConfig(name=name)
return self.processors[name]