Coverage for src / mafw / processor_library / db_init.py: 100%

161 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 16:10 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Database initialisation processor module. 

6 

7This module contains the following processors: 

8 

9:class:`TableCreator` 

10 processor which handles the creation of database tables based on registered models. It provides functionality to 

11 create tables automatically while respecting existing tables and offering options for forced recreation. 

12 

13:class:`TriggerRefresher` 

14 processor to safely update the trigger definitions. It removes all existing triggers and regenerates them 

15 according to the new definition. Particularly useful when debugging triggers, it can also be left at the 

16 beginning of all analysis pipelines since it does not cause any loss of data. 

17 

18:class:`SQLScriptRunner` 

19 processor to execute SQL scripts from files against the database. It reads SQL files, removes block comments, 

20 splits the content into individual statements, and executes them within a transaction. 

21 

22.. versionadded:: v2.0.0 

23""" 

24 

25import logging 

26import re 

27from pathlib import Path 

28from typing import TYPE_CHECKING, Any, Collection, Optional 

29 

30import peewee 

31from rich.prompt import Confirm, Prompt 

32 

33from mafw.db.db_model import mafw_model_register 

34from mafw.db.std_tables import StandardTable 

35from mafw.db.trigger import MySQLDialect, PostgreSQLDialect, SQLiteDialect, TriggerDialect 

36from mafw.decorators import database_required, single_loop 

37from mafw.enumerators import LoopingStatus, ProcessorExitStatus 

38from mafw.mafw_errors import InvalidConfigurationError, UnsupportedDatabaseError 

39from mafw.processor import ActiveParameter, Processor 

40 

41log = logging.getLogger(__name__) 

42 

43block_comment_re = re.compile(r'/\*.*?\*/', re.DOTALL) 

44 

45 

46@database_required 

47@single_loop 

48class TableCreator(Processor): 

49 """ 

50 Processor to create all tables in the database. 

51 

52 This processor can be included in all pipelines in order to create all tables in the database. Its functionality 

53 is based on the fact that all :class:`.MAFwBaseModel` subclasses are automatically included in a global register 

54 (:data:`.mafw_model_register`). 

55 

56 This processor will perform the following: 

57 

58 #. Get a list of all tables already existing in the database. 

59 #. Prune from the lists of models the ones for which already exist in the database. 

60 #. Create the remaining tables. 

61 

62 

63 This overall behaviour can be modified via the following parameters: 

64 

65 * *force_recreate* (bool, default = False): Use with extreme care. When set to True, all tables in the 

66 database and in the model register will be first dropped and then recreated. It is almost equivalent to a re-initialization of the 

67 whole DB with all the data being lost. 

68 

69 * *soft_recreate* (bool, default = True): When set to true, all tables whose model is in the mafw model 

70 register will be recreated with the safe flag. It means that there won't be any table drop. If a table is 

71 already existing, nothing will happen. If a new trigger is added to the table this will be created. When 

72 set to False, only tables whose model is in the register and that are not existing will be created. 

73 

74 * *apply_only_to_prefix* (list[str], default = []): This parameter allows to create only the tables that do 

75 not already exist and whose name start with one of the provided prefixes. 

76 

77 .. versionadded:: v2.0.0 

78 

79 """ 

80 

81 force_recreate = ActiveParameter( 

82 name='force_recreate', default=False, help_doc='First drop and then create the tables. LOSS OF ALL DATA!!!' 

83 ) 

84 """ 

85 Force recreate (bool, default = False). 

86  

87 Use with extreme care. When set to True, all tables in the database and in the model register will be first  

88 dropped and then recreated. It is almost equivalent to a re-initialization of the whole DB with all the data  

89 being lost.  

90 """ 

91 

92 soft_recreate = ActiveParameter( 

93 name='soft_recreate', default=True, help_doc='Safe recreate tables without dropping. No data loss' 

94 ) 

95 """ 

96 Soft recreate (bool default = True). 

97  

98 When set to true, all tables whose model is in the mafw model register will be recreated with the safe flag. It  

99 means that there won't be any table drop. If a table is already existing, nothing will happen. If a new trigger  

100 is added to the table, this will be created. When set to False, only tables whose model is in the register and  

101 that are not existing will be created.  

102 """ 

103 

104 apply_only_to_prefix = ActiveParameter[list[str]]( 

105 name='apply_only_to_prefix', 

106 default=[], 

107 help_doc='Create only tables whose name start with the provided prefixes.', 

108 ) 

109 """ 

110 Apply only to tables starting with prefix (list[str], default = []). 

111  

112 This parameter allows to create only the tables that do not already exist and whose name start with one of the  

113 provided prefixes.  

114 """ 

115 

116 def __init__(self, *args: Any, **kwargs: Any) -> None: 

117 super().__init__(*args, **kwargs) 

118 self.existing_table_names: list[str] = [] 

119 """The list of all existing tables in the database.""" 

120 

121 def validate_configuration(self) -> None: 

122 """ 

123 Configuration validation 

124 

125 :attr:`force_recreate` and :attr:`soft_recreate` cannot be both valid. 

126 

127 :raises InvalidConfigurationError: if both recreate types are True. 

128 """ 

129 if self.force_recreate and self.soft_recreate: 

130 raise InvalidConfigurationError( 

131 'Both force_recreate and soft_recreate set to True. Incompatible configuration' 

132 ) 

133 

134 def process(self) -> None: 

135 """ 

136 Execute the table creation process. 

137 

138 This method performs the following steps: 

139 

140 #. Identify all models that have automatic creation enabled. 

141 #. Filter models based on the apply_only_to_prefix parameter if specified. 

142 #. Handle forced recreation if requested, including user confirmation. 

143 #. Handle soft recreation if requested, letting all tables with a known model be recreated. 

144 #. Create the required tables. 

145 #. Initialise standard tables after recreation if needed. 

146 

147 If user cancel the creation, the processor exit status is set to :attr:`.ProcessorExitStatus.Aborted` so that 

148 the whole processor list is blocked. 

149 """ 

150 # get all tables with autocreation flag 

151 autocreation_table_names = [ 

152 name 

153 for name, model in mafw_model_register.items() 

154 if model._meta.automatic_creation # type: ignore[attr-defined] 

155 ] 

156 

157 if self.apply_only_to_prefix: 

158 # remove tables with all given prefixes 

159 autocreation_table_names = [ 

160 name 

161 for name in autocreation_table_names 

162 if any([name.startswith(prefix) for prefix in self.apply_only_to_prefix]) # type: ignore[union-attr] 

163 ] 

164 

165 # in the case of force_recreation, we need to have user confirmation 

166 if self.force_recreate: 

167 log.warning(f'Forcing recreation of {len(autocreation_table_names)} tables in the database.') 

168 log.warning('All data in these tables will be lost.') 

169 

170 with self._user_interface.enter_interactive_mode(): 

171 question = 'Are you really sure?' 

172 if self._user_interface.name == 'rich': 

173 question = '[red][bold]' + question + '[/red][/bold]' 

174 confirmation = self._user_interface.prompt_question( 

175 question=question, prompt_type=Confirm, default=False, show_default=True, case_sensitive=True 

176 ) 

177 if not confirmation: 

178 self.processor_exit_status = ProcessorExitStatus.Aborted 

179 return 

180 else: 

181 log.info(f'Removing {len(autocreation_table_names)} tables from the database.') 

182 models = [model for name, model in mafw_model_register.items() if name in autocreation_table_names] 

183 self.database.drop_tables(models) # type: ignore[arg-type] 

184 

185 if self.soft_recreate: 

186 # recreate all tables in the mafw register 

187 models = [model for name, model in mafw_model_register.items() if name in autocreation_table_names] 

188 else: 

189 # recreate all tables in the mafw register and that are not yet existing 

190 self.existing_table_names = self.database.get_tables() 

191 models = [ 

192 model 

193 for name, model in mafw_model_register.items() 

194 if name in autocreation_table_names and name not in self.existing_table_names 

195 ] 

196 self.database.create_tables(models) # type: ignore[arg-type] 

197 

198 if self.force_recreate: 

199 # in the case of a recreation, do the initialisation of all dropped standard tables. 

200 for model in models: 

201 if isinstance(model, type(StandardTable)): 

202 model.init() 

203 

204 n = len(models) 

205 if n > 0: 

206 if n == 1: 

207 plu = '' 

208 else: 

209 plu = 's' 

210 if self.soft_recreate: 

211 soft = '(soft) ' 

212 else: 

213 soft = '' 

214 

215 log.info(f'Successfully {soft}created {len(models)} table{plu}.') 

216 

217 

218@database_required 

219class TriggerRefresher(Processor): 

220 """ 

221 Processor to recreate all triggers. 

222 

223 Triggers are database objects, and even though they could be created, dropped and modified at any moment, 

224 within the MAFw execution cycle they are normally created along with the table they are targeting. 

225 

226 When the table is created, also all its triggers are created, 

227 but unless differently specified, with the safe flag on, that means that they are created if they do not exist. 

228 

229 This might be particularly annoying when modifying an existing trigger, because you need to manually drop the 

230 trigger to let the table creation mechanism to create the newer version. 

231 

232 The goal of this processor is to drop all existing triggers and then recreate the corresponding tables so to have 

233 an updated version of the triggers. 

234 

235 The processor is relying on the fact that all subclasses of :class:`.MAFwBaseModel` 

236 are automatically inserted in the :data:`.mafw_model_register` so that the model class can be retrieved from the 

237 table name. 

238 

239 Before removing any trigger, the processor will build a list with all the affected tables and check if all of 

240 them are in the :data:`.mafw_model_register`, if so, it will proceed without asking any further confirmation. 

241 Otherwise, if some affected tables are not in the register, then it will ask the user to decide what to do: 

242 

243 - Remove only the triggers whose tables are in the register and thus recreated afterward. 

244 - Remove all triggers, in this case, some of them will not be recreated. 

245 - Abort the processor. 

246 

247 Trigger manipulations (drop and creation) are not directly implemented in :link:`peewee` and are an extension 

248 provided by MAFw. In order to be compatible with the three main databases (:link:`sqlite`, :link:`mysql` and 

249 :link:`postgresql`), the SQL generation is obtained via the :class:`.TriggerDialect` interface. 

250 

251 .. seealso:: 

252 

253 The :class:`.Trigger` class and also the :ref:`trigger chapter <triggers>` for a deeper explanation on triggers. 

254 

255 The :class:`.ModelRegister` class, the :data:`.mafw_model_register` and the :ref:`related chapter 

256 <auto_registration>` on the automatic registration mechanism. 

257 

258 The :class:`.TriggerDialect` and its subclasses, for a database independent way to generate SQL statement 

259 related to triggers. 

260 

261 .. versionadded:: v2.0.0 

262 """ 

263 

264 def __init__(self, *args: Any, **kwargs: Any) -> None: 

265 super().__init__(*args, **kwargs) 

266 self.dialect: Optional[TriggerDialect] = None 

267 self.tables_to_be_rebuilt: set[str] = set() 

268 

269 def get_dialect(self) -> TriggerDialect: 

270 """ 

271 Get the valid SQL dialect based on the type of Database 

272 

273 :return: The SQL trigger dialect 

274 :type: :class:`.TriggerDialect` 

275 :raises: :class:`.UnsupportedDatabaseError` if there is no dialect for the current DB. 

276 """ 

277 if self.dialect is not None: 

278 return self.dialect 

279 

280 if self._database is None: 

281 # Default to SQLite dialect 

282 return SQLiteDialect() 

283 

284 db = self._database 

285 if isinstance(db, peewee.DatabaseProxy): 

286 db = db.obj # Get the actual database from the proxy 

287 

288 dialect: TriggerDialect 

289 if isinstance(db, peewee.SqliteDatabase): 

290 dialect = SQLiteDialect() 

291 elif isinstance(db, peewee.MySQLDatabase): 

292 dialect = MySQLDialect() 

293 elif isinstance(db, peewee.PostgresqlDatabase): 

294 dialect = PostgreSQLDialect() 

295 else: 

296 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}') 

297 

298 return dialect 

299 

300 def start(self) -> None: 

301 super().start() 

302 self.dialect = self.get_dialect() 

303 

304 def get_items(self) -> Collection[Any]: 

305 """ 

306 Retrieves a list of database triggers and interacts with the user to determine which ones to process. 

307 

308 This method fetches all currently defined database triggers. If any tables 

309 associated with these triggers are not known (i.e., not registered in 

310 :data:`.mafw_model_register`), it enters an interactive mode to prompt the user for 

311 a course of action: 

312 

313 1. **Remove All Triggers (A):** Processes all triggers for subsequent removal, 

314 but only marks 'rebuildable' tables for rebuilding. 

315 2. **Remove Only Rebuildable Triggers (O):** Processes only triggers associated 

316 with 'rebuildable' tables. 

317 3. **Quit (Q):** Aborts the entire process. 

318 

319 If no unknown tables are found, or the user chooses to process rebuildable tables, 

320 the list of triggers and the set of tables to be rebuilt are prepared for the next stage. 

321 

322 :return: A collection of database triggers to be processed, in the for tuple trigger_name, table_name 

323 :rtype: List[Tuple[str, str]] 

324 """ 

325 if TYPE_CHECKING: 

326 assert self.dialect is not None 

327 

328 s: list[tuple[str, str]] = self.database.execute_sql(self.dialect.select_all_trigger_sql()).fetchall() # type: ignore[no-untyped-call] 

329 tables = [r[1] for r in s] 

330 

331 affected_tables = set(tables) 

332 known_tables = mafw_model_register.get_table_names() 

333 rebuildable_tables = set([t for t in affected_tables if t in known_tables]) 

334 not_rebuildable_tables = affected_tables - rebuildable_tables 

335 

336 if len(not_rebuildable_tables) > 0: 

337 log.warning(f'There are some tables ({len(not_rebuildable_tables)}) that cannot be rebuild') 

338 with self._user_interface.enter_interactive_mode(): 

339 question = 'Remove all triggers (A), remove only rebuildable triggers (O), quit (Q)' 

340 if self._user_interface.name == 'rich': 

341 question = '[red][bold]' + question + '[/red][/bold]' 

342 

343 class TriggerPrompt(Prompt): 

344 response_type = str 

345 validate_error_message = '[prompt.invalid]Please enter A, O or Q' 

346 choices: list[str] = ['A', 'O', 'Q'] 

347 

348 answer = self._user_interface.prompt_question( 

349 question=question, 

350 prompt_type=TriggerPrompt, 

351 default='O', 

352 show_default=True, 

353 case_sensitive=False, 

354 show_answer=True, 

355 ) 

356 

357 if answer == 'Q': 

358 s = [] 

359 affected_tables = set() 

360 self.processor_exit_status = ProcessorExitStatus.Aborted 

361 self.looping_status = LoopingStatus.Abort 

362 

363 elif answer == 'O': 

364 s = [r for r in s if r[1] in rebuildable_tables] 

365 affected_tables = rebuildable_tables 

366 

367 else: # equivalent to 'A' 

368 # remove all triggers 

369 # but rebuilds only rebuildable_tables 

370 affected_tables = rebuildable_tables 

371 

372 self.tables_to_be_rebuilt = affected_tables 

373 return s 

374 

375 def process(self) -> None: 

376 """Delete the current trigger from its table""" 

377 if TYPE_CHECKING: 

378 assert self.dialect is not None 

379 

380 self.database.execute_sql(self.dialect.drop_trigger_sql(self.item[0], safe=True, table_name=self.item[1])) # type: ignore[no-untyped-call] 

381 

382 def finish(self) -> None: 

383 """ 

384 Recreate the tables from which triggers were dropped. 

385 

386 This is only done if the user did not abort the process. 

387 """ 

388 if self.looping_status != LoopingStatus.Abort: 

389 log.info(f'Recreating {self.n_item} triggers on {len(self.tables_to_be_rebuilt)} tables...') 

390 models = [mafw_model_register.get_model(table_name) for table_name in self.tables_to_be_rebuilt] 

391 

392 self.database.create_tables(models) # type: ignore[arg-type] 

393 super().finish() 

394 

395 def format_progress_message(self) -> None: 

396 self.progress_message = f'Dropping trigger {self.item[0]} from table {self.item[1]}' 

397 

398 

399@database_required 

400class SQLScriptRunner(Processor): 

401 """ 

402 Processor to execute SQL scripts from files against the database. 

403 

404 This processor reads SQL files, removes multi-line block comments, splits the content into individual 

405 statements, and executes them within a transaction. It is designed to handle SQL script execution 

406 in a safe manner by wrapping all statements in a single atomic transaction. 

407 

408 The processor accepts a list of SQL files through the :attr:`sql_files` parameter. Each file is validated 

409 to ensure it exists and is a regular file before processing. Block comments (`/* ... */`) are removed 

410 from the SQL content before statement parsing. 

411 

412 .. versionadded:: v2.0.0 

413 """ 

414 

415 sql_files = ActiveParameter[list[Path]]('sql_files', default=[], help_doc='A list of SQL files to be processed') 

416 """List of SQL files to be processed""" 

417 

418 def validate_configuration(self) -> None: 

419 """ 

420 Validate the configuration of SQL script runner. 

421 

422 Ensures that all specified SQL files exist and are regular files. 

423 

424 :raises InvalidConfigurationError: if any of the specified files does not exist or is not a regular file. 

425 """ 

426 if TYPE_CHECKING: 

427 # we need to convince mypy that the sql_files is not and ActiveParameter but 

428 # the content of the ActiveParameter 

429 assert isinstance(self.sql_files, list) 

430 

431 self.sql_files = [Path(file) for file in self.sql_files] 

432 for file in self.sql_files: 

433 if not file.exists() or not file.is_file(): 

434 raise InvalidConfigurationError(f'There are issues with SQL file "{file.resolve()}". Please verify.') 

435 

436 def get_items(self) -> Collection[Any]: 

437 """ 

438 Get the collection of SQL files to be processed. 

439 

440 :return: A collection of SQL file paths to be processed 

441 :rtype: Collection[Any] 

442 """ 

443 if TYPE_CHECKING: 

444 # we need to convince mypy that the sql_files is not and ActiveParameter but 

445 # the content of the ActiveParameter 

446 assert isinstance(self.sql_files, list) 

447 

448 return self.sql_files 

449 

450 def process(self) -> None: 

451 """ 

452 Process a single SQL file by reading, parsing, and executing its statements. 

453 

454 Reads the SQL file content, removes multi-line block comments, splits the content 

455 into individual SQL statements, and executes them within a transaction. 

456 

457 If no statements are found in the file, a warning is logged. If an error occurs 

458 during execution, the transaction is rolled back and the exception is re-raised. 

459 

460 :raises Exception: If an error occurs during SQL statement execution. 

461 """ 

462 with open(self.item, 'rt') as sql_file: 

463 sql_content = sql_file.read() 

464 

465 # remove the multi-line block comments (/* ... */) 

466 sql_content = block_comment_re.sub('', sql_content) 

467 

468 statements = [s.strip() + ';' for s in sql_content.split(';') if s.strip()] 

469 

470 if not statements: 

471 log.warning(f'No SQL statements found to execute in {self.item.name}.') 

472 

473 log.debug(f'Found {len(statements)} statements to execute.') 

474 

475 try: 

476 # use an atomic transaction to wrap the execution of all statements. 

477 with self.database.atomic(): 

478 for statement in statements: 

479 self.database.execute_sql(statement) # type: ignore[no-untyped-call] 

480 

481 except Exception as e: 

482 log.critical(f'An error occurred while executing the SQL script {self.item.name}.') 

483 log.critical('Rolling back the database to preserve integrity.') 

484 log.critical(f'Error details: {e}') 

485 raise 

486 

487 def format_progress_message(self) -> None: 

488 self.progress_message = f'Processing SQL file {self.item.name}'