Source code for mafw.steering.validation

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""Structural validation helpers for steering metadata."""

from __future__ import annotations

import logging
from typing import Iterable

import mafw.mafw_errors
from mafw.mafw_errors import (
    CyclicGroupIssue,
    DuplicateReplicaIssue,
    InvalidFilterConditionIssue,
    InvalidFilterLogicIssue,
    MissingProcessorsToRunIssue,
    MissingUserInterfaceSectionIssue,
    ProcessorsToRunNotListIssue,
    UnknownGroupMemberIssue,
    UnknownProcessorsToRunEntryIssue,
)
from mafw.tools.regexp import parse_processor_name

from .builder import SteeringBuilder, ValidationLevel
from .models import (
    Condition,
    ConditionalFilterConfig,
    FieldFilterConfig,
    GroupConfig,
    ModelFilterConfig,
    ProcessorConfig,
)

_log = logging.getLogger(__name__)


[docs] def validate( builder: SteeringBuilder, validation_level: ValidationLevel = ValidationLevel.SEMANTIC ) -> list['mafw.mafw_errors.ValidationIssue']: """Run steering validation and return every issue found.""" if validation_level == ValidationLevel.FULL: _log.warning('FULL validation level is not implemented yet; running SEMANTIC checks only') issues: list['mafw.mafw_errors.ValidationIssue'] = [] _validate_global_sections(builder, issues) _validate_processors_to_run(builder, issues) _assert_group_members(builder, issues) _assert_unique_replicas(builder.processors.keys(), issues) _detect_group_cycles(builder.groups, issues) _validate_filter_conditions(builder, issues) _validate_logic_expressions(builder, issues) return issues
def _validate_global_sections(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: if not builder.globals.processors_to_run: issues.append(MissingProcessorsToRunIssue()) document = builder.document if document is not None: processors = document.get('processors_to_run') if processors is not None and not isinstance(processors, list): issues.append(ProcessorsToRunNotListIssue()) if 'UserInterface' not in document: issues.append(MissingUserInterfaceSectionIssue()) def _validate_processors_to_run(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: known_section_names = set(builder.processors) | set(builder.groups) def _is_known(entry: str) -> bool: if entry in known_section_names: return True base_name, _ = parse_processor_name(entry) return base_name in known_section_names for entry in builder.globals.processors_to_run: if not _is_known(entry): issues.append(UnknownProcessorsToRunEntryIssue(entry)) def _assert_group_members(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: known_section_names = set(builder.processors) | set(builder.groups) def _is_known(entry: str) -> bool: if entry in known_section_names: return True base_name, _ = parse_processor_name(entry) return base_name in known_section_names for group in builder.groups.values(): for member in group.processors: if not _is_known(member): issues.append(UnknownGroupMemberIssue(group.name, member)) def _assert_unique_replicas(names: Iterable[str], issues: list['mafw.mafw_errors.ValidationIssue']) -> None: pairs: dict[str, set[str]] = {} for name in names: if '#' not in name: continue base, replica = name.split('#', 1) rep_set = pairs.setdefault(base, set()) if replica in rep_set: issues.append(DuplicateReplicaIssue(base, replica)) rep_set.add(replica) def _detect_group_cycles(groups: dict[str, GroupConfig], issues: list['mafw.mafw_errors.ValidationIssue']) -> None: visiting: set[str] = set() visited: set[str] = set() def visit(node: str) -> None: if node in visited or node not in groups: return if node in visiting: issues.append(CyclicGroupIssue(node)) return visiting.add(node) for child in groups[node].processors: visit(child) visiting.remove(node) visited.add(node) for name in groups: visit(name) def _validate_filter_conditions(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: for processor in builder.processors.values(): for filter_list in processor.filters.values(): for filter_entry in filter_list: if isinstance(filter_entry, ModelFilterConfig): for condition_name, condition in filter_entry.conditions.items(): _report_condition( processor.name, filter_entry.model, condition_name, None, condition, issues, ) elif isinstance(filter_entry, FieldFilterConfig): for condition_name, condition in filter_entry.conditions.items(): _report_condition( processor.name, filter_entry.model, condition_name, filter_entry.field_name, condition, issues, ) elif isinstance(filter_entry, ConditionalFilterConfig): _report_conditional(processor.name, filter_entry, issues) def _validate_logic_expressions(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: for processor in builder.processors.values(): _report_logic_issue(processor.name, processor, issues) for filter_list in processor.filters.values(): for entry in filter_list: if isinstance(entry, ModelFilterConfig): _report_logic_issue(processor.name, entry, issues) elif isinstance(entry, FieldFilterConfig): _report_logic_issue(processor.name, entry, issues) def _report_logic_issue( processor_name: str, config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig, issues: list['mafw.mafw_errors.ValidationIssue'], ) -> None: text = _logic_text(config) if text is None: return if config.logic_is_valid: return issues.append(InvalidFilterLogicIssue(processor_name, _logic_target_name(config), detail=text)) def _logic_target_name(config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig) -> str: if isinstance(config, ProcessorConfig): return 'processor filters' if isinstance(config, ModelFilterConfig): return f"model '{config.model}'" return f"field '{config.model}.{config.field_name}'" def _logic_text(config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig) -> str | None: candidate = config.logic_buffer if config.logic_buffer is not None else config.logic_str_original if candidate is None: return None text = str(candidate) if not text.strip(): return None return text def _report_condition( processor_name: str, model_name: str, condition_name: str, field_name: str | None, condition: Condition | None, issues: list['mafw.mafw_errors.ValidationIssue'], ) -> None: if condition is None or not condition.is_valid: issues.append( InvalidFilterConditionIssue( processor_name, model_name, condition_name, field_name=field_name, reason='missing or invalid value', ) ) def _report_conditional( processor_name: str, config: ConditionalFilterConfig, issues: list['mafw.mafw_errors.ValidationIssue'], ) -> None: if config.condition is None or not config.condition.is_valid: issues.append( InvalidFilterConditionIssue( processor_name, config.model, f'{config.name}:if', field_name=config.condition_field or config.model, reason='missing or invalid IF clause', ) ) if config.then_clause is None or not config.then_clause.is_valid: issues.append( InvalidFilterConditionIssue( processor_name, config.model, f'{config.name}:then', field_name=config.then_field or config.model, reason='missing or invalid THEN clause', ) ) if config.else_clause is not None and not config.else_clause.is_valid: issues.append( InvalidFilterConditionIssue( processor_name, config.model, f'{config.name}:else', field_name=config.else_field or config.model, reason='missing or invalid ELSE clause', ) )