mafw.processor_library.db_init

Database initialisation processor module.

This module contains the following processors:

TableCreator

processor which handles the creation of database tables based on registered models. It provides functionality to create tables automatically while respecting existing tables and offering options for forced recreation.

TriggerRefresher

processor to safely update the trigger definitions. It removes all existing triggers and regenerates them according to the new definition. Particularly useful when debugging triggers, it can also be left at the beginning of all analysis pipelines since it does not cause any loss of data.

SQLScriptRunner

processor to execute SQL scripts from files against the database. It reads SQL files, removes block comments, splits the content into individual statements, and executes them within a transaction.

Added in version v2.0.0.

Classes

SQLScriptRunner(*args, **kwargs)

Processor to execute SQL scripts from files against the database.

TableCreator(*args, **kwargs)

Processor to create all tables in the database.

TriggerRefresher(*args, **kwargs)

Processor to recreate all triggers.

class mafw.processor_library.db_init.SQLScriptRunner(*args: Any, **kwargs: Any)[source]

Bases: Processor

Processor to execute SQL scripts from files against the database.

This processor reads SQL files, removes multi-line block comments, splits the content into individual statements, and executes them within a transaction. It is designed to handle SQL script execution in a safe manner by wrapping all statements in a single atomic transaction.

The processor accepts a list of SQL files through the sql_files parameter. Each file is validated to ensure it exists and is a regular file before processing. Block comments (/* … */) are removed from the SQL content before statement parsing.

Added in version v2.0.0.

Processor parameters

  • sql_files: A list of SQL files to be processed (default: [])

Constructor parameters

Parameters:
  • name (str, Optional) – The name of the processor. If None is provided, the class name is used instead. Defaults to None.

  • description (str, Optional) – A short description of the processor task. Defaults to the processor name.

  • config (dict, Optional) – A configuration dictionary for this processor. Defaults to None.

  • looper (LoopType, Optional) – Enumerator to define the looping type. Defaults to LoopType.ForLoop

  • user_interface (UserInterfaceBase, Optional) – A user interface instance to be used by the processor to interact with the user.

  • timer (Timer, Optional) – A timer object to measure process duration.

  • timer_params (dict, Optional) – Parameters for the timer object.

  • database (Database, Optional) – A database instance. Defaults to None.

  • database_conf (dict, Optional) – Configuration for the database. Default to None.

  • remove_orphan_files (bool, Optional) – Boolean flag to remove files on disc without a reference to the database. See Standard tables and _remove_orphan_files(). Defaults to True

  • replica_id (str, Optional) – The replica identifier for the current processor.

  • create_standard_tables (bool, Optional) – Boolean flag to create std tables on disk. Defaults to True

  • kwargs – Keyword arguments that can be used to set processor parameters.

format_progress_message() None[source]

Customizes the progress message with information about the current item.

The user can overload this method in order to modify the message being displayed during the process loop with information about the current item.

The user can access the current value, its position in the looping cycle and the total number of items using Processor.item, Processor.i_item and Processor.n_item.

get_items() Collection[Any][source]

Get the collection of SQL files to be processed.

Returns:

A collection of SQL file paths to be processed

Return type:

Collection[Any]

process() None[source]

Process a single SQL file by reading, parsing, and executing its statements.

Reads the SQL file content, removes multi-line block comments, splits the content into individual SQL statements, and executes them within a transaction.

If no statements are found in the file, a warning is logged. If an error occurs during execution, the transaction is rolled back and the exception is re-raised.

Raises:

Exception – If an error occurs during SQL statement execution.

start() None

Start method.

The user can overload this method, including all steps that should be performed at the beginning of the operation.

If the user decides to overload it, it should include a call to the super method.

validate_configuration() None[source]

Validate the configuration of SQL script runner.

Ensures that all specified SQL files exist and are regular files.

Raises:

InvalidConfigurationError – if any of the specified files does not exist or is not a regular file.

_config: dict[str, Any]

A dictionary containing the processor configuration object.

This dictionary is populated with configuration parameter (always type 2) during the _load_parameter_configuration() method.

The original value of the configuration dictionary that is passed to the constructor is stored in _orig_config.

Changed in version v2.0.0: Now it is an empty dictionary until the _load_parameter_configuration() is called.

_processor_parameters: dict[str, PassiveParameter[ParameterType]]

A dictionary to store all the processor parameter instances.

The name of the parameter is used as a key, while for the value an instance of the PassiveParameter is used.

filter_register: mafw.db.db_filter.ProcessorFilter

The DB filter register of the Processor.

item: Any

The current item of the loop.

loop_type: LoopType

The loop type.

The value of this parameter can also be changed by the execution_workflow() decorator factory.

See LoopType for more details.

remove_orphan_files: bool

The flag to remove or protect the orphan files. Defaults to True

sql_files

List of SQL files to be processed

class mafw.processor_library.db_init.TableCreator(*args: Any, **kwargs: Any)[source]

Bases: Processor

Processor to create all tables in the database.

This processor can be included in all pipelines in order to create all tables in the database. Its functionality is based on the fact that all MAFwBaseModel subclasses are automatically included in a global register (mafw_model_register).

This processor will perform the following:

  1. Get a list of all tables already existing in the database.

  2. Prune from the lists of models the ones for which already exist in the database.

  3. Create the remaining tables.

This overall behaviour can be modified via the following parameters:

  • force_recreate (bool, default = False): Use with extreme care. When set to True, all tables in the database and in the model register will be first dropped and then recreated. It is almost equivalent to a re-initialization of the whole DB with all the data being lost.

  • soft_recreate (bool, default = True): When set to true, all tables whose model is in the mafw model register will be recreated with the safe flag. It means that there won’t be any table drop. If a table is already existing, nothing will happen. If a new trigger is added to the table this will be created. When set to False, only tables whose model is in the register and that are not existing will be created.

  • apply_only_to_prefix (list[str], default = []): This parameter allows to create only the tables that do not already exist and whose name start with one of the provided prefixes.

Added in version v2.0.0.

Processor parameters

  • apply_only_to_prefix: Create only tables whose name start with the provided prefixes. (default: [])

  • force_recreate: First drop and then create the tables. LOSS OF ALL DATA!!! (default: False)

  • soft_recreate: Safe recreate tables without dropping. No data loss (default: True)

Constructor parameters

Parameters:
  • name (str, Optional) – The name of the processor. If None is provided, the class name is used instead. Defaults to None.

  • description (str, Optional) – A short description of the processor task. Defaults to the processor name.

  • config (dict, Optional) – A configuration dictionary for this processor. Defaults to None.

  • looper (LoopType, Optional) – Enumerator to define the looping type. Defaults to LoopType.ForLoop

  • user_interface (UserInterfaceBase, Optional) – A user interface instance to be used by the processor to interact with the user.

  • timer (Timer, Optional) – A timer object to measure process duration.

  • timer_params (dict, Optional) – Parameters for the timer object.

  • database (Database, Optional) – A database instance. Defaults to None.

  • database_conf (dict, Optional) – Configuration for the database. Default to None.

  • remove_orphan_files (bool, Optional) – Boolean flag to remove files on disc without a reference to the database. See Standard tables and _remove_orphan_files(). Defaults to True

  • replica_id (str, Optional) – The replica identifier for the current processor.

  • create_standard_tables (bool, Optional) – Boolean flag to create std tables on disk. Defaults to True

  • kwargs – Keyword arguments that can be used to set processor parameters.

process() None[source]

Execute the table creation process.

This method performs the following steps:

  1. Identify all models that have automatic creation enabled.

  2. Filter models based on the apply_only_to_prefix parameter if specified.

  3. Handle forced recreation if requested, including user confirmation.

  4. Handle soft recreation if requested, letting all tables with a known model be recreated.

  5. Create the required tables.

  6. Initialise standard tables after recreation if needed.

If user cancel the creation, the processor exit status is set to ProcessorExitStatus.Aborted so that the whole processor list is blocked.

start() None

Start method.

The user can overload this method, including all steps that should be performed at the beginning of the operation.

If the user decides to overload it, it should include a call to the super method.

validate_configuration() None[source]

Configuration validation

force_recreate and soft_recreate cannot be both valid.

Raises:

InvalidConfigurationError – if both recreate types are True.

_config: dict[str, Any]

A dictionary containing the processor configuration object.

This dictionary is populated with configuration parameter (always type 2) during the _load_parameter_configuration() method.

The original value of the configuration dictionary that is passed to the constructor is stored in _orig_config.

Changed in version v2.0.0: Now it is an empty dictionary until the _load_parameter_configuration() is called.

_processor_parameters: dict[str, PassiveParameter[ParameterType]]

A dictionary to store all the processor parameter instances.

The name of the parameter is used as a key, while for the value an instance of the PassiveParameter is used.

apply_only_to_prefix

Apply only to tables starting with prefix (list[str], default = []).

This parameter allows to create only the tables that do not already exist and whose name start with one of the provided prefixes.

existing_table_names: list[str]

The list of all existing tables in the database.

filter_register: mafw.db.db_filter.ProcessorFilter

The DB filter register of the Processor.

force_recreate

Force recreate (bool, default = False).

Use with extreme care. When set to True, all tables in the database and in the model register will be first dropped and then recreated. It is almost equivalent to a re-initialization of the whole DB with all the data being lost.

item: Any

The current item of the loop.

loop_type: LoopType

The loop type.

The value of this parameter can also be changed by the execution_workflow() decorator factory.

See LoopType for more details.

remove_orphan_files: bool

The flag to remove or protect the orphan files. Defaults to True

soft_recreate

Soft recreate (bool default = True).

When set to true, all tables whose model is in the mafw model register will be recreated with the safe flag. It means that there won’t be any table drop. If a table is already existing, nothing will happen. If a new trigger is added to the table, this will be created. When set to False, only tables whose model is in the register and that are not existing will be created.

class mafw.processor_library.db_init.TriggerRefresher(*args: Any, **kwargs: Any)[source]

Bases: Processor

Processor to recreate all triggers.

Triggers are database objects, and even though they could be created, dropped and modified at any moment, within the MAFw execution cycle they are normally created along with the table they are targeting.

When the table is created, also all its triggers are created, but unless differently specified, with the safe flag on, that means that they are created if they do not exist.

This might be particularly annoying when modifying an existing trigger, because you need to manually drop the trigger to let the table creation mechanism to create the newer version.

The goal of this processor is to drop all existing triggers and then recreate the corresponding tables so to have an updated version of the triggers.

The processor is relying on the fact that all subclasses of MAFwBaseModel are automatically inserted in the mafw_model_register so that the model class can be retrieved from the table name.

Before removing any trigger, the processor will build a list with all the affected tables and check if all of them are in the mafw_model_register, if so, it will proceed without asking any further confirmation. Otherwise, if some affected tables are not in the register, then it will ask the user to decide what to do:

  • Remove only the triggers whose tables are in the register and thus recreated afterward.

  • Remove all triggers, in this case, some of them will not be recreated.

  • Abort the processor.

Trigger manipulations (drop and creation) are not directly implemented in peewee and are an extension provided by MAFw. In order to be compatible with the three main databases (sqlite, mysql and postgresql), the SQL generation is obtained via the TriggerDialect interface.

See also

The Trigger class and also the trigger chapter for a deeper explanation on triggers.

The ModelRegister class, the mafw_model_register and the related chapter on the automatic registration mechanism.

The TriggerDialect and its subclasses, for a database independent way to generate SQL statement related to triggers.

Added in version v2.0.0.

Constructor parameters

Parameters:
  • name (str, Optional) – The name of the processor. If None is provided, the class name is used instead. Defaults to None.

  • description (str, Optional) – A short description of the processor task. Defaults to the processor name.

  • config (dict, Optional) – A configuration dictionary for this processor. Defaults to None.

  • looper (LoopType, Optional) – Enumerator to define the looping type. Defaults to LoopType.ForLoop

  • user_interface (UserInterfaceBase, Optional) – A user interface instance to be used by the processor to interact with the user.

  • timer (Timer, Optional) – A timer object to measure process duration.

  • timer_params (dict, Optional) – Parameters for the timer object.

  • database (Database, Optional) – A database instance. Defaults to None.

  • database_conf (dict, Optional) – Configuration for the database. Default to None.

  • remove_orphan_files (bool, Optional) – Boolean flag to remove files on disc without a reference to the database. See Standard tables and _remove_orphan_files(). Defaults to True

  • replica_id (str, Optional) – The replica identifier for the current processor.

  • create_standard_tables (bool, Optional) – Boolean flag to create std tables on disk. Defaults to True

  • kwargs – Keyword arguments that can be used to set processor parameters.

finish() None[source]

Recreate the tables from which triggers were dropped.

This is only done if the user did not abort the process.

format_progress_message() None[source]

Customizes the progress message with information about the current item.

The user can overload this method in order to modify the message being displayed during the process loop with information about the current item.

The user can access the current value, its position in the looping cycle and the total number of items using Processor.item, Processor.i_item and Processor.n_item.

get_dialect() TriggerDialect[source]

Get the valid SQL dialect based on the type of Database

Returns:

The SQL trigger dialect

Type:

TriggerDialect

Raises:

UnsupportedDatabaseError if there is no dialect for the current DB.

get_items() Collection[Any][source]

Retrieves a list of database triggers and interacts with the user to determine which ones to process.

This method fetches all currently defined database triggers. If any tables associated with these triggers are not known (i.e., not registered in mafw_model_register), it enters an interactive mode to prompt the user for a course of action:

  1. Remove All Triggers (A): Processes all triggers for subsequent removal, but only marks ‘rebuildable’ tables for rebuilding.

  2. Remove Only Rebuildable Triggers (O): Processes only triggers associated with ‘rebuildable’ tables.

  3. Quit (Q): Aborts the entire process.

If no unknown tables are found, or the user chooses to process rebuildable tables, the list of triggers and the set of tables to be rebuilt are prepared for the next stage.

Returns:

A collection of database triggers to be processed, in the for tuple trigger_name, table_name

Return type:

List[Tuple[str, str]]

process() None[source]

Delete the current trigger from its table

start() None[source]

Start method.

The user can overload this method, including all steps that should be performed at the beginning of the operation.

If the user decides to overload it, it should include a call to the super method.

_config: dict[str, Any]

A dictionary containing the processor configuration object.

This dictionary is populated with configuration parameter (always type 2) during the _load_parameter_configuration() method.

The original value of the configuration dictionary that is passed to the constructor is stored in _orig_config.

Changed in version v2.0.0: Now it is an empty dictionary until the _load_parameter_configuration() is called.

_processor_parameters: dict[str, PassiveParameter[ParameterType]]

A dictionary to store all the processor parameter instances.

The name of the parameter is used as a key, while for the value an instance of the PassiveParameter is used.

filter_register: mafw.db.db_filter.ProcessorFilter

The DB filter register of the Processor.

item: Any

The current item of the loop.

loop_type: LoopType

The loop type.

The value of this parameter can also be changed by the execution_workflow() decorator factory.

See LoopType for more details.

remove_orphan_files: bool

The flag to remove or protect the orphan files. Defaults to True