Coverage for src / mafw / db / db_model.py: 99%

150 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5The module provides functionality to MAFw to interface to a DB. 

6""" 

7 

8import warnings 

9from typing import TYPE_CHECKING, Any, Iterable, cast 

10 

11# peewee type annotations are missing. 

12from peewee import ( # type: ignore[attr-defined] 

13 SQL, 

14 DatabaseProxy, 

15 Field, 

16 ModelBase, 

17 ModelInsert, 

18 Value, 

19 make_snake_case, 

20) 

21from playhouse.shortcuts import dict_to_model, model_to_dict, update_model_from_dict 

22 

23# noinspection PyUnresolvedReferences 

24from playhouse.signals import Model 

25 

26from mafw.db import trigger 

27from mafw.db.db_types import PeeweeModelWithMeta 

28from mafw.db.fields import FileNameField 

29from mafw.db.model_register import ModelRegister 

30from mafw.db.trigger import Trigger 

31from mafw.mafw_errors import MAFwException, UnsupportedDatabaseError 

32 

33database_proxy = DatabaseProxy() 

34"""This is a placeholder for the real database object that will be known only at run time""" 

35 

36 

37mafw_model_register = ModelRegister() 

38""" 

39This is the instance of the ModelRegister 

40 

41 .. seealso:: 

42  

43 :class:`.ModelRegister` for more information on how to retrieve models and :class:`.RegisteredMeta` and :class:`MAFwBaseModel` for the automatic registration of models` 

44 

45""" 

46 

47 

48class RegisteredMeta(ModelBase): 

49 """ 

50 Metaclass for registering models with the MAFw model registry. 

51 

52 This metaclass automatically registers model classes with the global model registry 

53 when they are defined, allowing for dynamic discovery and management of database models. 

54 It ensures that only concrete model classes (not the base classes themselves) are registered. 

55 

56 The registration process uses the table name from the model's metadata or generates 

57 a snake_case version of the class name if no explicit table name is set. 

58 """ 

59 

60 reserved_field_names = ['__logic__', '__conditional__'] 

61 

62 def __new__(cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: dict[str, Any]) -> type: 

63 """ 

64 Create a new model class and register it if applicable. 

65 

66 This method is called during class definition to create the actual class 

67 and register it with the MAFw model registry if it's a concrete model class. 

68 

69 :param name: The name of the class being created. 

70 :type name: str 

71 :param bases: The base classes of the class being created. 

72 :type bases: tuple 

73 :param attrs: The attributes of the class being created. 

74 :type attrs: dict 

75 :param kwargs: Other keyword attributes passed to the class. 

76 :type kwargs: dict[str, Any] 

77 :return: The newly created class. 

78 :rtype: type 

79 """ 

80 # store here any extra argument that is passed to the class defition. 

81 extra_args = kwargs 

82 

83 meta = attrs.get('Meta', None) 

84 if meta: 

85 custom_table_name = meta.__dict__.get('table_name', None) 

86 else: 

87 custom_table_name = None 

88 model = cast(RegisteredMeta, super().__new__(cls, name, bases, attrs)) # type: ignore[no-untyped-call] 

89 

90 if TYPE_CHECKING: 

91 assert hasattr(model, '_meta') 

92 

93 # for consistency, add the extra_args to the meta instance 

94 model._meta.extra_args = extra_args 

95 

96 # check for the use of reserved field names. 

97 field_names = model._meta.sorted_field_names 

98 if any([reserved_name in field_names for reserved_name in RegisteredMeta.reserved_field_names]): 

99 warnings.warn( 

100 f'Model {model.__name__} is using a reserved field name. Filter configuration might not ' 

101 f'work properly. Reserved names are f{RegisteredMeta.reserved_field_names}.' 

102 ) 

103 

104 # this super call is actually creating the Model class according to peewee ModelBase (that is a metaclass) 

105 # the model class will have an attribute (_meta) created by the ModelBase using the inner class Meta as a template. 

106 # The additional attributes (like suffix, prefix, ...) that we have added in the MAFwBaseModel Meta class will be 

107 # available. There is one problem, that the _meta.table_name is generated with our make_prefixed_suffixed_table_name 

108 # but before that the additional attributes are actually transferred to the instance of the meta class. 

109 # for this reason we need to regenerate the table_name now, when the whole Meta class is available. 

110 # We do so only if the user didn't specify a custom table name 

111 if custom_table_name is None: 

112 if model._meta.table_function is None: 

113 table_name = make_snake_case(model.__name__) 

114 else: 

115 table_name = model._meta.table_function(model) 

116 model._meta.table_name = table_name 

117 

118 # Do we need to register the Model? 

119 # By default we want to register (so skip_registration = False) 

120 skip_registration = extra_args.get('do_not_register', False) 

121 if skip_registration: 

122 # no need to register, so just skip the registration. 

123 return model 

124 

125 # Register only concrete model classes that are not base classes 

126 # Check that we're not processing base classes themselves 

127 is_base_class = name in ['Model', 'MAFwBaseModel', 'StandardTable'] 

128 

129 # Check that we have valid bases and that at least one inherits from Model 

130 has_valid_bases = bases and any(issubclass(base, Model) for base in bases) 

131 

132 if not is_base_class and has_valid_bases: 

133 table_name = model._meta.table_name 

134 mafw_model_register.register_model(table_name, model) 

135 

136 if hasattr(model._meta, 'suffix'): 136 ↛ 139line 136 didn't jump to line 139 because the condition on line 136 was always true

137 mafw_model_register.register_suffix(model._meta.suffix) 

138 

139 if hasattr(model._meta, 'prefix'): 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true

140 mafw_model_register.register_prefix(model._meta.prefix) 

141 

142 return model 

143 

144 

145class MAFwBaseModelDoesNotExist(MAFwException): 

146 """Raised when the base model class is not existing.""" 

147 

148 

149def make_prefixed_suffixed_name(model_class: RegisteredMeta) -> str: 

150 """ 

151 Generate a table name with optional prefix and suffix for a given model class. 

152 

153 This function constructs a table name by combining the prefix, the snake_case 

154 version of the model class name, and the suffix. If either prefix or suffix 

155 are not defined in the model's metadata, empty strings are used instead. 

156 

157 The prefix, table name, and suffix are joined using underscores. For example: 

158 

159 - If a model class is named "UserAccount" with prefix="app", suffix="data", 

160 the resulting table name will be "app_user_account_data" 

161 

162 - If a model class is named "Product" with prefix="ecommerce", suffix="_latest", 

163 the resulting table name will be "ecommerce_product_latest" 

164 

165 .. note:: 

166 

167 Underscores (_) will be automatically added to prefix and suffix if not already present. 

168 

169 :param model_class: The model class for which to generate the table name. 

170 :type model_class: RegisteredMeta 

171 :return: The constructed table name including prefix and suffix if applicable. 

172 :rtype: str 

173 """ 

174 if TYPE_CHECKING: 

175 assert hasattr(model_class, '_meta') 

176 

177 if hasattr(model_class._meta, 'suffix') and model_class._meta.suffix is not None: 

178 suffix = model_class._meta.suffix 

179 else: 

180 suffix = '' 

181 

182 if hasattr(model_class._meta, 'prefix') and model_class._meta.prefix is not None: 

183 prefix = model_class._meta.prefix 

184 else: 

185 prefix = '' 

186 

187 if not suffix.startswith('_') and suffix != '': 

188 suffix = '_' + suffix 

189 

190 if not prefix.endswith('_') and prefix != '': 

191 prefix = prefix + '_' 

192 

193 return f'{prefix}{make_snake_case(model_class.__name__)}{suffix}' 

194 

195 

196class MAFwBaseModel(Model, metaclass=RegisteredMeta): 

197 """The base model for the MAFw library. 

198 

199 Every model class (table) that the user wants to interface must inherit from this base. 

200 

201 This class extends peewee's Model with several additional features: 

202 

203 1. Automatic model registration: Models are automatically registered with the MAFw model registry 

204 during class definition, enabling dynamic discovery and management of database models. 

205 

206 2. Trigger support: The class supports defining database triggers through the :meth:`.triggers` method, 

207 which are automatically created when the table is created. File removal triggers can also be automatically 

208 generated using the `file_trigger_auto_create` boolean flag in the :ref:`meta class <auto_triggers>`. See also 

209 :meth:`.file_removal_triggers`. 

210 

211 3. Standard upsert operations: Provides :meth:`.std_upsert` and :meth:`.std_upsert_many` methods for 

212 performing upsert operations that work with SQLite and PostgreSQL. 

213 

214 4. Dictionary conversion utilities: Includes :meth:`.to_dict`, :meth:`.from_dict`, and :meth:`.update_from_dict` 

215 methods for easy serialization and deserialization of model instances. 

216 

217 5. Customizable table naming: Supports table name prefixes and suffixes through the Meta class 

218 with `prefix` and `suffix` attributes. See :func:`.make_prefixed_suffixed_name`. 

219 

220 6. Automatic table creation control: The `automatic_creation` Meta attribute controls whether 

221 tables are automatically created when the application starts. 

222 

223 .. note:: 

224 

225 The automatic model registration can be disabled for one single model class using the keyword argument 

226 `do_not_register` passed to the :class:`.RegisteredMeta` meta-class. For example: 

227 

228 .. code-block:: python 

229 

230 class AutoRegisterModel(MAFwBaseModel): 

231 pass 

232 

233 

234 class NoRegisterModel(MAFwBaseModel, do_not_register=True): 

235 pass 

236 

237 the first class will be automatically registered, while the second one will not. This is particularly useful if 

238 the user wants to define a base model class for the whole project without having it in the register where 

239 only concrete Model implementations are stored. 

240 

241 """ 

242 

243 @classmethod 

244 def get_fields_by_type(cls, field_type: type[Field]) -> dict[str, Field]: 

245 """ 

246 Return a dict {field_name: field_object} for all fields of the given type. 

247 

248 .. versionadded:: v2.0.0 

249 

250 :param field_type: Field type 

251 :type field_type: peewee.Field 

252 :return: A dict {field_name: field_object} for all fields of the given type. 

253 :rtype: dict[str, peewee.Field] 

254 """ 

255 if TYPE_CHECKING: 

256 assert hasattr(cls, '_meta') 

257 

258 return {name: field for name, field in cls._meta.fields.items() if isinstance(field, field_type)} 

259 

260 @classmethod 

261 def file_removal_triggers(cls) -> list[Trigger]: 

262 """ 

263 Generate a list of triggers for automatic file removal when records are deleted. 

264 

265 This method creates database triggers that automatically handle file cleanup when 

266 records containing :class:`~mafw.db.fields.FileNameField` fields are removed from 

267 the database table. The triggers insert the filenames and checksums into the 

268 :class:`~mafw.db.std_tables.OrphanFile` table for later processing. 

269 

270 The triggers are only created if the model has at least one field of type 

271 :class:`~mafw.db.fields.FileNameField`. If no such fields exist, an empty list 

272 is returned. 

273 

274 :class:`.FileNameListField` is a subclass of :class:`.FileNameField` and is treated in the same 

275 way. 

276 

277 .. versionadded:: v2.0.0 

278 

279 .. note:: 

280 

281 This functionality requires the ``file_trigger_auto_create`` attribute in the 

282 model's Meta class to be set to ``True`` for automatic trigger creation. 

283 

284 :return: A list containing the trigger object for file removal, or an empty list 

285 if no :class:`~mafw.db.fields.FileNameField` fields are found. 

286 :rtype: list[:class:`~mafw.db.trigger.Trigger`] 

287 """ 

288 from mafw.db.std_tables import OrphanFile, TriggerStatus 

289 

290 # it includes also FileNameListField that is a subclass of FileNameField 

291 file_fields = cls.get_fields_by_type(FileNameField) 

292 

293 if len(file_fields) == 0: 

294 return [] 

295 

296 if TYPE_CHECKING: 

297 assert hasattr(cls, '_meta') 

298 

299 new_trigger = Trigger( 

300 trigger_name=cls._meta.table_name + '_delete_files', 

301 trigger_type=(trigger.TriggerWhen.Before, trigger.TriggerAction.Delete), 

302 source_table=cls, 

303 safe=True, 

304 for_each_row=True, 

305 ) 

306 sub_query = TriggerStatus.select(TriggerStatus.status).where(TriggerStatus.trigger_type == 'DELETE_FILES') # type: ignore[no-untyped-call] 

307 trigger_condition = Value(1) == sub_query 

308 new_trigger.add_when(trigger_condition) 

309 data = [] 

310 for f in file_fields: 

311 c = cast(FileNameField, file_fields[f]).checksum_field or f 

312 # the checksum is not really used in the OrphanFile. 

313 # in versions before 2.0.0, the checksum field in OrphanFile was not nullable, 

314 # so it should contain something. In order to be backward compatible, in case of a missing checksum field 

315 # we will just use the filename 

316 # note that the automatic filename to checksum conversion will not work in this case because the trigger 

317 # lives in the database and not in the application. 

318 data.append({'filenames': SQL(f'OLD.{f}'), 'checksum': SQL(f'OLD.{c}')}) 

319 insert_query = OrphanFile.insert_many(data) 

320 new_trigger.add_sql(insert_query) 

321 

322 return [new_trigger] 

323 

324 @classmethod 

325 def triggers(cls) -> list[Trigger]: 

326 """ 

327 Returns an iterable of :class:`~mafw.db.trigger.Trigger` objects to create upon table creation. 

328 

329 The user must overload this returning all the triggers that must be created along with this class. 

330 """ 

331 return [] 

332 

333 # noinspection PyUnresolvedReferences 

334 @classmethod 

335 def create_table(cls, safe: bool = True, **options: Any) -> None: 

336 """ 

337 Create the table in the underlying DB and all the related trigger as well. 

338 

339 If the creation of a trigger fails, then the whole table dropped, and the original exception is re-raised. 

340 

341 .. warning:: 

342 

343 Trigger creation has been extensively tested with :link:`SQLite`, but not with the other database implementation. 

344 Please report any malfunction. 

345 

346 :param safe: Flag to add an IF NOT EXISTS to the creation statement. Defaults to True. 

347 :type safe: bool, Optional 

348 :param options: Additional options passed to the super method. 

349 """ 

350 super().create_table(safe, **options) 

351 

352 # this is just use to make mypy happy. 

353 meta_cls = cast(PeeweeModelWithMeta, cls) 

354 

355 # Get the database instance, it is used for trigger creation 

356 db = meta_cls._meta.database 

357 

358 triggers_list = cls.triggers() 

359 

360 if meta_cls._meta.file_trigger_auto_create: 

361 triggers_list.extend(cls.file_removal_triggers()) 

362 

363 if len(triggers_list): 

364 # Create tables with appropriate error handling 

365 try: 

366 for trigger in triggers_list: 

367 trigger.set_database(db) 

368 try: 

369 db.execute_sql(trigger.create()) 

370 except UnsupportedDatabaseError as e: 

371 warnings.warn(f'Skipping unsupported trigger {trigger.trigger_name}: {str(e)}') 

372 except Exception: 

373 raise 

374 except: 

375 # If an error occurs, drop the table and any created triggers 

376 meta_cls._meta.database.drop_tables([cls], safe=safe) 

377 for trigger in triggers_list: 

378 try: 

379 db.execute_sql(trigger.drop(True)) 

380 except Exception: 

381 pass # Ignore errors when dropping triggers during cleanup 

382 raise 

383 

384 # noinspection PyProtectedMember 

385 @classmethod 

386 def std_upsert(cls, __data: dict[str, Any] | None = None, **mapping: Any) -> ModelInsert: 

387 """ 

388 Perform a so-called standard upsert. 

389 

390 An upsert statement is not part of the standard SQL and different databases have different ways to implement it. 

391 This method will work for modern versions of :link:`sqlite` and :link:`postgreSQL`. 

392 Here is a `detailed explanation for SQLite <https://www.sqlite.org/lang_upsert.html>`_. 

393 

394 An upsert is a statement in which we try to insert some data in a table where there are some constraints. 

395 If one constraint is failing, then instead of inserting a new row, we will try to update the existing row 

396 causing the constraint violation. 

397 

398 A standard upsert, in the naming convention of MAFw, is setting the conflict cause to the primary key with all 

399 other fields being updated. In other words, the database will try to insert the data provided in the table, but 

400 if the primary key already exists, then all other fields will be updated. 

401 

402 This method is equivalent to the following: 

403 

404 .. code-block:: python 

405 

406 class Sample(MAFwBaseModel): 

407 sample_id = AutoField( 

408 primary_key=True, 

409 help_text='The sample id primary key', 

410 ) 

411 sample_name = TextField(help_text='The sample name') 

412 

413 

414 ( 

415 Sample.insert(sample_id=1, sample_name='my_sample') 

416 .on_conflict( 

417 preserve=[Sample.sample_name] 

418 ) # use the value we would have inserted 

419 .execute() 

420 ) 

421 

422 :param __data: A dictionary containing the key/value pair for the insert. The key is the column name. 

423 Defaults to None 

424 :type __data: dict, Optional 

425 :param mapping: Keyword arguments representing the value to be inserted. 

426 """ 

427 # this is used just to make mypy happy. 

428 # cls and meta_cls are exactly the same thing 

429 meta_cls = cast(PeeweeModelWithMeta, cls) 

430 

431 if meta_cls._meta.composite_key: 

432 conflict_target = [meta_cls._meta.fields[n] for n in meta_cls._meta.primary_key.field_names] 

433 else: 

434 conflict_target = [meta_cls._meta.primary_key] 

435 

436 conflict_target_names = [f.name for f in conflict_target] 

437 preserve = [f for n, f in meta_cls._meta.fields.items() if n not in conflict_target_names] 

438 return cast( 

439 ModelInsert, cls.insert(__data, **mapping).on_conflict(conflict_target=conflict_target, preserve=preserve) 

440 ) 

441 

442 # noinspection PyProtectedMember 

443 @classmethod 

444 def std_upsert_many(cls, rows: Iterable[Any], fields: list[str] | None = None) -> ModelInsert: 

445 """ 

446 Perform a standard upsert with many rows. 

447 

448 .. seealso:: 

449 

450 Read the :meth:`std_upsert` documentation for an explanation of this method. 

451 

452 :param rows: A list with the rows to be inserted. Each item can be a dictionary or a tuple of values. If a 

453 tuple is provided, then the `fields` must be provided. 

454 :type rows: Iterable 

455 :param fields: A list of field names. Defaults to None. 

456 :type fields: list[str], Optional 

457 """ 

458 # this is used just to make mypy happy. 

459 # cls and meta_cls are exactly the same thing 

460 meta_cls = cast(PeeweeModelWithMeta, cls) 

461 

462 if meta_cls._meta.composite_key: 

463 conflict_target = [meta_cls._meta.fields[n] for n in meta_cls._meta.primary_key.field_names] 

464 else: 

465 conflict_target = [meta_cls._meta.primary_key] 

466 

467 conflict_target_names = [f.name for f in conflict_target] 

468 preserve = [f for n, f in meta_cls._meta.fields.items() if n not in conflict_target_names] 

469 return cast( 

470 ModelInsert, 

471 ( 

472 cls.insert_many(rows, fields).on_conflict( 

473 conflict_target=conflict_target, 

474 preserve=preserve, 

475 ) 

476 ), 

477 ) 

478 

479 def to_dict( 

480 self, 

481 recurse: bool = True, 

482 backrefs: bool = False, 

483 only: list[str] | None = None, 

484 exclude: list[str] | None = None, 

485 **kwargs: Any, 

486 ) -> dict[str, Any]: 

487 """ 

488 Convert model instance to dictionary with optional parameters 

489 

490 See full documentation directly on the `peewee documentation 

491 <https://docs.peewee-orm.com/en/latest/peewee/playhouse.html#dict_to_model>`__. 

492 

493 :param recurse: If to recurse through foreign keys. Default to True. 

494 :type recurse: bool, Optional 

495 :param backrefs: If to include backrefs. Default to False. 

496 :type backrefs: bool, Optional 

497 :param only: A list of fields to be included. Defaults to None. 

498 :type only: list[str], Optional 

499 :param exclude: A list of fields to be excluded. Defaults to None. 

500 :type exclude: list[str], Optional 

501 :param kwargs: Other keyword arguments to be passed to peewee `playhouse shortcut <https://docs.peewee-orm.com/en/latest/peewee/playhouse.html#dict_to_model>`__. 

502 :return: A dictionary containing the key/value of the model. 

503 :rtype: dict[str, Any] 

504 """ 

505 # the playhouse module of peewee is not typed. 

506 return model_to_dict( # type: ignore[no-any-return] 

507 self, 

508 recurse=recurse, 

509 backrefs=backrefs, # type: ignore[no-untyped-call] 

510 only=only, 

511 exclude=exclude, 

512 **kwargs, 

513 ) 

514 

515 @classmethod 

516 def from_dict(cls, data: dict[str, Any], ignore_unknown: bool = False) -> 'MAFwBaseModel': 

517 """ 

518 Create a new model instance from dictionary 

519 

520 :param data: The dictionary containing the key/value pairs of the model. 

521 :type data: dict[str, Any] 

522 :param ignore_unknown: If unknown dictionary keys should be ignored. 

523 :type ignore_unknown: bool 

524 :return: A new model instance. 

525 :rtype: MAFwBaseModel 

526 """ 

527 # the playhouse module of peewee is not typed. 

528 return dict_to_model(cls, data, ignore_unknown=ignore_unknown) # type: ignore[no-untyped-call,no-any-return] 

529 

530 def update_from_dict(self, data: dict[str, Any], ignore_unknown: bool = False) -> 'MAFwBaseModel': 

531 """ 

532 Update current model instance from dictionary 

533 

534 The model instance is returned for daisy-chaining. 

535 

536 :param data: The dictionary containing the key/value pairs of the model. 

537 :type data: dict[str, Any] 

538 :param ignore_unknown: If unknown dictionary keys should be ignored. 

539 :type ignore_unknown: bool 

540 """ 

541 update_model_from_dict(self, data, ignore_unknown=ignore_unknown) # type: ignore[no-untyped-call] 

542 return self 

543 

544 class Meta: 

545 """The metadata container for the Model class""" 

546 

547 database = database_proxy 

548 """The reference database. A proxy is used as a placeholder that will be automatically replaced by the real  

549 instance of the database at runtime.""" 

550 

551 legacy_table_names = False 

552 """ 

553 Set the default table name as the snake case of the Model camel case name. 

554  

555 So for example, a model named ThisIsMyTable will corresponds to a database table named this_is_my_table. 

556 """ 

557 

558 suffix = '' 

559 """ 

560 Set the value to append to the table name.  

561 """ 

562 

563 prefix = '' 

564 """ 

565 Set the value to prepend to the table name.  

566 """ 

567 

568 table_function = make_prefixed_suffixed_name 

569 """ 

570 Set the table naming function. 

571 """ 

572 

573 automatic_creation = True 

574 """ 

575 Whether the table linked to the model should be created automatically 

576 """ 

577 

578 file_trigger_auto_create = False 

579 """ 

580 Whether to automatically create triggers to delete files once a row with a FilenameField is removed 

581 """