# Copyright 2025 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Provides a container to run configurable and modular analytical tasks.
"""
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any
from mafw import mafw_errors
from mafw.enumerators import ProcessorExitStatus
from mafw.mafw_errors import UnknownProcessorGroup
from mafw.plugin_manager import LoadedPlugins, MAFwPluginManager, get_plugin_manager
from mafw.processor import ProcessorList
from mafw.tools import toml_tools
from mafw.tools.regexp import parse_processor_name
from mafw.ui.abstract_user_interface import UserInterfaceBase
log = logging.getLogger(__name__)
[docs]
class MAFwApplication:
"""
The MAFw Application.
This class takes care of reading a steering file and from the information retrieved construct a
:class:`~mafw.processor.ProcessorList` from the processor listed there and execute it.
It is very practical because any combination of processors can be run without having to write dedicated scripts
but simply modifying a steering file.
The application will search for processors not only among the ones available in the MAFw library, but also in all
other packages exposing processors via the :ref:`plugin mechanism <plugins>`.
All parameters in the constructor are optional.
An instance can be created also without the `steering_file`, but such an instance cannot be executed. The steering
file can be provided in a later stage via the :meth:`init` method or directly to the :meth:`run` method.
The user interface can be either provided directly in the constructor, or it will be taken from the steering
file. In the worst case, the fallback :class:`~mafw.ui.console_user_interface.ConsoleInterface` will be used.
The plugin manager, if not provided, the global plugin manager will be retrieved from the
:func:`~mafw.plugin_manager.get_plugin_manager`.
A simple example is provided here below:
.. code-block:: python
:name: MAFwApplication_run
:caption: Creation and execution of a MAFwApplication
import logging
from pathlib import Path
from mafw.runner import MAFwApplication
log = logging.getLogger(__name__)
# put here your steering file
steering_file = Path('path_to_my_steering_file.toml')
try:
# create the app
app = MAFwApplication(steering_file)
# run it!
app.run()
except Exception as e:
log.error('An error occurred!')
log.exception(e)
"""
__default_ui__: str = 'rich'
def __init__(
self,
steering_file: Path | str | None = None,
user_interface: UserInterfaceBase | type[UserInterfaceBase] | str | None = None,
plugin_manager: MAFwPluginManager | None = None,
):
"""
Constructor parameters:
:param steering_file: The path to the steering file.
:type steering_file: Path | str, Optional
:param user_interface: The user interface to be used by the application.
:type user_interface: UserInterfaceBase | type[UserInterfaceBase] | str, Optional
:param plugin_manager: The plugin manager.
:type plugin_manager: PluginManager, Optional
"""
#: the name of the application instance
self.name = self.__class__.__name__
self._configuration_dict: dict[str, Any] = {}
#: the plugin manager of the application instance
self.plugin_manager = plugin_manager or get_plugin_manager()
#: the exit status of the application
self.exit_status = ProcessorExitStatus.Successful
self.user_interface: UserInterfaceBase | None
if user_interface is not None:
if isinstance(user_interface, UserInterfaceBase):
self.user_interface = user_interface
else: # if isinstance(user_interface, str):
if TYPE_CHECKING:
assert isinstance(user_interface, str)
self.user_interface = self.get_user_interface(user_interface)
else:
self.user_interface = None
if steering_file is None:
self._initialized = False
self.steering_file = None
else:
self.steering_file = steering_file if isinstance(steering_file, Path) else Path(steering_file)
self.init(self.steering_file)
[docs]
def get_user_interface(self, user_interface: str) -> UserInterfaceBase:
"""
Retrieves the user interface from the plugin managers.
User interfaces are exposed via the plugin manager.
If the requested `user_interface` is not available, then the fallback console interface is used.
:param user_interface: The name of the user interface to be used. Normally rich or console.
:type user_interface: str
"""
plugins = self.plugin_manager.load_plugins({'ui'})
if user_interface in plugins.ui_dict:
ui_type = plugins.ui_dict[user_interface]
else:
log.warning('User interface %s is not available. Using console.' % user_interface)
ui_type = plugins.ui_dict['console']
return ui_type()
[docs]
def run(self, steering_file: Path | str | None = None) -> ProcessorExitStatus:
"""
Runs the application.
This method builds the :class:`~mafw.processor.ProcessorList` with the processors listed in the steering file
and launches its execution.
A steering file can be provided at this stage if it was not done before.
.. versionchanged:: v2.0.0
Refactor to accept replica names in the processor to run list.
:param steering_file: The steering file. Defaults to None.
:type steering_file: Path | str, Optional
:raises RunnerNotInitialized: if the application has not been initialized. Very likely a steering file was
never provided.
:raises UnknownProcessor: if a processor listed in the steering file is not available in the plugin library.
"""
if steering_file is None and not self._initialized:
log.error('%s is not initialized. Have you provided a steering file?' % self.name)
raise mafw_errors.RunnerNotInitialized()
if steering_file is not None and steering_file != self.steering_file:
self.init(steering_file)
plugins = self.plugin_manager.load_plugins({'processors', 'db_modules'})
description = self._configuration_dict.get('analysis_description', None)
processors_to_run = self._configuration_dict.get('processors_to_run', [])
# build the possibly nested processor list
processor_list = self._expand_processors_to_run(processors_to_run, self._configuration_dict, plugins)
processor_list.name = self.name
processor_list.description = description
return processor_list.execute()
[docs]
def _expand_processors_to_run(
self, items: list[str], config: dict[str, Any], plugins: LoadedPlugins, seen: set[str] | None = None
) -> ProcessorList:
"""
Constructs a :class:`.ProcessorList` from a list of processor names or groups.
This method recursively expands a list of processor names or group definitions into a `ProcessorList` object,
which can be executed. It handles three cases: direct processor class matches, group definitions in the configuration,
and unknown entries.
.. versionadded:: v2.0.0
:param items: A list of processor names or group identifiers to be expanded.
:type items: list[str]
:param config: The configuration dictionary containing processor and group definitions.
:type config: dict[str, Any]
:param plugins: The loaded plugins containing available processors and groups.
:type plugins: LoadedPlugins
:param seen: A set of group names that have already been processed to detect cyclic definitions.
:type seen: set[str] | None
:return: A `ProcessorList` containing the expanded processors ready for execution.
:rtype: ProcessorList
:raises UnknownProcessorGroup: If a group is defined without processors or if a cyclic group definition is detected.
:raises UnknownProcessor: If a processor name is not recognized in the plugin library.
"""
seen = seen or set()
pl = ProcessorList(
name=f'{self.name}_group',
description=None,
user_interface=self.user_interface,
database_conf=config.get('DBConfiguration', None),
create_standard_tables=config.get('create_standard_tables', True),
)
for entry in items:
proc_name, replica = parse_processor_name(entry)
# Case 1: entry matches a real processor class
if proc_name in plugins.processor_dict:
pl.append(plugins.processor_dict[proc_name](config=config, replica_id=replica))
continue
# Case 2: entry is a group defined in the TOML
if entry in config and isinstance(config[entry], dict):
group = config[entry]
if 'processors_to_run' not in group:
raise UnknownProcessorGroup(f"Group '{entry}' exists but has no processors_to_run field.")
if entry in seen:
raise UnknownProcessorGroup(f"Cyclic group definition detected at '{entry}'.")
seen.add(entry)
sub_list = self._expand_processors_to_run(group['processors_to_run'], config, plugins, seen)
sub_list.name = entry
sub_list.description = group.get('description', entry)
pl.append(sub_list)
continue
# Case 3: completely unknown
raise mafw_errors.UnknownProcessor(entry)
return pl
[docs]
def init(self, steering_file: Path | str) -> None:
"""
Initializes the application.
This method is normally automatically invoked by the class constructor.
It can be called in a later moment to force the parsing of the provided steering file.
:param steering_file: The path to the steering file.
:type steering_file: Path | str
"""
if isinstance(steering_file, str):
steering_file = Path(steering_file)
self.steering_file = steering_file
self._configuration_dict = toml_tools.load_steering_file(steering_file)
if self.user_interface is None:
self.user_interface = self.get_user_interface(self._configuration_dict['UserInterface']['interface'])
self._initialized = True
self.name = self._configuration_dict.get('analysis_name', 'mafw analysis')