Coverage for src / mafw / db / db_connection.py: 100%
96 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 09:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 09:03 +0000
1# Copyright 2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Database connection configuration helpers.
7:Author: Bulgheroni Antonio
8:Description: Normalize database steering configuration into peewee connection arguments.
9"""
11from __future__ import annotations
13import os
14import warnings
15from typing import Any, Mapping
17from mafw.mafw_errors import SteeringFileDeprecation
18from mafw.tools.regexp import extract_protocol
20VALID_AUTH_METHODS: set[str] = {'env', 'inline', 'file'}
21"""Supported authentication methods for ``DBConfiguration.authentication``."""
23LEGACY_KEYS: set[str] = {'URL', 'pragmas'}
24"""Keys reserved by the legacy DBConfiguration schema."""
27def build_connection_parameters(config: Mapping[str, Any]) -> tuple[str, dict[str, Any]]:
28 """Normalize DBConfiguration mappings into peewee connection arguments.
30 The input can either be the full steering configuration dictionary or the
31 nested ``DBConfiguration`` table. The returned tuple provides the URL and
32 the keyword arguments for ``peewee.connect``.
34 :param config: Steering configuration or DBConfiguration table.
35 :type config: Mapping[str, Any]
36 :return: Tuple with database URL and connection keyword arguments.
37 :rtype: tuple[str, dict[str, Any]]
38 :raises ValueError: When required fields are missing or invalid.
39 """
41 conf = dict(config)
42 if 'DBConfiguration' in conf:
43 conf = dict(conf['DBConfiguration'])
45 url = conf.get('URL')
46 if not url:
47 raise ValueError('DBConfiguration must define a URL field.')
49 protocol = extract_protocol(str(url)) or ''
50 is_new_style = 'authentication' in conf or 'parameters' in conf
52 if not is_new_style:
53 warnings.warn(
54 'Legacy DBConfiguration schema detected. Please update to the new authentication/parameters layout.',
55 SteeringFileDeprecation,
56 stacklevel=2,
57 )
58 return str(url), _legacy_connection_parameters(protocol, conf)
60 connection_parameters: dict[str, Any] = {}
61 _apply_backend_parameters(protocol, conf, connection_parameters)
62 _apply_authentication(protocol, conf.get('authentication'), connection_parameters)
64 return str(url), connection_parameters
67def _legacy_connection_parameters(protocol: str, conf: Mapping[str, Any]) -> dict[str, Any]:
68 connection_parameters: dict[str, Any] = {}
69 if protocol == 'sqlite':
70 connection_parameters['pragmas'] = conf.get('pragmas', {})
71 for key, value in conf.items():
72 if key in LEGACY_KEYS:
73 continue
74 connection_parameters[key] = value
75 return connection_parameters
78def _apply_backend_parameters(protocol: str, conf: Mapping[str, Any], target: dict[str, Any]) -> None:
79 parameters = conf.get('parameters')
80 if not isinstance(parameters, Mapping):
81 return
83 backend_params = parameters.get(protocol)
84 if not isinstance(backend_params, Mapping):
85 return
87 backend_dict = dict(backend_params)
88 if protocol == 'sqlite':
89 pragmas = backend_dict.pop('pragmas', None)
90 if isinstance(pragmas, Mapping):
91 target['pragmas'] = dict(pragmas)
92 for key, value in backend_dict.items():
93 target[key] = value
96def _apply_authentication(protocol: str, auth: Any, target: dict[str, Any]) -> None:
97 if protocol == 'sqlite':
98 return
100 if auth is None or not isinstance(auth, Mapping):
101 raise ValueError('DBConfiguration.authentication must be defined for server-based databases.')
103 method = auth.get('method')
104 if method not in VALID_AUTH_METHODS:
105 raise ValueError(f'Unsupported authentication method: {method!r}.')
107 if method == 'env':
108 _apply_env_authentication(auth, target)
109 return
111 if method == 'inline':
112 warnings.warn(
113 'Inline database credentials are insecure. Prefer environment-based authentication.',
114 UserWarning,
115 stacklevel=2,
116 )
117 _apply_inline_authentication(auth, target)
118 return
120 if method == 'file':
121 _apply_file_authentication(protocol, auth, target)
122 return
125def _apply_env_authentication(auth: Mapping[str, Any], target: dict[str, Any]) -> None:
126 if 'username' in auth and auth['username'] is not None:
127 env_name = str(auth['username'])
128 value = os.getenv(env_name)
129 if value is None:
130 raise ValueError(f'Environment variable {env_name!r} for DB username is not set.')
131 target['user'] = value
132 if 'password' in auth and auth['password'] is not None:
133 env_name = str(auth['password'])
134 value = os.getenv(env_name)
135 if value is None:
136 raise ValueError(f'Environment variable {env_name!r} for DB password is not set.')
137 target['password'] = value
140def _apply_inline_authentication(auth: Mapping[str, Any], target: dict[str, Any]) -> None:
141 if 'username' in auth and auth['username'] is not None:
142 target['user'] = auth['username']
143 if 'password' in auth and auth['password'] is not None:
144 target['password'] = auth['password']
147def _apply_file_authentication(protocol: str, auth: Mapping[str, Any], target: dict[str, Any]) -> None:
148 if 'username' in auth and auth['username'] is not None:
149 target['user'] = auth['username']
151 passfile = auth.get('passfile')
152 if passfile is None:
153 return
155 if protocol == 'mysql':
156 target['read_default_file'] = passfile
157 elif protocol == 'postgresql':
158 target['passfile'] = passfile
159 else:
160 raise ValueError(f'Authentication method "file" is not supported for {protocol!r}.')