Source code for mafw.steering_gui.threading.workers

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""Threading helpers for future steering GUI background tasks.

:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
:Description: Define a reusable worker QObject that can execute callables without blocking the UI.
"""

from __future__ import annotations

import inspect
from typing import Any, Callable, Generic, TypeVar

from PySide6.QtCore import QObject, QRunnable, Signal, Slot

T = TypeVar('T')
"""Type variable for the return type of the worker's callable."""


[docs] class WorkerSignals(QObject, Generic[T]): """Bundle of signals emitted by a worker.""" finished: Signal = Signal(object) error: Signal = Signal(Exception) progress: Signal = Signal(int)
[docs] class Worker(QRunnable, Generic[T]): """Run a callable in a thread pool while exposing progress, error, and completion signals.""" def __init__(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> None: super().__init__() self.signals = WorkerSignals[T]() self._fn = fn self._args = args self._kwargs = kwargs self.setAutoDelete(True)
[docs] def connect( self, *, finished: Callable[[T], None] | None = None, error: Callable[[Exception], None] | None = None, progress: Callable[[int], None] | None = None, ) -> None: """Attach callbacks to the worker signals.""" if finished is not None: self.signals.finished.connect(finished) if error is not None: self.signals.error.connect(error) if progress is not None: self.signals.progress.connect(progress)
[docs] @Slot() def run(self) -> None: """Execute the callable and propagate its result or any exception.""" try: sig = inspect.signature(self._fn) if 'progress_callback' in sig.parameters: result = self._fn( *self._args, progress_callback=self.signals.progress.emit, **self._kwargs, ) else: result = self._fn(*self._args, **self._kwargs) except Exception as exc: # pragma: no cover - GUI entrypoint; interactive testing expected self.signals.error.emit(exc) else: self.signals.finished.emit(result)