Coverage for src / mafw / runner.py: 100%
80 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Provides a container to run configurable and modular analytical tasks.
6"""
8import logging
9from pathlib import Path
10from typing import TYPE_CHECKING, Any
12from mafw import mafw_errors
13from mafw.enumerators import ProcessorExitStatus
14from mafw.mafw_errors import UnknownProcessorGroup
15from mafw.plugin_manager import LoadedPlugins, MAFwPluginManager, get_plugin_manager
16from mafw.processor import ProcessorList
17from mafw.tools import toml_tools
18from mafw.tools.regexp import parse_processor_name
19from mafw.ui.abstract_user_interface import UserInterfaceBase
21log = logging.getLogger(__name__)
24class MAFwApplication:
25 """
26 The MAFw Application.
28 This class takes care of reading a steering file and from the information retrieved construct a
29 :class:`~mafw.processor.ProcessorList` from the processor listed there and execute it.
31 It is very practical because any combination of processors can be run without having to write dedicated scripts
32 but simply modifying a steering file.
34 The application will search for processors not only among the ones available in the MAFw library, but also in all
35 other packages exposing processors via the :ref:`plugin mechanism <plugins>`.
37 All parameters in the constructor are optional.
39 An instance can be created also without the `steering_file`, but such an instance cannot be executed. The steering
40 file can be provided in a later stage via the :meth:`init` method or directly to the :meth:`run` method.
42 The user interface can be either provided directly in the constructor, or it will be taken from the steering
43 file. In the worst case, the fallback :class:`~mafw.ui.console_user_interface.ConsoleInterface` will be used.
45 The plugin manager, if not provided, the global plugin manager will be retrieved from the
46 :func:`~mafw.plugin_manager.get_plugin_manager`.
48 A simple example is provided here below:
50 .. code-block:: python
51 :name: MAFwApplication_run
52 :caption: Creation and execution of a MAFwApplication
54 import logging
55 from pathlib import Path
57 from mafw.runner import MAFwApplication
59 log = logging.getLogger(__name__)
61 # put here your steering file
62 steering_file = Path('path_to_my_steering_file.toml')
64 try:
65 # create the app
66 app = MAFwApplication(steering_file)
68 # run it!
69 app.run()
71 except Exception as e:
72 log.error('An error occurred!')
73 log.exception(e)
74 """
76 __default_ui__: str = 'rich'
78 def __init__(
79 self,
80 steering_file: Path | str | None = None,
81 user_interface: UserInterfaceBase | type[UserInterfaceBase] | str | None = None,
82 plugin_manager: MAFwPluginManager | None = None,
83 ):
84 """
85 Constructor parameters:
87 :param steering_file: The path to the steering file.
88 :type steering_file: Path | str, Optional
89 :param user_interface: The user interface to be used by the application.
90 :type user_interface: UserInterfaceBase | type[UserInterfaceBase] | str, Optional
91 :param plugin_manager: The plugin manager.
92 :type plugin_manager: PluginManager, Optional
93 """
94 #: the name of the application instance
95 self.name = self.__class__.__name__
96 self._configuration_dict: dict[str, Any] = {}
98 #: the plugin manager of the application instance
99 self.plugin_manager = plugin_manager or get_plugin_manager()
101 #: the exit status of the application
102 self.exit_status = ProcessorExitStatus.Successful
104 # preload all plugins in one go. load_plugins will split the task on different threads
105 # making the operation still fluid.
106 self.plugins = self.plugin_manager.load_plugins({'ui', 'db_modules', 'processors'})
108 self.user_interface: UserInterfaceBase | None
109 if user_interface is not None:
110 if isinstance(user_interface, UserInterfaceBase):
111 self.user_interface = user_interface
112 else: # if isinstance(user_interface, str):
113 if TYPE_CHECKING:
114 assert isinstance(user_interface, str)
115 self.user_interface = self.get_user_interface(user_interface)
116 else:
117 self.user_interface = None
119 if steering_file is None:
120 self._initialized = False
121 self.steering_file = None
122 else:
123 self.steering_file = steering_file if isinstance(steering_file, Path) else Path(steering_file)
124 self.init(self.steering_file)
126 def get_user_interface(self, user_interface: str) -> UserInterfaceBase:
127 """
128 Retrieves the user interface from the loaded plugins.
130 User interfaces are exposed via the plugin manager.
132 If the requested `user_interface` is not available, then the fallback console interface is used.
134 :param user_interface: The name of the user interface to be used. Normally rich or console.
135 :type user_interface: str
136 """
137 if user_interface in self.plugins.ui_dict:
138 ui_type = self.plugins.ui_dict[user_interface]
139 else:
140 log.warning('User interface %s is not available. Using console.' % user_interface)
141 ui_type = self.plugins.ui_dict['console']
142 return ui_type()
144 def run(self, steering_file: Path | str | None = None) -> ProcessorExitStatus:
145 """
146 Runs the application.
148 This method builds the :class:`~mafw.processor.ProcessorList` with the processors listed in the steering file
149 and launches its execution.
151 A steering file can be provided at this stage if it was not done before.
153 .. versionchanged:: v2.0.0
154 Refactor to accept replica names in the processor to run list.
156 :param steering_file: The steering file. Defaults to None.
157 :type steering_file: Path | str, Optional
158 :raises RunnerNotInitialized: if the application has not been initialized. Very likely a steering file was
159 never provided.
160 :raises UnknownProcessor: if a processor listed in the steering file is not available in the plugin library.
161 """
162 if steering_file is None and not self._initialized:
163 log.error('%s is not initialized. Have you provided a steering file?' % self.name)
164 raise mafw_errors.RunnerNotInitialized()
166 if steering_file is not None and steering_file != self.steering_file:
167 self.init(steering_file)
169 description = self._configuration_dict.get('analysis_description', '')
170 processors_to_run = self._configuration_dict.get('processors_to_run', [])
172 # build the possibly nested processor list
173 processor_list = self._expand_processors_to_run(processors_to_run, self._configuration_dict, self.plugins)
174 processor_list.name = self.name
175 processor_list.description = description
176 return processor_list.execute()
178 def _expand_processors_to_run(
179 self, items: list[str], config: dict[str, Any], plugins: LoadedPlugins, seen: set[str] | None = None
180 ) -> ProcessorList:
181 """
182 Constructs a :class:`.ProcessorList` from a list of processor names or groups.
184 This method recursively expands a list of processor names or group definitions into a `ProcessorList` object,
185 which can be executed. It handles three cases: direct processor class matches, group definitions in the configuration,
186 and unknown entries.
188 .. versionadded:: v2.0.0
190 :param items: A list of processor names or group identifiers to be expanded.
191 :type items: list[str]
192 :param config: The configuration dictionary containing processor and group definitions.
193 :type config: dict[str, Any]
194 :param plugins: The loaded plugins containing available processors and groups.
195 :type plugins: LoadedPlugins
196 :param seen: A set of group names that have already been processed to detect cyclic definitions.
197 :type seen: set[str] | None
198 :return: A `ProcessorList` containing the expanded processors ready for execution.
199 :rtype: ProcessorList
200 :raises UnknownProcessorGroup: If a group is defined without processors or if a cyclic group definition is detected.
201 :raises UnknownProcessor: If a processor name is not recognized in the plugin library.
202 """
204 seen = seen or set()
206 pl = ProcessorList(
207 name=f'{self.name}_group',
208 description=None,
209 user_interface=self.user_interface,
210 database_conf=config.get('DBConfiguration', None),
211 create_standard_tables=config.get('create_standard_tables', True),
212 )
214 for entry in items:
215 proc_name, replica = parse_processor_name(entry)
217 # Case 1: entry matches a real processor class
218 if proc_name in plugins.processor_dict:
219 pl.append(plugins.processor_dict[proc_name](config=config, replica_id=replica))
220 continue
222 # Case 2: entry is a group defined in the TOML
223 if entry in config and isinstance(config[entry], dict):
224 group = config[entry]
225 if 'processors_to_run' not in group:
226 raise UnknownProcessorGroup(f"Group '{entry}' exists but has no processors_to_run field.")
228 if entry in seen:
229 raise UnknownProcessorGroup(f"Cyclic group definition detected at '{entry}'.")
231 seen.add(entry)
232 sub_list = self._expand_processors_to_run(group['processors_to_run'], config, plugins, seen)
233 sub_list.name = entry
234 sub_list.description = group.get('description', entry)
235 pl.append(sub_list)
236 continue
238 # Case 3: completely unknown
239 raise mafw_errors.UnknownProcessor(entry)
241 return pl
243 def init(self, steering_file: Path | str) -> None:
244 """
245 Initializes the application.
247 This method is normally automatically invoked by the class constructor.
248 It can be called in a later moment to force the parsing of the provided steering file.
250 :param steering_file: The path to the steering file.
251 :type steering_file: Path | str
252 """
253 if isinstance(steering_file, str):
254 steering_file = Path(steering_file)
256 self.steering_file = steering_file
257 self._configuration_dict = toml_tools.load_steering_file(steering_file)
259 if self.user_interface is None:
260 self.user_interface = self.get_user_interface(self._configuration_dict['UserInterface']['interface'])
262 self._initialized = True
263 self.name = self._configuration_dict.get('analysis_name', 'mafw analysis')