Coverage for src / mafw / steering / validation.py: 99%

130 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 16:10 +0000

1# Copyright 2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4"""Structural validation helpers for steering metadata.""" 

5 

6from __future__ import annotations 

7 

8import logging 

9from typing import Iterable 

10 

11import mafw.mafw_errors 

12from mafw.mafw_errors import ( 

13 CyclicGroupIssue, 

14 DuplicateReplicaIssue, 

15 InvalidFilterConditionIssue, 

16 InvalidFilterLogicIssue, 

17 MissingProcessorsToRunIssue, 

18 MissingUserInterfaceSectionIssue, 

19 ProcessorsToRunNotListIssue, 

20 UnknownGroupMemberIssue, 

21 UnknownProcessorsToRunEntryIssue, 

22) 

23from mafw.tools.regexp import parse_processor_name 

24 

25from .builder import SteeringBuilder, ValidationLevel 

26from .models import ( 

27 Condition, 

28 ConditionalFilterConfig, 

29 FieldFilterConfig, 

30 GroupConfig, 

31 ModelFilterConfig, 

32 ProcessorConfig, 

33) 

34 

35_log = logging.getLogger(__name__) 

36 

37 

38def validate( 

39 builder: SteeringBuilder, validation_level: ValidationLevel = ValidationLevel.SEMANTIC 

40) -> list['mafw.mafw_errors.ValidationIssue']: 

41 """Run steering validation and return every issue found.""" 

42 

43 if validation_level == ValidationLevel.FULL: 

44 _log.warning('FULL validation level is not implemented yet; running SEMANTIC checks only') 

45 

46 issues: list['mafw.mafw_errors.ValidationIssue'] = [] 

47 _validate_global_sections(builder, issues) 

48 _validate_processors_to_run(builder, issues) 

49 _assert_group_members(builder, issues) 

50 _assert_unique_replicas(builder.processors.keys(), issues) 

51 _detect_group_cycles(builder.groups, issues) 

52 _validate_filter_conditions(builder, issues) 

53 _validate_logic_expressions(builder, issues) 

54 

55 return issues 

56 

57 

58def _validate_global_sections(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: 

59 if not builder.globals.processors_to_run: 

60 issues.append(MissingProcessorsToRunIssue()) 

61 

62 document = builder.document 

63 if document is not None: 

64 processors = document.get('processors_to_run') 

65 if processors is not None and not isinstance(processors, list): 

66 issues.append(ProcessorsToRunNotListIssue()) 

67 if 'UserInterface' not in document: 

68 issues.append(MissingUserInterfaceSectionIssue()) 

69 

70 

71def _validate_processors_to_run(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: 

72 known_section_names = set(builder.processors) | set(builder.groups) 

73 

74 def _is_known(entry: str) -> bool: 

75 if entry in known_section_names: 

76 return True 

77 base_name, _ = parse_processor_name(entry) 

78 return base_name in known_section_names 

79 

80 for entry in builder.globals.processors_to_run: 

81 if not _is_known(entry): 

82 issues.append(UnknownProcessorsToRunEntryIssue(entry)) 

83 

84 

85def _assert_group_members(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: 

86 known_section_names = set(builder.processors) | set(builder.groups) 

87 

88 def _is_known(entry: str) -> bool: 

89 if entry in known_section_names: 

90 return True 

91 base_name, _ = parse_processor_name(entry) 

92 return base_name in known_section_names 

93 

94 for group in builder.groups.values(): 

95 for member in group.processors: 

96 if not _is_known(member): 

97 issues.append(UnknownGroupMemberIssue(group.name, member)) 

98 

99 

100def _assert_unique_replicas(names: Iterable[str], issues: list['mafw.mafw_errors.ValidationIssue']) -> None: 

101 pairs: dict[str, set[str]] = {} 

102 for name in names: 

103 if '#' not in name: 

104 continue 

105 base, replica = name.split('#', 1) 

106 rep_set = pairs.setdefault(base, set()) 

107 if replica in rep_set: 

108 issues.append(DuplicateReplicaIssue(base, replica)) 

109 rep_set.add(replica) 

110 

111 

112def _detect_group_cycles(groups: dict[str, GroupConfig], issues: list['mafw.mafw_errors.ValidationIssue']) -> None: 

113 visiting: set[str] = set() 

114 visited: set[str] = set() 

115 

116 def visit(node: str) -> None: 

117 if node in visited or node not in groups: 

118 return 

119 if node in visiting: 

120 issues.append(CyclicGroupIssue(node)) 

121 return 

122 visiting.add(node) 

123 for child in groups[node].processors: 

124 visit(child) 

125 visiting.remove(node) 

126 visited.add(node) 

127 

128 for name in groups: 

129 visit(name) 

130 

131 

132def _validate_filter_conditions(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: 

133 for processor in builder.processors.values(): 

134 for filter_list in processor.filters.values(): 

135 for filter_entry in filter_list: 

136 if isinstance(filter_entry, ModelFilterConfig): 

137 for condition_name, condition in filter_entry.conditions.items(): 

138 _report_condition( 

139 processor.name, 

140 filter_entry.model, 

141 condition_name, 

142 None, 

143 condition, 

144 issues, 

145 ) 

146 elif isinstance(filter_entry, FieldFilterConfig): 

147 for condition_name, condition in filter_entry.conditions.items(): 

148 _report_condition( 

149 processor.name, 

150 filter_entry.model, 

151 condition_name, 

152 filter_entry.field_name, 

153 condition, 

154 issues, 

155 ) 

156 elif isinstance(filter_entry, ConditionalFilterConfig): 156 ↛ 135line 156 didn't jump to line 135 because the condition on line 156 was always true

157 _report_conditional(processor.name, filter_entry, issues) 

158 

159 

160def _validate_logic_expressions(builder: SteeringBuilder, issues: list['mafw.mafw_errors.ValidationIssue']) -> None: 

161 for processor in builder.processors.values(): 

162 _report_logic_issue(processor.name, processor, issues) 

163 for filter_list in processor.filters.values(): 

164 for entry in filter_list: 

165 if isinstance(entry, ModelFilterConfig): 

166 _report_logic_issue(processor.name, entry, issues) 

167 elif isinstance(entry, FieldFilterConfig): 

168 _report_logic_issue(processor.name, entry, issues) 

169 

170 

171def _report_logic_issue( 

172 processor_name: str, 

173 config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig, 

174 issues: list['mafw.mafw_errors.ValidationIssue'], 

175) -> None: 

176 text = _logic_text(config) 

177 if text is None: 

178 return 

179 if config.logic_is_valid: 

180 return 

181 issues.append(InvalidFilterLogicIssue(processor_name, _logic_target_name(config), detail=text)) 

182 

183 

184def _logic_target_name(config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig) -> str: 

185 if isinstance(config, ProcessorConfig): 

186 return 'processor filters' 

187 if isinstance(config, ModelFilterConfig): 

188 return f"model '{config.model}'" 

189 return f"field '{config.model}.{config.field_name}'" 

190 

191 

192def _logic_text(config: ProcessorConfig | ModelFilterConfig | FieldFilterConfig) -> str | None: 

193 candidate = config.logic_buffer if config.logic_buffer is not None else config.logic_str_original 

194 if candidate is None: 

195 return None 

196 text = str(candidate) 

197 if not text.strip(): 

198 return None 

199 return text 

200 

201 

202def _report_condition( 

203 processor_name: str, 

204 model_name: str, 

205 condition_name: str, 

206 field_name: str | None, 

207 condition: Condition | None, 

208 issues: list['mafw.mafw_errors.ValidationIssue'], 

209) -> None: 

210 if condition is None or not condition.is_valid: 

211 issues.append( 

212 InvalidFilterConditionIssue( 

213 processor_name, 

214 model_name, 

215 condition_name, 

216 field_name=field_name, 

217 reason='missing or invalid value', 

218 ) 

219 ) 

220 

221 

222def _report_conditional( 

223 processor_name: str, 

224 config: ConditionalFilterConfig, 

225 issues: list['mafw.mafw_errors.ValidationIssue'], 

226) -> None: 

227 if config.condition is None or not config.condition.is_valid: 

228 issues.append( 

229 InvalidFilterConditionIssue( 

230 processor_name, 

231 config.model, 

232 f'{config.name}:if', 

233 field_name=config.condition_field or config.model, 

234 reason='missing or invalid IF clause', 

235 ) 

236 ) 

237 if config.then_clause is None or not config.then_clause.is_valid: 

238 issues.append( 

239 InvalidFilterConditionIssue( 

240 processor_name, 

241 config.model, 

242 f'{config.name}:then', 

243 field_name=config.then_field or config.model, 

244 reason='missing or invalid THEN clause', 

245 ) 

246 ) 

247 if config.else_clause is not None and not config.else_clause.is_valid: 

248 issues.append( 

249 InvalidFilterConditionIssue( 

250 processor_name, 

251 config.model, 

252 f'{config.name}:else', 

253 field_name=config.else_field or config.model, 

254 reason='missing or invalid ELSE clause', 

255 ) 

256 )