Coverage for src / mafw / db / db_model.py: 99%
150 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5The module provides functionality to MAFw to interface to a DB.
6"""
8import warnings
9from typing import TYPE_CHECKING, Any, Iterable, cast
11# peewee type annotations are missing.
12from peewee import ( # type: ignore[attr-defined]
13 SQL,
14 DatabaseProxy,
15 Field,
16 ModelBase,
17 ModelInsert,
18 Value,
19 make_snake_case,
20)
21from playhouse.shortcuts import dict_to_model, model_to_dict, update_model_from_dict
23# noinspection PyUnresolvedReferences
24from playhouse.signals import Model
26from mafw.db import trigger
27from mafw.db.db_types import PeeweeModelWithMeta
28from mafw.db.fields import FileNameField
29from mafw.db.model_register import ModelRegister
30from mafw.db.trigger import Trigger
31from mafw.mafw_errors import MAFwException, UnsupportedDatabaseError
33database_proxy = DatabaseProxy()
34"""This is a placeholder for the real database object that will be known only at run time"""
37mafw_model_register = ModelRegister()
38"""
39This is the instance of the ModelRegister
41 .. seealso::
43 :class:`.ModelRegister` for more information on how to retrieve models and :class:`.RegisteredMeta` and :class:`MAFwBaseModel` for the automatic registration of models`
45"""
48class RegisteredMeta(ModelBase):
49 """
50 Metaclass for registering models with the MAFw model registry.
52 This metaclass automatically registers model classes with the global model registry
53 when they are defined, allowing for dynamic discovery and management of database models.
54 It ensures that only concrete model classes (not the base classes themselves) are registered.
56 The registration process uses the table name from the model's metadata or generates
57 a snake_case version of the class name if no explicit table name is set.
58 """
60 reserved_field_names = ['__logic__', '__conditional__']
62 def __new__(cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: dict[str, Any]) -> type:
63 """
64 Create a new model class and register it if applicable.
66 This method is called during class definition to create the actual class
67 and register it with the MAFw model registry if it's a concrete model class.
69 :param name: The name of the class being created.
70 :type name: str
71 :param bases: The base classes of the class being created.
72 :type bases: tuple
73 :param attrs: The attributes of the class being created.
74 :type attrs: dict
75 :param kwargs: Other keyword attributes passed to the class.
76 :type kwargs: dict[str, Any]
77 :return: The newly created class.
78 :rtype: type
79 """
80 # store here any extra argument that is passed to the class defition.
81 extra_args = kwargs
83 meta = attrs.get('Meta', None)
84 if meta:
85 custom_table_name = meta.__dict__.get('table_name', None)
86 else:
87 custom_table_name = None
88 model = cast(RegisteredMeta, super().__new__(cls, name, bases, attrs)) # type: ignore[no-untyped-call]
90 if TYPE_CHECKING:
91 assert hasattr(model, '_meta')
93 # for consistency, add the extra_args to the meta instance
94 model._meta.extra_args = extra_args
96 # check for the use of reserved field names.
97 field_names = model._meta.sorted_field_names
98 if any([reserved_name in field_names for reserved_name in RegisteredMeta.reserved_field_names]):
99 warnings.warn(
100 f'Model {model.__name__} is using a reserved field name. Filter configuration might not '
101 f'work properly. Reserved names are f{RegisteredMeta.reserved_field_names}.'
102 )
104 # this super call is actually creating the Model class according to peewee ModelBase (that is a metaclass)
105 # the model class will have an attribute (_meta) created by the ModelBase using the inner class Meta as a template.
106 # The additional attributes (like suffix, prefix, ...) that we have added in the MAFwBaseModel Meta class will be
107 # available. There is one problem, that the _meta.table_name is generated with our make_prefixed_suffixed_table_name
108 # but before that the additional attributes are actually transferred to the instance of the meta class.
109 # for this reason we need to regenerate the table_name now, when the whole Meta class is available.
110 # We do so only if the user didn't specify a custom table name
111 if custom_table_name is None:
112 if model._meta.table_function is None:
113 table_name = make_snake_case(model.__name__)
114 else:
115 table_name = model._meta.table_function(model)
116 model._meta.table_name = table_name
118 # Do we need to register the Model?
119 # By default we want to register (so skip_registration = False)
120 skip_registration = extra_args.get('do_not_register', False)
121 if skip_registration:
122 # no need to register, so just skip the registration.
123 return model
125 # Register only concrete model classes that are not base classes
126 # Check that we're not processing base classes themselves
127 is_base_class = name in ['Model', 'MAFwBaseModel', 'StandardTable']
129 # Check that we have valid bases and that at least one inherits from Model
130 has_valid_bases = bases and any(issubclass(base, Model) for base in bases)
132 if not is_base_class and has_valid_bases:
133 table_name = model._meta.table_name
134 mafw_model_register.register_model(table_name, model)
136 if hasattr(model._meta, 'suffix'): 136 ↛ 139line 136 didn't jump to line 139 because the condition on line 136 was always true
137 mafw_model_register.register_suffix(model._meta.suffix)
139 if hasattr(model._meta, 'prefix'): 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true
140 mafw_model_register.register_prefix(model._meta.prefix)
142 return model
145class MAFwBaseModelDoesNotExist(MAFwException):
146 """Raised when the base model class is not existing."""
149def make_prefixed_suffixed_name(model_class: RegisteredMeta) -> str:
150 """
151 Generate a table name with optional prefix and suffix for a given model class.
153 This function constructs a table name by combining the prefix, the snake_case
154 version of the model class name, and the suffix. If either prefix or suffix
155 are not defined in the model's metadata, empty strings are used instead.
157 The prefix, table name, and suffix are joined using underscores. For example:
159 - If a model class is named "UserAccount" with prefix="app", suffix="data",
160 the resulting table name will be "app_user_account_data"
162 - If a model class is named "Product" with prefix="ecommerce", suffix="_latest",
163 the resulting table name will be "ecommerce_product_latest"
165 .. note::
167 Underscores (_) will be automatically added to prefix and suffix if not already present.
169 :param model_class: The model class for which to generate the table name.
170 :type model_class: RegisteredMeta
171 :return: The constructed table name including prefix and suffix if applicable.
172 :rtype: str
173 """
174 if TYPE_CHECKING:
175 assert hasattr(model_class, '_meta')
177 if hasattr(model_class._meta, 'suffix') and model_class._meta.suffix is not None:
178 suffix = model_class._meta.suffix
179 else:
180 suffix = ''
182 if hasattr(model_class._meta, 'prefix') and model_class._meta.prefix is not None:
183 prefix = model_class._meta.prefix
184 else:
185 prefix = ''
187 if not suffix.startswith('_') and suffix != '':
188 suffix = '_' + suffix
190 if not prefix.endswith('_') and prefix != '':
191 prefix = prefix + '_'
193 return f'{prefix}{make_snake_case(model_class.__name__)}{suffix}'
196class MAFwBaseModel(Model, metaclass=RegisteredMeta):
197 """The base model for the MAFw library.
199 Every model class (table) that the user wants to interface must inherit from this base.
201 This class extends peewee's Model with several additional features:
203 1. Automatic model registration: Models are automatically registered with the MAFw model registry
204 during class definition, enabling dynamic discovery and management of database models.
206 2. Trigger support: The class supports defining database triggers through the :meth:`.triggers` method,
207 which are automatically created when the table is created. File removal triggers can also be automatically
208 generated using the `file_trigger_auto_create` boolean flag in the :ref:`meta class <auto_triggers>`. See also
209 :meth:`.file_removal_triggers`.
211 3. Standard upsert operations: Provides :meth:`.std_upsert` and :meth:`.std_upsert_many` methods for
212 performing upsert operations that work with SQLite and PostgreSQL.
214 4. Dictionary conversion utilities: Includes :meth:`.to_dict`, :meth:`.from_dict`, and :meth:`.update_from_dict`
215 methods for easy serialization and deserialization of model instances.
217 5. Customizable table naming: Supports table name prefixes and suffixes through the Meta class
218 with `prefix` and `suffix` attributes. See :func:`.make_prefixed_suffixed_name`.
220 6. Automatic table creation control: The `automatic_creation` Meta attribute controls whether
221 tables are automatically created when the application starts.
223 .. note::
225 The automatic model registration can be disabled for one single model class using the keyword argument
226 `do_not_register` passed to the :class:`.RegisteredMeta` meta-class. For example:
228 .. code-block:: python
230 class AutoRegisterModel(MAFwBaseModel):
231 pass
234 class NoRegisterModel(MAFwBaseModel, do_not_register=True):
235 pass
237 the first class will be automatically registered, while the second one will not. This is particularly useful if
238 the user wants to define a base model class for the whole project without having it in the register where
239 only concrete Model implementations are stored.
241 """
243 @classmethod
244 def get_fields_by_type(cls, field_type: type[Field]) -> dict[str, Field]:
245 """
246 Return a dict {field_name: field_object} for all fields of the given type.
248 .. versionadded:: v2.0.0
250 :param field_type: Field type
251 :type field_type: peewee.Field
252 :return: A dict {field_name: field_object} for all fields of the given type.
253 :rtype: dict[str, peewee.Field]
254 """
255 if TYPE_CHECKING:
256 assert hasattr(cls, '_meta')
258 return {name: field for name, field in cls._meta.fields.items() if isinstance(field, field_type)}
260 @classmethod
261 def file_removal_triggers(cls) -> list[Trigger]:
262 """
263 Generate a list of triggers for automatic file removal when records are deleted.
265 This method creates database triggers that automatically handle file cleanup when
266 records containing :class:`~mafw.db.fields.FileNameField` fields are removed from
267 the database table. The triggers insert the filenames and checksums into the
268 :class:`~mafw.db.std_tables.OrphanFile` table for later processing.
270 The triggers are only created if the model has at least one field of type
271 :class:`~mafw.db.fields.FileNameField`. If no such fields exist, an empty list
272 is returned.
274 :class:`.FileNameListField` is a subclass of :class:`.FileNameField` and is treated in the same
275 way.
277 .. versionadded:: v2.0.0
279 .. note::
281 This functionality requires the ``file_trigger_auto_create`` attribute in the
282 model's Meta class to be set to ``True`` for automatic trigger creation.
284 :return: A list containing the trigger object for file removal, or an empty list
285 if no :class:`~mafw.db.fields.FileNameField` fields are found.
286 :rtype: list[:class:`~mafw.db.trigger.Trigger`]
287 """
288 from mafw.db.std_tables import OrphanFile, TriggerStatus
290 # it includes also FileNameListField that is a subclass of FileNameField
291 file_fields = cls.get_fields_by_type(FileNameField)
293 if len(file_fields) == 0:
294 return []
296 if TYPE_CHECKING:
297 assert hasattr(cls, '_meta')
299 new_trigger = Trigger(
300 trigger_name=cls._meta.table_name + '_delete_files',
301 trigger_type=(trigger.TriggerWhen.Before, trigger.TriggerAction.Delete),
302 source_table=cls,
303 safe=True,
304 for_each_row=True,
305 )
306 sub_query = TriggerStatus.select(TriggerStatus.status).where(TriggerStatus.trigger_type == 'DELETE_FILES') # type: ignore[no-untyped-call]
307 trigger_condition = Value(1) == sub_query
308 new_trigger.add_when(trigger_condition)
309 data = []
310 for f in file_fields:
311 c = cast(FileNameField, file_fields[f]).checksum_field or f
312 # the checksum is not really used in the OrphanFile.
313 # in versions before 2.0.0, the checksum field in OrphanFile was not nullable,
314 # so it should contain something. In order to be backward compatible, in case of a missing checksum field
315 # we will just use the filename
316 # note that the automatic filename to checksum conversion will not work in this case because the trigger
317 # lives in the database and not in the application.
318 data.append({'filenames': SQL(f'OLD.{f}'), 'checksum': SQL(f'OLD.{c}')})
319 insert_query = OrphanFile.insert_many(data)
320 new_trigger.add_sql(insert_query)
322 return [new_trigger]
324 @classmethod
325 def triggers(cls) -> list[Trigger]:
326 """
327 Returns an iterable of :class:`~mafw.db.trigger.Trigger` objects to create upon table creation.
329 The user must overload this returning all the triggers that must be created along with this class.
330 """
331 return []
333 # noinspection PyUnresolvedReferences
334 @classmethod
335 def create_table(cls, safe: bool = True, **options: Any) -> None:
336 """
337 Create the table in the underlying DB and all the related trigger as well.
339 If the creation of a trigger fails, then the whole table dropped, and the original exception is re-raised.
341 .. warning::
343 Trigger creation has been extensively tested with :link:`SQLite`, but not with the other database implementation.
344 Please report any malfunction.
346 :param safe: Flag to add an IF NOT EXISTS to the creation statement. Defaults to True.
347 :type safe: bool, Optional
348 :param options: Additional options passed to the super method.
349 """
350 super().create_table(safe, **options)
352 # this is just use to make mypy happy.
353 meta_cls = cast(PeeweeModelWithMeta, cls)
355 # Get the database instance, it is used for trigger creation
356 db = meta_cls._meta.database
358 triggers_list = cls.triggers()
360 if meta_cls._meta.file_trigger_auto_create:
361 triggers_list.extend(cls.file_removal_triggers())
363 if len(triggers_list):
364 # Create tables with appropriate error handling
365 try:
366 for trigger in triggers_list:
367 trigger.set_database(db)
368 try:
369 db.execute_sql(trigger.create())
370 except UnsupportedDatabaseError as e:
371 warnings.warn(f'Skipping unsupported trigger {trigger.trigger_name}: {str(e)}')
372 except Exception:
373 raise
374 except:
375 # If an error occurs, drop the table and any created triggers
376 meta_cls._meta.database.drop_tables([cls], safe=safe)
377 for trigger in triggers_list:
378 try:
379 db.execute_sql(trigger.drop(True))
380 except Exception:
381 pass # Ignore errors when dropping triggers during cleanup
382 raise
384 # noinspection PyProtectedMember
385 @classmethod
386 def std_upsert(cls, __data: dict[str, Any] | None = None, **mapping: Any) -> ModelInsert:
387 """
388 Perform a so-called standard upsert.
390 An upsert statement is not part of the standard SQL and different databases have different ways to implement it.
391 This method will work for modern versions of :link:`sqlite` and :link:`postgreSQL`.
392 Here is a `detailed explanation for SQLite <https://www.sqlite.org/lang_upsert.html>`_.
394 An upsert is a statement in which we try to insert some data in a table where there are some constraints.
395 If one constraint is failing, then instead of inserting a new row, we will try to update the existing row
396 causing the constraint violation.
398 A standard upsert, in the naming convention of MAFw, is setting the conflict cause to the primary key with all
399 other fields being updated. In other words, the database will try to insert the data provided in the table, but
400 if the primary key already exists, then all other fields will be updated.
402 This method is equivalent to the following:
404 .. code-block:: python
406 class Sample(MAFwBaseModel):
407 sample_id = AutoField(
408 primary_key=True,
409 help_text='The sample id primary key',
410 )
411 sample_name = TextField(help_text='The sample name')
414 (
415 Sample.insert(sample_id=1, sample_name='my_sample')
416 .on_conflict(
417 preserve=[Sample.sample_name]
418 ) # use the value we would have inserted
419 .execute()
420 )
422 :param __data: A dictionary containing the key/value pair for the insert. The key is the column name.
423 Defaults to None
424 :type __data: dict, Optional
425 :param mapping: Keyword arguments representing the value to be inserted.
426 """
427 # this is used just to make mypy happy.
428 # cls and meta_cls are exactly the same thing
429 meta_cls = cast(PeeweeModelWithMeta, cls)
431 if meta_cls._meta.composite_key:
432 conflict_target = [meta_cls._meta.fields[n] for n in meta_cls._meta.primary_key.field_names]
433 else:
434 conflict_target = [meta_cls._meta.primary_key]
436 conflict_target_names = [f.name for f in conflict_target]
437 preserve = [f for n, f in meta_cls._meta.fields.items() if n not in conflict_target_names]
438 return cast(
439 ModelInsert, cls.insert(__data, **mapping).on_conflict(conflict_target=conflict_target, preserve=preserve)
440 )
442 # noinspection PyProtectedMember
443 @classmethod
444 def std_upsert_many(cls, rows: Iterable[Any], fields: list[str] | None = None) -> ModelInsert:
445 """
446 Perform a standard upsert with many rows.
448 .. seealso::
450 Read the :meth:`std_upsert` documentation for an explanation of this method.
452 :param rows: A list with the rows to be inserted. Each item can be a dictionary or a tuple of values. If a
453 tuple is provided, then the `fields` must be provided.
454 :type rows: Iterable
455 :param fields: A list of field names. Defaults to None.
456 :type fields: list[str], Optional
457 """
458 # this is used just to make mypy happy.
459 # cls and meta_cls are exactly the same thing
460 meta_cls = cast(PeeweeModelWithMeta, cls)
462 if meta_cls._meta.composite_key:
463 conflict_target = [meta_cls._meta.fields[n] for n in meta_cls._meta.primary_key.field_names]
464 else:
465 conflict_target = [meta_cls._meta.primary_key]
467 conflict_target_names = [f.name for f in conflict_target]
468 preserve = [f for n, f in meta_cls._meta.fields.items() if n not in conflict_target_names]
469 return cast(
470 ModelInsert,
471 (
472 cls.insert_many(rows, fields).on_conflict(
473 conflict_target=conflict_target,
474 preserve=preserve,
475 )
476 ),
477 )
479 def to_dict(
480 self,
481 recurse: bool = True,
482 backrefs: bool = False,
483 only: list[str] | None = None,
484 exclude: list[str] | None = None,
485 **kwargs: Any,
486 ) -> dict[str, Any]:
487 """
488 Convert model instance to dictionary with optional parameters
490 See full documentation directly on the `peewee documentation
491 <https://docs.peewee-orm.com/en/latest/peewee/playhouse.html#dict_to_model>`__.
493 :param recurse: If to recurse through foreign keys. Default to True.
494 :type recurse: bool, Optional
495 :param backrefs: If to include backrefs. Default to False.
496 :type backrefs: bool, Optional
497 :param only: A list of fields to be included. Defaults to None.
498 :type only: list[str], Optional
499 :param exclude: A list of fields to be excluded. Defaults to None.
500 :type exclude: list[str], Optional
501 :param kwargs: Other keyword arguments to be passed to peewee `playhouse shortcut <https://docs.peewee-orm.com/en/latest/peewee/playhouse.html#dict_to_model>`__.
502 :return: A dictionary containing the key/value of the model.
503 :rtype: dict[str, Any]
504 """
505 # the playhouse module of peewee is not typed.
506 return model_to_dict( # type: ignore[no-any-return]
507 self,
508 recurse=recurse,
509 backrefs=backrefs, # type: ignore[no-untyped-call]
510 only=only,
511 exclude=exclude,
512 **kwargs,
513 )
515 @classmethod
516 def from_dict(cls, data: dict[str, Any], ignore_unknown: bool = False) -> 'MAFwBaseModel':
517 """
518 Create a new model instance from dictionary
520 :param data: The dictionary containing the key/value pairs of the model.
521 :type data: dict[str, Any]
522 :param ignore_unknown: If unknown dictionary keys should be ignored.
523 :type ignore_unknown: bool
524 :return: A new model instance.
525 :rtype: MAFwBaseModel
526 """
527 # the playhouse module of peewee is not typed.
528 return dict_to_model(cls, data, ignore_unknown=ignore_unknown) # type: ignore[no-untyped-call,no-any-return]
530 def update_from_dict(self, data: dict[str, Any], ignore_unknown: bool = False) -> 'MAFwBaseModel':
531 """
532 Update current model instance from dictionary
534 The model instance is returned for daisy-chaining.
536 :param data: The dictionary containing the key/value pairs of the model.
537 :type data: dict[str, Any]
538 :param ignore_unknown: If unknown dictionary keys should be ignored.
539 :type ignore_unknown: bool
540 """
541 update_model_from_dict(self, data, ignore_unknown=ignore_unknown) # type: ignore[no-untyped-call]
542 return self
544 class Meta:
545 """The metadata container for the Model class"""
547 database = database_proxy
548 """The reference database. A proxy is used as a placeholder that will be automatically replaced by the real
549 instance of the database at runtime."""
551 legacy_table_names = False
552 """
553 Set the default table name as the snake case of the Model camel case name.
555 So for example, a model named ThisIsMyTable will corresponds to a database table named this_is_my_table.
556 """
558 suffix = ''
559 """
560 Set the value to append to the table name.
561 """
563 prefix = ''
564 """
565 Set the value to prepend to the table name.
566 """
568 table_function = make_prefixed_suffixed_name
569 """
570 Set the table naming function.
571 """
573 automatic_creation = True
574 """
575 Whether the table linked to the model should be created automatically
576 """
578 file_trigger_auto_create = False
579 """
580 Whether to automatically create triggers to delete files once a row with a FilenameField is removed
581 """