Coverage for src / mafw / processor_library / db_init.py: 100%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Database initialisation processor module. 

6 

7This module contains the following processors: 

8 

9:class:`TableCreator` 

10 processor which handles the creation of database tables based on registered models. It provides functionality to 

11 create tables automatically while respecting existing tables and offering options for forced recreation. 

12 

13:class:`TriggerRefresher` 

14 processor to safely update the trigger definitions. It removes all existing triggers and regenerates them 

15 according to the new definition. Particularly useful when debugging triggers, it can also be left at the 

16 beginning of all analysis pipelines since it does not cause any loss of data. 

17 

18:class:`SQLScriptRunner` 

19 processor to execute SQL scripts from files against the database. It reads SQL files, removes block comments, 

20 splits the content into individual statements, and executes them within a transaction. 

21 

22.. versionadded:: v2.0.0 

23""" 

24 

25import logging 

26import re 

27from pathlib import Path 

28from typing import TYPE_CHECKING, Any, Collection, Optional 

29 

30import peewee 

31from rich.prompt import Confirm, Prompt 

32 

33from mafw.db.db_model import mafw_model_register 

34from mafw.db.std_tables import StandardTable 

35from mafw.db.trigger import MySQLDialect, PostgreSQLDialect, SQLiteDialect, TriggerDialect 

36from mafw.decorators import database_required, single_loop 

37from mafw.enumerators import LoopingStatus, ProcessorExitStatus 

38from mafw.mafw_errors import InvalidConfigurationError, UnsupportedDatabaseError 

39from mafw.processor import ActiveParameter, Processor 

40 

41log = logging.getLogger(__name__) 

42 

43block_comment_re = re.compile(r'/\*.*?\*/', re.DOTALL) 

44 

45 

46@database_required 

47@single_loop 

48class TableCreator(Processor): 

49 """ 

50 Processor to create all tables in the database. 

51 

52 This processor can be included in all pipelines in order to create all tables in the database. Its functionality 

53 is based on the fact that all :class:`.MAFwBaseModel` subclasses are automatically included in a global register 

54 (:data:`.mafw_model_register`). 

55 

56 This processor will perform the following: 

57 

58 #. Get a list of all tables already existing in the database. 

59 #. Prune from the lists of models the ones for which already exist in the database. 

60 #. Create the remaining tables. 

61 

62 

63 This overall behaviour can be modified via the following parameters: 

64 

65 * *force_recreate* (bool, default = False): Use with extreme care. When set to True, all tables in the 

66 database and in the model register will be first dropped and then recreated. It is almost equivalent to a re-initialization of the 

67 whole DB with all the data being lost. 

68 

69 * *soft_recreate* (bool, default = True): When set to true, all tables whose model is in the mafw model 

70 register will be recreated with the safe flag. It means that there won't be any table drop. If a table is 

71 already existing, nothing will happen. If a new trigger is added to the table this will be created. When 

72 set to False, only tables whose model is in the register and that are not existing will be created. 

73 

74 * *apply_only_to_prefix* (list[str], default = []): This parameter allows to create only the tables that do 

75 not already exist and whose name start with one of the provided prefixes. 

76 

77 .. versionadded:: v2.0.0 

78 

79 """ 

80 

81 force_recreate = ActiveParameter( 

82 name='force_recreate', default=False, help_doc='First drop and then create the tables. LOSS OF ALL DATA!!!' 

83 ) 

84 """ 

85 Force recreate (bool, default = False). 

86  

87 Use with extreme care. When set to True, all tables in the database and in the model register will be first  

88 dropped and then recreated. It is almost equivalent to a re-initialization of the whole DB with all the data  

89 being lost.  

90 """ 

91 

92 soft_recreate = ActiveParameter( 

93 name='soft_recreate', default=True, help_doc='Safe recreate tables without dropping. No data loss' 

94 ) 

95 """ 

96 Soft recreate (bool default = True). 

97  

98 When set to true, all tables whose model is in the mafw model register will be recreated with the safe flag. It  

99 means that there won't be any table drop. If a table is already existing, nothing will happen. If a new trigger  

100 is added to the table, this will be created. When set to False, only tables whose model is in the register and  

101 that are not existing will be created.  

102 """ 

103 

104 apply_only_to_prefix = ActiveParameter[list[str]]( 

105 name='apply_only_to_prefix', 

106 default=[], 

107 help_doc='Create only tables whose name start with the provided prefixes.', 

108 ) 

109 """ 

110 Apply only to tables starting with prefix (list[str], default = []). 

111  

112 This parameter allows to create only the tables that do not already exist and whose name start with one of the  

113 provided prefixes.  

114 """ 

115 

116 def __init__(self, *args: Any, **kwargs: Any) -> None: 

117 super().__init__(*args, **kwargs) 

118 self.existing_table_names: list[str] = [] 

119 """The list of all existing tables in the database.""" 

120 

121 def validate_configuration(self) -> None: 

122 """ 

123 Configuration validation 

124 

125 :attr:`force_recreate` and :attr:`soft_recreate` cannot be both valid. 

126 

127 :raises InvalidConfigurationError: if both recreate types are True. 

128 """ 

129 if self.force_recreate and self.soft_recreate: 

130 raise InvalidConfigurationError( 

131 'Both force_recreate and soft_recreate set to True. Incompatible configuration' 

132 ) 

133 

134 def process(self) -> None: 

135 """ 

136 Execute the table creation process. 

137 

138 This method performs the following steps: 

139 

140 #. Identify all models that have automatic creation enabled. 

141 #. Filter models based on the apply_only_to_prefix parameter if specified. 

142 #. Handle forced recreation if requested, including user confirmation. 

143 #. Handle soft recreation if requested, letting all tables with a known model be recreated. 

144 #. Create the required tables. 

145 #. Initialise standard tables after recreation if needed. 

146 

147 If user cancel the creation, the processor exit status is set to :attr:`.ProcessorExitStatus.Aborted` so that 

148 the whole processor list is blocked. 

149 """ 

150 # get all tables with autocreation flag 

151 autocreation_table_names = [ 

152 name 

153 for name, model in mafw_model_register.items() 

154 if model._meta.automatic_creation # type: ignore[attr-defined] 

155 ] 

156 

157 if self.apply_only_to_prefix: 

158 # remove tables with all given prefixes 

159 autocreation_table_names = [ 

160 name 

161 for name in autocreation_table_names 

162 if any([name.startswith(prefix) for prefix in self.apply_only_to_prefix]) # type: ignore[union-attr] 

163 ] 

164 

165 # in the case of force_recreation, we need to have user confirmation 

166 if self.force_recreate: 

167 log.warning(f'Forcing recreation of {len(autocreation_table_names)} tables in the database.') 

168 log.warning('All data in these tables will be lost.') 

169 

170 with self._user_interface.enter_interactive_mode(): 

171 question = 'Are you really sure?' 

172 if self._user_interface.name == 'rich': 

173 question = '[red][bold]' + question + '[/red][/bold]' 

174 confirmation = self._user_interface.prompt_question( 

175 question=question, prompt_type=Confirm, default=False, show_default=True, case_sensitive=True 

176 ) 

177 if not confirmation: 

178 self.processor_exit_status = ProcessorExitStatus.Aborted 

179 return 

180 else: 

181 log.info(f'Removing {len(autocreation_table_names)} tables from the database.') 

182 models = [model for name, model in mafw_model_register.items() if name in autocreation_table_names] 

183 self.database.drop_tables(models) # type: ignore[arg-type] 

184 

185 if self.soft_recreate: 

186 # recreate all tables in the mafw register 

187 models = [model for name, model in mafw_model_register.items() if name in autocreation_table_names] 

188 else: 

189 # recreate all tables in the mafw register and that are not yet existing 

190 self.existing_table_names = self.database.get_tables() 

191 models = [ 

192 model 

193 for name, model in mafw_model_register.items() 

194 if name in autocreation_table_names and name not in self.existing_table_names 

195 ] 

196 self.database.create_tables(models) # type: ignore[arg-type] 

197 

198 if self.force_recreate: 

199 # in the case of a recreation, do the initialisation of all dropped standard tables. 

200 for model in models: 

201 if isinstance(model, type(StandardTable)): 

202 model.init() 

203 

204 n = len(models) 

205 if n > 0: 

206 if n == 1: 

207 plu = '' 

208 else: 

209 plu = 's' 

210 log.info(f'Successfully create {len(models)} table{plu}.') 

211 

212 

213@database_required 

214class TriggerRefresher(Processor): 

215 """ 

216 Processor to recreate all triggers. 

217 

218 Triggers are database objects, and even though they could be created, dropped and modified at any moment, 

219 within the MAFw execution cycle they are normally created along with the table they are targeting. 

220 

221 When the table is created, also all its triggers are created, 

222 but unless differently specified, with the safe flag on, that means that they are created if they do not exist. 

223 

224 This might be particularly annoying when modifying an existing trigger, because you need to manually drop the 

225 trigger to let the table creation mechanism to create the newer version. 

226 

227 The goal of this processor is to drop all existing triggers and then recreate the corresponding tables so to have 

228 an updated version of the triggers. 

229 

230 The processor is relying on the fact that all subclasses of :class:`.MAFwBaseModel` 

231 are automatically inserted in the :data:`.mafw_model_register` so that the model class can be retrieved from the 

232 table name. 

233 

234 Before removing any trigger, the processor will build a list with all the affected tables and check if all of 

235 them are in the :data:`.mafw_model_register`, if so, it will proceed without asking any further confirmation. 

236 Otherwise, if some affected tables are not in the register, then it will ask the user to decide what to do: 

237 

238 - Remove only the triggers whose tables are in the register and thus recreated afterward. 

239 - Remove all triggers, in this case, some of them will not be recreated. 

240 - Abort the processor. 

241 

242 Trigger manipulations (drop and creation) are not directly implemented in :link:`peewee` and are an extension 

243 provided by MAFw. In order to be compatible with the three main databases (:link:`sqlite`, :link:`mysql` and 

244 :link:`postgresql`), the SQL generation is obtained via the :class:`.TriggerDialect` interface. 

245 

246 .. seealso:: 

247 

248 The :class:`.Trigger` class and also the :ref:`trigger chapter <triggers>` for a deeper explanation on triggers. 

249 

250 The :class:`.ModelRegister` class, the :data:`.mafw_model_register` and the :ref:`related chapter 

251 <auto_registration>` on the automatic registration mechanism. 

252 

253 The :class:`.TriggerDialect` and its subclasses, for a database independent way to generate SQL statement 

254 related to triggers. 

255 

256 .. versionadded:: v2.0.0 

257 """ 

258 

259 def __init__(self, *args: Any, **kwargs: Any) -> None: 

260 super().__init__(*args, **kwargs) 

261 self.dialect: Optional[TriggerDialect] = None 

262 self.tables_to_be_rebuilt: set[str] = set() 

263 

264 def get_dialect(self) -> TriggerDialect: 

265 """ 

266 Get the valid SQL dialect based on the type of Database 

267 

268 :return: The SQL trigger dialect 

269 :type: :class:`.TriggerDialect` 

270 :raises: :class:`.UnsupportedDatabaseError` if there is no dialect for the current DB. 

271 """ 

272 if self.dialect is not None: 

273 return self.dialect 

274 

275 if self._database is None: 

276 # Default to SQLite dialect 

277 return SQLiteDialect() 

278 

279 db = self._database 

280 if isinstance(db, peewee.DatabaseProxy): 

281 db = db.obj # Get the actual database from the proxy 

282 

283 dialect: TriggerDialect 

284 if isinstance(db, peewee.SqliteDatabase): 

285 dialect = SQLiteDialect() 

286 elif isinstance(db, peewee.MySQLDatabase): 

287 dialect = MySQLDialect() 

288 elif isinstance(db, peewee.PostgresqlDatabase): 

289 dialect = PostgreSQLDialect() 

290 else: 

291 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}') 

292 

293 return dialect 

294 

295 def start(self) -> None: 

296 super().start() 

297 self.dialect = self.get_dialect() 

298 

299 def get_items(self) -> Collection[Any]: 

300 """ 

301 Retrieves a list of database triggers and interacts with the user to determine which ones to process. 

302 

303 This method fetches all currently defined database triggers. If any tables 

304 associated with these triggers are not known (i.e., not registered in 

305 :data:`.mafw_model_register`), it enters an interactive mode to prompt the user for 

306 a course of action: 

307 

308 1. **Remove All Triggers (A):** Processes all triggers for subsequent removal, 

309 but only marks 'rebuildable' tables for rebuilding. 

310 2. **Remove Only Rebuildable Triggers (O):** Processes only triggers associated 

311 with 'rebuildable' tables. 

312 3. **Quit (Q):** Aborts the entire process. 

313 

314 If no unknown tables are found, or the user chooses to process rebuildable tables, 

315 the list of triggers and the set of tables to be rebuilt are prepared for the next stage. 

316 

317 :return: A collection of database triggers to be processed, in the for tuple trigger_name, table_name 

318 :rtype: List[Tuple[str, str]] 

319 """ 

320 if TYPE_CHECKING: 

321 assert self.dialect is not None 

322 

323 s: list[tuple[str, str]] = self.database.execute_sql(self.dialect.select_all_trigger_sql()).fetchall() 

324 tables = [r[1] for r in s] 

325 

326 affected_tables = set(tables) 

327 known_tables = mafw_model_register.get_table_names() 

328 rebuildable_tables = set([t for t in affected_tables if t in known_tables]) 

329 not_rebuildable_tables = affected_tables - rebuildable_tables 

330 

331 if len(not_rebuildable_tables) > 0: 

332 log.warning(f'There are some tables ({len(not_rebuildable_tables)}) that cannot be rebuild') 

333 with self._user_interface.enter_interactive_mode(): 

334 question = 'Remove all triggers (A), remove only rebuildable triggers (O), quit (Q)' 

335 if self._user_interface.name == 'rich': 

336 question = '[red][bold]' + question + '[/red][/bold]' 

337 

338 class TriggerPrompt(Prompt): 

339 response_type = str 

340 validate_error_message = '[prompt.invalid]Please enter A, O or Q' 

341 choices: list[str] = ['A', 'O', 'Q'] 

342 

343 answer = self._user_interface.prompt_question( 

344 question=question, 

345 prompt_type=TriggerPrompt, 

346 default='O', 

347 show_default=True, 

348 case_sensitive=False, 

349 show_answer=True, 

350 ) 

351 

352 if answer == 'Q': 

353 s = [] 

354 affected_tables = set() 

355 self.processor_exit_status = ProcessorExitStatus.Aborted 

356 self.looping_status = LoopingStatus.Abort 

357 

358 elif answer == 'O': 

359 s = [r for r in s if r[1] in rebuildable_tables] 

360 affected_tables = rebuildable_tables 

361 

362 else: # equivalent to 'A' 

363 # remove all triggers 

364 # but rebuilds only rebuildable_tables 

365 affected_tables = rebuildable_tables 

366 

367 self.tables_to_be_rebuilt = affected_tables 

368 return s 

369 

370 def process(self) -> None: 

371 """Delete the current trigger from its table""" 

372 if TYPE_CHECKING: 

373 assert self.dialect is not None 

374 

375 self.database.execute_sql(self.dialect.drop_trigger_sql(self.item[0], safe=True, table_name=self.item[1])) 

376 

377 def finish(self) -> None: 

378 """ 

379 Recreate the tables from which triggers were dropped. 

380 

381 This is only done if the user did not abort the process. 

382 """ 

383 if self.looping_status != LoopingStatus.Abort: 

384 log.info(f'Recreating {self.n_item} triggers on {len(self.tables_to_be_rebuilt)} tables...') 

385 models = [mafw_model_register.get_model(table_name) for table_name in self.tables_to_be_rebuilt] 

386 

387 self.database.create_tables(models) # type: ignore[arg-type] 

388 super().finish() 

389 

390 def format_progress_message(self) -> None: 

391 self.progress_message = f'Dropping trigger {self.item[0]} from table {self.item[1]}' 

392 

393 

394@database_required 

395class SQLScriptRunner(Processor): 

396 """ 

397 Processor to execute SQL scripts from files against the database. 

398 

399 This processor reads SQL files, removes multi-line block comments, splits the content into individual 

400 statements, and executes them within a transaction. It is designed to handle SQL script execution 

401 in a safe manner by wrapping all statements in a single atomic transaction. 

402 

403 The processor accepts a list of SQL files through the :attr:`sql_files` parameter. Each file is validated 

404 to ensure it exists and is a regular file before processing. Block comments (`/* ... */`) are removed 

405 from the SQL content before statement parsing. 

406 

407 .. versionadded:: v2.0.0 

408 """ 

409 

410 sql_files = ActiveParameter[list[Path]]('sql_files', default=[], help_doc='A list of SQL files to be processed') 

411 """List of SQL files to be processed""" 

412 

413 def validate_configuration(self) -> None: 

414 """ 

415 Validate the configuration of SQL script runner. 

416 

417 Ensures that all specified SQL files exist and are regular files. 

418 

419 :raises InvalidConfigurationError: if any of the specified files does not exist or is not a regular file. 

420 """ 

421 if TYPE_CHECKING: 

422 # we need to convince mypy that the sql_files is not and ActiveParameter but 

423 # the content of the ActiveParameter 

424 assert isinstance(self.sql_files, list) 

425 

426 self.sql_files = [Path(file) for file in self.sql_files] 

427 for file in self.sql_files: 

428 if not file.exists() or not file.is_file(): 

429 raise InvalidConfigurationError(f'There are issues with SQL file "{file.resolve()}". Please verify.') 

430 

431 def get_items(self) -> Collection[Any]: 

432 """ 

433 Get the collection of SQL files to be processed. 

434 

435 :return: A collection of SQL file paths to be processed 

436 :rtype: Collection[Any] 

437 """ 

438 if TYPE_CHECKING: 

439 # we need to convince mypy that the sql_files is not and ActiveParameter but 

440 # the content of the ActiveParameter 

441 assert isinstance(self.sql_files, list) 

442 

443 return self.sql_files 

444 

445 def process(self) -> None: 

446 """ 

447 Process a single SQL file by reading, parsing, and executing its statements. 

448 

449 Reads the SQL file content, removes multi-line block comments, splits the content 

450 into individual SQL statements, and executes them within a transaction. 

451 

452 If no statements are found in the file, a warning is logged. If an error occurs 

453 during execution, the transaction is rolled back and the exception is re-raised. 

454 

455 :raises Exception: If an error occurs during SQL statement execution. 

456 """ 

457 with open(self.item, 'rt') as sql_file: 

458 sql_content = sql_file.read() 

459 

460 # remove the multi-line block comments (/* ... */) 

461 sql_content = block_comment_re.sub('', sql_content) 

462 

463 statements = [s.strip() + ';' for s in sql_content.split(';') if s.strip()] 

464 

465 if not statements: 

466 log.warning(f'No SQL statements found to execute in {self.item.name}.') 

467 

468 log.debug(f'Found {len(statements)} statements to execute.') 

469 

470 try: 

471 # use an atomic transaction to wrap the execution of all statements. 

472 with self.database.atomic(): # type: ignore[no-untyped-call] 

473 for statement in statements: 

474 self.database.execute_sql(statement) 

475 

476 except Exception as e: 

477 log.critical(f'An error occurred while executing the SQL script {self.item.name}.') 

478 log.critical('Rolling back the database to preserve integrity.') 

479 log.critical(f'Error details: {e}') 

480 raise 

481 

482 def format_progress_message(self) -> None: 

483 self.progress_message = f'Processing SQL file {self.item.name}'