Source code for mafw.steering_gui.views.database_editor

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""Expose the database editor UI for steering file configuration.

:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
:Description: Provide a widget to edit database backend, transport, authentication,
              and backend parameters without touching the builder.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any, Mapping, cast
from urllib.parse import urlparse

from PySide6.QtCore import Signal
from PySide6.QtGui import QStandardItem, QStandardItemModel
from PySide6.QtWidgets import (
    QComboBox,
    QFileDialog,
    QFormLayout,
    QGridLayout,
    QGroupBox,
    QHBoxLayout,
    QHeaderView,
    QLabel,
    QLineEdit,
    QPushButton,
    QSpinBox,
    QStackedWidget,
    QTableView,
    QToolButton,
    QVBoxLayout,
    QWidget,
)

from mafw.db.db_configurations import DEFAULT_SQLITE_PRAGMAS, db_scheme
from mafw.steering_gui.utils import block_signals
from mafw.tools.regexp import extract_protocol

DEFAULT_BACKEND = 'sqlite'
"""Default backend selection for the database editor."""

DEFAULT_URLS: dict[str, str] = {backend: url for backend, url in db_scheme.items()}
"""Mapping of backend identifiers to URL prefixes."""

DEFAULT_PRAGMAS: dict[str, Any] = dict(cast(Mapping[str, Any], DEFAULT_SQLITE_PRAGMAS))
"""Default SQLite pragma values used for parameter seeding."""

DEFAULT_SERVER_PORTS: dict[str, int] = {'mysql': 3306, 'postgresql': 5432}
"""Default service ports used by server backends."""

DEFAULT_URL_PLACEHOLDERS: dict[str, str] = {
    'mysql': 'mysql://localhost:3306/mydb',
    'postgresql': 'postgresql://localhost:5432/mydb',
    'sqlite': 'sqlite:////path/to/db',
}
"""Placeholder URL examples for each backend."""


[docs] class TransportWidget(QWidget): """Widget exposing the database transport layer (URL).""" url_changed = Signal(str) validity_changed = Signal(bool) def __init__(self, parent: QWidget | None = None) -> None: super().__init__(parent) self._backend = DEFAULT_BACKEND self._url_valid = False self._group_box = QGroupBox('Transport') self._stack = QStackedWidget() self._server_widget = QWidget() self._sqlite_widget = QWidget() self._build_server_widget() self._build_sqlite_widget() self._stack.addWidget(self._server_widget) self._stack.addWidget(self._sqlite_widget) layout = QVBoxLayout(self) layout.addWidget(self._group_box) group_layout = QVBoxLayout() group_layout.addWidget(self._stack) self._group_box.setLayout(group_layout) self.set_backend(DEFAULT_BACKEND) @property def url_edit(self) -> QLineEdit: """Expose the active URL edit widget for external tests.""" if self._stack.currentWidget() is self._sqlite_widget: return self._sqlite_url_edit return self._server_url_edit @property def sqlite_open_button(self) -> QToolButton: """Expose the SQLite open button for tests.""" return self._sqlite_open_button @property def sqlite_memory_button(self) -> QToolButton: """Expose the SQLite memory button for tests.""" return self._sqlite_memory_button
[docs] def set_backend(self, backend: str) -> None: """Switch the transport view according to the backend.""" self._backend = backend is_sqlite = backend == 'sqlite' self._stack.setCurrentWidget(self._sqlite_widget if is_sqlite else self._server_widget) self._sqlite_widget.setEnabled(is_sqlite) self._server_widget.setEnabled(not is_sqlite) self._server_url_edit.setPlaceholderText(DEFAULT_URL_PLACEHOLDERS.get(backend, '')) self._sqlite_url_edit.setPlaceholderText(DEFAULT_URL_PLACEHOLDERS.get('sqlite', '')) self._update_server_defaults() self._update_validity()
[docs] def set_url(self, url: str) -> None: """Update the URL field without emitting signals.""" widgets = (self._server_url_edit, self._sqlite_url_edit) with block_signals(*widgets): self._server_url_edit.setText(url) self._sqlite_url_edit.setText(url) self._sync_from_url(url)
[docs] def get_url(self) -> str: """Return the current URL string.""" return self.url_edit.text().strip()
[docs] def is_valid(self) -> bool: """Return whether the URL can be parsed.""" return self._url_valid
def _build_server_widget(self) -> None: self._server_url_edit = QLineEdit() self._server_url_edit.setReadOnly(True) self._server_url_edit.setToolTip('This field is read-only, change the fields below to update it.') self._server_host_edit = QLineEdit() self._server_port_spin = QSpinBox() self._server_port_spin.setRange(1, 65535) self._server_db_edit = QLineEdit() grid = QGridLayout() grid.addWidget(QLabel('URL'), 0, 0) grid.addWidget(self._server_url_edit, 0, 1, 1, 3) grid.addWidget(QLabel('Host'), 1, 0) grid.addWidget(self._server_host_edit, 1, 1) grid.addWidget(QLabel('Port'), 1, 2) grid.addWidget(self._server_port_spin, 1, 3) grid.addWidget(QLabel('Database'), 2, 0) grid.addWidget(self._server_db_edit, 2, 1, 1, 3) self._server_widget.setLayout(grid) self._server_host_edit.textChanged.connect(self._on_host_fields_changed) self._server_port_spin.valueChanged.connect(self._on_host_fields_changed) self._server_db_edit.textChanged.connect(self._on_host_fields_changed) def _build_sqlite_widget(self) -> None: self._sqlite_url_edit = QLineEdit() self._sqlite_url_edit.setReadOnly(True) self._sqlite_url_edit.setToolTip('This field is read-only, change the path below to update it') self._sqlite_path_edit = QLineEdit() self._sqlite_memory_button = QToolButton() self._sqlite_memory_button.setText('Memory') self._sqlite_open_button = QToolButton() self._sqlite_open_button.setText('Open') self._sqlite_create_button = QToolButton() self._sqlite_create_button.setText('Create') grid = QGridLayout() grid.addWidget(QLabel('URL'), 0, 0) grid.addWidget(self._sqlite_url_edit, 0, 1, 1, 3) grid.addWidget(QLabel('Path'), 1, 0) grid.addWidget(self._sqlite_path_edit, 1, 1, 1, 3) grid.addWidget(self._sqlite_memory_button, 2, 0) grid.addWidget(self._sqlite_open_button, 2, 1) grid.addWidget(self._sqlite_create_button, 2, 2) self._sqlite_widget.setLayout(grid) self._sqlite_path_edit.textChanged.connect(self._on_path_changed) self._sqlite_memory_button.clicked.connect(self._on_memory_clicked) self._sqlite_open_button.clicked.connect(self._on_open_clicked) self._sqlite_create_button.clicked.connect(self._on_create_clicked) def _update_server_defaults(self) -> None: default_port = DEFAULT_SERVER_PORTS.get(self._backend) if default_port is not None: self._server_port_spin.setValue(default_port) def _on_host_fields_changed(self) -> None: host = self._server_host_edit.text().strip() port = self._server_port_spin.value() database = self._server_db_edit.text().strip() if host and database and port > 0: url = f'{self._backend}://{host}:{port}/{database}' else: url = '' with block_signals(self._server_url_edit, self._sqlite_url_edit): self._server_url_edit.setText(url) self._sqlite_url_edit.setText(url) self._update_validity() self.url_changed.emit(url) def _on_path_changed(self) -> None: path = self._sqlite_path_edit.text().strip() if path: url = f'sqlite:///{path}' else: url = '' with block_signals(self._sqlite_url_edit, self._server_url_edit): self._sqlite_url_edit.setText(url) self._server_url_edit.setText(url) self._update_validity() self.url_changed.emit(url) def _on_memory_clicked(self) -> None: url = 'sqlite:///:memory:' with block_signals(self._sqlite_url_edit, self._server_url_edit, self._sqlite_path_edit): self._sqlite_url_edit.setText(url) self._server_url_edit.setText(url) self._sqlite_path_edit.clear() self._sqlite_path_edit.setPlaceholderText('in memory DB') self._update_validity() self.url_changed.emit(url) def _on_open_clicked(self) -> None: path, _ = QFileDialog.getOpenFileName( self, 'Select SQLite database file', filter='SQLite Database (*.db);;All Files (*.*)', ) if not path: return sqlite_path = Path(path).resolve() with block_signals(self._sqlite_path_edit): self._sqlite_path_edit.setText(sqlite_path.as_posix()) self._on_path_changed() def _on_create_clicked(self) -> None: path, _ = QFileDialog.getSaveFileName( self, 'Create SQLite database file', filter='SQLite Database (*.db);;All Files (*.*)', ) if not path: return sqlite_path = Path(path).resolve() with block_signals(self._sqlite_path_edit): self._sqlite_path_edit.setText(sqlite_path.as_posix()) self._on_path_changed() def _sync_from_url(self, url: str) -> None: parsed = self._parse_url(url) if parsed is None: with block_signals( self._server_host_edit, self._server_port_spin, self._server_db_edit, self._sqlite_path_edit ): self._server_host_edit.clear() self._server_port_spin.setValue(DEFAULT_SERVER_PORTS.get(self._backend, 0)) self._server_db_edit.clear() self._sqlite_path_edit.clear() self._update_validity() return protocol = parsed.get('protocol') or '' if protocol == 'sqlite': database = parsed.get('database') or '' with block_signals(self._sqlite_path_edit): if url == 'sqlite:///:memory:': self._sqlite_path_edit.clear() self._sqlite_path_edit.setPlaceholderText('in memory DB') else: self._sqlite_path_edit.setPlaceholderText('') self._sqlite_path_edit.setText(str(database)) else: host = parsed.get('host') or parsed.get('hostname') or '' port = parsed.get('port') or 0 database = parsed.get('database') or '' with block_signals(self._server_host_edit, self._server_port_spin, self._server_db_edit): if host: self._server_host_edit.setText(str(host)) if port: self._server_port_spin.setValue(int(port)) if database: self._server_db_edit.setText(str(database)) self._update_validity() def _parse_url(self, url: str) -> dict[str, Any] | None: if not url: return None try: from playhouse.db_url import parse except ImportError: parsed = urlparse(url) if not parsed.scheme: return None payload: dict[str, Any] = {'protocol': parsed.scheme} if parsed.scheme == 'sqlite': if url == 'sqlite:///:memory:': payload['database'] = ':memory:' else: payload['database'] = parsed.path.lstrip('/') if parsed.path else '' else: payload['host'] = parsed.hostname or '' payload['port'] = parsed.port or 0 payload['database'] = parsed.path.lstrip('/') if parsed.path else '' return payload try: return dict(parse(url)) # type: ignore[no-untyped-call] except Exception: return None def _update_validity(self) -> None: url = self.get_url() if self._stack.currentWidget() is self._server_widget: host = bool(self._server_host_edit.text().strip()) database = bool(self._server_db_edit.text().strip()) valid = host and database and self._server_port_spin.value() > 0 else: path = bool(self._sqlite_path_edit.text().strip()) valid = path or url == 'sqlite:///:memory:' if self._url_valid == valid: return self._url_valid = valid self.validity_changed.emit(valid)
[docs] class AuthenticationWidget(QWidget): """Widget exposing authentication configuration for server databases.""" authentication_changed = Signal(dict) def __init__(self, parent: QWidget | None = None) -> None: super().__init__(parent) self._backend = DEFAULT_BACKEND self._group_box = QGroupBox('Authentication') self._method_combo = QComboBox() self._method_combo.addItem('Inline', 'inline') self._method_combo.addItem('Environment variables', 'env') self._method_combo.addItem('Password file', 'file') self._username_edit = QLineEdit() self._password_edit = QLineEdit() self._password_edit.setEchoMode(QLineEdit.EchoMode.Password) self._file_edit = QLineEdit() self._file_browse = QToolButton() self._file_browse.setText('Browse') layout = QVBoxLayout(self) layout.addWidget(self._group_box) group_layout = QFormLayout() group_layout.addRow('Method', self._method_combo) group_layout.addRow('Username', self._username_edit) group_layout.addRow('Password', self._password_edit) file_row = QHBoxLayout() file_row.addWidget(self._file_edit) file_row.addWidget(self._file_browse) group_layout.addRow('Password file', file_row) self._group_box.setLayout(group_layout) self._method_combo.currentIndexChanged.connect(self._update_method_state) self._username_edit.textChanged.connect(self._emit_changed) self._password_edit.textChanged.connect(self._emit_changed) self._file_edit.textChanged.connect(self._emit_changed) self._file_browse.clicked.connect(self._on_browse_clicked) self._update_method_state() @property def method_combo(self) -> QComboBox: """Expose method combo for tests.""" return self._method_combo @property def username_edit(self) -> QLineEdit: """Expose username edit for tests.""" return self._username_edit @property def password_edit(self) -> QLineEdit: """Expose password edit for tests.""" return self._password_edit @property def file_browse_button(self) -> QToolButton: """Expose browse button for tests.""" return self._file_browse @property def file_edit(self) -> QLineEdit: """Expose password file edit for tests.""" return self._file_edit
[docs] def set_backend(self, backend: str) -> None: """Enable or disable auth inputs based on backend.""" self._backend = backend enabled = backend != 'sqlite' self._group_box.setEnabled(enabled) self._update_method_state()
[docs] def set_data(self, auth: Mapping[str, Any] | None) -> None: """Populate widget values from the auth mapping.""" method = 'inline' username = '' password = '' passfile = '' if auth: method = str(auth.get('method') or method) username = str(auth.get('username') or '') password = str(auth.get('password') or '') passfile = str(auth.get('passfile') or '') widgets = (self._method_combo, self._username_edit, self._password_edit, self._file_edit) with block_signals(*widgets): index = self._method_combo.findData(method) if index >= 0: self._method_combo.setCurrentIndex(index) self._username_edit.setText(username) self._password_edit.setText(password) self._file_edit.setText(passfile) self._update_method_state()
[docs] def get_data(self) -> dict[str, Any]: """Return the authentication mapping for the current method.""" if self._backend == 'sqlite': return {} method = self._method_combo.currentData() username = _normalize_text(self._username_edit.text()) password = _normalize_text(self._password_edit.text()) passfile = _normalize_text(self._file_edit.text()) payload: dict[str, Any] = {'method': method} if method in {'inline', 'env'}: if username is not None: payload['username'] = username if password is not None: payload['password'] = password elif method == 'file': if username is not None: payload['username'] = username if passfile is not None: payload['passfile'] = passfile return payload
[docs] def has_required_fields(self) -> bool: """Return whether required fields for the selected method are populated.""" if self._backend == 'sqlite': return True method = self._method_combo.currentData() if method in {'inline', 'env'}: return bool(_normalize_text(self._username_edit.text())) and bool( _normalize_text(self._password_edit.text()) ) if method == 'file': return bool(_normalize_text(self._username_edit.text())) and bool(_normalize_text(self._file_edit.text())) return False
def _update_method_state(self) -> None: method = self._method_combo.currentData() is_inline = method == 'inline' is_env = method == 'env' is_file = method == 'file' self._username_edit.setEnabled(self._backend != 'sqlite') self._password_edit.setEnabled(self._backend != 'sqlite' and (is_inline or is_env)) self._file_edit.setEnabled(self._backend != 'sqlite' and is_file) self._file_browse.setEnabled(self._backend != 'sqlite' and is_file) if is_inline: self._username_edit.setPlaceholderText('insert username') self._password_edit.setPlaceholderText('insert password') elif is_env: self._username_edit.setPlaceholderText('variable pointing to username') self._password_edit.setPlaceholderText('variable pointing to password') elif is_file: self._username_edit.setPlaceholderText('insert username') self._password_edit.setPlaceholderText('') self._file_edit.setPlaceholderText('') self._emit_changed() def _on_browse_clicked(self) -> None: path, _ = QFileDialog.getOpenFileName(self, 'Select password file') if not path: return self._file_edit.setText(str(Path(path))) def _emit_changed(self) -> None: self.authentication_changed.emit(self.get_data())
[docs] class ParametersWidget(QWidget): """Widget that manages backend-specific parameters.""" parameters_changed = Signal(dict) def __init__(self, parent: QWidget | None = None) -> None: super().__init__(parent) self._backend = DEFAULT_BACKEND self._backend_tables: dict[str, dict[str, Any]] = {'sqlite': dict(DEFAULT_PRAGMAS)} self._group_box = QGroupBox('Parameters') self._model = QStandardItemModel(0, 2, self) self._model.setHorizontalHeaderLabels(['Name', 'Value']) self._table = QTableView() self._table.setModel(self._model) self._table.horizontalHeader().setStretchLastSection(True) self._table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch) self._table.setSelectionBehavior(QTableView.SelectionBehavior.SelectRows) self._table.setSelectionMode(QTableView.SelectionMode.ExtendedSelection) self._table.verticalHeader().setVisible(False) self._add_button = QToolButton() self._add_button.setText('Add') self._remove_button = QToolButton() self._remove_button.setText('Remove') layout = QVBoxLayout(self) layout.addWidget(self._group_box) group_layout = QVBoxLayout() group_layout.addWidget(self._table) button_row = QHBoxLayout() button_row.addWidget(self._add_button) button_row.addWidget(self._remove_button) button_row.addStretch() group_layout.addLayout(button_row) self._group_box.setLayout(group_layout) self._model.itemChanged.connect(self._emit_changed) self._model.rowsInserted.connect(self._emit_changed) self._model.rowsRemoved.connect(self._emit_changed) self._add_button.clicked.connect(self._on_add) self._remove_button.clicked.connect(self._on_remove) self.set_backend(DEFAULT_BACKEND) @property def table(self) -> QTableView: """Expose the parameter table for tests.""" return self._table
[docs] def set_backend(self, backend: str) -> None: """Switch the visible parameter table to the given backend.""" if self._model.rowCount(): self._store_current_table() self._backend = backend if backend not in self._backend_tables: if backend == 'sqlite': self._backend_tables[backend] = dict(DEFAULT_PRAGMAS) else: self._backend_tables[backend] = {} self._load_table(self._backend_tables[backend])
[docs] def set_data(self, parameters: Mapping[str, Mapping[str, Any]] | None, backend: str) -> None: """Populate the widget with the provided backend parameter mapping.""" self._backend_tables.clear() if parameters: for key, value in parameters.items(): if not isinstance(value, Mapping): self._backend_tables[key] = {} continue if key == 'sqlite': pragmas = value.get('pragmas') if isinstance(pragmas, Mapping): self._backend_tables[key] = dict(pragmas) else: self._backend_tables[key] = dict(DEFAULT_PRAGMAS) else: self._backend_tables[key] = dict(value) self.set_backend(backend)
[docs] def get_data(self) -> dict[str, dict[str, Any]]: """Return the full backend parameter mapping.""" self._store_current_table() result: dict[str, dict[str, Any]] = {} for backend, table_data in self._backend_tables.items(): if backend == 'sqlite': result[backend] = {'pragmas': dict(table_data)} elif table_data: result[backend] = dict(table_data) return result
[docs] def current_table_values(self) -> dict[str, Any]: """Return the current backend table mapping.""" self._store_current_table() return dict(self._backend_tables.get(self._backend, {}))
def _store_current_table(self) -> None: self._backend_tables[self._backend] = self._table_to_dict() def _load_table(self, values: Mapping[str, Any]) -> None: # with block_signals(self._model): self._model.removeRows(0, self._model.rowCount()) for key, value in values.items(): key_item = QStandardItem(str(key)) value_item = QStandardItem(str(value)) key_item.setEditable(True) value_item.setEditable(True) self._model.appendRow([key_item, value_item]) def _table_to_dict(self) -> dict[str, Any]: result: dict[str, Any] = {} for row in range(self._model.rowCount()): key_item = self._model.item(row, 0) value_item = self._model.item(row, 1) if key_item is None: continue key_text = key_item.text().strip() if not key_text: continue value_text = value_item.text().strip() if value_item else '' result[key_text] = _normalize_table_value(value_text) return result def _on_add(self) -> None: row = self._model.rowCount() self._model.insertRow(row, [QStandardItem(''), QStandardItem('')]) self._emit_changed() def _on_remove(self) -> None: selection = self._table.selectionModel().selectedRows() rows = sorted({index.row() for index in selection}, reverse=True) if not rows and self._model.rowCount(): rows = [self._model.rowCount() - 1] for row in rows: self._model.removeRow(row) if rows: self._emit_changed() def _emit_changed(self, *args: Any) -> None: self.parameters_changed.emit(self.get_data())
[docs] class DatabaseEditor(QWidget): """Widget that exposes database backend selection and layered configuration.""" configuration_changed = Signal(dict) test_connection_requested = Signal(dict) def __init__(self, parent: QWidget | None = None) -> None: super().__init__(parent) self._current_backend = DEFAULT_BACKEND self.backend_combo = QComboBox() self.backend_combo.addItems(list(DEFAULT_URLS.keys())) self.backend_combo.setCurrentText(DEFAULT_BACKEND) self.transport_widget = TransportWidget() self.authentication_widget = AuthenticationWidget() self.parameters_widget = ParametersWidget() self.test_connection_button = QPushButton('Test connection') self._setup_layout() self._wire_signals() self._apply_backend(DEFAULT_BACKEND)
[docs] def set_data(self, config: dict[str, Any] | None) -> None: """Populate the UI from a configuration dictionary without emitting signals.""" normalized = self._normalize_config(config) backend = normalized['backend'] widgets = ( self.backend_combo, self._group_box, self.transport_widget, self.authentication_widget, self.parameters_widget, self.test_connection_button, ) with block_signals(*widgets): self.backend_combo.setCurrentText(backend) self._group_box.setChecked(normalized['enabled']) self.transport_widget.set_backend(backend) self.transport_widget.set_url(normalized['url']) self.authentication_widget.set_backend(backend) self.authentication_widget.set_data(normalized['authentication']) self.parameters_widget.set_data(normalized['parameters'], backend) self._apply_backend(backend) self._update_test_connection_state()
[docs] def get_data(self) -> dict[str, Any]: """Return a normalized view of the current database configuration.""" parameters = self.parameters_widget.get_data() pragmas = parameters.get('sqlite', {}).get('pragmas') return { 'backend': self._current_backend, 'url': _normalize_text(self.transport_widget.get_url()), 'authentication': self.authentication_widget.get_data(), 'parameters': parameters, 'pragmas': pragmas, 'enabled': self._group_box.isChecked(), }
[docs] def is_enabled(self) -> bool: """Return whether the database configuration group is active.""" return self._group_box.isChecked()
def _setup_layout(self) -> None: main_layout = QVBoxLayout(self) # the group box is the enable / disable check box for the whole # database editor. self._group_box = QGroupBox('Database configuration') self._group_box.setCheckable(True) self._group_box.setChecked(True) group_layout = QVBoxLayout() self._group_box.setLayout(group_layout) main_layout.addWidget(self._group_box) backend_layout = QFormLayout() backend_layout.addRow('Backend:', self.backend_combo) group_layout.addLayout(backend_layout) group_layout.addWidget(self.transport_widget) # Authentication and Parameters side-by-side h_layout = QHBoxLayout() h_layout.addWidget(self.authentication_widget) h_layout.addWidget(self.parameters_widget) group_layout.addLayout(h_layout) group_layout.addWidget(self.test_connection_button) def _wire_signals(self) -> None: self.backend_combo.currentTextChanged.connect(self._on_backend_changed) self.transport_widget.url_changed.connect(self._emit_configuration_changed) self.transport_widget.validity_changed.connect(self._update_test_connection_state) self.authentication_widget.authentication_changed.connect(self._emit_configuration_changed) self.parameters_widget.parameters_changed.connect(self._emit_configuration_changed) self.test_connection_button.clicked.connect(self._on_test_connection_clicked) self._group_box.toggled.connect(self._on_groupbox_toggled)
[docs] def _normalize_config(self, config: dict[str, Any] | None) -> dict[str, Any]: """ Normalize the configuration dictionary to ensure consistent backend, URL, authentication, and parameters. This method processes the input configuration dictionary to extract and normalize values for backend type, URL, authentication details, and backend-specific parameters, ensuring defaults are applied where necessary. :param config: A dictionary containing configuration details such as backend, URL, authentication, and parameters. If None, defaults are applied. :return: A dictionary with normalized configuration values including backend, URL, enabled status, authentication, parameters, and pragmas. """ backend = DEFAULT_BACKEND url_value = '' enabled = True authentication: dict[str, Any] = {} parameters: dict[str, dict[str, Any]] = {} pragmas = dict(DEFAULT_PRAGMAS) user = None read_default_file = None if config is not None: url_value = config.get('url') or config.get('URL') or '' backend_value = config.get('backend') backend = self._normalize_backend(backend_value) or self._detect_backend_from_url(url_value) enabled = bool(config.get('enabled', True)) auth_value = config.get('authentication') if isinstance(auth_value, Mapping): authentication = dict(auth_value) parameters_value = config.get('parameters') if isinstance(parameters_value, Mapping): parameters = {key: dict(value) for key, value in parameters_value.items() if isinstance(value, Mapping)} pragma_value = config.get('pragmas') if isinstance(pragma_value, Mapping): pragmas = dict(pragma_value) if parameters.get('sqlite', {}).get('pragmas'): pragmas = dict(parameters['sqlite']['pragmas']) user = config.get('user') read_default_file = config.get('read_default_file') if not authentication: if read_default_file: authentication = {'method': 'file', 'username': user, 'passfile': read_default_file} elif user: authentication = {'method': 'inline', 'username': user} if 'sqlite' not in parameters and pragmas: parameters['sqlite'] = {'pragmas': dict(pragmas)} elif 'sqlite' in parameters and 'pragmas' not in parameters['sqlite'] and pragmas: sqlite_params = dict(parameters['sqlite']) sqlite_params['pragmas'] = dict(pragmas) parameters['sqlite'] = sqlite_params return { 'backend': backend, 'url': url_value, 'enabled': enabled, 'authentication': authentication, 'parameters': parameters, 'pragmas': pragmas, }
def _normalize_backend(self, value: str | None) -> str | None: if not value: return None value = value.lower().strip() return value if value in DEFAULT_URLS else None def _on_backend_changed(self, backend: str) -> None: normalized = self._normalize_backend(backend) or DEFAULT_BACKEND self._apply_backend(normalized) self._emit_configuration_changed() def _on_test_connection_clicked(self) -> None: self.test_connection_requested.emit(self.get_data()) def _apply_backend(self, backend: str) -> None: backend = self._normalize_backend(backend) or DEFAULT_BACKEND self._current_backend = backend active = self._group_box.isChecked() self.backend_combo.setEnabled(active) current_url = self.transport_widget.get_url() detected = self._detect_backend_from_url(current_url) self.transport_widget.set_backend(backend) if current_url and detected != backend: with block_signals(self.transport_widget): self.transport_widget.set_url('') self.authentication_widget.set_backend(backend) self.parameters_widget.set_backend(backend) self.transport_widget.setEnabled(active) self.authentication_widget.setEnabled(active) self.parameters_widget.setEnabled(active) self._update_test_connection_state() def _on_groupbox_toggled(self, enabled: bool) -> None: self._apply_backend(self._current_backend) self._emit_configuration_changed()
[docs] def _detect_backend_from_url(self, url: str | None) -> str: """ Determine the backend type from the given URL. This method analyses the provided URL to identify the corresponding backend type by matching URL prefixes or extracting the protocol. If no match is found, it defaults to the current backend or the default backend. :param url: The URL string to be analysed for backend detection. :return: A string representing the detected backend type. """ if not url: return self._current_backend or DEFAULT_BACKEND candidate = url.strip().lower() for backend, prefix in DEFAULT_URLS.items(): if candidate.startswith(prefix): return backend protocol = extract_protocol(candidate) if protocol in DEFAULT_URLS: return protocol return self._current_backend or DEFAULT_BACKEND
def _update_test_connection_state(self) -> None: if not self._group_box.isChecked(): self.test_connection_button.setEnabled(False) return url_valid = self.transport_widget.is_valid() auth_valid = self.authentication_widget.has_required_fields() self.test_connection_button.setEnabled(url_valid and auth_valid) def _emit_configuration_changed(self, *args: Any) -> None: self._update_test_connection_state() self.configuration_changed.emit(self.get_data())
def _normalize_text(value: str) -> str | None: text = value.strip() return text or None def _normalize_table_value(raw_value: str) -> Any: if not raw_value: return raw_value normalized = raw_value.strip() if not normalized: return '' if normalized.lower() in {'true', 'false'}: return normalized.lower() == 'true' for converter in (int, float): try: return converter(normalized) except ValueError: continue return normalized