# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""Serialize steering metadata into TOML documents via tomlkit."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import tomlkit
from tomlkit import aot, document, nl, table
from tomlkit.items import AoT
from mafw.db.db_filter import ast_to_string
from mafw.enumerators import LogicalOp
from .models import (
Condition,
ConditionalFilterConfig,
DBConfiguration,
FieldFilterConfig,
GlobalSettings,
GroupConfig,
ModelFilterConfig,
ProcessorConfig,
UIConfiguration,
)
if TYPE_CHECKING:
from mafw.steering.builder import SteeringBuilder, ValidationLevel
[docs]
def serialize(
builder: SteeringBuilder,
*,
validation_level: ValidationLevel | None = None,
) -> tomlkit.TOMLDocument:
"""Convert builder metadata into a TOML document matching the required layout."""
if validation_level is not None:
issues = builder.validate(validation_level)
if issues:
raise issues[0]
doc = document()
_add_globals(doc, builder.globals, builder.extra_globals)
_add_db(doc, builder.db_config)
_add_processors(doc, builder)
_add_groups(doc, builder.groups)
_add_user_interface(doc, builder.ui_config)
return doc
def _add_globals(doc: tomlkit.TOMLDocument, globals_: GlobalSettings, extra_globals: dict[str, Any]) -> None:
doc['processors_to_run'] = globals_.processors_to_run
if globals_.analysis_name is not None:
doc['analysis_name'] = globals_.analysis_name
if globals_.analysis_description is not None:
doc['analysis_description'] = globals_.analysis_description
if globals_.new_only is not None:
doc['new_only'] = globals_.new_only
for key, value in extra_globals.items():
doc[key] = value
if globals_.create_standard_tables is not None:
doc['create_standard_tables'] = globals_.create_standard_tables
doc.add(nl())
def _add_db(doc: tomlkit.TOMLDocument, db_config: DBConfiguration) -> None:
if not db_config.enabled:
return
db_table = table()
for key, value in db_config.attributes.items():
db_table[key] = value
if db_config.pragmas:
pragmas_table = table()
for key, value in db_config.pragmas.items():
pragmas_table[key] = value
db_table['pragmas'] = pragmas_table
doc['DBConfiguration'] = db_table
doc.add(nl())
def _add_processors(doc: tomlkit.TOMLDocument, builder: 'SteeringBuilder') -> None:
base_names = [name for name in builder.processors if '#' not in name]
replica_names = [name for name in builder.processors if '#' in name]
for name in base_names:
_add_processor_table(doc, builder.processors[name])
for name in replica_names:
_add_processor_table(doc, builder.processors[name])
[docs]
def _normalize_logical_op(value: str | LogicalOp) -> LogicalOp | None:
"""Return the LogicalOp enum for the provided value when possible."""
if isinstance(value, LogicalOp):
return value
if not isinstance(value, str):
return None
token = value
if token.startswith('LogicalOp.'):
token = token.split('LogicalOp.', 1)[-1]
try:
return LogicalOp[token]
except KeyError:
return None
try:
return LogicalOp(token)
except ValueError:
return None
[docs]
def _serialize_condition(condition: Condition) -> Any:
"""Convert Condition object to TOML-friendly value (implicit or dict)."""
op = str(condition.operator)
val = condition.value
op_enum = _normalize_logical_op(condition.operator)
if op_enum in (LogicalOp.IS_NULL, LogicalOp.IS_NOT_NULL):
val = True
# If explicitly marked implicit, try to return raw value if possible
if condition.is_implicit:
return val
# Explicit format
return {'op': op, 'value': val}
def _add_processor_table(doc: tomlkit.TOMLDocument, config: ProcessorConfig) -> None:
proc_table = table()
for key, param_config in config.parameters.items():
proc_table[key] = param_config.value
if config.new_only is not None:
proc_table['__new_only__'] = config.new_only
if config.inheritance is not None:
proc_table['__inheritance__'] = config.inheritance
filter_root = table()
filter_added = False
processor_logic = None
if config.logic_dirty and config.logic_ast:
processor_logic = ast_to_string(config.logic_ast)
elif config.logic_str_original:
processor_logic = config.logic_str_original
if processor_logic is not None:
proc_table['__logic__'] = processor_logic
if config.has_filter_root:
filter_root['__logic__'] = processor_logic
filter_added = True
for model_name, filter_list in config.filters.items():
filter_table = table()
model_added = False
# 1. Model Filter (base conditions & logic)
model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None)
if model_config:
model_added = True
if not model_config.enabled:
filter_table['__enable__'] = False
model_logic = None
if model_config.logic_dirty and model_config.logic_ast:
model_logic = ast_to_string(model_config.logic_ast)
elif model_config.logic_str_original:
model_logic = model_config.logic_str_original
if model_logic:
filter_table['__logic__'] = model_logic
for field_name, condition in model_config.conditions.items():
filter_table[field_name] = _serialize_condition(condition)
# 2. Field Filters (sub-tables)
for field_filter in filter_list:
if isinstance(field_filter, FieldFilterConfig):
model_added = True
sub_table = table()
if not field_filter.enabled:
sub_table['__enable__'] = False
field_logic = None
if field_filter.logic_dirty and field_filter.logic_ast:
field_logic = ast_to_string(field_filter.logic_ast)
elif field_filter.logic_str_original:
field_logic = field_filter.logic_str_original
if field_logic:
sub_table['__logic__'] = field_logic
for k, v in field_filter.conditions.items():
sub_table[k] = _serialize_condition(v)
filter_table[field_filter.field_name] = sub_table
# 3. Conditionals
conditionals = [f for f in filter_list if isinstance(f, ConditionalFilterConfig)]
if conditionals:
model_added = True
filter_table['__conditional__'] = _build_conditional_array_from_objects(conditionals)
if model_added:
filter_root[model_name] = filter_table
filter_added = True
if filter_added:
proc_table['__filter__'] = filter_root
doc[config.name] = proc_table
doc.add(nl())
def _build_conditional_array_from_objects(conditionals: list[ConditionalFilterConfig]) -> AoT:
tbl_array = aot()
for cond in conditionals:
tbl = table()
if not cond.enabled:
tbl['__enable__'] = False
if not cond.auto_named:
tbl['name'] = cond.name
if cond.condition:
tbl['condition_field'] = cond.condition_field
tbl['condition_op'] = str(cond.condition.operator)
tbl['condition_value'] = cond.condition.value
if cond.then_clause:
tbl['then_field'] = cond.then_field
tbl['then_op'] = str(cond.then_clause.operator)
tbl['then_value'] = cond.then_clause.value
if cond.else_clause:
tbl['else_field'] = cond.else_field
tbl['else_op'] = str(cond.else_clause.operator)
tbl['else_value'] = cond.else_clause.value
tbl_array.append(tbl)
return tbl_array
def _build_conditional_array(conditionals: list[dict[str, Any]]) -> AoT:
# Deprecated but kept for compatibility if needed, though unused in new logic
tbl_array = aot()
for entry in conditionals:
tbl = table()
for key, value in entry.items():
tbl[key] = value
tbl_array.append(tbl)
return tbl_array
def _add_groups(doc: tomlkit.TOMLDocument, groups: dict[str, GroupConfig]) -> None:
for group in groups.values():
tbl = table()
tbl['processors_to_run'] = group.processors
if group.description is not None:
tbl['description'] = group.description
for attr_key, attr_value in group.attributes.items():
tbl[attr_key] = attr_value
doc[group.name] = tbl
doc.add(nl())
def _add_user_interface(doc: tomlkit.TOMLDocument, ui_config: UIConfiguration) -> None:
ui_table = table()
ui_table['interface'] = ui_config.interface
doc['UserInterface'] = ui_table