Coverage for src / mafw / steering / validation.py: 99%
130 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""Structural validation helpers for steering metadata."""
6from __future__ import annotations
8import logging
9from typing import Iterable
11import mafw.mafw_errors
12from mafw.mafw_errors import (
13 CyclicGroupIssue,
14 DuplicateReplicaIssue,
15 InvalidFilterConditionIssue,
16 InvalidFilterLogicIssue,
17 MissingProcessorsToRunIssue,
18 MissingUserInterfaceSectionIssue,
19 ProcessorsToRunNotListIssue,
20 UnknownGroupMemberIssue,
21 UnknownProcessorsToRunEntryIssue,
22)
23from mafw.tools.regexp import parse_processor_name
25from .builder import SteeringBuilder, ValidationLevel
26from .models import (
27 Condition,
28 ConditionalFilterConfig,
29 FieldFilterConfig,
30 GroupConfig,
31 ModelFilterConfig,
32 ProcessorConfig,
33)
35_log = logging.getLogger(__name__)
38def validate(
39 builder: SteeringBuilder, validation_level: ValidationLevel = ValidationLevel.SEMANTIC
40) -> list['mafw.mafw_errors.ValidationIssue']:
41 """Run steering validation and return every issue found."""
43 if validation_level == ValidationLevel.FULL:
44 _log.warning('FULL validation level is not implemented yet; running SEMANTIC checks only')
46 issues: list['mafw.mafw_errors.ValidationIssue'] = []
47 _validate_global_sections(builder, issues)
48 _validate_processors_to_run(builder, issues)
49 _assert_group_members(builder, issues)
50 _assert_unique_replicas(builder.processors.keys(), issues)
51 _detect_group_cycles(builder.groups, issues)
52 _validate_filter_conditions(builder, issues)
53 _validate_logic_expressions(builder, issues)
55 return issues
58def _validate_global_sections(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
59 if not builder.globals.processors_to_run:
60 issues.append(MissingProcessorsToRunIssue())
62 document = builder.document
63 if document is not None:
64 processors = document.get('processors_to_run')
65 if processors is not None and not isinstance(processors, list):
66 issues.append(ProcessorsToRunNotListIssue())
67 if 'UserInterface' not in document:
68 issues.append(MissingUserInterfaceSectionIssue())
71def _validate_processors_to_run(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
72 known_section_names = set(builder.processors) | set(builder.groups)
74 def _is_known(entry: str) -> bool:
75 if entry in known_section_names:
76 return True
77 base_name, _ = parse_processor_name(entry)
78 return base_name in known_section_names
80 for entry in builder.globals.processors_to_run:
81 if not _is_known(entry):
82 issues.append(UnknownProcessorsToRunEntryIssue(entry))
85def _assert_group_members(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
86 known_section_names = set(builder.processors) | set(builder.groups)
88 def _is_known(entry: str) -> bool:
89 if entry in known_section_names:
90 return True
91 base_name, _ = parse_processor_name(entry)
92 return base_name in known_section_names
94 for group in builder.groups.values():
95 for member in group.processors:
96 if not _is_known(member):
97 issues.append(UnknownGroupMemberIssue(group.name, member))
100def _assert_unique_replicas(names: Iterable[str], issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
101 pairs: dict[str, set[str]] = {}
102 for name in names:
103 if '#' not in name:
104 continue
105 base, replica = name.split('#', 1)
106 rep_set = pairs.setdefault(base, set())
107 if replica in rep_set:
108 issues.append(DuplicateReplicaIssue(base, replica))
109 rep_set.add(replica)
112def _detect_group_cycles(groups: dict[str, GroupConfig], issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
113 visiting: set[str] = set()
114 visited: set[str] = set()
116 def visit(node: str) -> None:
117 if node in visited or node not in groups:
118 return
119 if node in visiting:
120 issues.append(CyclicGroupIssue(node))
121 return
122 visiting.add(node)
123 for child in groups[node].processors:
124 visit(child)
125 visiting.remove(node)
126 visited.add(node)
128 for name in groups:
129 visit(name)
132def _validate_filter_conditions(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
133 for processor in builder.processors.values():
134 for filter_list in processor.filters.values():
135 for filter_entry in filter_list:
136 if isinstance(filter_entry, ModelFilterConfig):
137 for condition_name, condition in filter_entry.conditions.items():
138 _report_condition(
139 processor.name,
140 filter_entry.model,
141 condition_name,
142 None,
143 condition,
144 issues,
145 )
146 elif isinstance(filter_entry, FieldFilterConfig):
147 for condition_name, condition in filter_entry.conditions.items():
148 _report_condition(
149 processor.name,
150 filter_entry.model,
151 condition_name,
152 filter_entry.field_name,
153 condition,
154 issues,
155 )
156 elif isinstance(filter_entry, ConditionalFilterConfig): 156 ↛ 135line 156 didn't jump to line 135 because the condition on line 156 was always true
157 _report_conditional(processor.name, filter_entry, issues)
160def _validate_logic_expressions(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None:
161 for processor in builder.processors.values():
162 _report_logic_issue(processor.name, processor, issues)
163 for filter_list in processor.filters.values():
164 for entry in filter_list:
165 if isinstance(entry, ModelFilterConfig):
166 _report_logic_issue(processor.name, entry, issues)
167 elif isinstance(entry, FieldFilterConfig):
168 _report_logic_issue(processor.name, entry, issues)
171def _report_logic_issue(
172 processor_name: str,
173 config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig,
174 issues: list['mafw.mafw_errors.ValidationIssue'],
175) -> None:
176 text = _logic_text(config)
177 if text is None:
178 return
179 if config.logic_is_valid:
180 return
181 issues.append(InvalidFilterLogicIssue(processor_name, _logic_target_name(config), detail=text))
184def _logic_target_name(config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig) -> str:
185 if isinstance(config, ProcessorConfig):
186 return 'processor filters'
187 if isinstance(config, ModelFilterConfig):
188 return f"model '{config.model}'"
189 return f"field '{config.model}.{config.field_name}'"
192def _logic_text(config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig) -> str | None:
193 candidate = config.logic_buffer if config.logic_buffer is not None else config.logic_str_original
194 if candidate is None:
195 return None
196 text = str(candidate)
197 if not text.strip():
198 return None
199 return text
202def _report_condition(
203 processor_name: str,
204 model_name: str,
205 condition_name: str,
206 field_name: str | None,
207 condition: Condition | None,
208 issues: list['mafw.mafw_errors.ValidationIssue'],
209) -> None:
210 if condition is None or not condition.is_valid:
211 issues.append(
212 InvalidFilterConditionIssue(
213 processor_name,
214 model_name,
215 condition_name,
216 field_name=field_name,
217 reason='missing or invalid value',
218 )
219 )
222def _report_conditional(
223 processor_name: str,
224 config: ConditionalFilterConfig,
225 issues: list['mafw.mafw_errors.ValidationIssue'],
226) -> None:
227 if config.condition is None or not config.condition.is_valid:
228 issues.append(
229 InvalidFilterConditionIssue(
230 processor_name,
231 config.model,
232 f'{config.name}:if',
233 field_name=config.condition_field or config.model,
234 reason='missing or invalid IF clause',
235 )
236 )
237 if config.then_clause is None or not config.then_clause.is_valid:
238 issues.append(
239 InvalidFilterConditionIssue(
240 processor_name,
241 config.model,
242 f'{config.name}:then',
243 field_name=config.then_field or config.model,
244 reason='missing or invalid THEN clause',
245 )
246 )
247 if config.else_clause is not None and not config.else_clause.is_valid:
248 issues.append(
249 InvalidFilterConditionIssue(
250 processor_name,
251 config.model,
252 f'{config.name}:else',
253 field_name=config.else_field or config.model,
254 reason='missing or invalid ELSE clause',
255 )
256 )