Source code for mafw.db.db_connection

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""
Database connection configuration helpers.

:Author: Bulgheroni Antonio
:Description: Normalize database steering configuration into peewee connection arguments.
"""

from __future__ import annotations

import os
import warnings
from typing import Any, Mapping

from mafw.mafw_errors import SteeringFileDeprecation
from mafw.tools.regexp import extract_protocol

VALID_AUTH_METHODS: set[str] = {'env', 'inline', 'file'}
"""Supported authentication methods for ``DBConfiguration.authentication``."""

LEGACY_KEYS: set[str] = {'URL', 'pragmas'}
"""Keys reserved by the legacy DBConfiguration schema."""


[docs] def build_connection_parameters(config: Mapping[str, Any]) -> tuple[str, dict[str, Any]]: """Normalize DBConfiguration mappings into peewee connection arguments. The input can either be the full steering configuration dictionary or the nested ``DBConfiguration`` table. The returned tuple provides the URL and the keyword arguments for ``peewee.connect``. :param config: Steering configuration or DBConfiguration table. :type config: Mapping[str, Any] :return: Tuple with database URL and connection keyword arguments. :rtype: tuple[str, dict[str, Any]] :raises ValueError: When required fields are missing or invalid. """ conf = dict(config) if 'DBConfiguration' in conf: conf = dict(conf['DBConfiguration']) url = conf.get('URL') if not url: raise ValueError('DBConfiguration must define a URL field.') protocol = extract_protocol(str(url)) or '' is_new_style = 'authentication' in conf or 'parameters' in conf if not is_new_style: warnings.warn( 'Legacy DBConfiguration schema detected. Please update to the new authentication/parameters layout.', SteeringFileDeprecation, stacklevel=2, ) return str(url), _legacy_connection_parameters(protocol, conf) connection_parameters: dict[str, Any] = {} _apply_backend_parameters(protocol, conf, connection_parameters) _apply_authentication(protocol, conf.get('authentication'), connection_parameters) return str(url), connection_parameters
def _legacy_connection_parameters(protocol: str, conf: Mapping[str, Any]) -> dict[str, Any]: connection_parameters: dict[str, Any] = {} if protocol == 'sqlite': connection_parameters['pragmas'] = conf.get('pragmas', {}) for key, value in conf.items(): if key in LEGACY_KEYS: continue connection_parameters[key] = value return connection_parameters def _apply_backend_parameters(protocol: str, conf: Mapping[str, Any], target: dict[str, Any]) -> None: parameters = conf.get('parameters') if not isinstance(parameters, Mapping): return backend_params = parameters.get(protocol) if not isinstance(backend_params, Mapping): return backend_dict = dict(backend_params) if protocol == 'sqlite': pragmas = backend_dict.pop('pragmas', None) if isinstance(pragmas, Mapping): target['pragmas'] = dict(pragmas) for key, value in backend_dict.items(): target[key] = value def _apply_authentication(protocol: str, auth: Any, target: dict[str, Any]) -> None: if protocol == 'sqlite': return if auth is None or not isinstance(auth, Mapping): raise ValueError('DBConfiguration.authentication must be defined for server-based databases.') method = auth.get('method') if method not in VALID_AUTH_METHODS: raise ValueError(f'Unsupported authentication method: {method!r}.') if method == 'env': _apply_env_authentication(auth, target) return if method == 'inline': warnings.warn( 'Inline database credentials are insecure. Prefer environment-based authentication.', UserWarning, stacklevel=2, ) _apply_inline_authentication(auth, target) return if method == 'file': _apply_file_authentication(protocol, auth, target) return def _apply_env_authentication(auth: Mapping[str, Any], target: dict[str, Any]) -> None: if 'username' in auth and auth['username'] is not None: env_name = str(auth['username']) value = os.getenv(env_name) if value is None: raise ValueError(f'Environment variable {env_name!r} for DB username is not set.') target['user'] = value if 'password' in auth and auth['password'] is not None: env_name = str(auth['password']) value = os.getenv(env_name) if value is None: raise ValueError(f'Environment variable {env_name!r} for DB password is not set.') target['password'] = value def _apply_inline_authentication(auth: Mapping[str, Any], target: dict[str, Any]) -> None: if 'username' in auth and auth['username'] is not None: target['user'] = auth['username'] if 'password' in auth and auth['password'] is not None: target['password'] = auth['password'] def _apply_file_authentication(protocol: str, auth: Mapping[str, Any], target: dict[str, Any]) -> None: if 'username' in auth and auth['username'] is not None: target['user'] = auth['username'] passfile = auth.get('passfile') if passfile is None: return if protocol == 'mysql': target['read_default_file'] = passfile elif protocol == 'postgresql': target['passfile'] = passfile else: raise ValueError(f'Authentication method "file" is not supported for {protocol!r}.')