Coverage for src / mafw / steering / serializer.py: 98%

205 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 09:03 +0000

1# Copyright 2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4"""Serialize steering metadata into TOML documents via tomlkit.""" 

5 

6from __future__ import annotations 

7 

8from typing import TYPE_CHECKING, Any 

9 

10import tomlkit 

11from tomlkit import aot, document, nl, table 

12from tomlkit.items import AoT 

13 

14from mafw.db.db_filter import ast_to_string 

15from mafw.enumerators import LogicalOp 

16 

17from .models import ( 

18 Condition, 

19 ConditionalFilterConfig, 

20 DBConfiguration, 

21 FieldFilterConfig, 

22 GlobalSettings, 

23 GroupConfig, 

24 ModelFilterConfig, 

25 ProcessorConfig, 

26 UIConfiguration, 

27) 

28 

29if TYPE_CHECKING: 

30 from mafw.steering.builder import SteeringBuilder, ValidationLevel 

31 

32 

33def serialize( 

34 builder: SteeringBuilder, 

35 *, 

36 validation_level: ValidationLevel | None = None, 

37) -> tomlkit.TOMLDocument: 

38 """Convert builder metadata into a TOML document matching the required layout.""" 

39 

40 if validation_level is not None: 

41 issues = builder.validate(validation_level) 

42 if issues: 

43 raise issues[0] 

44 doc = document() 

45 _add_globals(doc, builder.globals, builder.extra_globals) 

46 _add_db(doc, builder.db_config) 

47 _add_processors(doc, builder) 

48 _add_groups(doc, builder.groups) 

49 _add_user_interface(doc, builder.ui_config) 

50 return doc 

51 

52 

53def _add_globals(doc: tomlkit.TOMLDocument, globals_: GlobalSettings, extra_globals: dict[str, Any]) -> None: 

54 doc['processors_to_run'] = globals_.processors_to_run 

55 if globals_.analysis_name is not None: 

56 doc['analysis_name'] = globals_.analysis_name 

57 if globals_.analysis_description is not None: 

58 doc['analysis_description'] = globals_.analysis_description 

59 if globals_.new_only is not None: 

60 doc['new_only'] = globals_.new_only 

61 for key, value in extra_globals.items(): 

62 doc[key] = value 

63 if globals_.create_standard_tables is not None: 

64 doc['create_standard_tables'] = globals_.create_standard_tables 

65 doc.add(nl()) 

66 

67 

68def _add_db(doc: tomlkit.TOMLDocument, db_config: DBConfiguration) -> None: 

69 if not db_config.enabled: 

70 return 

71 db_table = table() 

72 for key, value in db_config.attributes.items(): 

73 db_table[key] = value 

74 if db_config.authentication or db_config.parameters: 

75 if db_config.authentication: 

76 auth_table = table() 

77 for key, value in db_config.authentication.items(): 

78 auth_table[key] = value 

79 db_table['authentication'] = auth_table 

80 if db_config.parameters: 80 ↛ 99line 80 didn't jump to line 99 because the condition on line 80 was always true

81 params_table = table() 

82 for backend, params in db_config.parameters.items(): 

83 backend_table = table() 

84 for key, value in params.items(): 

85 if key == 'pragmas' and isinstance(value, dict): 85 ↛ 91line 85 didn't jump to line 91 because the condition on line 85 was always true

86 pragmas_table = table() 

87 for pragma_key, pragma_value in value.items(): 

88 pragmas_table[pragma_key] = pragma_value 

89 backend_table['pragmas'] = pragmas_table 

90 else: 

91 backend_table[key] = value 

92 params_table[backend] = backend_table 

93 db_table['parameters'] = params_table 

94 elif db_config.pragmas: 94 ↛ 99line 94 didn't jump to line 99 because the condition on line 94 was always true

95 pragmas_table = table() 

96 for key, value in db_config.pragmas.items(): 

97 pragmas_table[key] = value 

98 db_table['pragmas'] = pragmas_table 

99 doc['DBConfiguration'] = db_table 

100 doc.add(nl()) 

101 

102 

103def _add_processors(doc: tomlkit.TOMLDocument, builder: 'SteeringBuilder') -> None: 

104 base_names = [name for name in builder.processors if '#' not in name] 

105 replica_names = [name for name in builder.processors if '#' in name] 

106 for name in base_names: 

107 _add_processor_table(doc, builder.processors[name]) 

108 for name in replica_names: 

109 _add_processor_table(doc, builder.processors[name]) 

110 

111 

112def _normalize_logical_op(value: str | LogicalOp) -> LogicalOp | None: 

113 """Return the LogicalOp enum for the provided value when possible.""" 

114 

115 if isinstance(value, LogicalOp): 

116 return value 

117 if not isinstance(value, str): 

118 return None 

119 token = value 

120 if token.startswith('LogicalOp.'): 

121 token = token.split('LogicalOp.', 1)[-1] 

122 try: 

123 return LogicalOp[token] 

124 except KeyError: 

125 return None 

126 try: 

127 return LogicalOp(token) 

128 except ValueError: 

129 return None 

130 

131 

132def _serialize_condition(condition: Condition) -> Any: 

133 """Convert Condition object to TOML-friendly value (implicit or dict).""" 

134 

135 op = str(condition.operator) 

136 val = condition.value 

137 op_enum = _normalize_logical_op(condition.operator) 

138 if op_enum in (LogicalOp.IS_NULL, LogicalOp.IS_NOT_NULL): 

139 val = True 

140 

141 # If explicitly marked implicit, try to return raw value if possible 

142 if condition.is_implicit: 

143 return val 

144 

145 # Explicit format 

146 return {'op': op, 'value': val} 

147 

148 

149def _add_processor_table(doc: tomlkit.TOMLDocument, config: ProcessorConfig) -> None: 

150 proc_table = table() 

151 for key, param_config in config.parameters.items(): 

152 proc_table[key] = param_config.value 

153 if config.new_only is not None: 

154 proc_table['__new_only__'] = config.new_only 

155 if config.inheritance is not None: 

156 proc_table['__inheritance__'] = config.inheritance 

157 filter_root = table() 

158 filter_added = False 

159 

160 processor_logic = None 

161 if config.logic_dirty and config.logic_ast: 

162 processor_logic = ast_to_string(config.logic_ast) 

163 elif config.logic_str_original: 

164 processor_logic = config.logic_str_original 

165 

166 if processor_logic is not None: 

167 proc_table['__logic__'] = processor_logic 

168 if config.has_filter_root: 

169 filter_root['__logic__'] = processor_logic 

170 filter_added = True 

171 

172 for model_name, filter_list in config.filters.items(): 

173 filter_table = table() 

174 model_added = False 

175 

176 # 1. Model Filter (base conditions & logic) 

177 model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None) 

178 if model_config: 

179 model_added = True 

180 if not model_config.enabled: 

181 filter_table['__enable__'] = False 

182 model_logic = None 

183 if model_config.logic_dirty and model_config.logic_ast: 

184 model_logic = ast_to_string(model_config.logic_ast) 

185 elif model_config.logic_str_original: 

186 model_logic = model_config.logic_str_original 

187 

188 if model_logic: 

189 filter_table['__logic__'] = model_logic 

190 for field_name, condition in model_config.conditions.items(): 

191 filter_table[field_name] = _serialize_condition(condition) 

192 

193 # 2. Field Filters (sub-tables) 

194 for field_filter in filter_list: 

195 if isinstance(field_filter, FieldFilterConfig): 

196 model_added = True 

197 sub_table = table() 

198 if not field_filter.enabled: 

199 sub_table['__enable__'] = False 

200 field_logic = None 

201 if field_filter.logic_dirty and field_filter.logic_ast: 

202 field_logic = ast_to_string(field_filter.logic_ast) 

203 elif field_filter.logic_str_original: 

204 field_logic = field_filter.logic_str_original 

205 

206 if field_logic: 

207 sub_table['__logic__'] = field_logic 

208 for k, v in field_filter.conditions.items(): 

209 sub_table[k] = _serialize_condition(v) 

210 filter_table[field_filter.field_name] = sub_table 

211 

212 # 3. Conditionals 

213 conditionals = [f for f in filter_list if isinstance(f, ConditionalFilterConfig)] 

214 if conditionals: 

215 model_added = True 

216 filter_table['__conditional__'] = _build_conditional_array_from_objects(conditionals) 

217 

218 if model_added: 218 ↛ 172line 218 didn't jump to line 172 because the condition on line 218 was always true

219 filter_root[model_name] = filter_table 

220 filter_added = True 

221 

222 if filter_added: 

223 proc_table['__filter__'] = filter_root 

224 doc[config.name] = proc_table 

225 doc.add(nl()) 

226 

227 

228def _build_conditional_array_from_objects(conditionals: list[ConditionalFilterConfig]) -> AoT: 

229 tbl_array = aot() 

230 for cond in conditionals: 

231 tbl = table() 

232 if not cond.enabled: 

233 tbl['__enable__'] = False 

234 if not cond.auto_named: 

235 tbl['name'] = cond.name 

236 

237 if cond.condition: 

238 tbl['condition_field'] = cond.condition_field 

239 tbl['condition_op'] = str(cond.condition.operator) 

240 tbl['condition_value'] = cond.condition.value 

241 

242 if cond.then_clause: 

243 tbl['then_field'] = cond.then_field 

244 tbl['then_op'] = str(cond.then_clause.operator) 

245 tbl['then_value'] = cond.then_clause.value 

246 

247 if cond.else_clause: 

248 tbl['else_field'] = cond.else_field 

249 tbl['else_op'] = str(cond.else_clause.operator) 

250 tbl['else_value'] = cond.else_clause.value 

251 

252 tbl_array.append(tbl) 

253 return tbl_array 

254 

255 

256def _build_conditional_array(conditionals: list[dict[str, Any]]) -> AoT: 

257 # Deprecated but kept for compatibility if needed, though unused in new logic 

258 tbl_array = aot() 

259 for entry in conditionals: 

260 tbl = table() 

261 for key, value in entry.items(): 

262 tbl[key] = value 

263 tbl_array.append(tbl) 

264 return tbl_array 

265 

266 

267def _add_groups(doc: tomlkit.TOMLDocument, groups: dict[str, GroupConfig]) -> None: 

268 for group in groups.values(): 

269 tbl = table() 

270 tbl['processors_to_run'] = group.processors 

271 if group.description is not None: 

272 tbl['description'] = group.description 

273 for attr_key, attr_value in group.attributes.items(): 

274 tbl[attr_key] = attr_value 

275 doc[group.name] = tbl 

276 doc.add(nl()) 

277 

278 

279def _add_user_interface(doc: tomlkit.TOMLDocument, ui_config: UIConfiguration) -> None: 

280 ui_table = table() 

281 ui_table['interface'] = ui_config.interface 

282 doc['UserInterface'] = ui_table