Coverage for src / mafw / tools / db_tools.py: 100%
27 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Database Tools
7This module provides utility functions for working with database models using the Peewee ORM.
8It offers helper functions for creating key-value mappings, retrieving primary key information,
9and combining fields for composite keys or display purposes.
10"""
12from typing import TYPE_CHECKING, Any, cast
14from peewee import Alias, CompositeKey, Field, Function, fn
16from mafw.db.db_model import MAFwBaseModel
19def make_kv(model: MAFwBaseModel | type[MAFwBaseModel], key: Field, value: Field) -> dict[Any, Any]:
20 """
21 Create a key-value mapping from a database model.
23 This function selects data from a given model using specified key and value fields,
24 and returns a dictionary where keys are values from the key field and values are
25 values from the value field.
27 :param model: The database model or model class to query.
28 :type model: MAFwBaseModel | type[MAFwBaseModel]
29 :param key: The field to use as dictionary keys.
30 :type key: peewee.Field
31 :param value: The field to use as dictionary values.
32 :type value: peewee.Field
33 :return: A dictionary mapping key field values to value field values.
34 :rtype: dict[Any, Any]
35 :raises AttributeError: If the model parameter doesn't have the required methods.
36 """
37 # Validate that model has the required methods
38 if not hasattr(model, 'select'):
39 raise AttributeError(f"Model {model} does not have a 'select' method")
41 lut = {}
42 for row in model.select(key, value): # type: ignore[no-untyped-call]
43 lut[getattr(row, key.name)] = getattr(row, value.name)
44 return lut
47def get_pk(model: MAFwBaseModel | type[MAFwBaseModel]) -> list[Field]:
48 """
49 Retrieve the primary key fields of a database model.
51 This function examines the primary key of the provided model and returns
52 a list of field objects that constitute the primary key. For composite
53 primary keys, it returns all constituent fields; for simple primary keys,
54 it returns a list containing just the primary key field.
56 :param model: The database model or model class to examine.
57 :type model: MAFwBaseModel | type[MAFwBaseModel]
58 :return: A list of field objects representing the primary key fields.
59 :rtype: list[peewee.Field]
60 """
61 if TYPE_CHECKING:
62 assert hasattr(model, '_meta')
64 if isinstance(model._meta.primary_key, CompositeKey):
65 pk_fields = [model._meta.fields[field_name] for field_name in model._meta.primary_key.field_names]
66 else:
67 pk_fields = [model._meta.primary_key]
69 return pk_fields
72def combine_fields(fields: list[Field], join_str: str = ' x ') -> Function:
73 """
74 Combine multiple database fields into a single concatenated string expression.
76 This function creates an SQL CONCAT expression that combines multiple field values
77 into a single string using the specified separator. It's particularly useful for
78 creating composite keys or display strings from multiple fields.
80 :param fields: List of field objects to be combined.
81 :type fields: list[peewee.Field]
82 :param join_str: String to use as separator between fields. Defaults to ' x '.
83 :type join_str: str
84 :return: A SQL CONCAT function expression combining the fields.
85 :rtype: peewee.Function
86 """
87 # Handle empty list case
88 if not fields:
89 return cast(Function, fn.CONCAT())
91 # Handle single field case
92 if len(fields) == 1:
93 return cast(Function, fn.CONCAT(fields[0]))
95 interspersed = [val for pair in zip(fields, [join_str] * (len(fields) - 1)) for val in pair] + [fields[-1]]
96 return cast(Function, fn.CONCAT(*interspersed))
99def combine_pk(
100 model: MAFwBaseModel | type[MAFwBaseModel], alias_name: str = 'combo_pk', join_str: str = ' x '
101) -> Alias:
102 """
103 Combine primary key fields of a database model into a single aliased field expression.
105 This function retrieves the primary key fields from the given model using :func:`get_pk` and combines them into a single field expression. For models with a single primary key field,
106 it simply aliases that field. For composite primary keys, it uses :func:combine_fields`
107 to concatenate the fields with the specified separator.
109 :param model: The database model or model class to examine for primary key fields.
110 :type model: MAFwBaseModel | type[MAFwBaseModel]
111 :param alias_name: The alias name to apply to the resulting field expression. Defaults to 'combo_pk'.
112 :type alias_name: str
113 :param join_str: String to use as separator between primary key fields when combining. Defaults to ' x '.
114 :type join_str: str
115 :return: An aliased field expression representing the combined primary key.
116 :rtype: peewee.Alias
117 """
118 pk_fields = get_pk(model)
119 if len(pk_fields) == 1:
120 return cast(Alias, pk_fields[0].alias(alias_name)) # type: ignore[no-untyped-call]
122 return cast(Alias, combine_fields(pk_fields, join_str).alias(alias_name)) # type: ignore[no-untyped-call]