Coverage for src / mafw / runner.py: 100%
81 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Provides a container to run configurable and modular analytical tasks.
6"""
8import logging
9from pathlib import Path
10from typing import TYPE_CHECKING, Any
12from mafw import mafw_errors
13from mafw.enumerators import ProcessorExitStatus
14from mafw.mafw_errors import UnknownProcessorGroup
15from mafw.plugin_manager import LoadedPlugins, MAFwPluginManager, get_plugin_manager
16from mafw.processor import ProcessorList
17from mafw.tools import toml_tools
18from mafw.tools.regexp import parse_processor_name
19from mafw.ui.abstract_user_interface import UserInterfaceBase
21log = logging.getLogger(__name__)
24class MAFwApplication:
25 """
26 The MAFw Application.
28 This class takes care of reading a steering file and from the information retrieved construct a
29 :class:`~mafw.processor.ProcessorList` from the processor listed there and execute it.
31 It is very practical because any combination of processors can be run without having to write dedicated scripts
32 but simply modifying a steering file.
34 The application will search for processors not only among the ones available in the MAFw library, but also in all
35 other packages exposing processors via the :ref:`plugin mechanism <plugins>`.
37 All parameters in the constructor are optional.
39 An instance can be created also without the `steering_file`, but such an instance cannot be executed. The steering
40 file can be provided in a later stage via the :meth:`init` method or directly to the :meth:`run` method.
42 The user interface can be either provided directly in the constructor, or it will be taken from the steering
43 file. In the worst case, the fallback :class:`~mafw.ui.console_user_interface.ConsoleInterface` will be used.
45 The plugin manager, if not provided, the global plugin manager will be retrieved from the
46 :func:`~mafw.plugin_manager.get_plugin_manager`.
48 A simple example is provided here below:
50 .. code-block:: python
51 :name: MAFwApplication_run
52 :caption: Creation and execution of a MAFwApplication
54 import logging
55 from pathlib import Path
57 from mafw.runner import MAFwApplication
59 log = logging.getLogger(__name__)
61 # put here your steering file
62 steering_file = Path('path_to_my_steering_file.toml')
64 try:
65 # create the app
66 app = MAFwApplication(steering_file)
68 # run it!
69 app.run()
71 except Exception as e:
72 log.error('An error occurred!')
73 log.exception(e)
74 """
76 __default_ui__: str = 'rich'
78 def __init__(
79 self,
80 steering_file: Path | str | None = None,
81 user_interface: UserInterfaceBase | type[UserInterfaceBase] | str | None = None,
82 plugin_manager: MAFwPluginManager | None = None,
83 ):
84 """
85 Constructor parameters:
87 :param steering_file: The path to the steering file.
88 :type steering_file: Path | str, Optional
89 :param user_interface: The user interface to be used by the application.
90 :type user_interface: UserInterfaceBase | type[UserInterfaceBase] | str, Optional
91 :param plugin_manager: The plugin manager.
92 :type plugin_manager: PluginManager, Optional
93 """
94 #: the name of the application instance
95 self.name = self.__class__.__name__
96 self._configuration_dict: dict[str, Any] = {}
98 #: the plugin manager of the application instance
99 self.plugin_manager = plugin_manager or get_plugin_manager()
101 #: the exit status of the application
102 self.exit_status = ProcessorExitStatus.Successful
104 self.user_interface: UserInterfaceBase | None
105 if user_interface is not None:
106 if isinstance(user_interface, UserInterfaceBase):
107 self.user_interface = user_interface
108 else: # if isinstance(user_interface, str):
109 if TYPE_CHECKING:
110 assert isinstance(user_interface, str)
111 self.user_interface = self.get_user_interface(user_interface)
112 else:
113 self.user_interface = None
115 if steering_file is None:
116 self._initialized = False
117 self.steering_file = None
118 else:
119 self.steering_file = steering_file if isinstance(steering_file, Path) else Path(steering_file)
120 self.init(self.steering_file)
122 def get_user_interface(self, user_interface: str) -> UserInterfaceBase:
123 """
124 Retrieves the user interface from the plugin managers.
126 User interfaces are exposed via the plugin manager.
128 If the requested `user_interface` is not available, then the fallback console interface is used.
130 :param user_interface: The name of the user interface to be used. Normally rich or console.
131 :type user_interface: str
132 """
133 plugins = self.plugin_manager.load_plugins({'ui'})
134 if user_interface in plugins.ui_dict:
135 ui_type = plugins.ui_dict[user_interface]
136 else:
137 log.warning('User interface %s is not available. Using console.' % user_interface)
138 ui_type = plugins.ui_dict['console']
139 return ui_type()
141 def run(self, steering_file: Path | str | None = None) -> ProcessorExitStatus:
142 """
143 Runs the application.
145 This method builds the :class:`~mafw.processor.ProcessorList` with the processors listed in the steering file
146 and launches its execution.
148 A steering file can be provided at this stage if it was not done before.
150 .. versionchanged:: v2.0.0
151 Refactor to accept replica names in the processor to run list.
153 :param steering_file: The steering file. Defaults to None.
154 :type steering_file: Path | str, Optional
155 :raises RunnerNotInitialized: if the application has not been initialized. Very likely a steering file was
156 never provided.
157 :raises UnknownProcessor: if a processor listed in the steering file is not available in the plugin library.
158 """
159 if steering_file is None and not self._initialized:
160 log.error('%s is not initialized. Have you provided a steering file?' % self.name)
161 raise mafw_errors.RunnerNotInitialized()
163 if steering_file is not None and steering_file != self.steering_file:
164 self.init(steering_file)
166 plugins = self.plugin_manager.load_plugins({'processors', 'db_modules'})
168 description = self._configuration_dict.get('analysis_description', None)
169 processors_to_run = self._configuration_dict.get('processors_to_run', [])
171 # build the possibly nested processor list
172 processor_list = self._expand_processors_to_run(processors_to_run, self._configuration_dict, plugins)
173 processor_list.name = self.name
174 processor_list.description = description
175 return processor_list.execute()
177 def _expand_processors_to_run(
178 self, items: list[str], config: dict[str, Any], plugins: LoadedPlugins, seen: set[str] | None = None
179 ) -> ProcessorList:
180 """
181 Constructs a :class:`.ProcessorList` from a list of processor names or groups.
183 This method recursively expands a list of processor names or group definitions into a `ProcessorList` object,
184 which can be executed. It handles three cases: direct processor class matches, group definitions in the configuration,
185 and unknown entries.
187 .. versionadded:: v2.0.0
189 :param items: A list of processor names or group identifiers to be expanded.
190 :type items: list[str]
191 :param config: The configuration dictionary containing processor and group definitions.
192 :type config: dict[str, Any]
193 :param plugins: The loaded plugins containing available processors and groups.
194 :type plugins: LoadedPlugins
195 :param seen: A set of group names that have already been processed to detect cyclic definitions.
196 :type seen: set[str] | None
197 :return: A `ProcessorList` containing the expanded processors ready for execution.
198 :rtype: ProcessorList
199 :raises UnknownProcessorGroup: If a group is defined without processors or if a cyclic group definition is detected.
200 :raises UnknownProcessor: If a processor name is not recognized in the plugin library.
201 """
203 seen = seen or set()
205 pl = ProcessorList(
206 name=f'{self.name}_group',
207 description=None,
208 user_interface=self.user_interface,
209 database_conf=config.get('DBConfiguration', None),
210 create_standard_tables=config.get('create_standard_tables', True),
211 )
213 for entry in items:
214 proc_name, replica = parse_processor_name(entry)
216 # Case 1: entry matches a real processor class
217 if proc_name in plugins.processor_dict:
218 pl.append(plugins.processor_dict[proc_name](config=config, replica_id=replica))
219 continue
221 # Case 2: entry is a group defined in the TOML
222 if entry in config and isinstance(config[entry], dict):
223 group = config[entry]
224 if 'processors_to_run' not in group:
225 raise UnknownProcessorGroup(f"Group '{entry}' exists but has no processors_to_run field.")
227 if entry in seen:
228 raise UnknownProcessorGroup(f"Cyclic group definition detected at '{entry}'.")
230 seen.add(entry)
231 sub_list = self._expand_processors_to_run(group['processors_to_run'], config, plugins, seen)
232 sub_list.name = entry
233 sub_list.description = group.get('description', entry)
234 pl.append(sub_list)
235 continue
237 # Case 3: completely unknown
238 raise mafw_errors.UnknownProcessor(entry)
240 return pl
242 def init(self, steering_file: Path | str) -> None:
243 """
244 Initializes the application.
246 This method is normally automatically invoked by the class constructor.
247 It can be called in a later moment to force the parsing of the provided steering file.
249 :param steering_file: The path to the steering file.
250 :type steering_file: Path | str
251 """
252 if isinstance(steering_file, str):
253 steering_file = Path(steering_file)
255 self.steering_file = steering_file
256 self._configuration_dict = toml_tools.load_steering_file(steering_file)
258 if self.user_interface is None:
259 self.user_interface = self.get_user_interface(self._configuration_dict['UserInterface']['interface'])
261 self._initialized = True
262 self.name = self._configuration_dict.get('analysis_name', 'mafw analysis')