# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Database connection configuration helpers.
:Author: Bulgheroni Antonio
:Description: Normalize database steering configuration into peewee connection arguments.
"""
from __future__ import annotations
import os
import warnings
from typing import Any, Mapping
from mafw.mafw_errors import SteeringFileDeprecation
from mafw.tools.regexp import extract_protocol
VALID_AUTH_METHODS: set[str] = {'env', 'inline', 'file'}
"""Supported authentication methods for ``DBConfiguration.authentication``."""
LEGACY_KEYS: set[str] = {'URL', 'pragmas'}
"""Keys reserved by the legacy DBConfiguration schema."""
[docs]
def build_connection_parameters(config: Mapping[str, Any]) -> tuple[str, dict[str, Any]]:
"""Normalize DBConfiguration mappings into peewee connection arguments.
The input can either be the full steering configuration dictionary or the
nested ``DBConfiguration`` table. The returned tuple provides the URL and
the keyword arguments for ``peewee.connect``.
:param config: Steering configuration or DBConfiguration table.
:type config: Mapping[str, Any]
:return: Tuple with database URL and connection keyword arguments.
:rtype: tuple[str, dict[str, Any]]
:raises ValueError: When required fields are missing or invalid.
"""
conf = dict(config)
if 'DBConfiguration' in conf:
conf = dict(conf['DBConfiguration'])
url = conf.get('URL')
if not url:
raise ValueError('DBConfiguration must define a URL field.')
protocol = extract_protocol(str(url)) or ''
is_new_style = 'authentication' in conf or 'parameters' in conf
if not is_new_style:
warnings.warn(
'Legacy DBConfiguration schema detected. Please update to the new authentication/parameters layout.',
SteeringFileDeprecation,
stacklevel=2,
)
return str(url), _legacy_connection_parameters(protocol, conf)
connection_parameters: dict[str, Any] = {}
_apply_backend_parameters(protocol, conf, connection_parameters)
_apply_authentication(protocol, conf.get('authentication'), connection_parameters)
return str(url), connection_parameters
def _legacy_connection_parameters(protocol: str, conf: Mapping[str, Any]) -> dict[str, Any]:
connection_parameters: dict[str, Any] = {}
if protocol == 'sqlite':
connection_parameters['pragmas'] = conf.get('pragmas', {})
for key, value in conf.items():
if key in LEGACY_KEYS:
continue
connection_parameters[key] = value
return connection_parameters
def _apply_backend_parameters(protocol: str, conf: Mapping[str, Any], target: dict[str, Any]) -> None:
parameters = conf.get('parameters')
if not isinstance(parameters, Mapping):
return
backend_params = parameters.get(protocol)
if not isinstance(backend_params, Mapping):
return
backend_dict = dict(backend_params)
if protocol == 'sqlite':
pragmas = backend_dict.pop('pragmas', None)
if isinstance(pragmas, Mapping):
target['pragmas'] = dict(pragmas)
for key, value in backend_dict.items():
target[key] = value
def _apply_authentication(protocol: str, auth: Any, target: dict[str, Any]) -> None:
if protocol == 'sqlite':
return
if auth is None or not isinstance(auth, Mapping):
raise ValueError('DBConfiguration.authentication must be defined for server-based databases.')
method = auth.get('method')
if method not in VALID_AUTH_METHODS:
raise ValueError(f'Unsupported authentication method: {method!r}.')
if method == 'env':
_apply_env_authentication(auth, target)
return
if method == 'inline':
warnings.warn(
'Inline database credentials are insecure. Prefer environment-based authentication.',
UserWarning,
stacklevel=2,
)
_apply_inline_authentication(auth, target)
return
if method == 'file':
_apply_file_authentication(protocol, auth, target)
return
def _apply_env_authentication(auth: Mapping[str, Any], target: dict[str, Any]) -> None:
if 'username' in auth and auth['username'] is not None:
env_name = str(auth['username'])
value = os.getenv(env_name)
if value is None:
raise ValueError(f'Environment variable {env_name!r} for DB username is not set.')
target['user'] = value
if 'password' in auth and auth['password'] is not None:
env_name = str(auth['password'])
value = os.getenv(env_name)
if value is None:
raise ValueError(f'Environment variable {env_name!r} for DB password is not set.')
target['password'] = value
def _apply_inline_authentication(auth: Mapping[str, Any], target: dict[str, Any]) -> None:
if 'username' in auth and auth['username'] is not None:
target['user'] = auth['username']
if 'password' in auth and auth['password'] is not None:
target['password'] = auth['password']
def _apply_file_authentication(protocol: str, auth: Mapping[str, Any], target: dict[str, Any]) -> None:
if 'username' in auth and auth['username'] is not None:
target['user'] = auth['username']
passfile = auth.get('passfile')
if passfile is None:
return
if protocol == 'mysql':
target['read_default_file'] = passfile
elif protocol == 'postgresql':
target['passfile'] = passfile
else:
raise ValueError(f'Authentication method "file" is not supported for {protocol!r}.')