Coverage for src / mafw / tools / toml_tools.py: 99%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 09:03 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Tools for reading, writing, and validating MAFw TOML steering files. 

6 

7:Author: Bulgheroni Antonio 

8:Description: Utilities to generate and load TOML steering files and related helpers. 

9""" 

10 

11import datetime 

12import logging 

13import os 

14import re 

15from pathlib import Path, PosixPath, WindowsPath 

16from typing import Any, Mapping, cast 

17 

18import tomlkit 

19from tomlkit import TOMLDocument, boolean, comment, document, item, nl, table 

20from tomlkit.exceptions import ConvertError 

21from tomlkit.items import Item, String, StringType 

22from tomlkit.toml_file import TOMLFile 

23 

24import mafw.mafw_errors 

25from mafw.__about__ import __version__ as version 

26from mafw.db.db_configurations import default_conf 

27from mafw.lazy_import import LazyImportProcessor, ProcessorClassProtocol 

28from mafw.processor import Processor 

29from mafw.steering.builder import SteeringBuilder, ValidationLevel 

30 

31log = logging.getLogger(__name__) 

32 

33ENV_PATTERN = re.compile( 

34 r""" 

35 \$\{ # opening ${ 

36 (?P<name>[A-Za-z_][A-Za-z0-9_]*) # variable name 

37 (?: 

38 (?P<op>:-|:\?) # operator (:- or :?) 

39 (?P<value>[^}]*) # default or error message 

40 )? 

41 \} # closing } 

42 """, 

43 re.VERBOSE, 

44) 

45"""Regex matching supported environment variable expansion patterns.""" 

46 

47ENV_ESCAPE_SENTINEL = '__MAFW_ENV_ESCAPE__{' 

48"""Sentinel used to preserve escaped variable patterns.""" 

49 

50MAX_ENV_RESOLUTION_PASSES = 10 

51"""Maximum number of expansion passes applied to a single string.""" 

52 

53 

54class PathItem(String): 

55 """TOML item representing a Path""" 

56 

57 def unwrap(self) -> Path: # type: ignore[override] # do not know how to do it 

58 return Path(super().unwrap()) 

59 

60 

61def path_encoder(obj: Any) -> Item: 

62 """Encoder for PathItem.""" 

63 if isinstance(obj, PosixPath): 

64 return PathItem.from_raw(str(obj), type_=StringType.SLB, escape=False) 

65 elif isinstance(obj, WindowsPath): 

66 return PathItem.from_raw(str(obj), type_=StringType.SLL, escape=False) 

67 else: 

68 raise ConvertError 

69 

70 

71tomlkit.register_encoder(path_encoder) 

72 

73 

74def generate_steering_file( 

75 output_file: Path | str, 

76 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, 

77 database_conf: dict[str, Any] | None = None, 

78 db_engine: str = 'sqlite', 

79) -> None: 

80 """ 

81 Generates a steering file. 

82 

83 :param output_file: The output filename where the steering file will be save. 

84 :type output_file: Path | str 

85 :param processors: The processors list for which the steering file will be generated. 

86 :type processors: list[type[Processor] | Processor], type[Processor], Processor 

87 :param database_conf: The database configuration dictionary 

88 :type database_conf: dict, Optional 

89 :param db_engine: A string representing the DB engine to be used. Possible values are: *sqlite*, *postgresql* 

90 and *mysql*. 

91 :type: str 

92 """ 

93 if isinstance(output_file, str): 

94 output_file = Path(output_file) 

95 

96 doc = _new_toml_doc() 

97 doc = _add_db_configuration(database_conf, db_engine=db_engine, doc=doc) 

98 doc = _add_processor_parameters_to_toml_doc(processors, doc) 

99 doc = _add_user_interface_configuration(doc) 

100 

101 with open(output_file, 'w') as fp: 

102 tomlkit.dump(doc, fp) 

103 

104 

105def _new_toml_doc() -> TOMLDocument: 

106 doc = document() 

107 doc.add(comment(f'MAFw steering file generated on {datetime.datetime.now()}')) 

108 doc.add(nl()) 

109 doc.add( 

110 comment('uncomment the line below and insert the processors you want to run from the available processor list') 

111 ) 

112 doc.add(comment('processors_to_run = []')) 

113 doc.add(nl()) 

114 doc.add(comment('customise the name of the analysis')) 

115 doc.add('analysis_name', String.from_raw('mafw analysis', StringType.SLB)) 

116 doc.add('analysis_description', String.from_raw('Summing up numbers', StringType.MLB)) 

117 doc.add('new_only', boolean('true')) 

118 doc.add('mafw_version', String.from_raw(version, StringType.SLB)) 

119 doc.add('create_standard_tables', boolean('true')) 

120 return doc 

121 

122 

123def _add_db_configuration( 

124 database_conf: dict[str, Any] | None = None, db_engine: str = 'sqlite', doc: TOMLDocument | None = None 

125) -> TOMLDocument: 

126 """Add the DB configuration to the TOML document 

127 

128 The expected structure of the database_conf dictionary is one of these: 

129 

130 .. code-block:: python 

131 

132 option1 = { 

133 'DBConfiguration': { 

134 'URL': 'sqlite:///:memory:', 

135 'parameters': { 

136 'sqlite': { 

137 'pragmas': { 

138 'journal_mode': 'wal', 

139 'cache_size': -64000, 

140 'foreign_keys': 1, 

141 'synchronous': 0, 

142 }, 

143 }, 

144 }, 

145 } 

146 } 

147 

148 option2 = { 

149 'URL': 'sqlite:///:memory:', 

150 'authentication': { 

151 'method': 'env', 

152 'username': 'POSTGRES_USER', 

153 'password': 'POSTGRES_PASS', 

154 }, 

155 'parameters': { 

156 'postgresql': { 

157 'sslmode': 'require', 

158 }, 

159 }, 

160 } 

161 

162 :param database_conf: A dictionary with the database configuration. See comments above. If None, then the default 

163 is used. 

164 :type database_conf: dict 

165 :param db_engine: The database engine. It is used only in case the provided database configuration is invalid to 

166 retrieve the default configuration. Defaults to sqlite. 

167 :type db_engine: str, Optional 

168 :param doc: The TOML document to add the DB configuration. If None, one will be created. 

169 :type doc: TOMLDocument, Optional 

170 :return: The modified document. 

171 :rtype: TOMLDocument 

172 :raises UnknownDBEngine: if the `database_conf` is invalid and the db_engine is not yet implemented. 

173 """ 

174 if doc is None: 

175 doc = _new_toml_doc() 

176 

177 if database_conf is None: 

178 if db_engine in default_conf: 

179 database_conf = default_conf[db_engine] 

180 else: 

181 log.critical('The provided db_engine (%s) is not yet implemented', db_engine) 

182 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented') 

183 

184 is_conf_valid = True 

185 if 'DBConfiguration' in database_conf: 

186 # it should be option 1. let's check if there is the URL that is required. 

187 if 'URL' not in database_conf['DBConfiguration']: 

188 # no URL 

189 is_conf_valid = False 

190 else: 

191 database_conf = cast(dict[str, Any], database_conf['DBConfiguration']) 

192 else: 

193 # option 2 

194 if 'URL' not in database_conf: 

195 # no URL 

196 is_conf_valid = False 

197 

198 if not is_conf_valid: 

199 log.error('The provided database configuration is invalid. Adding default configuration') 

200 if db_engine not in default_conf: 

201 log.critical('The provided db_engine (%s) is not yet implemented', db_engine) 

202 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented') 

203 database_conf = default_conf[db_engine] 

204 

205 db_table = table() 

206 for key, value in database_conf.items(): 

207 if key == 'authentication' and isinstance(value, dict): 

208 auth_table = table() 

209 auth_table.comment('Select auth method; see documentation placeholder link: DOC_LINK_PLACEHOLDER') 

210 for auth_key, auth_value in value.items(): 

211 auth_table[auth_key] = auth_value 

212 db_table['authentication'] = auth_table 

213 continue 

214 if key == 'parameters' and isinstance(value, dict): 

215 params_table = table() 

216 for backend, params in value.items(): 

217 backend_table = table() 

218 if isinstance(params, dict): 

219 for param_key, param_value in params.items(): 

220 if param_key == 'pragmas' and isinstance(param_value, dict): 

221 pragmas_table = table() 

222 pragmas_table.comment('Leave these default values, unless you know what you are doing!') 

223 for pragma_key, pragma_value in param_value.items(): 

224 pragmas_table[pragma_key] = pragma_value 

225 backend_table['pragmas'] = pragmas_table 

226 else: 

227 backend_table[param_key] = param_value 

228 params_table[backend] = backend_table 

229 db_table['parameters'] = params_table 

230 continue 

231 db_table[key] = value 

232 if key == 'URL': 

233 db_table[key].comment( 

234 'Change the protocol depending on the DB type. Update this file to the path of your DB.' 

235 ) 

236 if key == 'pragmas': 

237 db_table[key].comment('Leave these default values, unless you know what you are doing!') 

238 

239 doc.add('DBConfiguration', db_table) 

240 doc.add(nl()) 

241 

242 return doc 

243 

244 

245def _add_processor_parameters_to_toml_doc( 

246 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, doc: TOMLDocument | None = None 

247) -> TOMLDocument: 

248 if not isinstance(processors, list): 

249 processors = [processors] 

250 

251 if not processor_validator(processors): 

252 raise TypeError('Only processor instances and classes can be accepted') 

253 

254 if doc is None: 

255 doc = _new_toml_doc() 

256 

257 # add an array with all available processors 

258 proc_names = [] 

259 for processor in processors: 

260 if isinstance(processor, LazyImportProcessor): 

261 proc_names.append(processor.plugin_name) 

262 elif isinstance(processor, Processor): 

263 proc_names.append(processor.name) 

264 else: 

265 proc_names.append(processor.__name__) 

266 doc.add('available_processors', item(proc_names)) 

267 doc.add(nl()) 

268 

269 for p_item in processors: 

270 if isinstance(p_item, LazyImportProcessor): 

271 processor_cls = p_item._load() 

272 section_name = processor_cls.__name__ 

273 docstring = processor_cls.__doc__ 

274 elif isinstance(p_item, Processor): 

275 processor_cls = p_item.__class__ 

276 section_name = p_item.name 

277 docstring = p_item.__doc__ 

278 else: 

279 processor_cls = cast(type[Processor], p_item) 

280 section_name = processor_cls.__name__ 

281 docstring = processor_cls.__doc__ 

282 

283 # create a table for the current processor 

284 p_table = table() 

285 

286 if docstring: 

287 lines = docstring.splitlines() 

288 for line in lines: 288 ↛ 294line 288 didn't jump to line 294 because the loop on line 288 didn't complete

289 line = line.strip() 

290 if line: 

291 p_table.comment(line) 

292 break 

293 

294 for schema in processor_cls.parameter_schema(): 

295 p_table[schema.name] = schema.default 

296 if schema.help: 

297 # starting from tomlkit 0.15, item can return an union type between Item and OutOfOrderTableProxy 

298 cast(Item, p_table.value.item(schema.name)).comment(schema.help) 

299 

300 doc.add(section_name, p_table) 

301 doc.add(nl()) 

302 

303 return doc 

304 

305 

306def processor_validator(processors: list[ProcessorClassProtocol]) -> bool: 

307 """ 

308 Validates that all items in the list are valid processor instances or classes. 

309 

310 :param processors: The list of items to be validated. 

311 :type processors: list[type[Processor] | Processor] 

312 :return: True if all items are valid. 

313 :rtype: bool 

314 """ 

315 return all([isinstance(p, (Processor, type(Processor), LazyImportProcessor)) for p in processors]) 

316 

317 

318def dump_processor_parameters_to_toml( 

319 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, output_file: Path | str 

320) -> None: 

321 """ 

322 Dumps a toml file with processor parameters. 

323 

324 This helper function can be used when the parameters of one or many processors have to be dumped to a TOML file. 

325 For each Processor in the `processors` a table in the TOML file will be added with their parameters is the shape of 

326 parameter name = value. 

327 

328 It must be noted that `processors` can be: 

329 

330 - a list of processor classes (list[type[Processor]]) 

331 - a list of processor instances (list[Processor]]) 

332 - one single processor class (type[Processor]) 

333 - one single processor instance (Processor) 

334 

335 What value of the parameters will be dumped? 

336 -------------------------------------------- 

337 

338 Good question, have a look at this :ref:`explanation <parameter_dump>`. 

339 

340 :param processors: One or more processors for which the parameters should be dumped. 

341 :type processors: list[type[Processor | Processor]] | type[Processor] | Processor 

342 :param output_file: The name of the output file for the dump. 

343 :type output_file: Path | str 

344 :raise KeyAlreadyPresent: if an attempt to add twice, the same processor is made. 

345 :raise TypeError: if the list contains items different from Processor classes and instances. 

346 """ 

347 

348 doc = _add_processor_parameters_to_toml_doc(processors) 

349 

350 with open(output_file, 'w') as fp: 

351 tomlkit.dump(doc, fp) 

352 

353 

354def _add_user_interface_configuration(doc: TOMLDocument | None = None) -> TOMLDocument: 

355 if doc is None: 

356 doc = _new_toml_doc() 

357 

358 ui_table = table() 

359 ui_table.comment('Specify UI options') 

360 ui_table['interface'] = 'rich' 

361 ui_table['interface'].comment('Default "rich", backup "console"') 

362 doc.add('UserInterface', ui_table) 

363 

364 return doc 

365 

366 

367def load_steering_file_legacy(steering_file: Path | str) -> dict[str, Any]: 

368 """ 

369 Load a steering file without any semantic validation. 

370 

371 :param steering_file: The path to the steering file. 

372 :type steering_file: Path, str 

373 :return: The parsed steering dictionary. 

374 :rtype: dict 

375 """ 

376 if isinstance(steering_file, str): 

377 steering_file = Path(steering_file) 

378 

379 doc = TOMLFile(steering_file).read() 

380 return doc.value 

381 

382 

383def load_steering_file( 

384 steering_file: Path | str, validation_level: ValidationLevel | None = ValidationLevel.SEMANTIC 

385) -> dict[str, Any]: 

386 """ 

387 Load a steering file for the execution framework. 

388 

389 :param steering_file: The path to the steering file. 

390 :type steering_file: Path, str 

391 :param validation_level: Requested validation tier, or ``None`` to skip validation. 

392 :return: The configuration dictionary. 

393 :rtype: dict 

394 :raise mafw.mafw_errors.InvalidSteeringFile: if the validation level reports at least one issue. 

395 """ 

396 builder = SteeringBuilder.from_toml(steering_file) 

397 if validation_level is not None: 

398 issues = builder.validate(validation_level) 

399 if issues: 

400 raise issues[0] 

401 return resolve_config_env(builder.to_config_dict()) 

402 

403 

404def resolve_config_env(config: dict[str, Any], env: Mapping[str, str] | None = None) -> dict[str, Any]: 

405 """ 

406 Resolve environment variables in every string value of a configuration dictionary. 

407 

408 :param config: Configuration dictionary to resolve. 

409 :type config: dict[str, Any] 

410 :param env: Optional environment mapping; defaults to ``os.environ``. 

411 :type env: Mapping[str, str] | None 

412 :return: A new configuration dictionary with resolved values. 

413 :rtype: dict[str, Any] 

414 :raises ValueError: If a required variable is missing or expansion does not converge. 

415 """ 

416 if env is None: 

417 env = os.environ 

418 return cast(dict[str, Any], _resolve_value(config, env)) 

419 

420 

421def _resolve_value(value: Any, env: Mapping[str, str]) -> Any: 

422 if isinstance(value, str): 

423 return resolve_string(value, env) 

424 if isinstance(value, dict): 

425 return {key: _resolve_value(item, env) for key, item in value.items()} 

426 if isinstance(value, list): 

427 return [_resolve_value(item, env) for item in value] 

428 return value 

429 

430 

431def resolve_string(value: str, env: Mapping[str, str]) -> str: 

432 """ 

433 Resolve environment variables within a string value. 

434 

435 :param value: Input string to resolve. 

436 :type value: str 

437 :param env: Environment mapping to use for substitution. 

438 :type env: Mapping[str, str] 

439 :return: The resolved string. 

440 :rtype: str 

441 :raises ValueError: If a required variable is missing or expansion does not converge. 

442 """ 

443 escaped = value.replace(r'\${', ENV_ESCAPE_SENTINEL) 

444 

445 for _ in range(MAX_ENV_RESOLUTION_PASSES): 

446 if not ENV_PATTERN.search(escaped): 

447 break 

448 escaped = ENV_PATTERN.sub(lambda match: _resolve_match(match, env), escaped) 

449 else: 

450 raise ValueError('Environment variable expansion did not converge.') 

451 

452 return escaped.replace(ENV_ESCAPE_SENTINEL, '${') 

453 

454 

455def _resolve_match(match: re.Match[str], env: Mapping[str, str]) -> str: 

456 name = match.group('name') 

457 op = match.group('op') 

458 value = match.group('value') or '' 

459 

460 if name in env: 

461 return env[name] 

462 if op == ':-': 

463 return value 

464 if op == ':?': 

465 raise ValueError(value or f'{name} is required') 

466 raise ValueError(f"Environment variable '{name}' not set")