Coverage for src / mafw / processor_library / db_init.py: 100%

171 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-06-28 13:34 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Database initialisation processor module. 

6 

7This module contains the following processors: 

8 

9:class:`TableCreator` 

10 processor which handles the creation of database tables based on registered models. It provides functionality to 

11 create tables automatically while respecting existing tables and offering options for forced recreation. 

12 

13:class:`TriggerRefresher` 

14 processor to safely update the trigger definitions. It removes all existing triggers and regenerates them 

15 according to the new definition. Particularly useful when debugging triggers, it can also be left at the 

16 beginning of all analysis pipelines since it does not cause any loss of data. 

17 

18:class:`SQLScriptRunner` 

19 processor to execute SQL scripts from files against the database. It reads SQL files, removes block comments, 

20 splits the content into individual statements, and executes them within a transaction. 

21 

22.. versionadded:: v2.0.0 

23""" 

24 

25import logging 

26import re 

27from pathlib import Path 

28from typing import TYPE_CHECKING, Any, Collection, Optional, cast 

29 

30import peewee 

31from rich.prompt import Confirm, Prompt 

32 

33from mafw.db.db_model import mafw_model_register 

34from mafw.db.db_types import PeeweeModelWithMeta 

35from mafw.db.std_tables import StandardTable 

36from mafw.db.trigger import MySQLDialect, PostgreSQLDialect, SQLiteDialect, TriggerDialect 

37from mafw.decorators import database_required, single_loop 

38from mafw.enumerators import LoopingStatus, ProcessorExitStatus 

39from mafw.mafw_errors import InvalidConfigurationError, UnsupportedDatabaseError 

40from mafw.processor import ActiveParameter, Processor 

41 

42log = logging.getLogger(__name__) 

43 

44block_comment_re = re.compile(r'/\*.*?\*/', re.DOTALL) 

45 

46 

47def _standard_table_models() -> set[type[StandardTable]]: 

48 """Return the registered standard-table models when available.""" 

49 standard_tables = mafw_model_register.get_standard_tables() 

50 if isinstance(standard_tables, list): 

51 return set(standard_tables) 

52 return set() 

53 

54 

55@database_required 

56@single_loop 

57class TableCreator(Processor): 

58 """ 

59 Processor to create all tables in the database. 

60 

61 This processor can be included in all pipelines in order to create all tables in the database. Its functionality 

62 is based on the fact that all :class:`.MAFwBaseModel` subclasses are automatically included in a global register 

63 (:data:`.mafw_model_register`). 

64 

65 This processor will perform the following: 

66 

67 #. Get a list of all tables already existing in the database. 

68 #. Prune from the lists of models the ones for which already exist in the database. 

69 #. Create the remaining tables. 

70 

71 

72 This overall behaviour can be modified via the following parameters: 

73 

74 * *force_recreate* (bool, default = False): Use with extreme care. When set to True, all tables in the 

75 database and in the model register will be first dropped and then recreated. It is almost equivalent to a re-initialization of the 

76 whole DB with all the data being lost. 

77 

78 * *soft_recreate* (bool, default = True): When set to true, all tables whose model is in the mafw model 

79 register will be recreated with the safe flag. It means that there won't be any table drop. If a table is 

80 already existing, nothing will happen. If a new trigger is added to the table this will be created. When 

81 set to False, only tables whose model is in the register and that are not existing will be created. 

82 

83 * *apply_only_to_prefix* (list[str], default = []): This parameter allows to create only the tables that do 

84 not already exist and whose name start with one of the provided prefixes. 

85 

86 .. versionadded:: v2.0.0 

87 

88 """ 

89 

90 force_recreate = ActiveParameter( 

91 name='force_recreate', default=False, help_doc='First drop and then create the tables. LOSS OF ALL DATA!!!' 

92 ) 

93 """ 

94 Force recreate (bool, default = False). 

95  

96 Use with extreme care. When set to True, all tables in the database and in the model register will be first  

97 dropped and then recreated. It is almost equivalent to a re-initialization of the whole DB with all the data  

98 being lost.  

99 """ 

100 

101 soft_recreate = ActiveParameter( 

102 name='soft_recreate', default=True, help_doc='Safe recreate tables without dropping. No data loss' 

103 ) 

104 """ 

105 Soft recreate (bool default = True). 

106  

107 When set to true, all tables whose model is in the mafw model register will be recreated with the safe flag. It  

108 means that there won't be any table drop. If a table is already existing, nothing will happen. If a new trigger  

109 is added to the table, this will be created. When set to False, only tables whose model is in the register and  

110 that are not existing will be created.  

111 """ 

112 

113 apply_only_to_prefix = ActiveParameter[list[str]]( 

114 name='apply_only_to_prefix', 

115 default=[], 

116 help_doc='Create only tables whose name start with the provided prefixes.', 

117 ) 

118 """ 

119 Apply only to tables starting with prefix (list[str], default = []). 

120  

121 This parameter allows to create only the tables that do not already exist and whose name start with one of the  

122 provided prefixes.  

123 """ 

124 

125 def __init__(self, *args: Any, **kwargs: Any) -> None: 

126 super().__init__(*args, **kwargs) 

127 self.existing_table_names: list[str] = [] 

128 """The list of all existing tables in the database.""" 

129 

130 def validate_configuration(self) -> None: 

131 """ 

132 Configuration validation 

133 

134 :attr:`force_recreate` and :attr:`soft_recreate` cannot be both valid. 

135 

136 :raises InvalidConfigurationError: if both recreate types are True. 

137 """ 

138 if self.force_recreate and self.soft_recreate: 

139 raise InvalidConfigurationError( 

140 'Both force_recreate and soft_recreate set to True. Incompatible configuration' 

141 ) 

142 

143 def process(self) -> None: 

144 """ 

145 Execute the table creation process. 

146 

147 This method performs the following steps: 

148 

149 #. Identify all models that have automatic creation enabled. 

150 #. Filter models based on the apply_only_to_prefix parameter if specified. 

151 #. Handle forced recreation if requested, including user confirmation. 

152 #. Handle soft recreation if requested, letting all tables with a known model be recreated. 

153 #. Create the required tables. 

154 #. Initialise standard tables after recreation if needed. 

155 

156 If user cancel the creation, the processor exit status is set to :attr:`.ProcessorExitStatus.Aborted` so that 

157 the whole processor list is blocked. 

158 """ 

159 # get all tables with autocreation flag 

160 autocreation_models = [ 

161 model 

162 for _, model in mafw_model_register.items() 

163 if model._meta.automatic_creation # type: ignore[attr-defined] 

164 ] 

165 

166 # filter out standard tables if the user decided not to create them 

167 standard_table_models = _standard_table_models() 

168 if not self.create_standard_tables: 

169 autocreation_models = [model for model in autocreation_models if model not in standard_table_models] 

170 

171 # get the table name from the model class 

172 autocreation_table_names = [cast(PeeweeModelWithMeta, model)._meta.table_name for model in autocreation_models] 

173 

174 if self.apply_only_to_prefix: 

175 # remove tables with all given prefixes 

176 autocreation_table_names = [ 

177 name 

178 for name in autocreation_table_names 

179 if any([name.startswith(prefix) for prefix in self.apply_only_to_prefix]) # type: ignore[union-attr] 

180 ] 

181 

182 # in the case of force_recreation, we need to have user confirmation 

183 if self.force_recreate: 

184 log.warning(f'Forcing recreation of {len(autocreation_table_names)} tables in the database.') 

185 log.warning('All data in these tables will be lost.') 

186 

187 with self._user_interface.enter_interactive_mode(): 

188 question = 'Are you really sure?' 

189 if self._user_interface.name == 'rich': 

190 question = '[red][bold]' + question + '[/red][/bold]' 

191 confirmation = self._user_interface.prompt_question( 

192 question=question, prompt_type=Confirm, default=False, show_default=True, case_sensitive=True 

193 ) 

194 if not confirmation: 

195 self.processor_exit_status = ProcessorExitStatus.Aborted 

196 return 

197 else: 

198 log.info(f'Removing {len(autocreation_table_names)} tables from the database.') 

199 models = [ 

200 model 

201 for model in autocreation_models 

202 if cast(PeeweeModelWithMeta, model)._meta.table_name in autocreation_table_names 

203 ] 

204 self.database.drop_tables(models) # type: ignore[arg-type] 

205 

206 if self.soft_recreate: 

207 # recreate all tables in the mafw register 

208 models = [ 

209 model 

210 for model in autocreation_models 

211 if cast(PeeweeModelWithMeta, model)._meta.table_name in autocreation_table_names 

212 ] 

213 else: 

214 # recreate all tables in the mafw register and that are not yet existing 

215 self.existing_table_names = self.database.get_tables() 

216 models = [ 

217 model 

218 for model in autocreation_models 

219 if cast(PeeweeModelWithMeta, model)._meta.table_name in autocreation_table_names 

220 and cast(PeeweeModelWithMeta, model)._meta.table_name not in self.existing_table_names 

221 ] 

222 self.database.create_tables(models) # type: ignore[arg-type] 

223 

224 if self.force_recreate and self.create_standard_tables: 

225 # in the case of a recreation, do the initialisation of all dropped standard tables. 

226 for model in models: 

227 if model in standard_table_models: 

228 model.init() 

229 

230 n = len(models) 

231 if n > 0: 

232 if n == 1: 

233 plu = '' 

234 else: 

235 plu = 's' 

236 if self.soft_recreate: 

237 soft = '(soft) ' 

238 else: 

239 soft = '' 

240 

241 log.info(f'Successfully {soft}created {len(models)} table{plu}.') 

242 

243 

244@database_required 

245class TriggerRefresher(Processor): 

246 """ 

247 Processor to recreate all triggers. 

248 

249 Triggers are database objects, and even though they could be created, dropped and modified at any moment, 

250 within the MAFw execution cycle they are normally created along with the table they are targeting. 

251 

252 When the table is created, also all its triggers are created, 

253 but unless differently specified, with the safe flag on, that means that they are created if they do not exist. 

254 

255 This might be particularly annoying when modifying an existing trigger, because you need to manually drop the 

256 trigger to let the table creation mechanism to create the newer version. 

257 

258 The goal of this processor is to drop all existing triggers and then recreate the corresponding tables so to have 

259 an updated version of the triggers. 

260 

261 The processor is relying on the fact that all subclasses of :class:`.MAFwBaseModel` 

262 are automatically inserted in the :data:`.mafw_model_register` so that the model class can be retrieved from the 

263 table name. 

264 

265 Before removing any trigger, the processor will build a list with all the affected tables and check if all of 

266 them are in the :data:`.mafw_model_register`, if so, it will proceed without asking any further confirmation. 

267 Otherwise, if some affected tables are not in the register, then it will ask the user to decide what to do: 

268 

269 - Remove only the triggers whose tables are in the register and thus recreated afterward. 

270 - Remove all triggers, in this case, some of them will not be recreated. 

271 - Abort the processor. 

272 

273 Trigger manipulations (drop and creation) are not directly implemented in :link:`peewee` and are an extension 

274 provided by MAFw. In order to be compatible with the three main databases (:link:`sqlite`, :link:`mysql` and 

275 :link:`postgresql`), the SQL generation is obtained via the :class:`.TriggerDialect` interface. 

276 

277 .. seealso:: 

278 

279 The :class:`.Trigger` class and also the :ref:`trigger chapter <triggers>` for a deeper explanation on triggers. 

280 

281 The :class:`.ModelRegister` class, the :data:`.mafw_model_register` and the :ref:`related chapter 

282 <auto_registration>` on the automatic registration mechanism. 

283 

284 The :class:`.TriggerDialect` and its subclasses, for a database independent way to generate SQL statement 

285 related to triggers. 

286 

287 .. versionadded:: v2.0.0 

288 """ 

289 

290 def __init__(self, *args: Any, **kwargs: Any) -> None: 

291 super().__init__(*args, **kwargs) 

292 self.dialect: Optional[TriggerDialect] = None 

293 self.tables_to_be_rebuilt: set[str] = set() 

294 

295 def get_dialect(self) -> TriggerDialect: 

296 """ 

297 Get the valid SQL dialect based on the type of Database 

298 

299 :return: The SQL trigger dialect 

300 :type: :class:`.TriggerDialect` 

301 :raises: :class:`.UnsupportedDatabaseError` if there is no dialect for the current DB. 

302 """ 

303 if self.dialect is not None: 

304 return self.dialect 

305 

306 if self._database is None: 

307 # Default to SQLite dialect 

308 return SQLiteDialect() 

309 

310 db = self._database 

311 if isinstance(db, peewee.DatabaseProxy): 

312 db = db.obj # Get the actual database from the proxy 

313 

314 dialect: TriggerDialect 

315 if isinstance(db, peewee.SqliteDatabase): 

316 dialect = SQLiteDialect() 

317 elif isinstance(db, peewee.MySQLDatabase): 

318 dialect = MySQLDialect() 

319 elif isinstance(db, peewee.PostgresqlDatabase): 

320 dialect = PostgreSQLDialect() 

321 else: 

322 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}') 

323 

324 return dialect 

325 

326 def start(self) -> None: 

327 super().start() 

328 self.dialect = self.get_dialect() 

329 

330 def get_items(self) -> Collection[Any]: 

331 """ 

332 Retrieves a list of database triggers and interacts with the user to determine which ones to process. 

333 

334 This method fetches all currently defined database triggers. If any tables 

335 associated with these triggers are not known (i.e., not registered in 

336 :data:`.mafw_model_register`), it enters an interactive mode to prompt the user for 

337 a course of action: 

338 

339 1. **Remove All Triggers (A):** Processes all triggers for subsequent removal, 

340 but only marks 'rebuildable' tables for rebuilding. 

341 2. **Remove Only Rebuildable Triggers (O):** Processes only triggers associated 

342 with 'rebuildable' tables. 

343 3. **Quit (Q):** Aborts the entire process. 

344 

345 If no unknown tables are found, or the user chooses to process rebuildable tables, 

346 the list of triggers and the set of tables to be rebuilt are prepared for the next stage. 

347 

348 :return: A collection of database triggers to be processed, in the for tuple trigger_name, table_name 

349 :rtype: List[Tuple[str, str]] 

350 """ 

351 if TYPE_CHECKING: 

352 assert self.dialect is not None 

353 

354 s: list[tuple[str, str]] = self.database.execute_sql(self.dialect.select_all_trigger_sql()).fetchall() # type: ignore[no-untyped-call] 

355 tables = [r[1] for r in s] 

356 

357 affected_tables = set(tables) 

358 known_tables = mafw_model_register.get_table_names() 

359 rebuildable_tables = set([t for t in affected_tables if t in known_tables]) 

360 not_rebuildable_tables = affected_tables - rebuildable_tables 

361 

362 if len(not_rebuildable_tables) > 0: 

363 log.warning(f'There are some tables ({len(not_rebuildable_tables)}) that cannot be rebuild') 

364 with self._user_interface.enter_interactive_mode(): 

365 question = 'Remove all triggers (A), remove only rebuildable triggers (O), quit (Q)' 

366 if self._user_interface.name == 'rich': 

367 question = '[red][bold]' + question + '[/red][/bold]' 

368 

369 class TriggerPrompt(Prompt): 

370 response_type = str 

371 validate_error_message = '[prompt.invalid]Please enter A, O or Q' 

372 choices: list[str] = ['A', 'O', 'Q'] 

373 

374 answer = self._user_interface.prompt_question( 

375 question=question, 

376 prompt_type=TriggerPrompt, 

377 default='O', 

378 show_default=True, 

379 case_sensitive=False, 

380 show_answer=True, 

381 ) 

382 

383 if answer == 'Q': 

384 s = [] 

385 affected_tables = set() 

386 self.processor_exit_status = ProcessorExitStatus.Aborted 

387 self.looping_status = LoopingStatus.Abort 

388 

389 elif answer == 'O': 

390 s = [r for r in s if r[1] in rebuildable_tables] 

391 affected_tables = rebuildable_tables 

392 

393 else: # equivalent to 'A' 

394 # remove all triggers 

395 # but rebuilds only rebuildable_tables 

396 affected_tables = rebuildable_tables 

397 

398 self.tables_to_be_rebuilt = affected_tables 

399 return s 

400 

401 def process(self) -> None: 

402 """Delete the current trigger from its table""" 

403 if TYPE_CHECKING: 

404 assert self.dialect is not None 

405 

406 self.database.execute_sql(self.dialect.drop_trigger_sql(self.item[0], safe=True, table_name=self.item[1])) # type: ignore[no-untyped-call] 

407 

408 def finish(self) -> None: 

409 """ 

410 Recreate the tables from which triggers were dropped. 

411 

412 This is only done if the user did not abort the process. 

413 """ 

414 if self.looping_status != LoopingStatus.Abort: 

415 log.info(f'Recreating {self.n_item} triggers on {len(self.tables_to_be_rebuilt)} tables...') 

416 models = [mafw_model_register.get_model(table_name) for table_name in self.tables_to_be_rebuilt] 

417 

418 self.database.create_tables(models) # type: ignore[arg-type] 

419 super().finish() 

420 

421 def format_progress_message(self) -> None: 

422 self.progress_message = f'Dropping trigger {self.item[0]} from table {self.item[1]}' 

423 

424 

425@database_required 

426class SQLScriptRunner(Processor): 

427 """ 

428 Processor to execute SQL scripts from files against the database. 

429 

430 This processor reads SQL files, removes multi-line block comments, splits the content into individual 

431 statements, and executes them within a transaction. It is designed to handle SQL script execution 

432 in a safe manner by wrapping all statements in a single atomic transaction. 

433 

434 The processor accepts a list of SQL files through the :attr:`sql_files` parameter. Each file is validated 

435 to ensure it exists and is a regular file before processing. Block comments (`/* ... */`) are removed 

436 from the SQL content before statement parsing. 

437 

438 .. versionadded:: v2.0.0 

439 """ 

440 

441 sql_files = ActiveParameter[list[Path]]('sql_files', default=[], help_doc='A list of SQL files to be processed') 

442 """List of SQL files to be processed""" 

443 

444 def validate_configuration(self) -> None: 

445 """ 

446 Validate the configuration of SQL script runner. 

447 

448 Ensures that all specified SQL files exist and are regular files. 

449 

450 :raises InvalidConfigurationError: if any of the specified files does not exist or is not a regular file. 

451 """ 

452 if TYPE_CHECKING: 

453 # we need to convince mypy that the sql_files is not and ActiveParameter but 

454 # the content of the ActiveParameter 

455 assert isinstance(self.sql_files, list) 

456 

457 self.sql_files = [Path(file) for file in self.sql_files] 

458 for file in self.sql_files: 

459 if not file.exists() or not file.is_file(): 

460 raise InvalidConfigurationError(f'There are issues with SQL file "{file.resolve()}". Please verify.') 

461 

462 def get_items(self) -> Collection[Any]: 

463 """ 

464 Get the collection of SQL files to be processed. 

465 

466 :return: A collection of SQL file paths to be processed 

467 :rtype: Collection[Any] 

468 """ 

469 if TYPE_CHECKING: 

470 # we need to convince mypy that the sql_files is not and ActiveParameter but 

471 # the content of the ActiveParameter 

472 assert isinstance(self.sql_files, list) 

473 

474 return self.sql_files 

475 

476 def process(self) -> None: 

477 """ 

478 Process a single SQL file by reading, parsing, and executing its statements. 

479 

480 Reads the SQL file content, removes multi-line block comments, splits the content 

481 into individual SQL statements, and executes them within a transaction. 

482 

483 If no statements are found in the file, a warning is logged. If an error occurs 

484 during execution, the transaction is rolled back and the exception is re-raised. 

485 

486 :raises Exception: If an error occurs during SQL statement execution. 

487 """ 

488 with open(self.item, 'rt') as sql_file: 

489 sql_content = sql_file.read() 

490 

491 # remove the multi-line block comments (/* ... */) 

492 sql_content = block_comment_re.sub('', sql_content) 

493 

494 statements = [s.strip() + ';' for s in sql_content.split(';') if s.strip()] 

495 

496 if not statements: 

497 log.warning(f'No SQL statements found to execute in {self.item.name}.') 

498 

499 log.debug(f'Found {len(statements)} statements to execute.') 

500 

501 try: 

502 # use an atomic transaction to wrap the execution of all statements. 

503 with self.database.atomic(): 

504 for statement in statements: 

505 self.database.execute_sql(statement) # type: ignore[no-untyped-call] 

506 

507 except Exception as e: 

508 log.critical(f'An error occurred while executing the SQL script {self.item.name}.') 

509 log.critical('Rolling back the database to preserve integrity.') 

510 log.critical(f'Error details: {e}') 

511 raise 

512 

513 def format_progress_message(self) -> None: 

514 self.progress_message = f'Processing SQL file {self.item.name}'