Source code for mafw.steering.serializer

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""Serialize steering metadata into TOML documents via tomlkit."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

import tomlkit
from tomlkit import aot, document, nl, table
from tomlkit.items import AoT

from mafw.db.db_filter import ast_to_string
from mafw.enumerators import LogicalOp

from .models import (
    Condition,
    ConditionalFilterConfig,
    DBConfiguration,
    FieldFilterConfig,
    GlobalSettings,
    GroupConfig,
    ModelFilterConfig,
    ProcessorConfig,
    UIConfiguration,
)

if TYPE_CHECKING:
    from mafw.steering.builder import SteeringBuilder, ValidationLevel


[docs] def serialize( builder: SteeringBuilder, *, validation_level: ValidationLevel | None = None, ) -> tomlkit.TOMLDocument: """Convert builder metadata into a TOML document matching the required layout.""" if validation_level is not None: issues = builder.validate(validation_level) if issues: raise issues[0] doc = document() _add_globals(doc, builder.globals, builder.extra_globals) _add_db(doc, builder.db_config) _add_processors(doc, builder) _add_groups(doc, builder.groups) _add_user_interface(doc, builder.ui_config) return doc
def _add_globals(doc: tomlkit.TOMLDocument, globals_: GlobalSettings, extra_globals: dict[str, Any]) -> None: doc['processors_to_run'] = globals_.processors_to_run if globals_.analysis_name is not None: doc['analysis_name'] = globals_.analysis_name if globals_.analysis_description is not None: doc['analysis_description'] = globals_.analysis_description if globals_.new_only is not None: doc['new_only'] = globals_.new_only for key, value in extra_globals.items(): doc[key] = value if globals_.create_standard_tables is not None: doc['create_standard_tables'] = globals_.create_standard_tables doc.add(nl()) def _add_db(doc: tomlkit.TOMLDocument, db_config: DBConfiguration) -> None: if not db_config.enabled: return db_table = table() for key, value in db_config.attributes.items(): db_table[key] = value if db_config.pragmas: pragmas_table = table() for key, value in db_config.pragmas.items(): pragmas_table[key] = value db_table['pragmas'] = pragmas_table doc['DBConfiguration'] = db_table doc.add(nl()) def _add_processors(doc: tomlkit.TOMLDocument, builder: 'SteeringBuilder') -> None: base_names = [name for name in builder.processors if '#' not in name] replica_names = [name for name in builder.processors if '#' in name] for name in base_names: _add_processor_table(doc, builder.processors[name]) for name in replica_names: _add_processor_table(doc, builder.processors[name])
[docs] def _normalize_logical_op(value: str | LogicalOp) -> LogicalOp | None: """Return the LogicalOp enum for the provided value when possible.""" if isinstance(value, LogicalOp): return value if not isinstance(value, str): return None token = value if token.startswith('LogicalOp.'): token = token.split('LogicalOp.', 1)[-1] try: return LogicalOp[token] except KeyError: return None try: return LogicalOp(token) except ValueError: return None
[docs] def _serialize_condition(condition: Condition) -> Any: """Convert Condition object to TOML-friendly value (implicit or dict).""" op = str(condition.operator) val = condition.value op_enum = _normalize_logical_op(condition.operator) if op_enum in (LogicalOp.IS_NULL, LogicalOp.IS_NOT_NULL): val = True # If explicitly marked implicit, try to return raw value if possible if condition.is_implicit: return val # Explicit format return {'op': op, 'value': val}
def _add_processor_table(doc: tomlkit.TOMLDocument, config: ProcessorConfig) -> None: proc_table = table() for key, param_config in config.parameters.items(): proc_table[key] = param_config.value if config.new_only is not None: proc_table['__new_only__'] = config.new_only if config.inheritance is not None: proc_table['__inheritance__'] = config.inheritance filter_root = table() filter_added = False processor_logic = None if config.logic_dirty and config.logic_ast: processor_logic = ast_to_string(config.logic_ast) elif config.logic_str_original: processor_logic = config.logic_str_original if processor_logic is not None: proc_table['__logic__'] = processor_logic if config.has_filter_root: filter_root['__logic__'] = processor_logic filter_added = True for model_name, filter_list in config.filters.items(): filter_table = table() model_added = False # 1. Model Filter (base conditions & logic) model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None) if model_config: model_added = True if not model_config.enabled: filter_table['__enable__'] = False model_logic = None if model_config.logic_dirty and model_config.logic_ast: model_logic = ast_to_string(model_config.logic_ast) elif model_config.logic_str_original: model_logic = model_config.logic_str_original if model_logic: filter_table['__logic__'] = model_logic for field_name, condition in model_config.conditions.items(): filter_table[field_name] = _serialize_condition(condition) # 2. Field Filters (sub-tables) for field_filter in filter_list: if isinstance(field_filter, FieldFilterConfig): model_added = True sub_table = table() if not field_filter.enabled: sub_table['__enable__'] = False field_logic = None if field_filter.logic_dirty and field_filter.logic_ast: field_logic = ast_to_string(field_filter.logic_ast) elif field_filter.logic_str_original: field_logic = field_filter.logic_str_original if field_logic: sub_table['__logic__'] = field_logic for k, v in field_filter.conditions.items(): sub_table[k] = _serialize_condition(v) filter_table[field_filter.field_name] = sub_table # 3. Conditionals conditionals = [f for f in filter_list if isinstance(f, ConditionalFilterConfig)] if conditionals: model_added = True filter_table['__conditional__'] = _build_conditional_array_from_objects(conditionals) if model_added: filter_root[model_name] = filter_table filter_added = True if filter_added: proc_table['__filter__'] = filter_root doc[config.name] = proc_table doc.add(nl()) def _build_conditional_array_from_objects(conditionals: list[ConditionalFilterConfig]) -> AoT: tbl_array = aot() for cond in conditionals: tbl = table() if not cond.enabled: tbl['__enable__'] = False if not cond.auto_named: tbl['name'] = cond.name if cond.condition: tbl['condition_field'] = cond.condition_field tbl['condition_op'] = str(cond.condition.operator) tbl['condition_value'] = cond.condition.value if cond.then_clause: tbl['then_field'] = cond.then_field tbl['then_op'] = str(cond.then_clause.operator) tbl['then_value'] = cond.then_clause.value if cond.else_clause: tbl['else_field'] = cond.else_field tbl['else_op'] = str(cond.else_clause.operator) tbl['else_value'] = cond.else_clause.value tbl_array.append(tbl) return tbl_array def _build_conditional_array(conditionals: list[dict[str, Any]]) -> AoT: # Deprecated but kept for compatibility if needed, though unused in new logic tbl_array = aot() for entry in conditionals: tbl = table() for key, value in entry.items(): tbl[key] = value tbl_array.append(tbl) return tbl_array def _add_groups(doc: tomlkit.TOMLDocument, groups: dict[str, GroupConfig]) -> None: for group in groups.values(): tbl = table() tbl['processors_to_run'] = group.processors if group.description is not None: tbl['description'] = group.description for attr_key, attr_value in group.attributes.items(): tbl[attr_key] = attr_value doc[group.name] = tbl doc.add(nl()) def _add_user_interface(doc: tomlkit.TOMLDocument, ui_config: UIConfiguration) -> None: ui_table = table() ui_table['interface'] = ui_config.interface doc['UserInterface'] = ui_table