Coverage for src / mafw / tools / db_tools.py: 100%

27 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Database Tools 

6 

7This module provides utility functions for working with database models using the Peewee ORM. 

8It offers helper functions for creating key-value mappings, retrieving primary key information, 

9and combining fields for composite keys or display purposes. 

10""" 

11 

12from typing import TYPE_CHECKING, Any, cast 

13 

14from peewee import Alias, CompositeKey, Field, Function, fn 

15 

16from mafw.db.db_model import MAFwBaseModel 

17 

18 

19def make_kv(model: MAFwBaseModel | type[MAFwBaseModel], key: Field, value: Field) -> dict[Any, Any]: 

20 """ 

21 Create a key-value mapping from a database model. 

22 

23 This function selects data from a given model using specified key and value fields, 

24 and returns a dictionary where keys are values from the key field and values are 

25 values from the value field. 

26 

27 :param model: The database model or model class to query. 

28 :type model: MAFwBaseModel | type[MAFwBaseModel] 

29 :param key: The field to use as dictionary keys. 

30 :type key: peewee.Field 

31 :param value: The field to use as dictionary values. 

32 :type value: peewee.Field 

33 :return: A dictionary mapping key field values to value field values. 

34 :rtype: dict[Any, Any] 

35 :raises AttributeError: If the model parameter doesn't have the required methods. 

36 """ 

37 # Validate that model has the required methods 

38 if not hasattr(model, 'select'): 

39 raise AttributeError(f"Model {model} does not have a 'select' method") 

40 

41 lut = {} 

42 for row in model.select(key, value): # type: ignore[no-untyped-call] 

43 lut[getattr(row, key.name)] = getattr(row, value.name) 

44 return lut 

45 

46 

47def get_pk(model: MAFwBaseModel | type[MAFwBaseModel]) -> list[Field]: 

48 """ 

49 Retrieve the primary key fields of a database model. 

50 

51 This function examines the primary key of the provided model and returns 

52 a list of field objects that constitute the primary key. For composite 

53 primary keys, it returns all constituent fields; for simple primary keys, 

54 it returns a list containing just the primary key field. 

55 

56 :param model: The database model or model class to examine. 

57 :type model: MAFwBaseModel | type[MAFwBaseModel] 

58 :return: A list of field objects representing the primary key fields. 

59 :rtype: list[peewee.Field] 

60 """ 

61 if TYPE_CHECKING: 

62 assert hasattr(model, '_meta') 

63 

64 if isinstance(model._meta.primary_key, CompositeKey): 

65 pk_fields = [model._meta.fields[field_name] for field_name in model._meta.primary_key.field_names] 

66 else: 

67 pk_fields = [model._meta.primary_key] 

68 

69 return pk_fields 

70 

71 

72def combine_fields(fields: list[Field], join_str: str = ' x ') -> Function: 

73 """ 

74 Combine multiple database fields into a single concatenated string expression. 

75 

76 This function creates an SQL CONCAT expression that combines multiple field values 

77 into a single string using the specified separator. It's particularly useful for 

78 creating composite keys or display strings from multiple fields. 

79 

80 :param fields: List of field objects to be combined. 

81 :type fields: list[peewee.Field] 

82 :param join_str: String to use as separator between fields. Defaults to ' x '. 

83 :type join_str: str 

84 :return: A SQL CONCAT function expression combining the fields. 

85 :rtype: peewee.Function 

86 """ 

87 # Handle empty list case 

88 if not fields: 

89 return cast(Function, fn.CONCAT()) 

90 

91 # Handle single field case 

92 if len(fields) == 1: 

93 return cast(Function, fn.CONCAT(fields[0])) 

94 

95 interspersed = [val for pair in zip(fields, [join_str] * (len(fields) - 1)) for val in pair] + [fields[-1]] 

96 return cast(Function, fn.CONCAT(*interspersed)) 

97 

98 

99def combine_pk( 

100 model: MAFwBaseModel | type[MAFwBaseModel], alias_name: str = 'combo_pk', join_str: str = ' x ' 

101) -> Alias: 

102 """ 

103 Combine primary key fields of a database model into a single aliased field expression. 

104 

105 This function retrieves the primary key fields from the given model using :func:`get_pk` and combines them into a single field expression. For models with a single primary key field, 

106 it simply aliases that field. For composite primary keys, it uses :func:combine_fields` 

107 to concatenate the fields with the specified separator. 

108 

109 :param model: The database model or model class to examine for primary key fields. 

110 :type model: MAFwBaseModel | type[MAFwBaseModel] 

111 :param alias_name: The alias name to apply to the resulting field expression. Defaults to 'combo_pk'. 

112 :type alias_name: str 

113 :param join_str: String to use as separator between primary key fields when combining. Defaults to ' x '. 

114 :type join_str: str 

115 :return: An aliased field expression representing the combined primary key. 

116 :rtype: peewee.Alias 

117 """ 

118 pk_fields = get_pk(model) 

119 if len(pk_fields) == 1: 

120 return cast(Alias, pk_fields[0].alias(alias_name)) # type: ignore[no-untyped-call] 

121 

122 return cast(Alias, combine_fields(pk_fields, join_str).alias(alias_name)) # type: ignore[no-untyped-call]