Coverage for src / mafw / tools / toml_tools.py: 96%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5The module provides tools to read / write / modify specific TOML files. 

6""" 

7 

8import datetime 

9import logging 

10from pathlib import Path, PosixPath, WindowsPath 

11from typing import Any, cast 

12 

13import tomlkit 

14from tomlkit import TOMLDocument, boolean, comment, document, item, nl, table 

15from tomlkit.exceptions import ConvertError 

16from tomlkit.items import Item, String, StringType 

17from tomlkit.toml_file import TOMLFile 

18 

19import mafw.mafw_errors 

20from mafw.__about__ import __version__ as version 

21from mafw.db.db_configurations import default_conf 

22from mafw.lazy_import import LazyImportProcessor, ProcessorClassProtocol 

23from mafw.mafw_errors import InvalidSteeringFile 

24from mafw.processor import PassiveParameter, Processor 

25from mafw.tools.regexp import parse_processor_name 

26 

27log = logging.getLogger(__name__) 

28 

29 

30class PathItem(String): 

31 """TOML item representing a Path""" 

32 

33 def unwrap(self) -> Path: # type: ignore[override] # do not know how to do it 

34 return Path(super().unwrap()) 

35 

36 

37def path_encoder(obj: Any) -> Item: 

38 """Encoder for PathItem.""" 

39 if isinstance(obj, PosixPath): 

40 return PathItem.from_raw(str(obj), type_=StringType.SLB, escape=False) 

41 elif isinstance(obj, WindowsPath): 

42 return PathItem.from_raw(str(obj), type_=StringType.SLL, escape=False) 

43 else: 

44 raise ConvertError 

45 

46 

47tomlkit.register_encoder(path_encoder) 

48 

49 

50def generate_steering_file( 

51 output_file: Path | str, 

52 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, 

53 database_conf: dict[str, Any] | None = None, 

54 db_engine: str = 'sqlite', 

55) -> None: 

56 """ 

57 Generates a steering file. 

58 

59 :param output_file: The output filename where the steering file will be save. 

60 :type output_file: Path | str 

61 :param processors: The processors list for which the steering file will be generated. 

62 :type processors: list[type[Processor] | Processor], type[Processor], Processor 

63 :param database_conf: The database configuration dictionary 

64 :type database_conf: dict, Optional 

65 :param db_engine: A string representing the DB engine to be used. Possible values are: *sqlite*, *postgresql* 

66 and *mysql*. 

67 :type: str 

68 """ 

69 if isinstance(output_file, str): 

70 output_file = Path(output_file) 

71 

72 doc = _new_toml_doc() 

73 doc = _add_db_configuration(database_conf, db_engine=db_engine, doc=doc) 

74 doc = _add_processor_parameters_to_toml_doc(processors, doc) 

75 doc = _add_user_interface_configuration(doc) 

76 

77 with open(output_file, 'w') as fp: 

78 tomlkit.dump(doc, fp) 

79 

80 

81def _new_toml_doc() -> TOMLDocument: 

82 doc = document() 

83 doc.add(comment(f'MAFw steering file generated on {datetime.datetime.now()}')) 

84 doc.add(nl()) 

85 doc.add( 

86 comment('uncomment the line below and insert the processors you want to run from the available processor list') 

87 ) 

88 doc.add(comment('processors_to_run = []')) 

89 doc.add(nl()) 

90 doc.add(comment('customise the name of the analysis')) 

91 doc.add('analysis_name', String.from_raw('mafw analysis', StringType.SLB)) 

92 doc.add('analysis_description', String.from_raw('Summing up numbers', StringType.MLB)) 

93 doc.add('new_only', boolean('true')) 

94 doc.add('mafw_version', String.from_raw(version, StringType.SLB)) 

95 doc.add('create_standard_tables', boolean('true')) 

96 return doc 

97 

98 

99def _add_db_configuration( 

100 database_conf: dict[str, Any] | None = None, db_engine: str = 'sqlite', doc: TOMLDocument | None = None 

101) -> TOMLDocument: 

102 """Add the DB configuration to the TOML document 

103 

104 The expected structure of the database_conf dictionary is one of these two: 

105 

106 .. code-block:: python 

107 

108 option1 = { 

109 'DBConfiguration': { 

110 'URL': 'sqlite:///:memory:', 

111 'pragmas': { 

112 'journal_mode': 'wal', 

113 'cache_size': -64000, 

114 'foreign_keys': 1, 

115 'synchronous': 0, 

116 }, 

117 } 

118 } 

119 

120 option2 = { 

121 'URL': 'sqlite:///:memory:', 

122 'pragmas': { 

123 'journal_mode': 'wal', 

124 'cache_size': -64000, 

125 'foreign_keys': 1, 

126 'synchronous': 0, 

127 }, 

128 } 

129 

130 We will always convert the option1 in option2. 

131 

132 :param database_conf: A dictionary with the database configuration. See comments above. If None, then the default 

133 is used. 

134 :type database_conf: dict 

135 :param db_engine: The database engine. It is used only in case the provided database configuration is invalid to 

136 retrieve the default configuration. Defaults to sqlite. 

137 :type db_engine: str, Optional 

138 :param doc: The TOML document to add the DB configuration. If None, one will be created. 

139 :type doc: TOMLDocument, Optional 

140 :return: The modified document. 

141 :rtype: TOMLDocument 

142 :raises UnknownDBEngine: if the `database_conf` is invalid and the db_engine is not yet implemented. 

143 """ 

144 if doc is None: 

145 doc = _new_toml_doc() 

146 

147 if database_conf is None: 

148 if db_engine in default_conf: 

149 database_conf = default_conf[db_engine] 

150 else: 

151 log.critical('The provided db_engine (%s) is not yet implemented', db_engine) 

152 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented') 

153 

154 is_conf_valid = True 

155 if 'DBConfiguration' in database_conf: 

156 # it should be option 1. let's check if there is the URL that is required. 

157 if 'URL' not in database_conf['DBConfiguration']: 

158 # no URL 

159 is_conf_valid = False 

160 else: 

161 database_conf = cast(dict[str, Any], database_conf['DBConfiguration']) 

162 else: 

163 # option 2 

164 if 'URL' not in database_conf: 

165 # no URL 

166 is_conf_valid = False 

167 

168 if not is_conf_valid: 

169 log.error('The provided database configuration is invalid. Adding default configuration') 

170 if db_engine not in default_conf: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 log.critical('The provided db_engine (%s) is not yet implemented', db_engine) 

172 raise mafw.mafw_errors.UnknownDBEngine(f'DB engine ({db_engine} not implemented') 

173 database_conf = default_conf[db_engine] 

174 

175 db_table = table() 

176 for key, value in database_conf.items(): 

177 db_table[key] = value 

178 if key == 'URL': 

179 db_table[key].comment( 

180 'Change the protocol depending on the DB type. Update this file to the path of your DB.' 

181 ) 

182 if key == 'pragmas': 

183 db_table[key].comment('Leave these default values, unless you know what you are doing!') 

184 

185 doc.add('DBConfiguration', db_table) 

186 doc.add(nl()) 

187 

188 return doc 

189 

190 

191def _add_processor_parameters_to_toml_doc( 

192 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, doc: TOMLDocument | None = None 

193) -> TOMLDocument: 

194 if not isinstance(processors, list): 

195 processors = [processors] 

196 

197 if not processor_validator(processors): 

198 raise TypeError('Only processor instances and classes can be accepted') 

199 

200 if doc is None: 

201 doc = _new_toml_doc() 

202 

203 # add an array with all available processors 

204 proc_names = [] 

205 for processor in processors: 

206 if isinstance(processor, LazyImportProcessor): 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 proc_names.append(processor.plugin_name) 

208 elif isinstance(processor, Processor): 

209 proc_names.append(processor.name) 

210 else: 

211 proc_names.append(processor.__name__) 

212 doc.add('available_processors', item(proc_names)) 

213 doc.add(nl()) 

214 

215 # loop over processors 

216 for p_item in processors: 

217 if not isinstance(p_item, Processor): 

218 # p is a class not an instance. so let's create an instance of p 

219 p = p_item() 

220 else: 

221 p = p_item 

222 

223 # create a table for the current processor 

224 p_table = table() 

225 

226 # add the first line of the class documentation 

227 if p.__doc__: 

228 lines = p.__doc__.splitlines() 

229 for line in lines: 229 ↛ 236line 229 didn't jump to line 236 because the loop on line 229 didn't complete

230 line = line.strip() 

231 if line: 231 ↛ 229line 231 didn't jump to line 229 because the condition on line 231 was always true

232 p_table.comment(line) 

233 break 

234 # add all parameters to the table, including the help_doc as a comment 

235 param: PassiveParameter[Any] 

236 for name, param in p.get_parameters().items(): 

237 p_table[name] = param.value 

238 if param.doc: 238 ↛ 236line 238 didn't jump to line 236 because the condition on line 238 was always true

239 p_table.value.item(name).comment(param.doc) 

240 

241 # add the table to the doc and a new line before going to the next item. 

242 doc.add(p.name, p_table) 

243 doc.add(nl()) 

244 

245 return doc 

246 

247 

248def processor_validator(processors: list[ProcessorClassProtocol]) -> bool: 

249 """ 

250 Validates that all items in the list are valid processor instances or classes. 

251 

252 :param processors: The list of items to be validated. 

253 :type processors: list[type[Processor] | Processor] 

254 :return: True if all items are valid. 

255 :rtype: bool 

256 """ 

257 return all([isinstance(p, (Processor, type(Processor), LazyImportProcessor)) for p in processors]) 

258 

259 

260def dump_processor_parameters_to_toml( 

261 processors: list[ProcessorClassProtocol] | ProcessorClassProtocol, output_file: Path | str 

262) -> None: 

263 """ 

264 Dumps a toml file with processor parameters. 

265 

266 This helper function can be used when the parameters of one or many processors have to be dumped to a TOML file. 

267 For each Processor in the `processors` a table in the TOML file will be added with their parameters is the shape of 

268 parameter name = value. 

269 

270 It must be noted that `processors` can be: 

271 

272 - a list of processor classes (list[type[Processor]]) 

273 - a list of processor instances (list[Processor]]) 

274 - one single processor class (type[Processor]) 

275 - one single processor instance (Processor) 

276 

277 What value of the parameters will be dumped? 

278 -------------------------------------------- 

279 

280 Good question, have a look at this :ref:`explanation <parameter_dump>`. 

281 

282 :param processors: One or more processors for which the parameters should be dumped. 

283 :type processors: list[type[Processor | Processor]] | type[Processor] | Processor 

284 :param output_file: The name of the output file for the dump. 

285 :type output_file: Path | str 

286 :raise KeyAlreadyPresent: if an attempt to add twice, the same processor is made. 

287 :raise TypeError: if the list contains items different from Processor classes and instances. 

288 """ 

289 

290 doc = _add_processor_parameters_to_toml_doc(processors) 

291 

292 with open(output_file, 'w') as fp: 

293 tomlkit.dump(doc, fp) 

294 

295 

296def _add_user_interface_configuration(doc: TOMLDocument | None = None) -> TOMLDocument: 

297 if doc is None: 

298 doc = _new_toml_doc() 

299 

300 ui_table = table() 

301 ui_table.comment('Specify UI options') 

302 ui_table['interface'] = 'rich' 

303 ui_table['interface'].comment('Default "rich", backup "console"') 

304 doc.add('UserInterface', ui_table) 

305 

306 return doc 

307 

308 

309def load_steering_file(steering_file: Path | str, validate: bool = True) -> dict[str, Any]: 

310 """ 

311 Load a steering file for the execution framework. 

312 

313 .. versionchanged:: v2.0.0 

314 Introduce support for replica names along with base names in file validation 

315 

316 :param steering_file: The path to the steering file. 

317 :type steering_file: Path, str 

318 :param validate: A flag to validate the content. Defaults to True. 

319 :type validate: bool, Optional 

320 :return: The configuration dictionary. 

321 :rtype: dict 

322 :raise FileNotFound: if steering_file does not exist. 

323 """ 

324 doc = TOMLFile(steering_file).read() 

325 

326 if validate: 

327 required_fields = ['processors_to_run', 'UserInterface'] 

328 for field in required_fields: 

329 if field not in doc.value: 

330 log.error('Missing section %s in %s' % (field, str(steering_file))) 

331 raise InvalidSteeringFile(f'Missing {field} in {str(steering_file)}') 

332 for processor in doc['processors_to_run']: # type: ignore[union-attr] 

333 # processor to run is a list of replica aware processor name. 

334 # the steering file must contain one configuration section for either the 

335 # base processor or the replica. 

336 replica_name = processor 

337 base_name, _ = parse_processor_name(processor) 

338 # Check if neither the replica nor the base processor configuration exists 

339 if not any([name in doc.value for name in [replica_name, base_name]]): 

340 log.error('Missing section %s in %s' % (processor, str(steering_file))) 

341 raise InvalidSteeringFile(f'Missing {processor} in {str(steering_file)}') 

342 

343 return doc.value