# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""Structural validation helpers for steering metadata."""
from __future__ import annotations
import logging
from typing import Iterable
import mafw.mafw_errors
from mafw.mafw_errors import (
CyclicGroupIssue,
DuplicateReplicaIssue,
InvalidFilterConditionIssue,
InvalidFilterLogicIssue,
MissingProcessorsToRunIssue,
MissingUserInterfaceSectionIssue,
ProcessorsToRunNotListIssue,
UnknownGroupMemberIssue,
UnknownProcessorsToRunEntryIssue,
)
from mafw.tools.regexp import parse_processor_name
from .builder import SteeringBuilder, ValidationLevel
from .models import (
Condition,
ConditionalFilterConfig,
FieldFilterConfig,
GroupConfig,
ModelFilterConfig,
ProcessorConfig,
)
_log = logging.getLogger(__name__)
[docs]
def validate(
builder: SteeringBuilder, validation_level: ValidationLevel = ValidationLevel.SEMANTIC
) -> list['mafw.mafw_errors.ValidationIssue']:
"""Run steering validation and return every issue found."""
if validation_level == ValidationLevel.FULL:
_log.warning('FULL validation level is not implemented yet; running SEMANTIC checks only')
issues: list['mafw.mafw_errors.ValidationIssue'] = []
_validate_global_sections(builder, issues)
_validate_processors_to_run(builder, issues)
_assert_group_members(builder, issues)
_assert_unique_replicas(builder.processors.keys(), issues)
_detect_group_cycles(builder.groups, issues)
_validate_filter_conditions(builder, issues)
_validate_logic_expressions(builder, issues)
return issues
def _validate_global_sections(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
if not builder.globals.processors_to_run:
issues.append(MissingProcessorsToRunIssue())
document = builder.document
if document is not None:
processors = document.get('processors_to_run')
if processors is not None and not isinstance(processors, list):
issues.append(ProcessorsToRunNotListIssue())
if 'UserInterface' not in document:
issues.append(MissingUserInterfaceSectionIssue())
def _validate_processors_to_run(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
known_section_names = set(builder.processors) | set(builder.groups)
def _is_known(entry: str) -> bool:
if entry in known_section_names:
return True
base_name, _ = parse_processor_name(entry)
return base_name in known_section_names
for entry in builder.globals.processors_to_run:
if not _is_known(entry):
issues.append(UnknownProcessorsToRunEntryIssue(entry))
def _assert_group_members(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
known_section_names = set(builder.processors) | set(builder.groups)
def _is_known(entry: str) -> bool:
if entry in known_section_names:
return True
base_name, _ = parse_processor_name(entry)
return base_name in known_section_names
for group in builder.groups.values():
for member in group.processors:
if not _is_known(member):
issues.append(UnknownGroupMemberIssue(group.name, member))
def _assert_unique_replicas(names: Iterable[str], issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
pairs: dict[str, set[str]] = {}
for name in names:
if '#' not in name:
continue
base, replica = name.split('#', 1)
rep_set = pairs.setdefault(base, set())
if replica in rep_set:
issues.append(DuplicateReplicaIssue(base, replica))
rep_set.add(replica)
def _detect_group_cycles(groups: dict[str, GroupConfig], issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
visiting: set[str] = set()
visited: set[str] = set()
def visit(node: str) -> None:
if node in visited or node not in groups:
return
if node in visiting:
issues.append(CyclicGroupIssue(node))
return
visiting.add(node)
for child in groups[node].processors:
visit(child)
visiting.remove(node)
visited.add(node)
for name in groups:
visit(name)
def _validate_filter_conditions(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
for processor in builder.processors.values():
for filter_list in processor.filters.values():
for filter_entry in filter_list:
if isinstance(filter_entry, ModelFilterConfig):
for condition_name, condition in filter_entry.conditions.items():
_report_condition(
processor.name,
filter_entry.model,
condition_name,
None,
condition,
issues,
)
elif isinstance(filter_entry, FieldFilterConfig):
for condition_name, condition in filter_entry.conditions.items():
_report_condition(
processor.name,
filter_entry.model,
condition_name,
filter_entry.field_name,
condition,
issues,
)
elif isinstance(filter_entry, ConditionalFilterConfig):
_report_conditional(processor.name, filter_entry, issues)
def _validate_logic_expressions(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
for processor in builder.processors.values():
_report_logic_issue(processor.name, processor, issues)
for filter_list in processor.filters.values():
for entry in filter_list:
if isinstance(entry, ModelFilterConfig):
_report_logic_issue(processor.name, entry, issues)
elif isinstance(entry, FieldFilterConfig):
_report_logic_issue(processor.name, entry, issues)
def _report_logic_issue(
processor_name: str,
config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig,
issues: list['mafw.mafw_errors.ValidationIssue'],
) -> None:
text = _logic_text(config)
if text is None:
return
if config.logic_is_valid:
return
issues.append(InvalidFilterLogicIssue(processor_name, _logic_target_name(config), detail=text))
def _logic_target_name(config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig) -> str:
if isinstance(config, ProcessorConfig):
return 'processor filters'
if isinstance(config, ModelFilterConfig):
return f"model '{config.model}'"
return f"field '{config.model}.{config.field_name}'"
def _logic_text(config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig) -> str | None:
candidate = config.logic_buffer if config.logic_buffer is not None else config.logic_str_original
if candidate is None:
return None
text = str(candidate)
if not text.strip():
return None
return text
def _report_condition(
processor_name: str,
model_name: str,
condition_name: str,
field_name: str | None,
condition: Condition | None,
issues: list['mafw.mafw_errors.ValidationIssue'],
) -> None:
if condition is None or not condition.is_valid:
issues.append(
InvalidFilterConditionIssue(
processor_name,
model_name,
condition_name,
field_name=field_name,
reason='missing or invalid value',
)
)
def _report_conditional(
processor_name: str,
config: ConditionalFilterConfig,
issues: list['mafw.mafw_errors.ValidationIssue'],
) -> None:
if config.condition is None or not config.condition.is_valid:
issues.append(
InvalidFilterConditionIssue(
processor_name,
config.model,
f'{config.name}:if',
field_name=config.condition_field or config.model,
reason='missing or invalid IF clause',
)
)
if config.then_clause is None or not config.then_clause.is_valid:
issues.append(
InvalidFilterConditionIssue(
processor_name,
config.model,
f'{config.name}:then',
field_name=config.then_field or config.model,
reason='missing or invalid THEN clause',
)
)
if config.else_clause is not None and not config.else_clause.is_valid:
issues.append(
InvalidFilterConditionIssue(
processor_name,
config.model,
f'{config.name}:else',
field_name=config.else_field or config.model,
reason='missing or invalid ELSE clause',
)
)