Coverage for src / mafw / steering / serializer.py: 99%

186 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 16:10 +0000

1# Copyright 2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4"""Serialize steering metadata into TOML documents via tomlkit.""" 

5 

6from __future__ import annotations 

7 

8from typing import TYPE_CHECKING, Any 

9 

10import tomlkit 

11from tomlkit import aot, document, nl, table 

12from tomlkit.items import AoT 

13 

14from mafw.db.db_filter import ast_to_string 

15from mafw.enumerators import LogicalOp 

16 

17from .models import ( 

18 Condition, 

19 ConditionalFilterConfig, 

20 DBConfiguration, 

21 FieldFilterConfig, 

22 GlobalSettings, 

23 GroupConfig, 

24 ModelFilterConfig, 

25 ProcessorConfig, 

26 UIConfiguration, 

27) 

28 

29if TYPE_CHECKING: 

30 from mafw.steering.builder import SteeringBuilder, ValidationLevel 

31 

32 

33def serialize( 

34 builder: SteeringBuilder, 

35 *, 

36 validation_level: ValidationLevel | None = None, 

37) -> tomlkit.TOMLDocument: 

38 """Convert builder metadata into a TOML document matching the required layout.""" 

39 

40 if validation_level is not None: 

41 issues = builder.validate(validation_level) 

42 if issues: 

43 raise issues[0] 

44 doc = document() 

45 _add_globals(doc, builder.globals, builder.extra_globals) 

46 _add_db(doc, builder.db_config) 

47 _add_processors(doc, builder) 

48 _add_groups(doc, builder.groups) 

49 _add_user_interface(doc, builder.ui_config) 

50 return doc 

51 

52 

53def _add_globals(doc: tomlkit.TOMLDocument, globals_: GlobalSettings, extra_globals: dict[str, Any]) -> None: 

54 doc['processors_to_run'] = globals_.processors_to_run 

55 if globals_.analysis_name is not None: 

56 doc['analysis_name'] = globals_.analysis_name 

57 if globals_.analysis_description is not None: 

58 doc['analysis_description'] = globals_.analysis_description 

59 if globals_.new_only is not None: 

60 doc['new_only'] = globals_.new_only 

61 for key, value in extra_globals.items(): 

62 doc[key] = value 

63 if globals_.create_standard_tables is not None: 

64 doc['create_standard_tables'] = globals_.create_standard_tables 

65 doc.add(nl()) 

66 

67 

68def _add_db(doc: tomlkit.TOMLDocument, db_config: DBConfiguration) -> None: 

69 if not db_config.enabled: 

70 return 

71 db_table = table() 

72 for key, value in db_config.attributes.items(): 

73 db_table[key] = value 

74 if db_config.pragmas: 

75 pragmas_table = table() 

76 for key, value in db_config.pragmas.items(): 

77 pragmas_table[key] = value 

78 db_table['pragmas'] = pragmas_table 

79 doc['DBConfiguration'] = db_table 

80 doc.add(nl()) 

81 

82 

83def _add_processors(doc: tomlkit.TOMLDocument, builder: 'SteeringBuilder') -> None: 

84 base_names = [name for name in builder.processors if '#' not in name] 

85 replica_names = [name for name in builder.processors if '#' in name] 

86 for name in base_names: 

87 _add_processor_table(doc, builder.processors[name]) 

88 for name in replica_names: 

89 _add_processor_table(doc, builder.processors[name]) 

90 

91 

92def _normalize_logical_op(value: str | LogicalOp) -> LogicalOp | None: 

93 """Return the LogicalOp enum for the provided value when possible.""" 

94 

95 if isinstance(value, LogicalOp): 

96 return value 

97 if not isinstance(value, str): 

98 return None 

99 token = value 

100 if token.startswith('LogicalOp.'): 

101 token = token.split('LogicalOp.', 1)[-1] 

102 try: 

103 return LogicalOp[token] 

104 except KeyError: 

105 return None 

106 try: 

107 return LogicalOp(token) 

108 except ValueError: 

109 return None 

110 

111 

112def _serialize_condition(condition: Condition) -> Any: 

113 """Convert Condition object to TOML-friendly value (implicit or dict).""" 

114 

115 op = str(condition.operator) 

116 val = condition.value 

117 op_enum = _normalize_logical_op(condition.operator) 

118 if op_enum in (LogicalOp.IS_NULL, LogicalOp.IS_NOT_NULL): 

119 val = True 

120 

121 # If explicitly marked implicit, try to return raw value if possible 

122 if condition.is_implicit: 

123 return val 

124 

125 # Explicit format 

126 return {'op': op, 'value': val} 

127 

128 

129def _add_processor_table(doc: tomlkit.TOMLDocument, config: ProcessorConfig) -> None: 

130 proc_table = table() 

131 for key, param_config in config.parameters.items(): 

132 proc_table[key] = param_config.value 

133 if config.new_only is not None: 

134 proc_table['__new_only__'] = config.new_only 

135 if config.inheritance is not None: 

136 proc_table['__inheritance__'] = config.inheritance 

137 filter_root = table() 

138 filter_added = False 

139 

140 processor_logic = None 

141 if config.logic_dirty and config.logic_ast: 

142 processor_logic = ast_to_string(config.logic_ast) 

143 elif config.logic_str_original: 

144 processor_logic = config.logic_str_original 

145 

146 if processor_logic is not None: 

147 proc_table['__logic__'] = processor_logic 

148 if config.has_filter_root: 

149 filter_root['__logic__'] = processor_logic 

150 filter_added = True 

151 

152 for model_name, filter_list in config.filters.items(): 

153 filter_table = table() 

154 model_added = False 

155 

156 # 1. Model Filter (base conditions & logic) 

157 model_config = next((f for f in filter_list if isinstance(f, ModelFilterConfig)), None) 

158 if model_config: 

159 model_added = True 

160 if not model_config.enabled: 

161 filter_table['__enable__'] = False 

162 model_logic = None 

163 if model_config.logic_dirty and model_config.logic_ast: 

164 model_logic = ast_to_string(model_config.logic_ast) 

165 elif model_config.logic_str_original: 

166 model_logic = model_config.logic_str_original 

167 

168 if model_logic: 

169 filter_table['__logic__'] = model_logic 

170 for field_name, condition in model_config.conditions.items(): 

171 filter_table[field_name] = _serialize_condition(condition) 

172 

173 # 2. Field Filters (sub-tables) 

174 for field_filter in filter_list: 

175 if isinstance(field_filter, FieldFilterConfig): 

176 model_added = True 

177 sub_table = table() 

178 if not field_filter.enabled: 

179 sub_table['__enable__'] = False 

180 field_logic = None 

181 if field_filter.logic_dirty and field_filter.logic_ast: 

182 field_logic = ast_to_string(field_filter.logic_ast) 

183 elif field_filter.logic_str_original: 

184 field_logic = field_filter.logic_str_original 

185 

186 if field_logic: 

187 sub_table['__logic__'] = field_logic 

188 for k, v in field_filter.conditions.items(): 

189 sub_table[k] = _serialize_condition(v) 

190 filter_table[field_filter.field_name] = sub_table 

191 

192 # 3. Conditionals 

193 conditionals = [f for f in filter_list if isinstance(f, ConditionalFilterConfig)] 

194 if conditionals: 

195 model_added = True 

196 filter_table['__conditional__'] = _build_conditional_array_from_objects(conditionals) 

197 

198 if model_added: 198 ↛ 152line 198 didn't jump to line 152 because the condition on line 198 was always true

199 filter_root[model_name] = filter_table 

200 filter_added = True 

201 

202 if filter_added: 

203 proc_table['__filter__'] = filter_root 

204 doc[config.name] = proc_table 

205 doc.add(nl()) 

206 

207 

208def _build_conditional_array_from_objects(conditionals: list[ConditionalFilterConfig]) -> AoT: 

209 tbl_array = aot() 

210 for cond in conditionals: 

211 tbl = table() 

212 if not cond.enabled: 

213 tbl['__enable__'] = False 

214 if not cond.auto_named: 

215 tbl['name'] = cond.name 

216 

217 if cond.condition: 

218 tbl['condition_field'] = cond.condition_field 

219 tbl['condition_op'] = str(cond.condition.operator) 

220 tbl['condition_value'] = cond.condition.value 

221 

222 if cond.then_clause: 

223 tbl['then_field'] = cond.then_field 

224 tbl['then_op'] = str(cond.then_clause.operator) 

225 tbl['then_value'] = cond.then_clause.value 

226 

227 if cond.else_clause: 

228 tbl['else_field'] = cond.else_field 

229 tbl['else_op'] = str(cond.else_clause.operator) 

230 tbl['else_value'] = cond.else_clause.value 

231 

232 tbl_array.append(tbl) 

233 return tbl_array 

234 

235 

236def _build_conditional_array(conditionals: list[dict[str, Any]]) -> AoT: 

237 # Deprecated but kept for compatibility if needed, though unused in new logic 

238 tbl_array = aot() 

239 for entry in conditionals: 

240 tbl = table() 

241 for key, value in entry.items(): 

242 tbl[key] = value 

243 tbl_array.append(tbl) 

244 return tbl_array 

245 

246 

247def _add_groups(doc: tomlkit.TOMLDocument, groups: dict[str, GroupConfig]) -> None: 

248 for group in groups.values(): 

249 tbl = table() 

250 tbl['processors_to_run'] = group.processors 

251 if group.description is not None: 

252 tbl['description'] = group.description 

253 for attr_key, attr_value in group.attributes.items(): 

254 tbl[attr_key] = attr_value 

255 doc[group.name] = tbl 

256 doc.add(nl()) 

257 

258 

259def _add_user_interface(doc: tomlkit.TOMLDocument, ui_config: UIConfiguration) -> None: 

260 ui_table = table() 

261 ui_table['interface'] = ui_config.interface 

262 doc['UserInterface'] = ui_table