Coverage for src / mafw / processor_library / db_init.py: 100%
158 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Database initialisation processor module.
7This module contains the following processors:
9:class:`TableCreator`
10 processor which handles the creation of database tables based on registered models. It provides functionality to
11 create tables automatically while respecting existing tables and offering options for forced recreation.
13:class:`TriggerRefresher`
14 processor to safely update the trigger definitions. It removes all existing triggers and regenerates them
15 according to the new definition. Particularly useful when debugging triggers, it can also be left at the
16 beginning of all analysis pipelines since it does not cause any loss of data.
18:class:`SQLScriptRunner`
19 processor to execute SQL scripts from files against the database. It reads SQL files, removes block comments,
20 splits the content into individual statements, and executes them within a transaction.
22.. versionadded:: v2.0.0
23"""
25import logging
26import re
27from pathlib import Path
28from typing import TYPE_CHECKING, Any, Collection, Optional
30import peewee
31from rich.prompt import Confirm, Prompt
33from mafw.db.db_model import mafw_model_register
34from mafw.db.std_tables import StandardTable
35from mafw.db.trigger import MySQLDialect, PostgreSQLDialect, SQLiteDialect, TriggerDialect
36from mafw.decorators import database_required, single_loop
37from mafw.enumerators import LoopingStatus, ProcessorExitStatus
38from mafw.mafw_errors import InvalidConfigurationError, UnsupportedDatabaseError
39from mafw.processor import ActiveParameter, Processor
41log = logging.getLogger(__name__)
43block_comment_re = re.compile(r'/\*.*?\*/', re.DOTALL)
46@database_required
47@single_loop
48class TableCreator(Processor):
49 """
50 Processor to create all tables in the database.
52 This processor can be included in all pipelines in order to create all tables in the database. Its functionality
53 is based on the fact that all :class:`.MAFwBaseModel` subclasses are automatically included in a global register
54 (:data:`.mafw_model_register`).
56 This processor will perform the following:
58 #. Get a list of all tables already existing in the database.
59 #. Prune from the lists of models the ones for which already exist in the database.
60 #. Create the remaining tables.
63 This overall behaviour can be modified via the following parameters:
65 * *force_recreate* (bool, default = False): Use with extreme care. When set to True, all tables in the
66 database and in the model register will be first dropped and then recreated. It is almost equivalent to a re-initialization of the
67 whole DB with all the data being lost.
69 * *soft_recreate* (bool, default = True): When set to true, all tables whose model is in the mafw model
70 register will be recreated with the safe flag. It means that there won't be any table drop. If a table is
71 already existing, nothing will happen. If a new trigger is added to the table this will be created. When
72 set to False, only tables whose model is in the register and that are not existing will be created.
74 * *apply_only_to_prefix* (list[str], default = []): This parameter allows to create only the tables that do
75 not already exist and whose name start with one of the provided prefixes.
77 .. versionadded:: v2.0.0
79 """
81 force_recreate = ActiveParameter(
82 name='force_recreate', default=False, help_doc='First drop and then create the tables. LOSS OF ALL DATA!!!'
83 )
84 """
85 Force recreate (bool, default = False).
87 Use with extreme care. When set to True, all tables in the database and in the model register will be first
88 dropped and then recreated. It is almost equivalent to a re-initialization of the whole DB with all the data
89 being lost.
90 """
92 soft_recreate = ActiveParameter(
93 name='soft_recreate', default=True, help_doc='Safe recreate tables without dropping. No data loss'
94 )
95 """
96 Soft recreate (bool default = True).
98 When set to true, all tables whose model is in the mafw model register will be recreated with the safe flag. It
99 means that there won't be any table drop. If a table is already existing, nothing will happen. If a new trigger
100 is added to the table, this will be created. When set to False, only tables whose model is in the register and
101 that are not existing will be created.
102 """
104 apply_only_to_prefix = ActiveParameter[list[str]](
105 name='apply_only_to_prefix',
106 default=[],
107 help_doc='Create only tables whose name start with the provided prefixes.',
108 )
109 """
110 Apply only to tables starting with prefix (list[str], default = []).
112 This parameter allows to create only the tables that do not already exist and whose name start with one of the
113 provided prefixes.
114 """
116 def __init__(self, *args: Any, **kwargs: Any) -> None:
117 super().__init__(*args, **kwargs)
118 self.existing_table_names: list[str] = []
119 """The list of all existing tables in the database."""
121 def validate_configuration(self) -> None:
122 """
123 Configuration validation
125 :attr:`force_recreate` and :attr:`soft_recreate` cannot be both valid.
127 :raises InvalidConfigurationError: if both recreate types are True.
128 """
129 if self.force_recreate and self.soft_recreate:
130 raise InvalidConfigurationError(
131 'Both force_recreate and soft_recreate set to True. Incompatible configuration'
132 )
134 def process(self) -> None:
135 """
136 Execute the table creation process.
138 This method performs the following steps:
140 #. Identify all models that have automatic creation enabled.
141 #. Filter models based on the apply_only_to_prefix parameter if specified.
142 #. Handle forced recreation if requested, including user confirmation.
143 #. Handle soft recreation if requested, letting all tables with a known model be recreated.
144 #. Create the required tables.
145 #. Initialise standard tables after recreation if needed.
147 If user cancel the creation, the processor exit status is set to :attr:`.ProcessorExitStatus.Aborted` so that
148 the whole processor list is blocked.
149 """
150 # get all tables with autocreation flag
151 autocreation_table_names = [
152 name
153 for name, model in mafw_model_register.items()
154 if model._meta.automatic_creation # type: ignore[attr-defined]
155 ]
157 if self.apply_only_to_prefix:
158 # remove tables with all given prefixes
159 autocreation_table_names = [
160 name
161 for name in autocreation_table_names
162 if any([name.startswith(prefix) for prefix in self.apply_only_to_prefix]) # type: ignore[union-attr]
163 ]
165 # in the case of force_recreation, we need to have user confirmation
166 if self.force_recreate:
167 log.warning(f'Forcing recreation of {len(autocreation_table_names)} tables in the database.')
168 log.warning('All data in these tables will be lost.')
170 with self._user_interface.enter_interactive_mode():
171 question = 'Are you really sure?'
172 if self._user_interface.name == 'rich':
173 question = '[red][bold]' + question + '[/red][/bold]'
174 confirmation = self._user_interface.prompt_question(
175 question=question, prompt_type=Confirm, default=False, show_default=True, case_sensitive=True
176 )
177 if not confirmation:
178 self.processor_exit_status = ProcessorExitStatus.Aborted
179 return
180 else:
181 log.info(f'Removing {len(autocreation_table_names)} tables from the database.')
182 models = [model for name, model in mafw_model_register.items() if name in autocreation_table_names]
183 self.database.drop_tables(models) # type: ignore[arg-type]
185 if self.soft_recreate:
186 # recreate all tables in the mafw register
187 models = [model for name, model in mafw_model_register.items() if name in autocreation_table_names]
188 else:
189 # recreate all tables in the mafw register and that are not yet existing
190 self.existing_table_names = self.database.get_tables()
191 models = [
192 model
193 for name, model in mafw_model_register.items()
194 if name in autocreation_table_names and name not in self.existing_table_names
195 ]
196 self.database.create_tables(models) # type: ignore[arg-type]
198 if self.force_recreate:
199 # in the case of a recreation, do the initialisation of all dropped standard tables.
200 for model in models:
201 if isinstance(model, type(StandardTable)):
202 model.init()
204 n = len(models)
205 if n > 0:
206 if n == 1:
207 plu = ''
208 else:
209 plu = 's'
210 log.info(f'Successfully create {len(models)} table{plu}.')
213@database_required
214class TriggerRefresher(Processor):
215 """
216 Processor to recreate all triggers.
218 Triggers are database objects, and even though they could be created, dropped and modified at any moment,
219 within the MAFw execution cycle they are normally created along with the table they are targeting.
221 When the table is created, also all its triggers are created,
222 but unless differently specified, with the safe flag on, that means that they are created if they do not exist.
224 This might be particularly annoying when modifying an existing trigger, because you need to manually drop the
225 trigger to let the table creation mechanism to create the newer version.
227 The goal of this processor is to drop all existing triggers and then recreate the corresponding tables so to have
228 an updated version of the triggers.
230 The processor is relying on the fact that all subclasses of :class:`.MAFwBaseModel`
231 are automatically inserted in the :data:`.mafw_model_register` so that the model class can be retrieved from the
232 table name.
234 Before removing any trigger, the processor will build a list with all the affected tables and check if all of
235 them are in the :data:`.mafw_model_register`, if so, it will proceed without asking any further confirmation.
236 Otherwise, if some affected tables are not in the register, then it will ask the user to decide what to do:
238 - Remove only the triggers whose tables are in the register and thus recreated afterward.
239 - Remove all triggers, in this case, some of them will not be recreated.
240 - Abort the processor.
242 Trigger manipulations (drop and creation) are not directly implemented in :link:`peewee` and are an extension
243 provided by MAFw. In order to be compatible with the three main databases (:link:`sqlite`, :link:`mysql` and
244 :link:`postgresql`), the SQL generation is obtained via the :class:`.TriggerDialect` interface.
246 .. seealso::
248 The :class:`.Trigger` class and also the :ref:`trigger chapter <triggers>` for a deeper explanation on triggers.
250 The :class:`.ModelRegister` class, the :data:`.mafw_model_register` and the :ref:`related chapter
251 <auto_registration>` on the automatic registration mechanism.
253 The :class:`.TriggerDialect` and its subclasses, for a database independent way to generate SQL statement
254 related to triggers.
256 .. versionadded:: v2.0.0
257 """
259 def __init__(self, *args: Any, **kwargs: Any) -> None:
260 super().__init__(*args, **kwargs)
261 self.dialect: Optional[TriggerDialect] = None
262 self.tables_to_be_rebuilt: set[str] = set()
264 def get_dialect(self) -> TriggerDialect:
265 """
266 Get the valid SQL dialect based on the type of Database
268 :return: The SQL trigger dialect
269 :type: :class:`.TriggerDialect`
270 :raises: :class:`.UnsupportedDatabaseError` if there is no dialect for the current DB.
271 """
272 if self.dialect is not None:
273 return self.dialect
275 if self._database is None:
276 # Default to SQLite dialect
277 return SQLiteDialect()
279 db = self._database
280 if isinstance(db, peewee.DatabaseProxy):
281 db = db.obj # Get the actual database from the proxy
283 dialect: TriggerDialect
284 if isinstance(db, peewee.SqliteDatabase):
285 dialect = SQLiteDialect()
286 elif isinstance(db, peewee.MySQLDatabase):
287 dialect = MySQLDialect()
288 elif isinstance(db, peewee.PostgresqlDatabase):
289 dialect = PostgreSQLDialect()
290 else:
291 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}')
293 return dialect
295 def start(self) -> None:
296 super().start()
297 self.dialect = self.get_dialect()
299 def get_items(self) -> Collection[Any]:
300 """
301 Retrieves a list of database triggers and interacts with the user to determine which ones to process.
303 This method fetches all currently defined database triggers. If any tables
304 associated with these triggers are not known (i.e., not registered in
305 :data:`.mafw_model_register`), it enters an interactive mode to prompt the user for
306 a course of action:
308 1. **Remove All Triggers (A):** Processes all triggers for subsequent removal,
309 but only marks 'rebuildable' tables for rebuilding.
310 2. **Remove Only Rebuildable Triggers (O):** Processes only triggers associated
311 with 'rebuildable' tables.
312 3. **Quit (Q):** Aborts the entire process.
314 If no unknown tables are found, or the user chooses to process rebuildable tables,
315 the list of triggers and the set of tables to be rebuilt are prepared for the next stage.
317 :return: A collection of database triggers to be processed, in the for tuple trigger_name, table_name
318 :rtype: List[Tuple[str, str]]
319 """
320 if TYPE_CHECKING:
321 assert self.dialect is not None
323 s: list[tuple[str, str]] = self.database.execute_sql(self.dialect.select_all_trigger_sql()).fetchall()
324 tables = [r[1] for r in s]
326 affected_tables = set(tables)
327 known_tables = mafw_model_register.get_table_names()
328 rebuildable_tables = set([t for t in affected_tables if t in known_tables])
329 not_rebuildable_tables = affected_tables - rebuildable_tables
331 if len(not_rebuildable_tables) > 0:
332 log.warning(f'There are some tables ({len(not_rebuildable_tables)}) that cannot be rebuild')
333 with self._user_interface.enter_interactive_mode():
334 question = 'Remove all triggers (A), remove only rebuildable triggers (O), quit (Q)'
335 if self._user_interface.name == 'rich':
336 question = '[red][bold]' + question + '[/red][/bold]'
338 class TriggerPrompt(Prompt):
339 response_type = str
340 validate_error_message = '[prompt.invalid]Please enter A, O or Q'
341 choices: list[str] = ['A', 'O', 'Q']
343 answer = self._user_interface.prompt_question(
344 question=question,
345 prompt_type=TriggerPrompt,
346 default='O',
347 show_default=True,
348 case_sensitive=False,
349 show_answer=True,
350 )
352 if answer == 'Q':
353 s = []
354 affected_tables = set()
355 self.processor_exit_status = ProcessorExitStatus.Aborted
356 self.looping_status = LoopingStatus.Abort
358 elif answer == 'O':
359 s = [r for r in s if r[1] in rebuildable_tables]
360 affected_tables = rebuildable_tables
362 else: # equivalent to 'A'
363 # remove all triggers
364 # but rebuilds only rebuildable_tables
365 affected_tables = rebuildable_tables
367 self.tables_to_be_rebuilt = affected_tables
368 return s
370 def process(self) -> None:
371 """Delete the current trigger from its table"""
372 if TYPE_CHECKING:
373 assert self.dialect is not None
375 self.database.execute_sql(self.dialect.drop_trigger_sql(self.item[0], safe=True, table_name=self.item[1]))
377 def finish(self) -> None:
378 """
379 Recreate the tables from which triggers were dropped.
381 This is only done if the user did not abort the process.
382 """
383 if self.looping_status != LoopingStatus.Abort:
384 log.info(f'Recreating {self.n_item} triggers on {len(self.tables_to_be_rebuilt)} tables...')
385 models = [mafw_model_register.get_model(table_name) for table_name in self.tables_to_be_rebuilt]
387 self.database.create_tables(models) # type: ignore[arg-type]
388 super().finish()
390 def format_progress_message(self) -> None:
391 self.progress_message = f'Dropping trigger {self.item[0]} from table {self.item[1]}'
394@database_required
395class SQLScriptRunner(Processor):
396 """
397 Processor to execute SQL scripts from files against the database.
399 This processor reads SQL files, removes multi-line block comments, splits the content into individual
400 statements, and executes them within a transaction. It is designed to handle SQL script execution
401 in a safe manner by wrapping all statements in a single atomic transaction.
403 The processor accepts a list of SQL files through the :attr:`sql_files` parameter. Each file is validated
404 to ensure it exists and is a regular file before processing. Block comments (`/* ... */`) are removed
405 from the SQL content before statement parsing.
407 .. versionadded:: v2.0.0
408 """
410 sql_files = ActiveParameter[list[Path]]('sql_files', default=[], help_doc='A list of SQL files to be processed')
411 """List of SQL files to be processed"""
413 def validate_configuration(self) -> None:
414 """
415 Validate the configuration of SQL script runner.
417 Ensures that all specified SQL files exist and are regular files.
419 :raises InvalidConfigurationError: if any of the specified files does not exist or is not a regular file.
420 """
421 if TYPE_CHECKING:
422 # we need to convince mypy that the sql_files is not and ActiveParameter but
423 # the content of the ActiveParameter
424 assert isinstance(self.sql_files, list)
426 self.sql_files = [Path(file) for file in self.sql_files]
427 for file in self.sql_files:
428 if not file.exists() or not file.is_file():
429 raise InvalidConfigurationError(f'There are issues with SQL file "{file.resolve()}". Please verify.')
431 def get_items(self) -> Collection[Any]:
432 """
433 Get the collection of SQL files to be processed.
435 :return: A collection of SQL file paths to be processed
436 :rtype: Collection[Any]
437 """
438 if TYPE_CHECKING:
439 # we need to convince mypy that the sql_files is not and ActiveParameter but
440 # the content of the ActiveParameter
441 assert isinstance(self.sql_files, list)
443 return self.sql_files
445 def process(self) -> None:
446 """
447 Process a single SQL file by reading, parsing, and executing its statements.
449 Reads the SQL file content, removes multi-line block comments, splits the content
450 into individual SQL statements, and executes them within a transaction.
452 If no statements are found in the file, a warning is logged. If an error occurs
453 during execution, the transaction is rolled back and the exception is re-raised.
455 :raises Exception: If an error occurs during SQL statement execution.
456 """
457 with open(self.item, 'rt') as sql_file:
458 sql_content = sql_file.read()
460 # remove the multi-line block comments (/* ... */)
461 sql_content = block_comment_re.sub('', sql_content)
463 statements = [s.strip() + ';' for s in sql_content.split(';') if s.strip()]
465 if not statements:
466 log.warning(f'No SQL statements found to execute in {self.item.name}.')
468 log.debug(f'Found {len(statements)} statements to execute.')
470 try:
471 # use an atomic transaction to wrap the execution of all statements.
472 with self.database.atomic(): # type: ignore[no-untyped-call]
473 for statement in statements:
474 self.database.execute_sql(statement)
476 except Exception as e:
477 log.critical(f'An error occurred while executing the SQL script {self.item.name}.')
478 log.critical('Rolling back the database to preserve integrity.')
479 log.critical(f'Error details: {e}')
480 raise
482 def format_progress_message(self) -> None:
483 self.progress_message = f'Processing SQL file {self.item.name}'