# Copyright 2025 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
The module provides functionality to MAFw to interface to a DB.
"""
import warnings
from typing import TYPE_CHECKING, Any, Iterable, cast
# peewee type annotations are missing.
from peewee import ( # type: ignore[attr-defined]
SQL,
DatabaseProxy,
Field,
ModelBase,
ModelInsert,
Value,
make_snake_case,
)
from playhouse.shortcuts import dict_to_model, model_to_dict, update_model_from_dict
# noinspection PyUnresolvedReferences
from playhouse.signals import Model
from mafw.db import trigger
from mafw.db.db_types import PeeweeModelWithMeta
from mafw.db.fields import FileNameField
from mafw.db.model_register import ModelRegister
from mafw.db.trigger import Trigger
from mafw.mafw_errors import MAFwException, UnsupportedDatabaseError
database_proxy = DatabaseProxy()
"""This is a placeholder for the real database object that will be known only at run time"""
mafw_model_register = ModelRegister()
"""
This is the instance of the ModelRegister
.. seealso::
:class:`.ModelRegister` for more information on how to retrieve models and :class:`.RegisteredMeta` and :class:`MAFwBaseModel` for the automatic registration of models`
"""
[docs]
class MAFwBaseModelDoesNotExist(MAFwException):
"""Raised when the base model class is not existing."""
[docs]
def make_prefixed_suffixed_name(model_class: RegisteredMeta) -> str:
"""
Generate a table name with optional prefix and suffix for a given model class.
This function constructs a table name by combining the prefix, the snake_case
version of the model class name, and the suffix. If either prefix or suffix
are not defined in the model's metadata, empty strings are used instead.
The prefix, table name, and suffix are joined using underscores. For example:
- If a model class is named "UserAccount" with prefix="app", suffix="data",
the resulting table name will be "app_user_account_data"
- If a model class is named "Product" with prefix="ecommerce", suffix="_latest",
the resulting table name will be "ecommerce_product_latest"
.. note::
Underscores (_) will be automatically added to prefix and suffix if not already present.
:param model_class: The model class for which to generate the table name.
:type model_class: RegisteredMeta
:return: The constructed table name including prefix and suffix if applicable.
:rtype: str
"""
if TYPE_CHECKING:
assert hasattr(model_class, '_meta')
if hasattr(model_class._meta, 'suffix') and model_class._meta.suffix is not None:
suffix = model_class._meta.suffix
else:
suffix = ''
if hasattr(model_class._meta, 'prefix') and model_class._meta.prefix is not None:
prefix = model_class._meta.prefix
else:
prefix = ''
if not suffix.startswith('_') and suffix != '':
suffix = '_' + suffix
if not prefix.endswith('_') and prefix != '':
prefix = prefix + '_'
return f'{prefix}{make_snake_case(model_class.__name__)}{suffix}'
[docs]
class MAFwBaseModel(Model, metaclass=RegisteredMeta):
"""The base model for the MAFw library.
Every model class (table) that the user wants to interface must inherit from this base.
This class extends peewee's Model with several additional features:
1. Automatic model registration: Models are automatically registered with the MAFw model registry
during class definition, enabling dynamic discovery and management of database models.
2. Trigger support: The class supports defining database triggers through the :meth:`.triggers` method,
which are automatically created when the table is created. File removal triggers can also be automatically
generated using the `file_trigger_auto_create` boolean flag in the :ref:`meta class <auto_triggers>`. See also
:meth:`.file_removal_triggers`.
3. Standard upsert operations: Provides :meth:`.std_upsert` and :meth:`.std_upsert_many` methods for
performing upsert operations that work with SQLite and PostgreSQL.
4. Dictionary conversion utilities: Includes :meth:`.to_dict`, :meth:`.from_dict`, and :meth:`.update_from_dict`
methods for easy serialization and deserialization of model instances.
5. Customizable table naming: Supports table name prefixes and suffixes through the Meta class
with `prefix` and `suffix` attributes. See :func:`.make_prefixed_suffixed_name`.
6. Automatic table creation control: The `automatic_creation` Meta attribute controls whether
tables are automatically created when the application starts.
.. note::
The automatic model registration can be disabled for one single model class using the keyword argument
`do_not_register` passed to the :class:`.RegisteredMeta` meta-class. For example:
.. code-block:: python
class AutoRegisterModel(MAFwBaseModel):
pass
class NoRegisterModel(MAFwBaseModel, do_not_register=True):
pass
the first class will be automatically registered, while the second one will not. This is particularly useful if
the user wants to define a base model class for the whole project without having it in the register where
only concrete Model implementations are stored.
"""
[docs]
@classmethod
def get_fields_by_type(cls, field_type: type[Field]) -> dict[str, Field]:
"""
Return a dict {field_name: field_object} for all fields of the given type.
.. versionadded:: v2.0.0
:param field_type: Field type
:type field_type: peewee.Field
:return: A dict {field_name: field_object} for all fields of the given type.
:rtype: dict[str, peewee.Field]
"""
if TYPE_CHECKING:
assert hasattr(cls, '_meta')
return {name: field for name, field in cls._meta.fields.items() if isinstance(field, field_type)}
[docs]
@classmethod
def file_removal_triggers(cls) -> list[Trigger]:
"""
Generate a list of triggers for automatic file removal when records are deleted.
This method creates database triggers that automatically handle file cleanup when
records containing :class:`~mafw.db.fields.FileNameField` fields are removed from
the database table. The triggers insert the filenames and checksums into the
:class:`~mafw.db.std_tables.OrphanFile` table for later processing.
The triggers are only created if the model has at least one field of type
:class:`~mafw.db.fields.FileNameField`. If no such fields exist, an empty list
is returned.
:class:`.FileNameListField` is a subclass of :class:`.FileNameField` and is treated in the same
way.
.. versionadded:: v2.0.0
.. note::
This functionality requires the ``file_trigger_auto_create`` attribute in the
model's Meta class to be set to ``True`` for automatic trigger creation.
:return: A list containing the trigger object for file removal, or an empty list
if no :class:`~mafw.db.fields.FileNameField` fields are found.
:rtype: list[:class:`~mafw.db.trigger.Trigger`]
"""
from mafw.db.std_tables import OrphanFile, TriggerStatus
# it includes also FileNameListField that is a subclass of FileNameField
file_fields = cls.get_fields_by_type(FileNameField)
if len(file_fields) == 0:
return []
if TYPE_CHECKING:
assert hasattr(cls, '_meta')
new_trigger = Trigger(
trigger_name=cls._meta.table_name + '_delete_files',
trigger_type=(trigger.TriggerWhen.Before, trigger.TriggerAction.Delete),
source_table=cls,
safe=True,
for_each_row=True,
)
sub_query = TriggerStatus.select(TriggerStatus.status).where(TriggerStatus.trigger_type == 'DELETE_FILES') # type: ignore[no-untyped-call]
trigger_condition = Value(1) == sub_query
new_trigger.add_when(trigger_condition)
data = []
for f in file_fields:
c = cast(FileNameField, file_fields[f]).checksum_field or f
# the checksum is not really used in the OrphanFile.
# in versions before 2.0.0, the checksum field in OrphanFile was not nullable,
# so it should contain something. In order to be backward compatible, in case of a missing checksum field
# we will just use the filename
# note that the automatic filename to checksum conversion will not work in this case because the trigger
# lives in the database and not in the application.
data.append({'filenames': SQL(f'OLD.{f}'), 'checksum': SQL(f'OLD.{c}')})
insert_query = OrphanFile.insert_many(data)
new_trigger.add_sql(insert_query)
return [new_trigger]
[docs]
@classmethod
def triggers(cls) -> list[Trigger]:
"""
Returns an iterable of :class:`~mafw.db.trigger.Trigger` objects to create upon table creation.
The user must overload this returning all the triggers that must be created along with this class.
"""
return []
# noinspection PyUnresolvedReferences
[docs]
@classmethod
def create_table(cls, safe: bool = True, **options: Any) -> None:
"""
Create the table in the underlying DB and all the related trigger as well.
If the creation of a trigger fails, then the whole table dropped, and the original exception is re-raised.
.. warning::
Trigger creation has been extensively tested with :link:`SQLite`, but not with the other database implementation.
Please report any malfunction.
:param safe: Flag to add an IF NOT EXISTS to the creation statement. Defaults to True.
:type safe: bool, Optional
:param options: Additional options passed to the super method.
"""
super().create_table(safe, **options)
# this is just use to make mypy happy.
meta_cls = cast(PeeweeModelWithMeta, cls)
# Get the database instance, it is used for trigger creation
db = meta_cls._meta.database
triggers_list = cls.triggers()
if meta_cls._meta.file_trigger_auto_create:
triggers_list.extend(cls.file_removal_triggers())
if len(triggers_list):
# Create tables with appropriate error handling
try:
for trigger in triggers_list:
trigger.set_database(db)
try:
db.execute_sql(trigger.create())
except UnsupportedDatabaseError as e:
warnings.warn(f'Skipping unsupported trigger {trigger.trigger_name}: {str(e)}')
except Exception:
raise
except:
# If an error occurs, drop the table and any created triggers
meta_cls._meta.database.drop_tables([cls], safe=safe)
for trigger in triggers_list:
try:
db.execute_sql(trigger.drop(True))
except Exception:
pass # Ignore errors when dropping triggers during cleanup
raise
# noinspection PyProtectedMember
[docs]
@classmethod
def std_upsert(cls, __data: dict[str, Any] | None = None, **mapping: Any) -> ModelInsert:
"""
Perform a so-called standard upsert.
An upsert statement is not part of the standard SQL and different databases have different ways to implement it.
This method will work for modern versions of :link:`sqlite` and :link:`postgreSQL`.
Here is a `detailed explanation for SQLite <https://www.sqlite.org/lang_upsert.html>`_.
An upsert is a statement in which we try to insert some data in a table where there are some constraints.
If one constraint is failing, then instead of inserting a new row, we will try to update the existing row
causing the constraint violation.
A standard upsert, in the naming convention of MAFw, is setting the conflict cause to the primary key with all
other fields being updated. In other words, the database will try to insert the data provided in the table, but
if the primary key already exists, then all other fields will be updated.
This method is equivalent to the following:
.. code-block:: python
class Sample(MAFwBaseModel):
sample_id = AutoField(
primary_key=True,
help_text='The sample id primary key',
)
sample_name = TextField(help_text='The sample name')
(
Sample.insert(sample_id=1, sample_name='my_sample')
.on_conflict(
preserve=[Sample.sample_name]
) # use the value we would have inserted
.execute()
)
:param __data: A dictionary containing the key/value pair for the insert. The key is the column name.
Defaults to None
:type __data: dict, Optional
:param mapping: Keyword arguments representing the value to be inserted.
"""
# this is used just to make mypy happy.
# cls and meta_cls are exactly the same thing
meta_cls = cast(PeeweeModelWithMeta, cls)
if meta_cls._meta.composite_key:
conflict_target = [meta_cls._meta.fields[n] for n in meta_cls._meta.primary_key.field_names]
else:
conflict_target = [meta_cls._meta.primary_key]
conflict_target_names = [f.name for f in conflict_target]
preserve = [f for n, f in meta_cls._meta.fields.items() if n not in conflict_target_names]
return cast(
ModelInsert, cls.insert(__data, **mapping).on_conflict(conflict_target=conflict_target, preserve=preserve)
)
# noinspection PyProtectedMember
[docs]
@classmethod
def std_upsert_many(cls, rows: Iterable[Any], fields: list[str] | None = None) -> ModelInsert:
"""
Perform a standard upsert with many rows.
.. seealso::
Read the :meth:`std_upsert` documentation for an explanation of this method.
:param rows: A list with the rows to be inserted. Each item can be a dictionary or a tuple of values. If a
tuple is provided, then the `fields` must be provided.
:type rows: Iterable
:param fields: A list of field names. Defaults to None.
:type fields: list[str], Optional
"""
# this is used just to make mypy happy.
# cls and meta_cls are exactly the same thing
meta_cls = cast(PeeweeModelWithMeta, cls)
if meta_cls._meta.composite_key:
conflict_target = [meta_cls._meta.fields[n] for n in meta_cls._meta.primary_key.field_names]
else:
conflict_target = [meta_cls._meta.primary_key]
conflict_target_names = [f.name for f in conflict_target]
preserve = [f for n, f in meta_cls._meta.fields.items() if n not in conflict_target_names]
return cast(
ModelInsert,
(
cls.insert_many(rows, fields).on_conflict(
conflict_target=conflict_target,
preserve=preserve,
)
),
)
[docs]
def to_dict(
self,
recurse: bool = True,
backrefs: bool = False,
only: list[str] | None = None,
exclude: list[str] | None = None,
**kwargs: Any,
) -> dict[str, Any]:
"""
Convert model instance to dictionary with optional parameters
See full documentation directly on the `peewee documentation
<https://docs.peewee-orm.com/en/latest/peewee/playhouse.html#dict_to_model>`__.
:param recurse: If to recurse through foreign keys. Default to True.
:type recurse: bool, Optional
:param backrefs: If to include backrefs. Default to False.
:type backrefs: bool, Optional
:param only: A list of fields to be included. Defaults to None.
:type only: list[str], Optional
:param exclude: A list of fields to be excluded. Defaults to None.
:type exclude: list[str], Optional
:param kwargs: Other keyword arguments to be passed to peewee `playhouse shortcut <https://docs.peewee-orm.com/en/latest/peewee/playhouse.html#dict_to_model>`__.
:return: A dictionary containing the key/value of the model.
:rtype: dict[str, Any]
"""
# the playhouse module of peewee is not typed.
return model_to_dict( # type: ignore[no-any-return]
self,
recurse=recurse,
backrefs=backrefs, # type: ignore[no-untyped-call]
only=only,
exclude=exclude,
**kwargs,
)
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any], ignore_unknown: bool = False) -> 'MAFwBaseModel':
"""
Create a new model instance from dictionary
:param data: The dictionary containing the key/value pairs of the model.
:type data: dict[str, Any]
:param ignore_unknown: If unknown dictionary keys should be ignored.
:type ignore_unknown: bool
:return: A new model instance.
:rtype: MAFwBaseModel
"""
# the playhouse module of peewee is not typed.
return dict_to_model(cls, data, ignore_unknown=ignore_unknown) # type: ignore[no-untyped-call,no-any-return]
[docs]
def update_from_dict(self, data: dict[str, Any], ignore_unknown: bool = False) -> 'MAFwBaseModel':
"""
Update current model instance from dictionary
The model instance is returned for daisy-chaining.
:param data: The dictionary containing the key/value pairs of the model.
:type data: dict[str, Any]
:param ignore_unknown: If unknown dictionary keys should be ignored.
:type ignore_unknown: bool
"""
update_model_from_dict(self, data, ignore_unknown=ignore_unknown) # type: ignore[no-untyped-call]
return self
class Meta:
"""The metadata container for the Model class"""
database = database_proxy
"""The reference database. A proxy is used as a placeholder that will be automatically replaced by the real
instance of the database at runtime."""
legacy_table_names = False
"""
Set the default table name as the snake case of the Model camel case name.
So for example, a model named ThisIsMyTable will corresponds to a database table named this_is_my_table.
"""
suffix = ''
"""
Set the value to append to the table name.
"""
prefix = ''
"""
Set the value to prepend to the table name.
"""
table_function = make_prefixed_suffixed_name
"""
Set the table naming function.
"""
automatic_creation = True
"""
Whether the table linked to the model should be created automatically
"""
file_trigger_auto_create = False
"""
Whether to automatically create triggers to delete files once a row with a FilenameField is removed
"""