Coverage for src / mafw / processor_library / db_init.py: 100%
171 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-28 13:34 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-28 13:34 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Database initialisation processor module.
7This module contains the following processors:
9:class:`TableCreator`
10 processor which handles the creation of database tables based on registered models. It provides functionality to
11 create tables automatically while respecting existing tables and offering options for forced recreation.
13:class:`TriggerRefresher`
14 processor to safely update the trigger definitions. It removes all existing triggers and regenerates them
15 according to the new definition. Particularly useful when debugging triggers, it can also be left at the
16 beginning of all analysis pipelines since it does not cause any loss of data.
18:class:`SQLScriptRunner`
19 processor to execute SQL scripts from files against the database. It reads SQL files, removes block comments,
20 splits the content into individual statements, and executes them within a transaction.
22.. versionadded:: v2.0.0
23"""
25import logging
26import re
27from pathlib import Path
28from typing import TYPE_CHECKING, Any, Collection, Optional, cast
30import peewee
31from rich.prompt import Confirm, Prompt
33from mafw.db.db_model import mafw_model_register
34from mafw.db.db_types import PeeweeModelWithMeta
35from mafw.db.std_tables import StandardTable
36from mafw.db.trigger import MySQLDialect, PostgreSQLDialect, SQLiteDialect, TriggerDialect
37from mafw.decorators import database_required, single_loop
38from mafw.enumerators import LoopingStatus, ProcessorExitStatus
39from mafw.mafw_errors import InvalidConfigurationError, UnsupportedDatabaseError
40from mafw.processor import ActiveParameter, Processor
42log = logging.getLogger(__name__)
44block_comment_re = re.compile(r'/\*.*?\*/', re.DOTALL)
47def _standard_table_models() -> set[type[StandardTable]]:
48 """Return the registered standard-table models when available."""
49 standard_tables = mafw_model_register.get_standard_tables()
50 if isinstance(standard_tables, list):
51 return set(standard_tables)
52 return set()
55@database_required
56@single_loop
57class TableCreator(Processor):
58 """
59 Processor to create all tables in the database.
61 This processor can be included in all pipelines in order to create all tables in the database. Its functionality
62 is based on the fact that all :class:`.MAFwBaseModel` subclasses are automatically included in a global register
63 (:data:`.mafw_model_register`).
65 This processor will perform the following:
67 #. Get a list of all tables already existing in the database.
68 #. Prune from the lists of models the ones for which already exist in the database.
69 #. Create the remaining tables.
72 This overall behaviour can be modified via the following parameters:
74 * *force_recreate* (bool, default = False): Use with extreme care. When set to True, all tables in the
75 database and in the model register will be first dropped and then recreated. It is almost equivalent to a re-initialization of the
76 whole DB with all the data being lost.
78 * *soft_recreate* (bool, default = True): When set to true, all tables whose model is in the mafw model
79 register will be recreated with the safe flag. It means that there won't be any table drop. If a table is
80 already existing, nothing will happen. If a new trigger is added to the table this will be created. When
81 set to False, only tables whose model is in the register and that are not existing will be created.
83 * *apply_only_to_prefix* (list[str], default = []): This parameter allows to create only the tables that do
84 not already exist and whose name start with one of the provided prefixes.
86 .. versionadded:: v2.0.0
88 """
90 force_recreate = ActiveParameter(
91 name='force_recreate', default=False, help_doc='First drop and then create the tables. LOSS OF ALL DATA!!!'
92 )
93 """
94 Force recreate (bool, default = False).
96 Use with extreme care. When set to True, all tables in the database and in the model register will be first
97 dropped and then recreated. It is almost equivalent to a re-initialization of the whole DB with all the data
98 being lost.
99 """
101 soft_recreate = ActiveParameter(
102 name='soft_recreate', default=True, help_doc='Safe recreate tables without dropping. No data loss'
103 )
104 """
105 Soft recreate (bool default = True).
107 When set to true, all tables whose model is in the mafw model register will be recreated with the safe flag. It
108 means that there won't be any table drop. If a table is already existing, nothing will happen. If a new trigger
109 is added to the table, this will be created. When set to False, only tables whose model is in the register and
110 that are not existing will be created.
111 """
113 apply_only_to_prefix = ActiveParameter[list[str]](
114 name='apply_only_to_prefix',
115 default=[],
116 help_doc='Create only tables whose name start with the provided prefixes.',
117 )
118 """
119 Apply only to tables starting with prefix (list[str], default = []).
121 This parameter allows to create only the tables that do not already exist and whose name start with one of the
122 provided prefixes.
123 """
125 def __init__(self, *args: Any, **kwargs: Any) -> None:
126 super().__init__(*args, **kwargs)
127 self.existing_table_names: list[str] = []
128 """The list of all existing tables in the database."""
130 def validate_configuration(self) -> None:
131 """
132 Configuration validation
134 :attr:`force_recreate` and :attr:`soft_recreate` cannot be both valid.
136 :raises InvalidConfigurationError: if both recreate types are True.
137 """
138 if self.force_recreate and self.soft_recreate:
139 raise InvalidConfigurationError(
140 'Both force_recreate and soft_recreate set to True. Incompatible configuration'
141 )
143 def process(self) -> None:
144 """
145 Execute the table creation process.
147 This method performs the following steps:
149 #. Identify all models that have automatic creation enabled.
150 #. Filter models based on the apply_only_to_prefix parameter if specified.
151 #. Handle forced recreation if requested, including user confirmation.
152 #. Handle soft recreation if requested, letting all tables with a known model be recreated.
153 #. Create the required tables.
154 #. Initialise standard tables after recreation if needed.
156 If user cancel the creation, the processor exit status is set to :attr:`.ProcessorExitStatus.Aborted` so that
157 the whole processor list is blocked.
158 """
159 # get all tables with autocreation flag
160 autocreation_models = [
161 model
162 for _, model in mafw_model_register.items()
163 if model._meta.automatic_creation # type: ignore[attr-defined]
164 ]
166 # filter out standard tables if the user decided not to create them
167 standard_table_models = _standard_table_models()
168 if not self.create_standard_tables:
169 autocreation_models = [model for model in autocreation_models if model not in standard_table_models]
171 # get the table name from the model class
172 autocreation_table_names = [cast(PeeweeModelWithMeta, model)._meta.table_name for model in autocreation_models]
174 if self.apply_only_to_prefix:
175 # remove tables with all given prefixes
176 autocreation_table_names = [
177 name
178 for name in autocreation_table_names
179 if any([name.startswith(prefix) for prefix in self.apply_only_to_prefix]) # type: ignore[union-attr]
180 ]
182 # in the case of force_recreation, we need to have user confirmation
183 if self.force_recreate:
184 log.warning(f'Forcing recreation of {len(autocreation_table_names)} tables in the database.')
185 log.warning('All data in these tables will be lost.')
187 with self._user_interface.enter_interactive_mode():
188 question = 'Are you really sure?'
189 if self._user_interface.name == 'rich':
190 question = '[red][bold]' + question + '[/red][/bold]'
191 confirmation = self._user_interface.prompt_question(
192 question=question, prompt_type=Confirm, default=False, show_default=True, case_sensitive=True
193 )
194 if not confirmation:
195 self.processor_exit_status = ProcessorExitStatus.Aborted
196 return
197 else:
198 log.info(f'Removing {len(autocreation_table_names)} tables from the database.')
199 models = [
200 model
201 for model in autocreation_models
202 if cast(PeeweeModelWithMeta, model)._meta.table_name in autocreation_table_names
203 ]
204 self.database.drop_tables(models) # type: ignore[arg-type]
206 if self.soft_recreate:
207 # recreate all tables in the mafw register
208 models = [
209 model
210 for model in autocreation_models
211 if cast(PeeweeModelWithMeta, model)._meta.table_name in autocreation_table_names
212 ]
213 else:
214 # recreate all tables in the mafw register and that are not yet existing
215 self.existing_table_names = self.database.get_tables()
216 models = [
217 model
218 for model in autocreation_models
219 if cast(PeeweeModelWithMeta, model)._meta.table_name in autocreation_table_names
220 and cast(PeeweeModelWithMeta, model)._meta.table_name not in self.existing_table_names
221 ]
222 self.database.create_tables(models) # type: ignore[arg-type]
224 if self.force_recreate and self.create_standard_tables:
225 # in the case of a recreation, do the initialisation of all dropped standard tables.
226 for model in models:
227 if model in standard_table_models:
228 model.init()
230 n = len(models)
231 if n > 0:
232 if n == 1:
233 plu = ''
234 else:
235 plu = 's'
236 if self.soft_recreate:
237 soft = '(soft) '
238 else:
239 soft = ''
241 log.info(f'Successfully {soft}created {len(models)} table{plu}.')
244@database_required
245class TriggerRefresher(Processor):
246 """
247 Processor to recreate all triggers.
249 Triggers are database objects, and even though they could be created, dropped and modified at any moment,
250 within the MAFw execution cycle they are normally created along with the table they are targeting.
252 When the table is created, also all its triggers are created,
253 but unless differently specified, with the safe flag on, that means that they are created if they do not exist.
255 This might be particularly annoying when modifying an existing trigger, because you need to manually drop the
256 trigger to let the table creation mechanism to create the newer version.
258 The goal of this processor is to drop all existing triggers and then recreate the corresponding tables so to have
259 an updated version of the triggers.
261 The processor is relying on the fact that all subclasses of :class:`.MAFwBaseModel`
262 are automatically inserted in the :data:`.mafw_model_register` so that the model class can be retrieved from the
263 table name.
265 Before removing any trigger, the processor will build a list with all the affected tables and check if all of
266 them are in the :data:`.mafw_model_register`, if so, it will proceed without asking any further confirmation.
267 Otherwise, if some affected tables are not in the register, then it will ask the user to decide what to do:
269 - Remove only the triggers whose tables are in the register and thus recreated afterward.
270 - Remove all triggers, in this case, some of them will not be recreated.
271 - Abort the processor.
273 Trigger manipulations (drop and creation) are not directly implemented in :link:`peewee` and are an extension
274 provided by MAFw. In order to be compatible with the three main databases (:link:`sqlite`, :link:`mysql` and
275 :link:`postgresql`), the SQL generation is obtained via the :class:`.TriggerDialect` interface.
277 .. seealso::
279 The :class:`.Trigger` class and also the :ref:`trigger chapter <triggers>` for a deeper explanation on triggers.
281 The :class:`.ModelRegister` class, the :data:`.mafw_model_register` and the :ref:`related chapter
282 <auto_registration>` on the automatic registration mechanism.
284 The :class:`.TriggerDialect` and its subclasses, for a database independent way to generate SQL statement
285 related to triggers.
287 .. versionadded:: v2.0.0
288 """
290 def __init__(self, *args: Any, **kwargs: Any) -> None:
291 super().__init__(*args, **kwargs)
292 self.dialect: Optional[TriggerDialect] = None
293 self.tables_to_be_rebuilt: set[str] = set()
295 def get_dialect(self) -> TriggerDialect:
296 """
297 Get the valid SQL dialect based on the type of Database
299 :return: The SQL trigger dialect
300 :type: :class:`.TriggerDialect`
301 :raises: :class:`.UnsupportedDatabaseError` if there is no dialect for the current DB.
302 """
303 if self.dialect is not None:
304 return self.dialect
306 if self._database is None:
307 # Default to SQLite dialect
308 return SQLiteDialect()
310 db = self._database
311 if isinstance(db, peewee.DatabaseProxy):
312 db = db.obj # Get the actual database from the proxy
314 dialect: TriggerDialect
315 if isinstance(db, peewee.SqliteDatabase):
316 dialect = SQLiteDialect()
317 elif isinstance(db, peewee.MySQLDatabase):
318 dialect = MySQLDialect()
319 elif isinstance(db, peewee.PostgresqlDatabase):
320 dialect = PostgreSQLDialect()
321 else:
322 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}')
324 return dialect
326 def start(self) -> None:
327 super().start()
328 self.dialect = self.get_dialect()
330 def get_items(self) -> Collection[Any]:
331 """
332 Retrieves a list of database triggers and interacts with the user to determine which ones to process.
334 This method fetches all currently defined database triggers. If any tables
335 associated with these triggers are not known (i.e., not registered in
336 :data:`.mafw_model_register`), it enters an interactive mode to prompt the user for
337 a course of action:
339 1. **Remove All Triggers (A):** Processes all triggers for subsequent removal,
340 but only marks 'rebuildable' tables for rebuilding.
341 2. **Remove Only Rebuildable Triggers (O):** Processes only triggers associated
342 with 'rebuildable' tables.
343 3. **Quit (Q):** Aborts the entire process.
345 If no unknown tables are found, or the user chooses to process rebuildable tables,
346 the list of triggers and the set of tables to be rebuilt are prepared for the next stage.
348 :return: A collection of database triggers to be processed, in the for tuple trigger_name, table_name
349 :rtype: List[Tuple[str, str]]
350 """
351 if TYPE_CHECKING:
352 assert self.dialect is not None
354 s: list[tuple[str, str]] = self.database.execute_sql(self.dialect.select_all_trigger_sql()).fetchall() # type: ignore[no-untyped-call]
355 tables = [r[1] for r in s]
357 affected_tables = set(tables)
358 known_tables = mafw_model_register.get_table_names()
359 rebuildable_tables = set([t for t in affected_tables if t in known_tables])
360 not_rebuildable_tables = affected_tables - rebuildable_tables
362 if len(not_rebuildable_tables) > 0:
363 log.warning(f'There are some tables ({len(not_rebuildable_tables)}) that cannot be rebuild')
364 with self._user_interface.enter_interactive_mode():
365 question = 'Remove all triggers (A), remove only rebuildable triggers (O), quit (Q)'
366 if self._user_interface.name == 'rich':
367 question = '[red][bold]' + question + '[/red][/bold]'
369 class TriggerPrompt(Prompt):
370 response_type = str
371 validate_error_message = '[prompt.invalid]Please enter A, O or Q'
372 choices: list[str] = ['A', 'O', 'Q']
374 answer = self._user_interface.prompt_question(
375 question=question,
376 prompt_type=TriggerPrompt,
377 default='O',
378 show_default=True,
379 case_sensitive=False,
380 show_answer=True,
381 )
383 if answer == 'Q':
384 s = []
385 affected_tables = set()
386 self.processor_exit_status = ProcessorExitStatus.Aborted
387 self.looping_status = LoopingStatus.Abort
389 elif answer == 'O':
390 s = [r for r in s if r[1] in rebuildable_tables]
391 affected_tables = rebuildable_tables
393 else: # equivalent to 'A'
394 # remove all triggers
395 # but rebuilds only rebuildable_tables
396 affected_tables = rebuildable_tables
398 self.tables_to_be_rebuilt = affected_tables
399 return s
401 def process(self) -> None:
402 """Delete the current trigger from its table"""
403 if TYPE_CHECKING:
404 assert self.dialect is not None
406 self.database.execute_sql(self.dialect.drop_trigger_sql(self.item[0], safe=True, table_name=self.item[1])) # type: ignore[no-untyped-call]
408 def finish(self) -> None:
409 """
410 Recreate the tables from which triggers were dropped.
412 This is only done if the user did not abort the process.
413 """
414 if self.looping_status != LoopingStatus.Abort:
415 log.info(f'Recreating {self.n_item} triggers on {len(self.tables_to_be_rebuilt)} tables...')
416 models = [mafw_model_register.get_model(table_name) for table_name in self.tables_to_be_rebuilt]
418 self.database.create_tables(models) # type: ignore[arg-type]
419 super().finish()
421 def format_progress_message(self) -> None:
422 self.progress_message = f'Dropping trigger {self.item[0]} from table {self.item[1]}'
425@database_required
426class SQLScriptRunner(Processor):
427 """
428 Processor to execute SQL scripts from files against the database.
430 This processor reads SQL files, removes multi-line block comments, splits the content into individual
431 statements, and executes them within a transaction. It is designed to handle SQL script execution
432 in a safe manner by wrapping all statements in a single atomic transaction.
434 The processor accepts a list of SQL files through the :attr:`sql_files` parameter. Each file is validated
435 to ensure it exists and is a regular file before processing. Block comments (`/* ... */`) are removed
436 from the SQL content before statement parsing.
438 .. versionadded:: v2.0.0
439 """
441 sql_files = ActiveParameter[list[Path]]('sql_files', default=[], help_doc='A list of SQL files to be processed')
442 """List of SQL files to be processed"""
444 def validate_configuration(self) -> None:
445 """
446 Validate the configuration of SQL script runner.
448 Ensures that all specified SQL files exist and are regular files.
450 :raises InvalidConfigurationError: if any of the specified files does not exist or is not a regular file.
451 """
452 if TYPE_CHECKING:
453 # we need to convince mypy that the sql_files is not and ActiveParameter but
454 # the content of the ActiveParameter
455 assert isinstance(self.sql_files, list)
457 self.sql_files = [Path(file) for file in self.sql_files]
458 for file in self.sql_files:
459 if not file.exists() or not file.is_file():
460 raise InvalidConfigurationError(f'There are issues with SQL file "{file.resolve()}". Please verify.')
462 def get_items(self) -> Collection[Any]:
463 """
464 Get the collection of SQL files to be processed.
466 :return: A collection of SQL file paths to be processed
467 :rtype: Collection[Any]
468 """
469 if TYPE_CHECKING:
470 # we need to convince mypy that the sql_files is not and ActiveParameter but
471 # the content of the ActiveParameter
472 assert isinstance(self.sql_files, list)
474 return self.sql_files
476 def process(self) -> None:
477 """
478 Process a single SQL file by reading, parsing, and executing its statements.
480 Reads the SQL file content, removes multi-line block comments, splits the content
481 into individual SQL statements, and executes them within a transaction.
483 If no statements are found in the file, a warning is logged. If an error occurs
484 during execution, the transaction is rolled back and the exception is re-raised.
486 :raises Exception: If an error occurs during SQL statement execution.
487 """
488 with open(self.item, 'rt') as sql_file:
489 sql_content = sql_file.read()
491 # remove the multi-line block comments (/* ... */)
492 sql_content = block_comment_re.sub('', sql_content)
494 statements = [s.strip() + ';' for s in sql_content.split(';') if s.strip()]
496 if not statements:
497 log.warning(f'No SQL statements found to execute in {self.item.name}.')
499 log.debug(f'Found {len(statements)} statements to execute.')
501 try:
502 # use an atomic transaction to wrap the execution of all statements.
503 with self.database.atomic():
504 for statement in statements:
505 self.database.execute_sql(statement) # type: ignore[no-untyped-call]
507 except Exception as e:
508 log.critical(f'An error occurred while executing the SQL script {self.item.name}.')
509 log.critical('Rolling back the database to preserve integrity.')
510 log.critical(f'Error details: {e}')
511 raise
513 def format_progress_message(self) -> None:
514 self.progress_message = f'Processing SQL file {self.item.name}'