Source code for mafw.steering_gui.main_window

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""Main window orchestrator for the steering GUI layout.

:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
:Description: Build the menu, splitter, tree, stacked editors, and status bar required by the GUI.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any, Callable, Dict, cast

from PySide6.QtCore import QByteArray, QSettings, Qt, QThreadPool
from PySide6.QtGui import QAction, QCloseEvent, QIcon, QKeySequence
from PySide6.QtWidgets import (
    QDialog,
    QFileDialog,
    QMainWindow,
    QMessageBox,
    QProgressDialog,
    QSplitter,
    QStackedWidget,
    QWidget,
)

from mafw.db.db_filter import ast_to_string
from mafw.mafw_errors import MAFwException, ValidationIssue
from mafw.plugin_manager import LoadedPlugins
from mafw.steering.builder import ValidationLevel
from mafw.steering.models import ProcessorConfig
from mafw.steering_gui.actions import ActionSpec, create_action
from mafw.steering_gui.controllers.steering_controller import SteeringController
from mafw.steering_gui.dialogs.about_dialog import AboutDialog
from mafw.steering_gui.dialogs.add_processor_dialog import AddProcessorDialog
from mafw.steering_gui.dialogs.steering_text_editor import SteeringTextEditor
from mafw.steering_gui.dialogs.validation_issues_dialog import ValidationIssuesDialog
from mafw.steering_gui.models import PipelineItem, SteeringTreeModel
from mafw.steering_gui.models.pipeline import ProcessorPipeline
from mafw.steering_gui.threading import Worker
from mafw.steering_gui.views import (
    DatabaseEditor,
    GlobalsEditor,
    GroupEditor,
    MetadataEditor,
    PipelineEditor,
    ProcessorParameterEditor,
    SteeringTreeView,
    UIEditor,
)
from mafw.steering_gui.views.filter_manager import FilterManager
from mafw.tools.regexp import parse_processor_name

_DEFAULT_SAVE_NAME = 'mysteerfile.toml'
"""Default suggestion when the user invokes Save As."""


[docs] class MainWindow(QMainWindow): """Compose the steering GUI main window without executing controller logic.""" def __init__(self, controller: SteeringController | None = None, parent: QWidget | None = None) -> None: super().__init__(parent) self._controller = controller self._actions: Dict[str, QAction] = {} self._section_widgets: Dict[str, QWidget] = {} self._metadata_editor: MetadataEditor | None = None self._globals_editor: GlobalsEditor | None = None self._database_editor: DatabaseEditor | None = None self._ui_editor: UIEditor | None = None self._pipeline_editor: PipelineEditor | None = None self._processor_parameter_editor: ProcessorParameterEditor | None = None self._filter_manager: FilterManager | None = None self._group_editor: GroupEditor | None = None self._tree_model: SteeringTreeModel | None = None self._current_pipeline_item: PipelineItem | None = None self._current_pipeline_parent: str | None = None self._current_group_name: str | None = None self._connection_test_worker: Worker[bool] | None = None self._plugin_load_worker: Worker[LoadedPlugins] | None = None self._pipeline_dialog: QProgressDialog | None = None self._settings = QSettings('mafw', 'steering-gui') self._create_actions() self._create_menus() self._create_status_bar() self._create_central_widget() self._restore_window_geometry() self._refresh_title_from_controller() icon_path = Path(__file__).parent / 'resources' / 'mafw-logo.png' if icon_path.exists(): self.setWindowIcon(QIcon(str(icon_path)))
[docs] def _create_actions(self) -> None: """Declare the main window actions and store them for later menu wiring.""" specs = [ ActionSpec( 'New', triggered=self._new_file, shortcut=QKeySequence.StandardKey.New, status_tip='Create a new steering file', ), ActionSpec( 'Open', triggered=self._open_file, shortcut=QKeySequence.StandardKey.Open, status_tip='Open an existing steering file', ), ActionSpec( 'Save', triggered=self._save_file, shortcut=QKeySequence.StandardKey.Save, status_tip='Save the current steering file', ), ActionSpec( 'Save As', triggered=self._save_file_as, shortcut=QKeySequence.StandardKey.SaveAs, status_tip='Save the steering file under a new name', ), ActionSpec( 'Exit', triggered=self._exit_app, shortcut=QKeySequence.StandardKey.Quit, status_tip='Exit the steering GUI', ), # removed Undo and Redo action from v2.0.0 # to be implemented in a later stage according to #112 # ActionSpec( # 'Undo', # triggered=self._undo, # shortcut=QKeySequence.StandardKey.Undo, # status_tip='Undo the most recent change', # enabled=False, # ), # ActionSpec( # 'Redo', # triggered=self._redo, # shortcut=QKeySequence.StandardKey.Redo, # status_tip='Redo the most recent undo', # enabled=False, # ), ActionSpec( 'About MAFw', triggered=self._show_about, shortcut=QKeySequence.StandardKey.HelpContents, status_tip='Show information about the MAFw steering GUI', ), ] for spec in specs: self._actions[spec.text] = create_action(self, spec)
[docs] def _create_menus(self) -> None: """Compose the menu bar using the actions previously created.""" file_menu = self.menuBar().addMenu('&File') file_menu.addAction(self._actions['New']) file_menu.addAction(self._actions['Open']) file_menu.addAction(self._actions['Save']) file_menu.addAction(self._actions['Save As']) file_menu.addSeparator() file_menu.addAction(self._actions['Exit']) # remove undo and redo from v2.0.0 # to be implemented in a later stage according to #112 # edit_menu = self.menuBar().addMenu('&Edit') # edit_menu.addAction(self._actions['Undo']) # edit_menu.addAction(self._actions['Redo']) help_menu = self.menuBar().addMenu('&Help') help_menu.addAction(self._actions['About MAFw'])
[docs] def _create_status_bar(self) -> None: """Initialise the status bar to keep the user informed.""" self.statusBar().showMessage('Ready to compose steering files.')
[docs] def _create_central_widget(self) -> None: """Build the splitter containing the tree and editor stack.""" splitter = QSplitter() splitter.setOrientation(Qt.Orientation.Horizontal) self._tree_view = SteeringTreeView() self._tree_view.section_selected.connect(self._display_section) self._tree_view.pipeline_selection_changed.connect(self._on_pipeline_selection_changed) self._tree_view.add_processor_requested.connect(self._on_add_processor_requested) self._tree_view.add_group_requested.connect(self._on_add_group_requested) self._tree_view.pipeline_items_delete_requested.connect(self._on_pipeline_remove_requested) splitter.addWidget(self._tree_view) editor_stack = QStackedWidget() self._section_widgets.clear() metadata_editor = MetadataEditor() self._section_widgets['root'] = metadata_editor self._metadata_editor = metadata_editor globals_editor = GlobalsEditor() self._globals_editor = globals_editor self._section_widgets['globals'] = globals_editor database_editor = DatabaseEditor() self._database_editor = database_editor self._section_widgets['database'] = database_editor pipeline_editor = PipelineEditor() pipeline_editor.add_processor_requested.connect(self._on_add_processor_requested) pipeline_editor.add_group_requested.connect(self._on_add_group_requested) pipeline_editor.edit_requested.connect(self._on_pipeline_edit_requested) pipeline_editor.remove_requested.connect(self._on_pipeline_remove_requested) pipeline_editor.move_requested.connect(self._on_pipeline_move_requested) self._pipeline_editor = pipeline_editor self._section_widgets['processors'] = pipeline_editor ui_editor = UIEditor() self._ui_editor = ui_editor self._section_widgets['ui'] = ui_editor for widget in self._section_widgets.values(): editor_stack.addWidget(widget) self._processor_parameter_editor = ProcessorParameterEditor() self._group_editor = GroupEditor() editor_stack.addWidget(self._processor_parameter_editor) editor_stack.addWidget(self._group_editor) splitter.addWidget(editor_stack) self._editor_stack = editor_stack splitter.setStretchFactor(1, 1) self.setCentralWidget(splitter) self._wire_controller_signals() self._populate_from_model() self._start_plugin_loading()
[docs] def _display_section(self, section: str) -> None: """Show the editor that corresponds to the selected tree section.""" widget = self._section_widgets.get(section, self._section_widgets['root']) if section == 'root': self._refresh_metadata_summary() self._editor_stack.setCurrentWidget(widget) self.statusBar().showMessage(f'Selected "{section}" editor.')
[docs] def _populate_from_model(self) -> None: """Refresh the tree and metadata editor from the controller model.""" if self._controller is None or self._metadata_editor is None: return current_file = self._controller.current_file() if current_file is not None: self.statusBar().showMessage(f'Loading "{current_file.name}"...') self._pipeline_dialog = QProgressDialog('Building pipeline...', 'Cancel', 0, 100, self) self._pipeline_dialog.setWindowTitle('Please Wait') self._pipeline_dialog.setMinimumDuration(100) self._pipeline_dialog.setValue(0) worker = Worker(self._controller.build_pipeline) worker.connect( finished=self._on_pipeline_ready, error=self._on_pipeline_error, progress=self._pipeline_dialog.setValue, ) QThreadPool.globalInstance().start(worker)
[docs] def _on_pipeline_ready(self, pipeline: ProcessorPipeline) -> None: """Update the tree and editors with the built pipeline.""" if self._pipeline_dialog is not None: self._pipeline_dialog.close() self._pipeline_dialog = None if self._controller is None: return if self._tree_model is None: self._tree_model = SteeringTreeModel(self._controller, pipeline, self) self._tree_view.set_model(self._tree_model) else: self._tree_model.set_pipeline(pipeline) self._tree_view.refresh() self._tree_view.select_section('processors') if self._pipeline_editor is not None: self._pipeline_editor.set_model(self._tree_model) if self._group_editor is not None: self._group_editor.set_model(self._tree_model) self._refresh_metadata_summary() if self._metadata_editor is not None: self._metadata_editor.set_validation_level(self._controller.validation_level()) self._refresh_editors() current_file = self._controller.current_file() if current_file is not None: self.statusBar().showMessage(f'Loaded "{current_file.name}".')
[docs] def _on_pipeline_error(self, exc: Exception) -> None: """Report pipeline building failures to the user.""" if self._pipeline_dialog is not None: self._pipeline_dialog.close() self._pipeline_dialog = None QMessageBox.critical(self, 'Pipeline Build Failed', str(exc)) self.statusBar().showMessage('Failed to build pipeline.')
def _refresh_editors(self) -> None: self._refresh_globals_editor() self._refresh_database_editor() self._refresh_ui_editor()
[docs] def _refresh_globals_editor(self) -> None: """Push the controller globals data into the globals editor.""" if self._controller is None or self._globals_editor is None: return self._globals_editor.set_data(**self._controller.get_globals())
[docs] def _refresh_database_editor(self) -> None: """Push the controller database metadata into the database editor.""" if self._controller is None or self._database_editor is None: return attributes = self._controller.get_db_attributes() config = { 'url': self._controller.get_db_url(), 'pragmas': self._controller.get_db_pragmas(), 'user': attributes.get('user'), 'read_default_file': attributes.get('read_default_file'), 'enabled': self._controller.is_db_configuration_enabled(), } self._database_editor.set_data(config)
[docs] def _refresh_ui_editor(self) -> None: """Update the UI combobox once plugin metadata or configuration changes.""" if self._controller is None or self._ui_editor is None: return if self._controller.plugins is None: names = None else: names = [ui.name for ui in self._controller.plugins.ui_list] self._ui_editor.set_current_interface(self._controller.get_ui_interface()) self._ui_editor.set_available_interfaces(names)
[docs] def _wire_controller_signals(self) -> None: """Wire controller dirty notifications and globals signals to the editor.""" if self._controller is None or self._globals_editor is None: return controller = self._controller globals_editor = self._globals_editor controller.dirty_state_signal.connect(self._on_dirty_changed) self._on_dirty_changed(controller.dirty) globals_editor.analysis_name_changed.connect( lambda text: controller.set_analysis_name(self._normalize_text(text)) ) globals_editor.description_changed.connect( lambda text: controller.set_analysis_description(self._normalize_text(text)) ) globals_editor.new_only_changed.connect(controller.set_new_only) globals_editor.create_standard_tables_changed.connect(controller.set_create_standard_tables) if self._metadata_editor is not None: metadata = self._metadata_editor metadata.validation_level_changed.connect(self._on_validation_level_changed) metadata.manual_edit_requested.connect(self._open_steering_text_editor) metadata.set_validation_level(controller.validation_level()) if self._database_editor is not None: self._database_editor.configuration_changed.connect(self._on_database_configuration_changed) self._database_editor.test_connection_requested.connect(self._on_test_connection_requested) if self._ui_editor is not None: self._ui_editor.interface_changed.connect(self._on_ui_interface_changed) if self._processor_parameter_editor is not None: processor_editor = self._processor_parameter_editor processor_editor.replica_name_changed.connect(self._on_replica_name_changed) processor_editor.new_only_changed.connect(self._on_processor_new_only_changed) processor_editor.inheritance_changed.connect(self._on_processor_inheritance_changed) processor_editor.parameter_value_changed.connect(self._on_processor_parameter_changed) processor_editor.parameter_active_changed.connect(self._on_processor_parameter_active_changed) processor_editor.reset_defaults_requested.connect(self._on_processor_reset_defaults) processor_editor.remove_requested.connect(self._on_processor_remove_requested) processor_editor.edit_filters_requested.connect(self._on_edit_filters_requested) if self._group_editor is not None: group_editor = self._group_editor group_editor.name_changed.connect(self._on_group_name_changed) group_editor.description_changed.connect(self._on_group_description_changed) group_editor.add_processor_requested.connect(self._on_group_add_processor_requested) group_editor.add_group_requested.connect(self._on_group_add_group_requested) group_editor.edit_requested.connect(self._on_pipeline_edit_requested) group_editor.remove_requested.connect(self._on_group_remove_requested) group_editor.move_requested.connect(self._on_group_move_requested)
[docs] def _refresh_metadata_summary(self) -> None: """Update the metadata view with configured and available plugin data.""" if self._controller is None or self._metadata_editor is None: return configured_count = len(self._controller.get_processors_to_run()) self._metadata_editor.update_summary( self._controller.get_analysis_name(), configured_count, self._controller.available_processor_count(), self._controller.available_db_module_count(), self._controller.available_ui_count(), )
[docs] def _refresh_pipeline_after_change(self, select_name: str | None = None) -> None: """Rebuild the pipeline and refresh dependent widgets.""" if self._controller is None or self._tree_model is None: return pipeline = self._controller.build_pipeline() self._tree_model.set_pipeline(pipeline) self._tree_view.refresh() if select_name: self._tree_view.select_pipeline_item(select_name) else: self._tree_view.select_section('processors') if self._pipeline_editor is not None: self._pipeline_editor.refresh() if self._group_editor is not None: self._group_editor.refresh() self._refresh_metadata_summary()
[docs] def _on_add_processor_requested(self) -> None: """Open the Add Processor dialog and append selections to the pipeline.""" if self._controller is None: self.statusBar().showMessage('No controller configured for adding processors.') return names = self._controller.available_processor_names() if not names: self.statusBar().showMessage('No processors available to add.') return # setting the dialog parent to None allows the user to move the dialog freely dialog = AddProcessorDialog(names, None) if dialog.exec() != QDialog.DialogCode.Accepted: return selected = dialog.selected_names() if not selected: return self._controller.add_processors_with_defaults(selected) self._refresh_pipeline_after_change()
[docs] def _on_add_group_requested(self) -> None: """Create a new top-level group and open it in the editor.""" group_name = self._create_group(parent_group=None) if group_name is None: return self._refresh_pipeline_after_change(select_name=group_name)
[docs] def _on_group_add_processor_requested(self) -> None: """Add processors to the currently edited group.""" if self._controller is None: self.statusBar().showMessage('No controller configured for adding processors.') return if self._current_group_name is None: self.statusBar().showMessage('No group selected for adding processors.') return names = self._controller.available_processor_names() if not names: self.statusBar().showMessage('No processors available to add.') return # setting the dialog parent to None allows the user to move the dialog freely dialog = AddProcessorDialog(names, None) if dialog.exec() != QDialog.DialogCode.Accepted: return selected = dialog.selected_names() if not selected: return self._controller.add_processors_to_group_with_defaults(self._current_group_name, selected) self._refresh_pipeline_after_change(select_name=self._current_group_name)
[docs] def _on_group_add_group_requested(self) -> None: """Create a new group nested within the currently edited group.""" if self._current_group_name is None: self.statusBar().showMessage('No group selected for adding nested groups.') return group_name = self._create_group(parent_group=self._current_group_name) if group_name is None: return self._refresh_pipeline_after_change(select_name=group_name)
[docs] def _on_group_remove_requested(self, items: list[PipelineItem]) -> None: """Remove items from the currently edited group.""" if self._controller is None or self._current_group_name is None: return names = [item.name() for item in items] if not names: return for name in names: self._controller.remove_pipeline_entry(name, self._current_group_name) self._refresh_pipeline_after_change(select_name=self._current_group_name)
[docs] def _on_group_move_requested(self, direction: str, items: list[PipelineItem]) -> None: """Reorder items inside the currently edited group.""" if self._controller is None or self._current_group_name is None: return group = self._controller.get_group_snapshot(self._current_group_name) order = list(group.processors) names = [item.name() for item in items] if not names: return positions = [order.index(name) for name in names if name in order] if not positions: return if direction == 'up': for pos in sorted(positions): if pos == 0: continue if pos - 1 in positions: continue order[pos - 1], order[pos] = order[pos], order[pos - 1] elif direction == 'down': for pos in sorted(positions, reverse=True): if pos >= len(order) - 1: continue if pos + 1 in positions: continue order[pos + 1], order[pos] = order[pos], order[pos + 1] else: return self._controller.set_group_processors(self._current_group_name, order) self._refresh_pipeline_after_change(select_name=self._current_group_name)
[docs] def _on_group_name_changed(self, new_name: str) -> None: """Rename the currently edited group.""" if self._controller is None or self._current_group_name is None: return normalized = new_name.strip() if not normalized: QMessageBox.warning(self, 'Rename group failed', 'Group name cannot be empty.') self._refresh_pipeline_after_change(select_name=self._current_group_name) return if normalized == self._current_group_name: return try: self._controller.rename_group(self._current_group_name, normalized) except MAFwException as exc: QMessageBox.warning(self, 'Rename group failed', str(exc)) self._refresh_pipeline_after_change(select_name=self._current_group_name) return self._current_group_name = normalized self._refresh_pipeline_after_change(select_name=normalized)
[docs] def _on_group_description_changed(self, description: str) -> None: """Update the description for the currently edited group.""" if self._controller is None or self._current_group_name is None: return normalized = self._normalize_text(description) try: self._controller.set_group_description(self._current_group_name, normalized) except MAFwException as exc: QMessageBox.warning(self, 'Update group description failed', str(exc)) return
[docs] def _create_group(self, parent_group: str | None) -> str | None: """Create a new group and attach it to the requested parent list.""" if self._controller is None: self.statusBar().showMessage('No controller configured for adding groups.') return None name = self._generate_group_name() try: self._controller.add_group(name, [], None) if parent_group is None: self._controller.add_group_to_run_list(name) else: self._controller.add_group_entry_to_group(parent_group, name) except MAFwException as exc: QMessageBox.warning(self, 'Add group failed', str(exc)) return None return name
[docs] def _generate_group_name(self) -> str: """Return a unique group name based on the current builder state.""" if self._controller is None: return 'Group1' used = set(self._controller.list_groups()) used.update(self._controller.list_processors()) counter = 1 while True: candidate = f'Group{counter}' if candidate not in used: return candidate counter += 1
[docs] def _on_pipeline_selection_changed(self, items: list[PipelineItem]) -> None: """Switch editors when a pipeline item is selected in the tree.""" if not items: self._current_pipeline_item = None self._current_pipeline_parent = None self._current_group_name = None return self._select_pipeline_item_editor(items[0])
[docs] def _on_pipeline_edit_requested(self, item: PipelineItem) -> None: """Switch to the appropriate placeholder editor for the selected item.""" if self._tree_view is not None: self._tree_view.select_pipeline_item(item.name()) self._select_pipeline_item_editor(item)
def _select_pipeline_item_editor(self, item: PipelineItem) -> None: if self._editor_stack is None: return if item.is_group(): if self._group_editor is None: return self._current_pipeline_item = None self._current_pipeline_parent = None self._current_group_name = item.name() self._group_editor.set_group_item(item) self._editor_stack.setCurrentWidget(self._group_editor) else: if self._processor_parameter_editor is None: return self._current_group_name = None self._current_pipeline_item = item self._current_pipeline_parent = item.parent.name() if item.parent and item.parent.is_group() else None self._processor_parameter_editor.set_processor_data(cast(ProcessorConfig, item.config)) self._editor_stack.setCurrentWidget(self._processor_parameter_editor)
[docs] def _on_pipeline_remove_requested(self, items: list[PipelineItem]) -> None: """Remove selected items from the top-level processors_to_run list.""" if self._controller is None: return names = [item.name() for item in items] if not names: return self._controller.remove_pipeline_entries(names) self._refresh_pipeline_after_change()
[docs] def _on_pipeline_move_requested(self, direction: str, items: list[PipelineItem]) -> None: """Reorder the selected items within the top-level pipeline list.""" if self._controller is None: return names = [item.name() for item in items] if not names: return order = self._controller.get_processors_to_run() positions = [order.index(name) for name in names if name in order] if not positions: return if direction == 'up': for pos in sorted(positions): if pos == 0: continue if pos - 1 in positions: continue order[pos - 1], order[pos] = order[pos], order[pos - 1] elif direction == 'down': for pos in sorted(positions, reverse=True): if pos >= len(order) - 1: continue if pos + 1 in positions: continue order[pos + 1], order[pos] = order[pos], order[pos + 1] else: return self._controller.set_processors_to_run(order) self._refresh_pipeline_after_change()
def _on_processor_parameter_changed(self, key: str, value: Any) -> None: if self._controller is None or self._current_pipeline_item is None: return self._controller.set_processor_parameter(self._current_pipeline_item.name(), key, value) def _on_processor_parameter_active_changed(self, key: str, active: bool, value: Any) -> None: if self._controller is None or self._current_pipeline_item is None: return name = self._current_pipeline_item.name() if active: self._controller.set_processor_parameter(name, key, value) else: self._controller.remove_processor_parameter(name, key) def _on_processor_new_only_changed(self, value: bool | None) -> None: if self._controller is None or self._current_pipeline_item is None: return self._controller.set_processor_new_only(self._current_pipeline_item.name(), value) def _on_processor_inheritance_changed(self, value: bool | None) -> None: if self._controller is None or self._current_pipeline_item is None: return name = self._current_pipeline_item.name() _, replica = parse_processor_name(name) if replica is None: return self._controller.set_replica_inheritance(name, value) def _on_processor_reset_defaults(self) -> None: if self._controller is None or self._current_pipeline_item is None: return name = self._current_pipeline_item.name() self._controller.reset_processor_parameters(name) self._refresh_pipeline_after_change(select_name=name) def _on_processor_remove_requested(self) -> None: if self._controller is None or self._current_pipeline_item is None: return self._controller.remove_pipeline_entry(self._current_pipeline_item.name(), self._current_pipeline_parent) self._current_pipeline_item = None self._current_pipeline_parent = None self._refresh_pipeline_after_change() def _on_replica_name_changed(self, replica: str) -> None: if self._controller is None or self._current_pipeline_item is None: return name = self._current_pipeline_item.name() base_name, _ = parse_processor_name(name) new_full_name = base_name if not replica.strip() else f'{base_name}#{replica.strip()}' try: self._controller.rename_processor_entry(name, new_full_name, self._current_pipeline_parent) except MAFwException as exc: QMessageBox.warning(self, 'Rename replica failed', str(exc)) return self._refresh_pipeline_after_change(select_name=new_full_name) def _on_edit_filters_requested(self) -> None: if self._current_pipeline_item is None or self._editor_stack is None: return # We assume self._current_pipeline_item.config is a ProcessorConfig # But PipelineItem.config is Any. Cast if needed or check. if not isinstance(self._current_pipeline_item.config, ProcessorConfig): return config = self._current_pipeline_item.config # Create new FilterManager for this config # We recreate it to ensure clean state and correct config if self._filter_manager is not None: self._editor_stack.removeWidget(self._filter_manager) self._filter_manager.deleteLater() self._filter_manager = FilterManager(config) self._filter_manager.finished.connect(self._on_filter_manager_finished) self._filter_manager.filter_changed.connect(self._on_filter_changed) self._editor_stack.addWidget(self._filter_manager) self._editor_stack.setCurrentWidget(self._filter_manager) self.statusBar().showMessage(f'Editing filters for "{config.name}".') def _on_filter_changed(self) -> None: if self._controller is None or self._filter_manager is None: return config = self._filter_manager._config logic: str | None if config.logic_dirty and config.logic_ast: logic = ast_to_string(config.logic_ast) else: logic = config.logic_str_original self._controller.set_processor_filters(config.name, config.filters, logic) def _on_filter_manager_finished(self) -> None: if self._processor_parameter_editor is None or self._editor_stack is None: return self._editor_stack.setCurrentWidget(self._processor_parameter_editor) if self._filter_manager: self._editor_stack.removeWidget(self._filter_manager) self._filter_manager.deleteLater() self._filter_manager = None self.statusBar().showMessage('Returned to processor parameters.')
[docs] def _start_plugin_loading(self) -> None: """Trigger the controller to load plugins in the background.""" if self._controller is None: return if self._plugin_load_worker is not None: return if self._controller.plugins is not None: return self.statusBar().showMessage('Loading available plugins...') worker = Worker(self._controller.load_plugins) worker.connect( finished=self._on_plugins_loaded, error=self._on_plugin_load_failure, ) self._plugin_load_worker = worker QThreadPool.globalInstance().start(worker)
[docs] def _on_plugins_loaded(self, _: object) -> None: """Handle successful plugin loading and refresh the UI.""" self._plugin_load_worker = None self.statusBar().showMessage('Plugin loading complete.') self._refresh_metadata_summary() self._refresh_ui_editor()
[docs] def _on_plugin_load_failure(self, exc: Exception) -> None: """Report plugin loading failures to the user and refresh the UI.""" self._plugin_load_worker = None self.statusBar().showMessage('Failed to load plugins.') QMessageBox.critical(self, 'Plugin loading failed', str(exc)) self._refresh_metadata_summary() self._refresh_ui_editor()
[docs] def _on_dirty_changed(self, dirty: bool) -> None: """Update the window title when the controller dirty state changes.""" self._refresh_title_from_controller(dirty)
[docs] def _on_validation_level_changed(self, level: ValidationLevel | None) -> None: """Tell the controller about the user-selected validation level.""" if self._controller is None: return self._controller.set_validation_level(level)
[docs] def _on_database_configuration_changed(self, data: dict[str, Any]) -> None: """Commit every database widget change back to the controller.""" if self._controller is None: return # I do not understand how 'enabled' could be not found in the data enabled = data.get('enabled', True) currently_enabled = self._controller.is_db_configuration_enabled() # this is to avoid useless changes. if enabled and not currently_enabled: self._controller.enable_db_configuration() elif not enabled and currently_enabled: self._controller.disable_db_configuration() self._controller.set_db_url(data.get('url')) pragmas = data.get('pragmas') or {} self._controller.set_db_pragmas(pragmas) for attribute in ('user', 'read_default_file'): value = data.get(attribute) if isinstance(value, str): value = value.strip() or None self._controller.set_db_attribute(attribute, value)
[docs] def _on_ui_interface_changed(self, interface: str) -> None: """Tell the controller to use the selected UI interface.""" if self._controller is None: return self._controller.set_ui_interface(interface)
[docs] def _on_test_connection_requested(self, config: dict[str, Any]) -> None: """Start an asynchronous database connection test and update the UI.""" if self._controller is None: self.statusBar().showMessage('No controller configured for testing.') return if self._database_editor is None: self.statusBar().showMessage('Database editor is not ready.') return if self._connection_test_worker is not None: self.statusBar().showMessage('Database connection test already running.') return self.statusBar().showMessage('Testing DB connection...') task = self._controller.build_connection_test_task(config) self._start_connection_test_worker(task)
[docs] def _start_connection_test_worker(self, task: Callable[[], bool]) -> None: """Start the worker via a thread pool and bind completion callbacks.""" worker = Worker(task) worker.connect( finished=self._on_connection_test_success, error=self._on_connection_test_failure, ) self._connection_test_worker = worker self._set_test_button_enabled(False) QThreadPool.globalInstance().start(worker)
[docs] def _on_connection_test_success(self, _: object) -> None: """Report a successful connection and reset the UI state.""" self.statusBar().showMessage('Database connection successful.') self._finalize_connection_test() QMessageBox.information(self, 'Connection successful', 'The database connection test succeeded.')
[docs] def _on_connection_test_failure(self, exc: Exception) -> None: """Report a failure with the provided exception message.""" self.statusBar().showMessage('Database connection failed.') self._finalize_connection_test() QMessageBox.critical(self, 'Connection failed', str(exc))
[docs] def _finalize_connection_test(self, result: object | None = None) -> None: """Reset worker references and re-enable the test button.""" self._connection_test_worker = None self._set_test_button_enabled(True)
[docs] def _set_test_button_enabled(self, enabled: bool) -> None: """Toggle the database editor test button when the feature is active.""" if self._database_editor is not None: self._database_editor.test_connection_button.setEnabled( enabled and self._database_editor.is_enabled(), )
[docs] @staticmethod def _normalize_text(value: str) -> str | None: """Treat empty or whitespace-only strings as unset.""" normalized = value.strip() return normalized if normalized else None
[docs] def _refresh_title_from_controller(self, dirty: bool | None = None) -> None: """Compose the window title with the associated file and dirty marker.""" title = 'MAFw Steering GUI' if self._controller is not None: file = self._controller.current_file() else: file = None if file is not None: title += f' — {file.name}' else: title += ' — Untitled' if dirty is None: dirty = self._controller.dirty if self._controller is not None else False if dirty: title += ' *' self.setWindowTitle(title)
[docs] def _new_file(self) -> None: """Reset the UI state in preparation for a new steering file.""" if self._controller is None: self.statusBar().showMessage('No controller configured for New.') return self._controller.reset() self._populate_from_model() self._refresh_title_from_controller() self.statusBar().showMessage('Prepared new steering file (layout only).')
[docs] def _open_file(self) -> None: """Stub that requests a filename without yet loading it.""" path, _ = QFileDialog.getOpenFileName(self, 'Open Steering File', '', 'TOML Files (*.toml)') if not path: self.statusBar().showMessage('Open cancelled.') return self._load_file(Path(path))
[docs] def _load_file(self, path: Path) -> None: """Load the steering file through the controller and refresh the UI.""" if self._controller is None: self.statusBar().showMessage('No controller configured for loading files.') return try: self._controller.load(path) except MAFwException as exc: QMessageBox.critical(self, 'Failed to load steering file', str(exc)) self.statusBar().showMessage('Failed to load steering file.') return self._populate_from_model() self._refresh_title_from_controller()
[docs] def _save_file(self) -> None: """Action handler that keeps the QAction signature while delegating to DoSave.""" self._perform_save()
[docs] def _perform_save(self) -> bool: """Persist the current builder to its tracked path (or prompt Save As).""" # Guard: ensure a controller exists to handle the save operation if self._controller is None: self.statusBar().showMessage('No controller configured for Save.') return False # Branch: no file currently tracked, so delegate to Save As to obtain a path if self._controller.current_file() is None: return self._perform_save_as() # Retrieve the user-selected validation level; if set, validate the steering file level = self._controller.validation_level() issues: list[ValidationIssue] = [] if level is not None: issues = self._controller.validate(level) # Branch: validation produced issues that require user attention skip_validation = False if issues: # Show a dialog listing all issues; user can accept (proceed) or reject (cancel) dialog = ValidationIssuesDialog(issues, self) if dialog.exec() != QDialog.DialogCode.Accepted: return False # User chose to proceed despite issues; skip re-validation during save skip_validation = True # Attempt the actual save, trapping any MAFw-level exceptions try: self._controller.save(skip_validation=skip_validation) except MAFwException as exc: # Branch: save failed (e.g., I/O error, permission issue, schema violation) QMessageBox.critical(self, 'Failed to save steering file', str(exc)) self.statusBar().showMessage('Failed to save steering file.') return False # Branch: save succeeded; extract the filename for the status message file = self._controller.current_file() name = file.name if file is not None else 'Untitled' self._refresh_title_from_controller() self.statusBar().showMessage(f'Saved "{name}".') return True
[docs] def _save_file_as(self) -> None: """Action handler for Save As that satisfies the QAction signature.""" self._perform_save_as()
[docs] def _perform_save_as(self) -> bool: """Prompt the user for a location and persist the current builder there.""" # Guard: ensure a controller exists to handle the save operation if self._controller is None: self.statusBar().showMessage('No controller configured for Save As.') return False # Determine default path for the save dialog: use current file path if exists, otherwise use default name current = self._controller.current_file() default_path = str(current) if current is not None else _DEFAULT_SAVE_NAME # Branch: user cancelled the file dialog; abort the save operation path, _ = QFileDialog.getSaveFileName(self, 'Save Steering File As', default_path, 'TOML Files (*.toml)') if not path: self.statusBar().showMessage('Save As cancelled.') return False # Retrieve the user-selected validation level; if set, validate the steering file level = self._controller.validation_level() issues: list[ValidationIssue] = [] if level is not None: issues = self._controller.validate(level) # Branch: validation produced issues that require user attention skip_validation = False if issues: # Show a dialog listing all issues; user can accept (proceed) or reject (cancel) dialog = ValidationIssuesDialog(issues, self) if dialog.exec() != QDialog.DialogCode.Accepted: return False # User chose to proceed despite issues; skip re-validation during save skip_validation = True # Attempt the save-as operation, trapping any MAFw-level exceptions try: self._controller.save_as(Path(path), skip_validation=skip_validation) except MAFwException as exc: # Branch: save failed (e.g., I/O error, permission issue, schema violation) QMessageBox.critical(self, 'Failed to save steering file', str(exc)) self.statusBar().showMessage('Failed to save steering file.') return False self._refresh_title_from_controller() self.statusBar().showMessage(f'Saved "{Path(path).name}".') return True
[docs] def _exit_app(self) -> None: """Close the window after ensuring dirty state obeys the user.""" self.close()
[docs] def _undo(self) -> None: """Placeholder undo action until controller wiring occurs.""" self.statusBar().showMessage('Undo not yet available in this prototype.')
[docs] def _redo(self) -> None: """Placeholder redo action until controller wiring occurs.""" self.statusBar().showMessage('Redo not yet available in this prototype.')
[docs] def _show_about(self) -> None: """Display the About dialog describing the GUI.""" AboutDialog(self).exec()
[docs] def _open_steering_text_editor(self) -> None: """Open the manual TOML editor for advanced steering edits.""" if self._controller is None: self.statusBar().showMessage('No controller configured for manual editing.') return # the parent of the steering dialog = SteeringTextEditor(self._controller, None) dialog.builder_applied.connect(self._populate_from_model) dialog.exec()
[docs] def closeEvent(self, event: QCloseEvent) -> None: """Prompt the user to save if the current state is dirty.""" is_dirty = self._controller.dirty if self._controller is not None else False if is_dirty and not self._confirm_discard(): event.ignore() return super().closeEvent(event) self._save_window_geometry()
[docs] def _confirm_discard(self) -> bool: """Ask whether the user wants to save/discard/cancel when the document is dirty.""" prompt = QMessageBox(self) prompt.setWindowTitle('Unsaved Steering File') prompt.setText('Save your changes before closing?') dialog_buttons = ( QMessageBox.StandardButton.Save | QMessageBox.StandardButton.Discard | QMessageBox.StandardButton.Cancel ) prompt.setStandardButtons(dialog_buttons) prompt.setDefaultButton(QMessageBox.StandardButton.Save) response = prompt.exec() if response == QMessageBox.StandardButton.Save: if not self._perform_save(): return False return not (self._controller and self._controller.dirty) if response == QMessageBox.StandardButton.Discard: return True return False
[docs] def _restore_window_geometry(self) -> None: """Restore the last-window size and position when available.""" geometry = self._settings.value('geometry') if isinstance(geometry, QByteArray): self.restoreGeometry(geometry)
[docs] def _save_window_geometry(self) -> None: """Persist the current geometry before exit.""" self._settings.setValue('geometry', self.saveGeometry())