Source code for mafw.steering_gui.views.database_editor
# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""Expose the database editor UI for steering file configuration.
:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
:Description: Provide a widget to edit database backend, transport, authentication,
and backend parameters without touching the builder.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Mapping, cast
from urllib.parse import urlparse
from PySide6.QtCore import Signal
from PySide6.QtGui import QStandardItem, QStandardItemModel
from PySide6.QtWidgets import (
QComboBox,
QFileDialog,
QFormLayout,
QGridLayout,
QGroupBox,
QHBoxLayout,
QHeaderView,
QLabel,
QLineEdit,
QPushButton,
QSpinBox,
QStackedWidget,
QTableView,
QToolButton,
QVBoxLayout,
QWidget,
)
from mafw.db.db_configurations import DEFAULT_SQLITE_PRAGMAS, db_scheme
from mafw.steering_gui.utils import block_signals
from mafw.tools.regexp import extract_protocol
DEFAULT_BACKEND = 'sqlite'
"""Default backend selection for the database editor."""
DEFAULT_URLS: dict[str, str] = {backend: url for backend, url in db_scheme.items()}
"""Mapping of backend identifiers to URL prefixes."""
DEFAULT_PRAGMAS: dict[str, Any] = dict(cast(Mapping[str, Any], DEFAULT_SQLITE_PRAGMAS))
"""Default SQLite pragma values used for parameter seeding."""
DEFAULT_SERVER_PORTS: dict[str, int] = {'mysql': 3306, 'postgresql': 5432}
"""Default service ports used by server backends."""
DEFAULT_URL_PLACEHOLDERS: dict[str, str] = {
'mysql': 'mysql://localhost:3306/mydb',
'postgresql': 'postgresql://localhost:5432/mydb',
'sqlite': 'sqlite:////path/to/db',
}
"""Placeholder URL examples for each backend."""
[docs]
class TransportWidget(QWidget):
"""Widget exposing the database transport layer (URL)."""
url_changed = Signal(str)
validity_changed = Signal(bool)
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._backend = DEFAULT_BACKEND
self._url_valid = False
self._group_box = QGroupBox('Transport')
self._stack = QStackedWidget()
self._server_widget = QWidget()
self._sqlite_widget = QWidget()
self._build_server_widget()
self._build_sqlite_widget()
self._stack.addWidget(self._server_widget)
self._stack.addWidget(self._sqlite_widget)
layout = QVBoxLayout(self)
layout.addWidget(self._group_box)
group_layout = QVBoxLayout()
group_layout.addWidget(self._stack)
self._group_box.setLayout(group_layout)
self.set_backend(DEFAULT_BACKEND)
@property
def url_edit(self) -> QLineEdit:
"""Expose the active URL edit widget for external tests."""
if self._stack.currentWidget() is self._sqlite_widget:
return self._sqlite_url_edit
return self._server_url_edit
@property
def sqlite_open_button(self) -> QToolButton:
"""Expose the SQLite open button for tests."""
return self._sqlite_open_button
@property
def sqlite_memory_button(self) -> QToolButton:
"""Expose the SQLite memory button for tests."""
return self._sqlite_memory_button
[docs]
def set_backend(self, backend: str) -> None:
"""Switch the transport view according to the backend."""
self._backend = backend
is_sqlite = backend == 'sqlite'
self._stack.setCurrentWidget(self._sqlite_widget if is_sqlite else self._server_widget)
self._sqlite_widget.setEnabled(is_sqlite)
self._server_widget.setEnabled(not is_sqlite)
self._server_url_edit.setPlaceholderText(DEFAULT_URL_PLACEHOLDERS.get(backend, ''))
self._sqlite_url_edit.setPlaceholderText(DEFAULT_URL_PLACEHOLDERS.get('sqlite', ''))
self._update_server_defaults()
self._update_validity()
[docs]
def set_url(self, url: str) -> None:
"""Update the URL field without emitting signals."""
widgets = (self._server_url_edit, self._sqlite_url_edit)
with block_signals(*widgets):
self._server_url_edit.setText(url)
self._sqlite_url_edit.setText(url)
self._sync_from_url(url)
[docs]
def get_url(self) -> str:
"""Return the current URL string."""
return self.url_edit.text().strip()
[docs]
def is_valid(self) -> bool:
"""Return whether the URL can be parsed."""
return self._url_valid
def _build_server_widget(self) -> None:
self._server_url_edit = QLineEdit()
self._server_url_edit.setReadOnly(True)
self._server_url_edit.setToolTip('This field is read-only, change the fields below to update it.')
self._server_host_edit = QLineEdit()
self._server_port_spin = QSpinBox()
self._server_port_spin.setRange(1, 65535)
self._server_db_edit = QLineEdit()
grid = QGridLayout()
grid.addWidget(QLabel('URL'), 0, 0)
grid.addWidget(self._server_url_edit, 0, 1, 1, 3)
grid.addWidget(QLabel('Host'), 1, 0)
grid.addWidget(self._server_host_edit, 1, 1)
grid.addWidget(QLabel('Port'), 1, 2)
grid.addWidget(self._server_port_spin, 1, 3)
grid.addWidget(QLabel('Database'), 2, 0)
grid.addWidget(self._server_db_edit, 2, 1, 1, 3)
self._server_widget.setLayout(grid)
self._server_host_edit.textChanged.connect(self._on_host_fields_changed)
self._server_port_spin.valueChanged.connect(self._on_host_fields_changed)
self._server_db_edit.textChanged.connect(self._on_host_fields_changed)
def _build_sqlite_widget(self) -> None:
self._sqlite_url_edit = QLineEdit()
self._sqlite_url_edit.setReadOnly(True)
self._sqlite_url_edit.setToolTip('This field is read-only, change the path below to update it')
self._sqlite_path_edit = QLineEdit()
self._sqlite_memory_button = QToolButton()
self._sqlite_memory_button.setText('Memory')
self._sqlite_open_button = QToolButton()
self._sqlite_open_button.setText('Open')
self._sqlite_create_button = QToolButton()
self._sqlite_create_button.setText('Create')
grid = QGridLayout()
grid.addWidget(QLabel('URL'), 0, 0)
grid.addWidget(self._sqlite_url_edit, 0, 1, 1, 3)
grid.addWidget(QLabel('Path'), 1, 0)
grid.addWidget(self._sqlite_path_edit, 1, 1, 1, 3)
grid.addWidget(self._sqlite_memory_button, 2, 0)
grid.addWidget(self._sqlite_open_button, 2, 1)
grid.addWidget(self._sqlite_create_button, 2, 2)
self._sqlite_widget.setLayout(grid)
self._sqlite_path_edit.textChanged.connect(self._on_path_changed)
self._sqlite_memory_button.clicked.connect(self._on_memory_clicked)
self._sqlite_open_button.clicked.connect(self._on_open_clicked)
self._sqlite_create_button.clicked.connect(self._on_create_clicked)
def _update_server_defaults(self) -> None:
default_port = DEFAULT_SERVER_PORTS.get(self._backend)
if default_port is not None:
self._server_port_spin.setValue(default_port)
def _on_host_fields_changed(self) -> None:
host = self._server_host_edit.text().strip()
port = self._server_port_spin.value()
database = self._server_db_edit.text().strip()
if host and database and port > 0:
url = f'{self._backend}://{host}:{port}/{database}'
else:
url = ''
with block_signals(self._server_url_edit, self._sqlite_url_edit):
self._server_url_edit.setText(url)
self._sqlite_url_edit.setText(url)
self._update_validity()
self.url_changed.emit(url)
def _on_path_changed(self) -> None:
path = self._sqlite_path_edit.text().strip()
if path:
url = f'sqlite:///{path}'
else:
url = ''
with block_signals(self._sqlite_url_edit, self._server_url_edit):
self._sqlite_url_edit.setText(url)
self._server_url_edit.setText(url)
self._update_validity()
self.url_changed.emit(url)
def _on_memory_clicked(self) -> None:
url = 'sqlite:///:memory:'
with block_signals(self._sqlite_url_edit, self._server_url_edit, self._sqlite_path_edit):
self._sqlite_url_edit.setText(url)
self._server_url_edit.setText(url)
self._sqlite_path_edit.clear()
self._sqlite_path_edit.setPlaceholderText('in memory DB')
self._update_validity()
self.url_changed.emit(url)
def _on_open_clicked(self) -> None:
path, _ = QFileDialog.getOpenFileName(
self,
'Select SQLite database file',
filter='SQLite Database (*.db);;All Files (*.*)',
)
if not path:
return
sqlite_path = Path(path).resolve()
with block_signals(self._sqlite_path_edit):
self._sqlite_path_edit.setText(sqlite_path.as_posix())
self._on_path_changed()
def _on_create_clicked(self) -> None:
path, _ = QFileDialog.getSaveFileName(
self,
'Create SQLite database file',
filter='SQLite Database (*.db);;All Files (*.*)',
)
if not path:
return
sqlite_path = Path(path).resolve()
with block_signals(self._sqlite_path_edit):
self._sqlite_path_edit.setText(sqlite_path.as_posix())
self._on_path_changed()
def _sync_from_url(self, url: str) -> None:
parsed = self._parse_url(url)
if parsed is None:
with block_signals(
self._server_host_edit, self._server_port_spin, self._server_db_edit, self._sqlite_path_edit
):
self._server_host_edit.clear()
self._server_port_spin.setValue(DEFAULT_SERVER_PORTS.get(self._backend, 0))
self._server_db_edit.clear()
self._sqlite_path_edit.clear()
self._update_validity()
return
protocol = parsed.get('protocol') or ''
if protocol == 'sqlite':
database = parsed.get('database') or ''
with block_signals(self._sqlite_path_edit):
if url == 'sqlite:///:memory:':
self._sqlite_path_edit.clear()
self._sqlite_path_edit.setPlaceholderText('in memory DB')
else:
self._sqlite_path_edit.setPlaceholderText('')
self._sqlite_path_edit.setText(str(database))
else:
host = parsed.get('host') or parsed.get('hostname') or ''
port = parsed.get('port') or 0
database = parsed.get('database') or ''
with block_signals(self._server_host_edit, self._server_port_spin, self._server_db_edit):
if host:
self._server_host_edit.setText(str(host))
if port:
self._server_port_spin.setValue(int(port))
if database:
self._server_db_edit.setText(str(database))
self._update_validity()
def _parse_url(self, url: str) -> dict[str, Any] | None:
if not url:
return None
try:
from playhouse.db_url import parse
except ImportError:
parsed = urlparse(url)
if not parsed.scheme:
return None
payload: dict[str, Any] = {'protocol': parsed.scheme}
if parsed.scheme == 'sqlite':
if url == 'sqlite:///:memory:':
payload['database'] = ':memory:'
else:
payload['database'] = parsed.path.lstrip('/') if parsed.path else ''
else:
payload['host'] = parsed.hostname or ''
payload['port'] = parsed.port or 0
payload['database'] = parsed.path.lstrip('/') if parsed.path else ''
return payload
try:
return dict(parse(url)) # type: ignore[no-untyped-call]
except Exception:
return None
def _update_validity(self) -> None:
url = self.get_url()
if self._stack.currentWidget() is self._server_widget:
host = bool(self._server_host_edit.text().strip())
database = bool(self._server_db_edit.text().strip())
valid = host and database and self._server_port_spin.value() > 0
else:
path = bool(self._sqlite_path_edit.text().strip())
valid = path or url == 'sqlite:///:memory:'
if self._url_valid == valid:
return
self._url_valid = valid
self.validity_changed.emit(valid)
[docs]
class AuthenticationWidget(QWidget):
"""Widget exposing authentication configuration for server databases."""
authentication_changed = Signal(dict)
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._backend = DEFAULT_BACKEND
self._group_box = QGroupBox('Authentication')
self._method_combo = QComboBox()
self._method_combo.addItem('Inline', 'inline')
self._method_combo.addItem('Environment variables', 'env')
self._method_combo.addItem('Password file', 'file')
self._username_edit = QLineEdit()
self._password_edit = QLineEdit()
self._password_edit.setEchoMode(QLineEdit.EchoMode.Password)
self._file_edit = QLineEdit()
self._file_browse = QToolButton()
self._file_browse.setText('Browse')
layout = QVBoxLayout(self)
layout.addWidget(self._group_box)
group_layout = QFormLayout()
group_layout.addRow('Method', self._method_combo)
group_layout.addRow('Username', self._username_edit)
group_layout.addRow('Password', self._password_edit)
file_row = QHBoxLayout()
file_row.addWidget(self._file_edit)
file_row.addWidget(self._file_browse)
group_layout.addRow('Password file', file_row)
self._group_box.setLayout(group_layout)
self._method_combo.currentIndexChanged.connect(self._update_method_state)
self._username_edit.textChanged.connect(self._emit_changed)
self._password_edit.textChanged.connect(self._emit_changed)
self._file_edit.textChanged.connect(self._emit_changed)
self._file_browse.clicked.connect(self._on_browse_clicked)
self._update_method_state()
@property
def method_combo(self) -> QComboBox:
"""Expose method combo for tests."""
return self._method_combo
@property
def username_edit(self) -> QLineEdit:
"""Expose username edit for tests."""
return self._username_edit
@property
def password_edit(self) -> QLineEdit:
"""Expose password edit for tests."""
return self._password_edit
@property
def file_browse_button(self) -> QToolButton:
"""Expose browse button for tests."""
return self._file_browse
@property
def file_edit(self) -> QLineEdit:
"""Expose password file edit for tests."""
return self._file_edit
[docs]
def set_backend(self, backend: str) -> None:
"""Enable or disable auth inputs based on backend."""
self._backend = backend
enabled = backend != 'sqlite'
self._group_box.setEnabled(enabled)
self._update_method_state()
[docs]
def set_data(self, auth: Mapping[str, Any] | None) -> None:
"""Populate widget values from the auth mapping."""
method = 'inline'
username = ''
password = ''
passfile = ''
if auth:
method = str(auth.get('method') or method)
username = str(auth.get('username') or '')
password = str(auth.get('password') or '')
passfile = str(auth.get('passfile') or '')
widgets = (self._method_combo, self._username_edit, self._password_edit, self._file_edit)
with block_signals(*widgets):
index = self._method_combo.findData(method)
if index >= 0:
self._method_combo.setCurrentIndex(index)
self._username_edit.setText(username)
self._password_edit.setText(password)
self._file_edit.setText(passfile)
self._update_method_state()
[docs]
def get_data(self) -> dict[str, Any]:
"""Return the authentication mapping for the current method."""
if self._backend == 'sqlite':
return {}
method = self._method_combo.currentData()
username = _normalize_text(self._username_edit.text())
password = _normalize_text(self._password_edit.text())
passfile = _normalize_text(self._file_edit.text())
payload: dict[str, Any] = {'method': method}
if method in {'inline', 'env'}:
if username is not None:
payload['username'] = username
if password is not None:
payload['password'] = password
elif method == 'file':
if username is not None:
payload['username'] = username
if passfile is not None:
payload['passfile'] = passfile
return payload
[docs]
def has_required_fields(self) -> bool:
"""Return whether required fields for the selected method are populated."""
if self._backend == 'sqlite':
return True
method = self._method_combo.currentData()
if method in {'inline', 'env'}:
return bool(_normalize_text(self._username_edit.text())) and bool(
_normalize_text(self._password_edit.text())
)
if method == 'file':
return bool(_normalize_text(self._username_edit.text())) and bool(_normalize_text(self._file_edit.text()))
return False
def _update_method_state(self) -> None:
method = self._method_combo.currentData()
is_inline = method == 'inline'
is_env = method == 'env'
is_file = method == 'file'
self._username_edit.setEnabled(self._backend != 'sqlite')
self._password_edit.setEnabled(self._backend != 'sqlite' and (is_inline or is_env))
self._file_edit.setEnabled(self._backend != 'sqlite' and is_file)
self._file_browse.setEnabled(self._backend != 'sqlite' and is_file)
if is_inline:
self._username_edit.setPlaceholderText('insert username')
self._password_edit.setPlaceholderText('insert password')
elif is_env:
self._username_edit.setPlaceholderText('variable pointing to username')
self._password_edit.setPlaceholderText('variable pointing to password')
elif is_file:
self._username_edit.setPlaceholderText('insert username')
self._password_edit.setPlaceholderText('')
self._file_edit.setPlaceholderText('')
self._emit_changed()
def _on_browse_clicked(self) -> None:
path, _ = QFileDialog.getOpenFileName(self, 'Select password file')
if not path:
return
self._file_edit.setText(str(Path(path)))
def _emit_changed(self) -> None:
self.authentication_changed.emit(self.get_data())
[docs]
class ParametersWidget(QWidget):
"""Widget that manages backend-specific parameters."""
parameters_changed = Signal(dict)
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._backend = DEFAULT_BACKEND
self._backend_tables: dict[str, dict[str, Any]] = {'sqlite': dict(DEFAULT_PRAGMAS)}
self._group_box = QGroupBox('Parameters')
self._model = QStandardItemModel(0, 2, self)
self._model.setHorizontalHeaderLabels(['Name', 'Value'])
self._table = QTableView()
self._table.setModel(self._model)
self._table.horizontalHeader().setStretchLastSection(True)
self._table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
self._table.setSelectionBehavior(QTableView.SelectionBehavior.SelectRows)
self._table.setSelectionMode(QTableView.SelectionMode.ExtendedSelection)
self._table.verticalHeader().setVisible(False)
self._add_button = QToolButton()
self._add_button.setText('Add')
self._remove_button = QToolButton()
self._remove_button.setText('Remove')
layout = QVBoxLayout(self)
layout.addWidget(self._group_box)
group_layout = QVBoxLayout()
group_layout.addWidget(self._table)
button_row = QHBoxLayout()
button_row.addWidget(self._add_button)
button_row.addWidget(self._remove_button)
button_row.addStretch()
group_layout.addLayout(button_row)
self._group_box.setLayout(group_layout)
self._model.itemChanged.connect(self._emit_changed)
self._model.rowsInserted.connect(self._emit_changed)
self._model.rowsRemoved.connect(self._emit_changed)
self._add_button.clicked.connect(self._on_add)
self._remove_button.clicked.connect(self._on_remove)
self.set_backend(DEFAULT_BACKEND)
@property
def table(self) -> QTableView:
"""Expose the parameter table for tests."""
return self._table
[docs]
def set_backend(self, backend: str) -> None:
"""Switch the visible parameter table to the given backend."""
if self._model.rowCount():
self._store_current_table()
self._backend = backend
if backend not in self._backend_tables:
if backend == 'sqlite':
self._backend_tables[backend] = dict(DEFAULT_PRAGMAS)
else:
self._backend_tables[backend] = {}
self._load_table(self._backend_tables[backend])
[docs]
def set_data(self, parameters: Mapping[str, Mapping[str, Any]] | None, backend: str) -> None:
"""Populate the widget with the provided backend parameter mapping."""
self._backend_tables.clear()
if parameters:
for key, value in parameters.items():
if not isinstance(value, Mapping):
self._backend_tables[key] = {}
continue
if key == 'sqlite':
pragmas = value.get('pragmas')
if isinstance(pragmas, Mapping):
self._backend_tables[key] = dict(pragmas)
else:
self._backend_tables[key] = dict(DEFAULT_PRAGMAS)
else:
self._backend_tables[key] = dict(value)
self.set_backend(backend)
[docs]
def get_data(self) -> dict[str, dict[str, Any]]:
"""Return the full backend parameter mapping."""
self._store_current_table()
result: dict[str, dict[str, Any]] = {}
for backend, table_data in self._backend_tables.items():
if backend == 'sqlite':
result[backend] = {'pragmas': dict(table_data)}
elif table_data:
result[backend] = dict(table_data)
return result
[docs]
def current_table_values(self) -> dict[str, Any]:
"""Return the current backend table mapping."""
self._store_current_table()
return dict(self._backend_tables.get(self._backend, {}))
def _store_current_table(self) -> None:
self._backend_tables[self._backend] = self._table_to_dict()
def _load_table(self, values: Mapping[str, Any]) -> None:
# with block_signals(self._model):
self._model.removeRows(0, self._model.rowCount())
for key, value in values.items():
key_item = QStandardItem(str(key))
value_item = QStandardItem(str(value))
key_item.setEditable(True)
value_item.setEditable(True)
self._model.appendRow([key_item, value_item])
def _table_to_dict(self) -> dict[str, Any]:
result: dict[str, Any] = {}
for row in range(self._model.rowCount()):
key_item = self._model.item(row, 0)
value_item = self._model.item(row, 1)
if key_item is None:
continue
key_text = key_item.text().strip()
if not key_text:
continue
value_text = value_item.text().strip() if value_item else ''
result[key_text] = _normalize_table_value(value_text)
return result
def _on_add(self) -> None:
row = self._model.rowCount()
self._model.insertRow(row, [QStandardItem(''), QStandardItem('')])
self._emit_changed()
def _on_remove(self) -> None:
selection = self._table.selectionModel().selectedRows()
rows = sorted({index.row() for index in selection}, reverse=True)
if not rows and self._model.rowCount():
rows = [self._model.rowCount() - 1]
for row in rows:
self._model.removeRow(row)
if rows:
self._emit_changed()
def _emit_changed(self, *args: Any) -> None:
self.parameters_changed.emit(self.get_data())
[docs]
class DatabaseEditor(QWidget):
"""Widget that exposes database backend selection and layered configuration."""
configuration_changed = Signal(dict)
test_connection_requested = Signal(dict)
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._current_backend = DEFAULT_BACKEND
self.backend_combo = QComboBox()
self.backend_combo.addItems(list(DEFAULT_URLS.keys()))
self.backend_combo.setCurrentText(DEFAULT_BACKEND)
self.transport_widget = TransportWidget()
self.authentication_widget = AuthenticationWidget()
self.parameters_widget = ParametersWidget()
self.test_connection_button = QPushButton('Test connection')
self._setup_layout()
self._wire_signals()
self._apply_backend(DEFAULT_BACKEND)
[docs]
def set_data(self, config: dict[str, Any] | None) -> None:
"""Populate the UI from a configuration dictionary without emitting signals."""
normalized = self._normalize_config(config)
backend = normalized['backend']
widgets = (
self.backend_combo,
self._group_box,
self.transport_widget,
self.authentication_widget,
self.parameters_widget,
self.test_connection_button,
)
with block_signals(*widgets):
self.backend_combo.setCurrentText(backend)
self._group_box.setChecked(normalized['enabled'])
self.transport_widget.set_backend(backend)
self.transport_widget.set_url(normalized['url'])
self.authentication_widget.set_backend(backend)
self.authentication_widget.set_data(normalized['authentication'])
self.parameters_widget.set_data(normalized['parameters'], backend)
self._apply_backend(backend)
self._update_test_connection_state()
[docs]
def get_data(self) -> dict[str, Any]:
"""Return a normalized view of the current database configuration."""
parameters = self.parameters_widget.get_data()
pragmas = parameters.get('sqlite', {}).get('pragmas')
return {
'backend': self._current_backend,
'url': _normalize_text(self.transport_widget.get_url()),
'authentication': self.authentication_widget.get_data(),
'parameters': parameters,
'pragmas': pragmas,
'enabled': self._group_box.isChecked(),
}
[docs]
def is_enabled(self) -> bool:
"""Return whether the database configuration group is active."""
return self._group_box.isChecked()
def _setup_layout(self) -> None:
main_layout = QVBoxLayout(self)
# the group box is the enable / disable check box for the whole
# database editor.
self._group_box = QGroupBox('Database configuration')
self._group_box.setCheckable(True)
self._group_box.setChecked(True)
group_layout = QVBoxLayout()
self._group_box.setLayout(group_layout)
main_layout.addWidget(self._group_box)
backend_layout = QFormLayout()
backend_layout.addRow('Backend:', self.backend_combo)
group_layout.addLayout(backend_layout)
group_layout.addWidget(self.transport_widget)
# Authentication and Parameters side-by-side
h_layout = QHBoxLayout()
h_layout.addWidget(self.authentication_widget)
h_layout.addWidget(self.parameters_widget)
group_layout.addLayout(h_layout)
group_layout.addWidget(self.test_connection_button)
def _wire_signals(self) -> None:
self.backend_combo.currentTextChanged.connect(self._on_backend_changed)
self.transport_widget.url_changed.connect(self._emit_configuration_changed)
self.transport_widget.validity_changed.connect(self._update_test_connection_state)
self.authentication_widget.authentication_changed.connect(self._emit_configuration_changed)
self.parameters_widget.parameters_changed.connect(self._emit_configuration_changed)
self.test_connection_button.clicked.connect(self._on_test_connection_clicked)
self._group_box.toggled.connect(self._on_groupbox_toggled)
[docs]
def _normalize_config(self, config: dict[str, Any] | None) -> dict[str, Any]:
"""
Normalize the configuration dictionary to ensure consistent backend, URL, authentication, and parameters.
This method processes the input configuration dictionary to extract and normalize values for backend type,
URL, authentication details, and backend-specific parameters, ensuring defaults are applied where necessary.
:param config: A dictionary containing configuration details such as backend, URL, authentication, and parameters.
If None, defaults are applied.
:return: A dictionary with normalized configuration values including backend, URL, enabled status, authentication,
parameters, and pragmas.
"""
backend = DEFAULT_BACKEND
url_value = ''
enabled = True
authentication: dict[str, Any] = {}
parameters: dict[str, dict[str, Any]] = {}
pragmas = dict(DEFAULT_PRAGMAS)
user = None
read_default_file = None
if config is not None:
url_value = config.get('url') or config.get('URL') or ''
backend_value = config.get('backend')
backend = self._normalize_backend(backend_value) or self._detect_backend_from_url(url_value)
enabled = bool(config.get('enabled', True))
auth_value = config.get('authentication')
if isinstance(auth_value, Mapping):
authentication = dict(auth_value)
parameters_value = config.get('parameters')
if isinstance(parameters_value, Mapping):
parameters = {key: dict(value) for key, value in parameters_value.items() if isinstance(value, Mapping)}
pragma_value = config.get('pragmas')
if isinstance(pragma_value, Mapping):
pragmas = dict(pragma_value)
if parameters.get('sqlite', {}).get('pragmas'):
pragmas = dict(parameters['sqlite']['pragmas'])
user = config.get('user')
read_default_file = config.get('read_default_file')
if not authentication:
if read_default_file:
authentication = {'method': 'file', 'username': user, 'passfile': read_default_file}
elif user:
authentication = {'method': 'inline', 'username': user}
if 'sqlite' not in parameters and pragmas:
parameters['sqlite'] = {'pragmas': dict(pragmas)}
elif 'sqlite' in parameters and 'pragmas' not in parameters['sqlite'] and pragmas:
sqlite_params = dict(parameters['sqlite'])
sqlite_params['pragmas'] = dict(pragmas)
parameters['sqlite'] = sqlite_params
return {
'backend': backend,
'url': url_value,
'enabled': enabled,
'authentication': authentication,
'parameters': parameters,
'pragmas': pragmas,
}
def _normalize_backend(self, value: str | None) -> str | None:
if not value:
return None
value = value.lower().strip()
return value if value in DEFAULT_URLS else None
def _on_backend_changed(self, backend: str) -> None:
normalized = self._normalize_backend(backend) or DEFAULT_BACKEND
self._apply_backend(normalized)
self._emit_configuration_changed()
def _on_test_connection_clicked(self) -> None:
self.test_connection_requested.emit(self.get_data())
def _apply_backend(self, backend: str) -> None:
backend = self._normalize_backend(backend) or DEFAULT_BACKEND
self._current_backend = backend
active = self._group_box.isChecked()
self.backend_combo.setEnabled(active)
current_url = self.transport_widget.get_url()
detected = self._detect_backend_from_url(current_url)
self.transport_widget.set_backend(backend)
if current_url and detected != backend:
with block_signals(self.transport_widget):
self.transport_widget.set_url('')
self.authentication_widget.set_backend(backend)
self.parameters_widget.set_backend(backend)
self.transport_widget.setEnabled(active)
self.authentication_widget.setEnabled(active)
self.parameters_widget.setEnabled(active)
self._update_test_connection_state()
def _on_groupbox_toggled(self, enabled: bool) -> None:
self._apply_backend(self._current_backend)
self._emit_configuration_changed()
[docs]
def _detect_backend_from_url(self, url: str | None) -> str:
"""
Determine the backend type from the given URL.
This method analyses the provided URL to identify the corresponding backend type by matching URL prefixes
or extracting the protocol. If no match is found, it defaults to the current backend or the default backend.
:param url: The URL string to be analysed for backend detection.
:return: A string representing the detected backend type.
"""
if not url:
return self._current_backend or DEFAULT_BACKEND
candidate = url.strip().lower()
for backend, prefix in DEFAULT_URLS.items():
if candidate.startswith(prefix):
return backend
protocol = extract_protocol(candidate)
if protocol in DEFAULT_URLS:
return protocol
return self._current_backend or DEFAULT_BACKEND
def _update_test_connection_state(self) -> None:
if not self._group_box.isChecked():
self.test_connection_button.setEnabled(False)
return
url_valid = self.transport_widget.is_valid()
auth_valid = self.authentication_widget.has_required_fields()
self.test_connection_button.setEnabled(url_valid and auth_valid)
def _emit_configuration_changed(self, *args: Any) -> None:
self._update_test_connection_state()
self.configuration_changed.emit(self.get_data())
def _normalize_text(value: str) -> str | None:
text = value.strip()
return text or None
def _normalize_table_value(raw_value: str) -> Any:
if not raw_value:
return raw_value
normalized = raw_value.strip()
if not normalized:
return ''
if normalized.lower() in {'true', 'false'}:
return normalized.lower() == 'true'
for converter in (int, float):
try:
return converter(normalized)
except ValueError:
continue
return normalized