Source code for mafw.runner

#  Copyright 2025 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""
Provides a container to run configurable and modular analytical tasks.
"""

import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any

from mafw import mafw_errors
from mafw.enumerators import ProcessorExitStatus
from mafw.mafw_errors import UnknownProcessorGroup
from mafw.plugin_manager import LoadedPlugins, MAFwPluginManager, get_plugin_manager
from mafw.processor import ProcessorList
from mafw.tools import toml_tools
from mafw.tools.regexp import parse_processor_name
from mafw.ui.abstract_user_interface import UserInterfaceBase

log = logging.getLogger(__name__)


[docs] class MAFwApplication: """ The MAFw Application. This class takes care of reading a steering file and from the information retrieved construct a :class:`~mafw.processor.ProcessorList` from the processor listed there and execute it. It is very practical because any combination of processors can be run without having to write dedicated scripts but simply modifying a steering file. The application will search for processors not only among the ones available in the MAFw library, but also in all other packages exposing processors via the :ref:`plugin mechanism <plugins>`. All parameters in the constructor are optional. An instance can be created also without the `steering_file`, but such an instance cannot be executed. The steering file can be provided in a later stage via the :meth:`init` method or directly to the :meth:`run` method. The user interface can be either provided directly in the constructor, or it will be taken from the steering file. In the worst case, the fallback :class:`~mafw.ui.console_user_interface.ConsoleInterface` will be used. The plugin manager, if not provided, the global plugin manager will be retrieved from the :func:`~mafw.plugin_manager.get_plugin_manager`. A simple example is provided here below: .. code-block:: python :name: MAFwApplication_run :caption: Creation and execution of a MAFwApplication import logging from pathlib import Path from mafw.runner import MAFwApplication log = logging.getLogger(__name__) # put here your steering file steering_file = Path('path_to_my_steering_file.toml') try: # create the app app = MAFwApplication(steering_file) # run it! app.run() except Exception as e: log.error('An error occurred!') log.exception(e) """ __default_ui__: str = 'rich' def __init__( self, steering_file: Path | str | None = None, user_interface: UserInterfaceBase | type[UserInterfaceBase] | str | None = None, plugin_manager: MAFwPluginManager | None = None, ): """ Constructor parameters: :param steering_file: The path to the steering file. :type steering_file: Path | str, Optional :param user_interface: The user interface to be used by the application. :type user_interface: UserInterfaceBase | type[UserInterfaceBase] | str, Optional :param plugin_manager: The plugin manager. :type plugin_manager: PluginManager, Optional """ #: the name of the application instance self.name = self.__class__.__name__ self._configuration_dict: dict[str, Any] = {} #: the plugin manager of the application instance self.plugin_manager = plugin_manager or get_plugin_manager() #: the exit status of the application self.exit_status = ProcessorExitStatus.Successful self.user_interface: UserInterfaceBase | None if user_interface is not None: if isinstance(user_interface, UserInterfaceBase): self.user_interface = user_interface else: # if isinstance(user_interface, str): if TYPE_CHECKING: assert isinstance(user_interface, str) self.user_interface = self.get_user_interface(user_interface) else: self.user_interface = None if steering_file is None: self._initialized = False self.steering_file = None else: self.steering_file = steering_file if isinstance(steering_file, Path) else Path(steering_file) self.init(self.steering_file)
[docs] def get_user_interface(self, user_interface: str) -> UserInterfaceBase: """ Retrieves the user interface from the plugin managers. User interfaces are exposed via the plugin manager. If the requested `user_interface` is not available, then the fallback console interface is used. :param user_interface: The name of the user interface to be used. Normally rich or console. :type user_interface: str """ plugins = self.plugin_manager.load_plugins({'ui'}) if user_interface in plugins.ui_dict: ui_type = plugins.ui_dict[user_interface] else: log.warning('User interface %s is not available. Using console.' % user_interface) ui_type = plugins.ui_dict['console'] return ui_type()
[docs] def run(self, steering_file: Path | str | None = None) -> ProcessorExitStatus: """ Runs the application. This method builds the :class:`~mafw.processor.ProcessorList` with the processors listed in the steering file and launches its execution. A steering file can be provided at this stage if it was not done before. .. versionchanged:: v2.0.0 Refactor to accept replica names in the processor to run list. :param steering_file: The steering file. Defaults to None. :type steering_file: Path | str, Optional :raises RunnerNotInitialized: if the application has not been initialized. Very likely a steering file was never provided. :raises UnknownProcessor: if a processor listed in the steering file is not available in the plugin library. """ if steering_file is None and not self._initialized: log.error('%s is not initialized. Have you provided a steering file?' % self.name) raise mafw_errors.RunnerNotInitialized() if steering_file is not None and steering_file != self.steering_file: self.init(steering_file) plugins = self.plugin_manager.load_plugins({'processors', 'db_modules'}) description = self._configuration_dict.get('analysis_description', None) processors_to_run = self._configuration_dict.get('processors_to_run', []) # build the possibly nested processor list processor_list = self._expand_processors_to_run(processors_to_run, self._configuration_dict, plugins) processor_list.name = self.name processor_list.description = description return processor_list.execute()
[docs] def _expand_processors_to_run( self, items: list[str], config: dict[str, Any], plugins: LoadedPlugins, seen: set[str] | None = None ) -> ProcessorList: """ Constructs a :class:`.ProcessorList` from a list of processor names or groups. This method recursively expands a list of processor names or group definitions into a `ProcessorList` object, which can be executed. It handles three cases: direct processor class matches, group definitions in the configuration, and unknown entries. .. versionadded:: v2.0.0 :param items: A list of processor names or group identifiers to be expanded. :type items: list[str] :param config: The configuration dictionary containing processor and group definitions. :type config: dict[str, Any] :param plugins: The loaded plugins containing available processors and groups. :type plugins: LoadedPlugins :param seen: A set of group names that have already been processed to detect cyclic definitions. :type seen: set[str] | None :return: A `ProcessorList` containing the expanded processors ready for execution. :rtype: ProcessorList :raises UnknownProcessorGroup: If a group is defined without processors or if a cyclic group definition is detected. :raises UnknownProcessor: If a processor name is not recognized in the plugin library. """ seen = seen or set() pl = ProcessorList( name=f'{self.name}_group', description=None, user_interface=self.user_interface, database_conf=config.get('DBConfiguration', None), create_standard_tables=config.get('create_standard_tables', True), ) for entry in items: proc_name, replica = parse_processor_name(entry) # Case 1: entry matches a real processor class if proc_name in plugins.processor_dict: pl.append(plugins.processor_dict[proc_name](config=config, replica_id=replica)) continue # Case 2: entry is a group defined in the TOML if entry in config and isinstance(config[entry], dict): group = config[entry] if 'processors_to_run' not in group: raise UnknownProcessorGroup(f"Group '{entry}' exists but has no processors_to_run field.") if entry in seen: raise UnknownProcessorGroup(f"Cyclic group definition detected at '{entry}'.") seen.add(entry) sub_list = self._expand_processors_to_run(group['processors_to_run'], config, plugins, seen) sub_list.name = entry sub_list.description = group.get('description', entry) pl.append(sub_list) continue # Case 3: completely unknown raise mafw_errors.UnknownProcessor(entry) return pl
[docs] def init(self, steering_file: Path | str) -> None: """ Initializes the application. This method is normally automatically invoked by the class constructor. It can be called in a later moment to force the parsing of the provided steering file. :param steering_file: The path to the steering file. :type steering_file: Path | str """ if isinstance(steering_file, str): steering_file = Path(steering_file) self.steering_file = steering_file self._configuration_dict = toml_tools.load_steering_file(steering_file) if self.user_interface is None: self.user_interface = self.get_user_interface(self._configuration_dict['UserInterface']['interface']) self._initialized = True self.name = self._configuration_dict.get('analysis_name', 'mafw analysis')