Coverage for src / mafw / runner.py: 100%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 16:10 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Provides a container to run configurable and modular analytical tasks. 

6""" 

7 

8import logging 

9from pathlib import Path 

10from typing import TYPE_CHECKING, Any 

11 

12from mafw import mafw_errors 

13from mafw.enumerators import ProcessorExitStatus 

14from mafw.mafw_errors import UnknownProcessorGroup 

15from mafw.plugin_manager import LoadedPlugins, MAFwPluginManager, get_plugin_manager 

16from mafw.processor import ProcessorList 

17from mafw.tools import toml_tools 

18from mafw.tools.regexp import parse_processor_name 

19from mafw.ui.abstract_user_interface import UserInterfaceBase 

20 

21log = logging.getLogger(__name__) 

22 

23 

24class MAFwApplication: 

25 """ 

26 The MAFw Application. 

27 

28 This class takes care of reading a steering file and from the information retrieved construct a 

29 :class:`~mafw.processor.ProcessorList` from the processor listed there and execute it. 

30 

31 It is very practical because any combination of processors can be run without having to write dedicated scripts 

32 but simply modifying a steering file. 

33 

34 The application will search for processors not only among the ones available in the MAFw library, but also in all 

35 other packages exposing processors via the :ref:`plugin mechanism <plugins>`. 

36 

37 All parameters in the constructor are optional. 

38 

39 An instance can be created also without the `steering_file`, but such an instance cannot be executed. The steering 

40 file can be provided in a later stage via the :meth:`init` method or directly to the :meth:`run` method. 

41 

42 The user interface can be either provided directly in the constructor, or it will be taken from the steering 

43 file. In the worst case, the fallback :class:`~mafw.ui.console_user_interface.ConsoleInterface` will be used. 

44 

45 The plugin manager, if not provided, the global plugin manager will be retrieved from the 

46 :func:`~mafw.plugin_manager.get_plugin_manager`. 

47 

48 A simple example is provided here below: 

49 

50 .. code-block:: python 

51 :name: MAFwApplication_run 

52 :caption: Creation and execution of a MAFwApplication 

53 

54 import logging 

55 from pathlib import Path 

56 

57 from mafw.runner import MAFwApplication 

58 

59 log = logging.getLogger(__name__) 

60 

61 # put here your steering file 

62 steering_file = Path('path_to_my_steering_file.toml') 

63 

64 try: 

65 # create the app 

66 app = MAFwApplication(steering_file) 

67 

68 # run it! 

69 app.run() 

70 

71 except Exception as e: 

72 log.error('An error occurred!') 

73 log.exception(e) 

74 """ 

75 

76 __default_ui__: str = 'rich' 

77 

78 def __init__( 

79 self, 

80 steering_file: Path | str | None = None, 

81 user_interface: UserInterfaceBase | type[UserInterfaceBase] | str | None = None, 

82 plugin_manager: MAFwPluginManager | None = None, 

83 ): 

84 """ 

85 Constructor parameters: 

86 

87 :param steering_file: The path to the steering file. 

88 :type steering_file: Path | str, Optional 

89 :param user_interface: The user interface to be used by the application. 

90 :type user_interface: UserInterfaceBase | type[UserInterfaceBase] | str, Optional 

91 :param plugin_manager: The plugin manager. 

92 :type plugin_manager: PluginManager, Optional 

93 """ 

94 #: the name of the application instance 

95 self.name = self.__class__.__name__ 

96 self._configuration_dict: dict[str, Any] = {} 

97 

98 #: the plugin manager of the application instance 

99 self.plugin_manager = plugin_manager or get_plugin_manager() 

100 

101 #: the exit status of the application 

102 self.exit_status = ProcessorExitStatus.Successful 

103 

104 # preload all plugins in one go. load_plugins will split the task on different threads 

105 # making the operation still fluid. 

106 self.plugins = self.plugin_manager.load_plugins({'ui', 'db_modules', 'processors'}) 

107 

108 self.user_interface: UserInterfaceBase | None 

109 if user_interface is not None: 

110 if isinstance(user_interface, UserInterfaceBase): 

111 self.user_interface = user_interface 

112 else: # if isinstance(user_interface, str): 

113 if TYPE_CHECKING: 

114 assert isinstance(user_interface, str) 

115 self.user_interface = self.get_user_interface(user_interface) 

116 else: 

117 self.user_interface = None 

118 

119 if steering_file is None: 

120 self._initialized = False 

121 self.steering_file = None 

122 else: 

123 self.steering_file = steering_file if isinstance(steering_file, Path) else Path(steering_file) 

124 self.init(self.steering_file) 

125 

126 def get_user_interface(self, user_interface: str) -> UserInterfaceBase: 

127 """ 

128 Retrieves the user interface from the loaded plugins. 

129 

130 User interfaces are exposed via the plugin manager. 

131 

132 If the requested `user_interface` is not available, then the fallback console interface is used. 

133 

134 :param user_interface: The name of the user interface to be used. Normally rich or console. 

135 :type user_interface: str 

136 """ 

137 if user_interface in self.plugins.ui_dict: 

138 ui_type = self.plugins.ui_dict[user_interface] 

139 else: 

140 log.warning('User interface %s is not available. Using console.' % user_interface) 

141 ui_type = self.plugins.ui_dict['console'] 

142 return ui_type() 

143 

144 def run(self, steering_file: Path | str | None = None) -> ProcessorExitStatus: 

145 """ 

146 Runs the application. 

147 

148 This method builds the :class:`~mafw.processor.ProcessorList` with the processors listed in the steering file 

149 and launches its execution. 

150 

151 A steering file can be provided at this stage if it was not done before. 

152 

153 .. versionchanged:: v2.0.0 

154 Refactor to accept replica names in the processor to run list. 

155 

156 :param steering_file: The steering file. Defaults to None. 

157 :type steering_file: Path | str, Optional 

158 :raises RunnerNotInitialized: if the application has not been initialized. Very likely a steering file was 

159 never provided. 

160 :raises UnknownProcessor: if a processor listed in the steering file is not available in the plugin library. 

161 """ 

162 if steering_file is None and not self._initialized: 

163 log.error('%s is not initialized. Have you provided a steering file?' % self.name) 

164 raise mafw_errors.RunnerNotInitialized() 

165 

166 if steering_file is not None and steering_file != self.steering_file: 

167 self.init(steering_file) 

168 

169 description = self._configuration_dict.get('analysis_description', '') 

170 processors_to_run = self._configuration_dict.get('processors_to_run', []) 

171 

172 # build the possibly nested processor list 

173 processor_list = self._expand_processors_to_run(processors_to_run, self._configuration_dict, self.plugins) 

174 processor_list.name = self.name 

175 processor_list.description = description 

176 return processor_list.execute() 

177 

178 def _expand_processors_to_run( 

179 self, items: list[str], config: dict[str, Any], plugins: LoadedPlugins, seen: set[str] | None = None 

180 ) -> ProcessorList: 

181 """ 

182 Constructs a :class:`.ProcessorList` from a list of processor names or groups. 

183 

184 This method recursively expands a list of processor names or group definitions into a `ProcessorList` object, 

185 which can be executed. It handles three cases: direct processor class matches, group definitions in the configuration, 

186 and unknown entries. 

187 

188 .. versionadded:: v2.0.0 

189 

190 :param items: A list of processor names or group identifiers to be expanded. 

191 :type items: list[str] 

192 :param config: The configuration dictionary containing processor and group definitions. 

193 :type config: dict[str, Any] 

194 :param plugins: The loaded plugins containing available processors and groups. 

195 :type plugins: LoadedPlugins 

196 :param seen: A set of group names that have already been processed to detect cyclic definitions. 

197 :type seen: set[str] | None 

198 :return: A `ProcessorList` containing the expanded processors ready for execution. 

199 :rtype: ProcessorList 

200 :raises UnknownProcessorGroup: If a group is defined without processors or if a cyclic group definition is detected. 

201 :raises UnknownProcessor: If a processor name is not recognized in the plugin library. 

202 """ 

203 

204 seen = seen or set() 

205 

206 pl = ProcessorList( 

207 name=f'{self.name}_group', 

208 description=None, 

209 user_interface=self.user_interface, 

210 database_conf=config.get('DBConfiguration', None), 

211 create_standard_tables=config.get('create_standard_tables', True), 

212 ) 

213 

214 for entry in items: 

215 proc_name, replica = parse_processor_name(entry) 

216 

217 # Case 1: entry matches a real processor class 

218 if proc_name in plugins.processor_dict: 

219 pl.append(plugins.processor_dict[proc_name](config=config, replica_id=replica)) 

220 continue 

221 

222 # Case 2: entry is a group defined in the TOML 

223 if entry in config and isinstance(config[entry], dict): 

224 group = config[entry] 

225 if 'processors_to_run' not in group: 

226 raise UnknownProcessorGroup(f"Group '{entry}' exists but has no processors_to_run field.") 

227 

228 if entry in seen: 

229 raise UnknownProcessorGroup(f"Cyclic group definition detected at '{entry}'.") 

230 

231 seen.add(entry) 

232 sub_list = self._expand_processors_to_run(group['processors_to_run'], config, plugins, seen) 

233 sub_list.name = entry 

234 sub_list.description = group.get('description', entry) 

235 pl.append(sub_list) 

236 continue 

237 

238 # Case 3: completely unknown 

239 raise mafw_errors.UnknownProcessor(entry) 

240 

241 return pl 

242 

243 def init(self, steering_file: Path | str) -> None: 

244 """ 

245 Initializes the application. 

246 

247 This method is normally automatically invoked by the class constructor. 

248 It can be called in a later moment to force the parsing of the provided steering file. 

249 

250 :param steering_file: The path to the steering file. 

251 :type steering_file: Path | str 

252 """ 

253 if isinstance(steering_file, str): 

254 steering_file = Path(steering_file) 

255 

256 self.steering_file = steering_file 

257 self._configuration_dict = toml_tools.load_steering_file(steering_file) 

258 

259 if self.user_interface is None: 

260 self.user_interface = self.get_user_interface(self._configuration_dict['UserInterface']['interface']) 

261 

262 self._initialized = True 

263 self.name = self._configuration_dict.get('analysis_name', 'mafw analysis')