Coverage for src / mafw / db / db_connection.py: 100%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 09:03 +0000

1# Copyright 2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Database connection configuration helpers. 

6 

7:Author: Bulgheroni Antonio 

8:Description: Normalize database steering configuration into peewee connection arguments. 

9""" 

10 

11from __future__ import annotations 

12 

13import os 

14import warnings 

15from typing import Any, Mapping 

16 

17from mafw.mafw_errors import SteeringFileDeprecation 

18from mafw.tools.regexp import extract_protocol 

19 

20VALID_AUTH_METHODS: set[str] = {'env', 'inline', 'file'} 

21"""Supported authentication methods for ``DBConfiguration.authentication``.""" 

22 

23LEGACY_KEYS: set[str] = {'URL', 'pragmas'} 

24"""Keys reserved by the legacy DBConfiguration schema.""" 

25 

26 

27def build_connection_parameters(config: Mapping[str, Any]) -> tuple[str, dict[str, Any]]: 

28 """Normalize DBConfiguration mappings into peewee connection arguments. 

29 

30 The input can either be the full steering configuration dictionary or the 

31 nested ``DBConfiguration`` table. The returned tuple provides the URL and 

32 the keyword arguments for ``peewee.connect``. 

33 

34 :param config: Steering configuration or DBConfiguration table. 

35 :type config: Mapping[str, Any] 

36 :return: Tuple with database URL and connection keyword arguments. 

37 :rtype: tuple[str, dict[str, Any]] 

38 :raises ValueError: When required fields are missing or invalid. 

39 """ 

40 

41 conf = dict(config) 

42 if 'DBConfiguration' in conf: 

43 conf = dict(conf['DBConfiguration']) 

44 

45 url = conf.get('URL') 

46 if not url: 

47 raise ValueError('DBConfiguration must define a URL field.') 

48 

49 protocol = extract_protocol(str(url)) or '' 

50 is_new_style = 'authentication' in conf or 'parameters' in conf 

51 

52 if not is_new_style: 

53 warnings.warn( 

54 'Legacy DBConfiguration schema detected. Please update to the new authentication/parameters layout.', 

55 SteeringFileDeprecation, 

56 stacklevel=2, 

57 ) 

58 return str(url), _legacy_connection_parameters(protocol, conf) 

59 

60 connection_parameters: dict[str, Any] = {} 

61 _apply_backend_parameters(protocol, conf, connection_parameters) 

62 _apply_authentication(protocol, conf.get('authentication'), connection_parameters) 

63 

64 return str(url), connection_parameters 

65 

66 

67def _legacy_connection_parameters(protocol: str, conf: Mapping[str, Any]) -> dict[str, Any]: 

68 connection_parameters: dict[str, Any] = {} 

69 if protocol == 'sqlite': 

70 connection_parameters['pragmas'] = conf.get('pragmas', {}) 

71 for key, value in conf.items(): 

72 if key in LEGACY_KEYS: 

73 continue 

74 connection_parameters[key] = value 

75 return connection_parameters 

76 

77 

78def _apply_backend_parameters(protocol: str, conf: Mapping[str, Any], target: dict[str, Any]) -> None: 

79 parameters = conf.get('parameters') 

80 if not isinstance(parameters, Mapping): 

81 return 

82 

83 backend_params = parameters.get(protocol) 

84 if not isinstance(backend_params, Mapping): 

85 return 

86 

87 backend_dict = dict(backend_params) 

88 if protocol == 'sqlite': 

89 pragmas = backend_dict.pop('pragmas', None) 

90 if isinstance(pragmas, Mapping): 

91 target['pragmas'] = dict(pragmas) 

92 for key, value in backend_dict.items(): 

93 target[key] = value 

94 

95 

96def _apply_authentication(protocol: str, auth: Any, target: dict[str, Any]) -> None: 

97 if protocol == 'sqlite': 

98 return 

99 

100 if auth is None or not isinstance(auth, Mapping): 

101 raise ValueError('DBConfiguration.authentication must be defined for server-based databases.') 

102 

103 method = auth.get('method') 

104 if method not in VALID_AUTH_METHODS: 

105 raise ValueError(f'Unsupported authentication method: {method!r}.') 

106 

107 if method == 'env': 

108 _apply_env_authentication(auth, target) 

109 return 

110 

111 if method == 'inline': 

112 warnings.warn( 

113 'Inline database credentials are insecure. Prefer environment-based authentication.', 

114 UserWarning, 

115 stacklevel=2, 

116 ) 

117 _apply_inline_authentication(auth, target) 

118 return 

119 

120 if method == 'file': 

121 _apply_file_authentication(protocol, auth, target) 

122 return 

123 

124 

125def _apply_env_authentication(auth: Mapping[str, Any], target: dict[str, Any]) -> None: 

126 if 'username' in auth and auth['username'] is not None: 

127 env_name = str(auth['username']) 

128 value = os.getenv(env_name) 

129 if value is None: 

130 raise ValueError(f'Environment variable {env_name!r} for DB username is not set.') 

131 target['user'] = value 

132 if 'password' in auth and auth['password'] is not None: 

133 env_name = str(auth['password']) 

134 value = os.getenv(env_name) 

135 if value is None: 

136 raise ValueError(f'Environment variable {env_name!r} for DB password is not set.') 

137 target['password'] = value 

138 

139 

140def _apply_inline_authentication(auth: Mapping[str, Any], target: dict[str, Any]) -> None: 

141 if 'username' in auth and auth['username'] is not None: 

142 target['user'] = auth['username'] 

143 if 'password' in auth and auth['password'] is not None: 

144 target['password'] = auth['password'] 

145 

146 

147def _apply_file_authentication(protocol: str, auth: Mapping[str, Any], target: dict[str, Any]) -> None: 

148 if 'username' in auth and auth['username'] is not None: 

149 target['user'] = auth['username'] 

150 

151 passfile = auth.get('passfile') 

152 if passfile is None: 

153 return 

154 

155 if protocol == 'mysql': 

156 target['read_default_file'] = passfile 

157 elif protocol == 'postgresql': 

158 target['passfile'] = passfile 

159 else: 

160 raise ValueError(f'Authentication method "file" is not supported for {protocol!r}.')