Coverage for src / mafw / processor_library / db_init.py: 100%
161 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Database initialisation processor module.
7This module contains the following processors:
9:class:`TableCreator`
10 processor which handles the creation of database tables based on registered models. It provides functionality to
11 create tables automatically while respecting existing tables and offering options for forced recreation.
13:class:`TriggerRefresher`
14 processor to safely update the trigger definitions. It removes all existing triggers and regenerates them
15 according to the new definition. Particularly useful when debugging triggers, it can also be left at the
16 beginning of all analysis pipelines since it does not cause any loss of data.
18:class:`SQLScriptRunner`
19 processor to execute SQL scripts from files against the database. It reads SQL files, removes block comments,
20 splits the content into individual statements, and executes them within a transaction.
22.. versionadded:: v2.0.0
23"""
25import logging
26import re
27from pathlib import Path
28from typing import TYPE_CHECKING, Any, Collection, Optional
30import peewee
31from rich.prompt import Confirm, Prompt
33from mafw.db.db_model import mafw_model_register
34from mafw.db.std_tables import StandardTable
35from mafw.db.trigger import MySQLDialect, PostgreSQLDialect, SQLiteDialect, TriggerDialect
36from mafw.decorators import database_required, single_loop
37from mafw.enumerators import LoopingStatus, ProcessorExitStatus
38from mafw.mafw_errors import InvalidConfigurationError, UnsupportedDatabaseError
39from mafw.processor import ActiveParameter, Processor
41log = logging.getLogger(__name__)
43block_comment_re = re.compile(r'/\*.*?\*/', re.DOTALL)
46@database_required
47@single_loop
48class TableCreator(Processor):
49 """
50 Processor to create all tables in the database.
52 This processor can be included in all pipelines in order to create all tables in the database. Its functionality
53 is based on the fact that all :class:`.MAFwBaseModel` subclasses are automatically included in a global register
54 (:data:`.mafw_model_register`).
56 This processor will perform the following:
58 #. Get a list of all tables already existing in the database.
59 #. Prune from the lists of models the ones for which already exist in the database.
60 #. Create the remaining tables.
63 This overall behaviour can be modified via the following parameters:
65 * *force_recreate* (bool, default = False): Use with extreme care. When set to True, all tables in the
66 database and in the model register will be first dropped and then recreated. It is almost equivalent to a re-initialization of the
67 whole DB with all the data being lost.
69 * *soft_recreate* (bool, default = True): When set to true, all tables whose model is in the mafw model
70 register will be recreated with the safe flag. It means that there won't be any table drop. If a table is
71 already existing, nothing will happen. If a new trigger is added to the table this will be created. When
72 set to False, only tables whose model is in the register and that are not existing will be created.
74 * *apply_only_to_prefix* (list[str], default = []): This parameter allows to create only the tables that do
75 not already exist and whose name start with one of the provided prefixes.
77 .. versionadded:: v2.0.0
79 """
81 force_recreate = ActiveParameter(
82 name='force_recreate', default=False, help_doc='First drop and then create the tables. LOSS OF ALL DATA!!!'
83 )
84 """
85 Force recreate (bool, default = False).
87 Use with extreme care. When set to True, all tables in the database and in the model register will be first
88 dropped and then recreated. It is almost equivalent to a re-initialization of the whole DB with all the data
89 being lost.
90 """
92 soft_recreate = ActiveParameter(
93 name='soft_recreate', default=True, help_doc='Safe recreate tables without dropping. No data loss'
94 )
95 """
96 Soft recreate (bool default = True).
98 When set to true, all tables whose model is in the mafw model register will be recreated with the safe flag. It
99 means that there won't be any table drop. If a table is already existing, nothing will happen. If a new trigger
100 is added to the table, this will be created. When set to False, only tables whose model is in the register and
101 that are not existing will be created.
102 """
104 apply_only_to_prefix = ActiveParameter[list[str]](
105 name='apply_only_to_prefix',
106 default=[],
107 help_doc='Create only tables whose name start with the provided prefixes.',
108 )
109 """
110 Apply only to tables starting with prefix (list[str], default = []).
112 This parameter allows to create only the tables that do not already exist and whose name start with one of the
113 provided prefixes.
114 """
116 def __init__(self, *args: Any, **kwargs: Any) -> None:
117 super().__init__(*args, **kwargs)
118 self.existing_table_names: list[str] = []
119 """The list of all existing tables in the database."""
121 def validate_configuration(self) -> None:
122 """
123 Configuration validation
125 :attr:`force_recreate` and :attr:`soft_recreate` cannot be both valid.
127 :raises InvalidConfigurationError: if both recreate types are True.
128 """
129 if self.force_recreate and self.soft_recreate:
130 raise InvalidConfigurationError(
131 'Both force_recreate and soft_recreate set to True. Incompatible configuration'
132 )
134 def process(self) -> None:
135 """
136 Execute the table creation process.
138 This method performs the following steps:
140 #. Identify all models that have automatic creation enabled.
141 #. Filter models based on the apply_only_to_prefix parameter if specified.
142 #. Handle forced recreation if requested, including user confirmation.
143 #. Handle soft recreation if requested, letting all tables with a known model be recreated.
144 #. Create the required tables.
145 #. Initialise standard tables after recreation if needed.
147 If user cancel the creation, the processor exit status is set to :attr:`.ProcessorExitStatus.Aborted` so that
148 the whole processor list is blocked.
149 """
150 # get all tables with autocreation flag
151 autocreation_table_names = [
152 name
153 for name, model in mafw_model_register.items()
154 if model._meta.automatic_creation # type: ignore[attr-defined]
155 ]
157 if self.apply_only_to_prefix:
158 # remove tables with all given prefixes
159 autocreation_table_names = [
160 name
161 for name in autocreation_table_names
162 if any([name.startswith(prefix) for prefix in self.apply_only_to_prefix]) # type: ignore[union-attr]
163 ]
165 # in the case of force_recreation, we need to have user confirmation
166 if self.force_recreate:
167 log.warning(f'Forcing recreation of {len(autocreation_table_names)} tables in the database.')
168 log.warning('All data in these tables will be lost.')
170 with self._user_interface.enter_interactive_mode():
171 question = 'Are you really sure?'
172 if self._user_interface.name == 'rich':
173 question = '[red][bold]' + question + '[/red][/bold]'
174 confirmation = self._user_interface.prompt_question(
175 question=question, prompt_type=Confirm, default=False, show_default=True, case_sensitive=True
176 )
177 if not confirmation:
178 self.processor_exit_status = ProcessorExitStatus.Aborted
179 return
180 else:
181 log.info(f'Removing {len(autocreation_table_names)} tables from the database.')
182 models = [model for name, model in mafw_model_register.items() if name in autocreation_table_names]
183 self.database.drop_tables(models) # type: ignore[arg-type]
185 if self.soft_recreate:
186 # recreate all tables in the mafw register
187 models = [model for name, model in mafw_model_register.items() if name in autocreation_table_names]
188 else:
189 # recreate all tables in the mafw register and that are not yet existing
190 self.existing_table_names = self.database.get_tables()
191 models = [
192 model
193 for name, model in mafw_model_register.items()
194 if name in autocreation_table_names and name not in self.existing_table_names
195 ]
196 self.database.create_tables(models) # type: ignore[arg-type]
198 if self.force_recreate:
199 # in the case of a recreation, do the initialisation of all dropped standard tables.
200 for model in models:
201 if isinstance(model, type(StandardTable)):
202 model.init()
204 n = len(models)
205 if n > 0:
206 if n == 1:
207 plu = ''
208 else:
209 plu = 's'
210 if self.soft_recreate:
211 soft = '(soft) '
212 else:
213 soft = ''
215 log.info(f'Successfully {soft}created {len(models)} table{plu}.')
218@database_required
219class TriggerRefresher(Processor):
220 """
221 Processor to recreate all triggers.
223 Triggers are database objects, and even though they could be created, dropped and modified at any moment,
224 within the MAFw execution cycle they are normally created along with the table they are targeting.
226 When the table is created, also all its triggers are created,
227 but unless differently specified, with the safe flag on, that means that they are created if they do not exist.
229 This might be particularly annoying when modifying an existing trigger, because you need to manually drop the
230 trigger to let the table creation mechanism to create the newer version.
232 The goal of this processor is to drop all existing triggers and then recreate the corresponding tables so to have
233 an updated version of the triggers.
235 The processor is relying on the fact that all subclasses of :class:`.MAFwBaseModel`
236 are automatically inserted in the :data:`.mafw_model_register` so that the model class can be retrieved from the
237 table name.
239 Before removing any trigger, the processor will build a list with all the affected tables and check if all of
240 them are in the :data:`.mafw_model_register`, if so, it will proceed without asking any further confirmation.
241 Otherwise, if some affected tables are not in the register, then it will ask the user to decide what to do:
243 - Remove only the triggers whose tables are in the register and thus recreated afterward.
244 - Remove all triggers, in this case, some of them will not be recreated.
245 - Abort the processor.
247 Trigger manipulations (drop and creation) are not directly implemented in :link:`peewee` and are an extension
248 provided by MAFw. In order to be compatible with the three main databases (:link:`sqlite`, :link:`mysql` and
249 :link:`postgresql`), the SQL generation is obtained via the :class:`.TriggerDialect` interface.
251 .. seealso::
253 The :class:`.Trigger` class and also the :ref:`trigger chapter <triggers>` for a deeper explanation on triggers.
255 The :class:`.ModelRegister` class, the :data:`.mafw_model_register` and the :ref:`related chapter
256 <auto_registration>` on the automatic registration mechanism.
258 The :class:`.TriggerDialect` and its subclasses, for a database independent way to generate SQL statement
259 related to triggers.
261 .. versionadded:: v2.0.0
262 """
264 def __init__(self, *args: Any, **kwargs: Any) -> None:
265 super().__init__(*args, **kwargs)
266 self.dialect: Optional[TriggerDialect] = None
267 self.tables_to_be_rebuilt: set[str] = set()
269 def get_dialect(self) -> TriggerDialect:
270 """
271 Get the valid SQL dialect based on the type of Database
273 :return: The SQL trigger dialect
274 :type: :class:`.TriggerDialect`
275 :raises: :class:`.UnsupportedDatabaseError` if there is no dialect for the current DB.
276 """
277 if self.dialect is not None:
278 return self.dialect
280 if self._database is None:
281 # Default to SQLite dialect
282 return SQLiteDialect()
284 db = self._database
285 if isinstance(db, peewee.DatabaseProxy):
286 db = db.obj # Get the actual database from the proxy
288 dialect: TriggerDialect
289 if isinstance(db, peewee.SqliteDatabase):
290 dialect = SQLiteDialect()
291 elif isinstance(db, peewee.MySQLDatabase):
292 dialect = MySQLDialect()
293 elif isinstance(db, peewee.PostgresqlDatabase):
294 dialect = PostgreSQLDialect()
295 else:
296 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}')
298 return dialect
300 def start(self) -> None:
301 super().start()
302 self.dialect = self.get_dialect()
304 def get_items(self) -> Collection[Any]:
305 """
306 Retrieves a list of database triggers and interacts with the user to determine which ones to process.
308 This method fetches all currently defined database triggers. If any tables
309 associated with these triggers are not known (i.e., not registered in
310 :data:`.mafw_model_register`), it enters an interactive mode to prompt the user for
311 a course of action:
313 1. **Remove All Triggers (A):** Processes all triggers for subsequent removal,
314 but only marks 'rebuildable' tables for rebuilding.
315 2. **Remove Only Rebuildable Triggers (O):** Processes only triggers associated
316 with 'rebuildable' tables.
317 3. **Quit (Q):** Aborts the entire process.
319 If no unknown tables are found, or the user chooses to process rebuildable tables,
320 the list of triggers and the set of tables to be rebuilt are prepared for the next stage.
322 :return: A collection of database triggers to be processed, in the for tuple trigger_name, table_name
323 :rtype: List[Tuple[str, str]]
324 """
325 if TYPE_CHECKING:
326 assert self.dialect is not None
328 s: list[tuple[str, str]] = self.database.execute_sql(self.dialect.select_all_trigger_sql()).fetchall() # type: ignore[no-untyped-call]
329 tables = [r[1] for r in s]
331 affected_tables = set(tables)
332 known_tables = mafw_model_register.get_table_names()
333 rebuildable_tables = set([t for t in affected_tables if t in known_tables])
334 not_rebuildable_tables = affected_tables - rebuildable_tables
336 if len(not_rebuildable_tables) > 0:
337 log.warning(f'There are some tables ({len(not_rebuildable_tables)}) that cannot be rebuild')
338 with self._user_interface.enter_interactive_mode():
339 question = 'Remove all triggers (A), remove only rebuildable triggers (O), quit (Q)'
340 if self._user_interface.name == 'rich':
341 question = '[red][bold]' + question + '[/red][/bold]'
343 class TriggerPrompt(Prompt):
344 response_type = str
345 validate_error_message = '[prompt.invalid]Please enter A, O or Q'
346 choices: list[str] = ['A', 'O', 'Q']
348 answer = self._user_interface.prompt_question(
349 question=question,
350 prompt_type=TriggerPrompt,
351 default='O',
352 show_default=True,
353 case_sensitive=False,
354 show_answer=True,
355 )
357 if answer == 'Q':
358 s = []
359 affected_tables = set()
360 self.processor_exit_status = ProcessorExitStatus.Aborted
361 self.looping_status = LoopingStatus.Abort
363 elif answer == 'O':
364 s = [r for r in s if r[1] in rebuildable_tables]
365 affected_tables = rebuildable_tables
367 else: # equivalent to 'A'
368 # remove all triggers
369 # but rebuilds only rebuildable_tables
370 affected_tables = rebuildable_tables
372 self.tables_to_be_rebuilt = affected_tables
373 return s
375 def process(self) -> None:
376 """Delete the current trigger from its table"""
377 if TYPE_CHECKING:
378 assert self.dialect is not None
380 self.database.execute_sql(self.dialect.drop_trigger_sql(self.item[0], safe=True, table_name=self.item[1])) # type: ignore[no-untyped-call]
382 def finish(self) -> None:
383 """
384 Recreate the tables from which triggers were dropped.
386 This is only done if the user did not abort the process.
387 """
388 if self.looping_status != LoopingStatus.Abort:
389 log.info(f'Recreating {self.n_item} triggers on {len(self.tables_to_be_rebuilt)} tables...')
390 models = [mafw_model_register.get_model(table_name) for table_name in self.tables_to_be_rebuilt]
392 self.database.create_tables(models) # type: ignore[arg-type]
393 super().finish()
395 def format_progress_message(self) -> None:
396 self.progress_message = f'Dropping trigger {self.item[0]} from table {self.item[1]}'
399@database_required
400class SQLScriptRunner(Processor):
401 """
402 Processor to execute SQL scripts from files against the database.
404 This processor reads SQL files, removes multi-line block comments, splits the content into individual
405 statements, and executes them within a transaction. It is designed to handle SQL script execution
406 in a safe manner by wrapping all statements in a single atomic transaction.
408 The processor accepts a list of SQL files through the :attr:`sql_files` parameter. Each file is validated
409 to ensure it exists and is a regular file before processing. Block comments (`/* ... */`) are removed
410 from the SQL content before statement parsing.
412 .. versionadded:: v2.0.0
413 """
415 sql_files = ActiveParameter[list[Path]]('sql_files', default=[], help_doc='A list of SQL files to be processed')
416 """List of SQL files to be processed"""
418 def validate_configuration(self) -> None:
419 """
420 Validate the configuration of SQL script runner.
422 Ensures that all specified SQL files exist and are regular files.
424 :raises InvalidConfigurationError: if any of the specified files does not exist or is not a regular file.
425 """
426 if TYPE_CHECKING:
427 # we need to convince mypy that the sql_files is not and ActiveParameter but
428 # the content of the ActiveParameter
429 assert isinstance(self.sql_files, list)
431 self.sql_files = [Path(file) for file in self.sql_files]
432 for file in self.sql_files:
433 if not file.exists() or not file.is_file():
434 raise InvalidConfigurationError(f'There are issues with SQL file "{file.resolve()}". Please verify.')
436 def get_items(self) -> Collection[Any]:
437 """
438 Get the collection of SQL files to be processed.
440 :return: A collection of SQL file paths to be processed
441 :rtype: Collection[Any]
442 """
443 if TYPE_CHECKING:
444 # we need to convince mypy that the sql_files is not and ActiveParameter but
445 # the content of the ActiveParameter
446 assert isinstance(self.sql_files, list)
448 return self.sql_files
450 def process(self) -> None:
451 """
452 Process a single SQL file by reading, parsing, and executing its statements.
454 Reads the SQL file content, removes multi-line block comments, splits the content
455 into individual SQL statements, and executes them within a transaction.
457 If no statements are found in the file, a warning is logged. If an error occurs
458 during execution, the transaction is rolled back and the exception is re-raised.
460 :raises Exception: If an error occurs during SQL statement execution.
461 """
462 with open(self.item, 'rt') as sql_file:
463 sql_content = sql_file.read()
465 # remove the multi-line block comments (/* ... */)
466 sql_content = block_comment_re.sub('', sql_content)
468 statements = [s.strip() + ';' for s in sql_content.split(';') if s.strip()]
470 if not statements:
471 log.warning(f'No SQL statements found to execute in {self.item.name}.')
473 log.debug(f'Found {len(statements)} statements to execute.')
475 try:
476 # use an atomic transaction to wrap the execution of all statements.
477 with self.database.atomic():
478 for statement in statements:
479 self.database.execute_sql(statement) # type: ignore[no-untyped-call]
481 except Exception as e:
482 log.critical(f'An error occurred while executing the SQL script {self.item.name}.')
483 log.critical('Rolling back the database to preserve integrity.')
484 log.critical(f'Error details: {e}')
485 raise
487 def format_progress_message(self) -> None:
488 self.progress_message = f'Processing SQL file {self.item.name}'