Coverage for src / mafw / runner.py: 100%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Provides a container to run configurable and modular analytical tasks. 

6""" 

7 

8import logging 

9from pathlib import Path 

10from typing import TYPE_CHECKING, Any 

11 

12from mafw import mafw_errors 

13from mafw.enumerators import ProcessorExitStatus 

14from mafw.mafw_errors import UnknownProcessorGroup 

15from mafw.plugin_manager import LoadedPlugins, MAFwPluginManager, get_plugin_manager 

16from mafw.processor import ProcessorList 

17from mafw.tools import toml_tools 

18from mafw.tools.regexp import parse_processor_name 

19from mafw.ui.abstract_user_interface import UserInterfaceBase 

20 

21log = logging.getLogger(__name__) 

22 

23 

24class MAFwApplication: 

25 """ 

26 The MAFw Application. 

27 

28 This class takes care of reading a steering file and from the information retrieved construct a 

29 :class:`~mafw.processor.ProcessorList` from the processor listed there and execute it. 

30 

31 It is very practical because any combination of processors can be run without having to write dedicated scripts 

32 but simply modifying a steering file. 

33 

34 The application will search for processors not only among the ones available in the MAFw library, but also in all 

35 other packages exposing processors via the :ref:`plugin mechanism <plugins>`. 

36 

37 All parameters in the constructor are optional. 

38 

39 An instance can be created also without the `steering_file`, but such an instance cannot be executed. The steering 

40 file can be provided in a later stage via the :meth:`init` method or directly to the :meth:`run` method. 

41 

42 The user interface can be either provided directly in the constructor, or it will be taken from the steering 

43 file. In the worst case, the fallback :class:`~mafw.ui.console_user_interface.ConsoleInterface` will be used. 

44 

45 The plugin manager, if not provided, the global plugin manager will be retrieved from the 

46 :func:`~mafw.plugin_manager.get_plugin_manager`. 

47 

48 A simple example is provided here below: 

49 

50 .. code-block:: python 

51 :name: MAFwApplication_run 

52 :caption: Creation and execution of a MAFwApplication 

53 

54 import logging 

55 from pathlib import Path 

56 

57 from mafw.runner import MAFwApplication 

58 

59 log = logging.getLogger(__name__) 

60 

61 # put here your steering file 

62 steering_file = Path('path_to_my_steering_file.toml') 

63 

64 try: 

65 # create the app 

66 app = MAFwApplication(steering_file) 

67 

68 # run it! 

69 app.run() 

70 

71 except Exception as e: 

72 log.error('An error occurred!') 

73 log.exception(e) 

74 """ 

75 

76 __default_ui__: str = 'rich' 

77 

78 def __init__( 

79 self, 

80 steering_file: Path | str | None = None, 

81 user_interface: UserInterfaceBase | type[UserInterfaceBase] | str | None = None, 

82 plugin_manager: MAFwPluginManager | None = None, 

83 ): 

84 """ 

85 Constructor parameters: 

86 

87 :param steering_file: The path to the steering file. 

88 :type steering_file: Path | str, Optional 

89 :param user_interface: The user interface to be used by the application. 

90 :type user_interface: UserInterfaceBase | type[UserInterfaceBase] | str, Optional 

91 :param plugin_manager: The plugin manager. 

92 :type plugin_manager: PluginManager, Optional 

93 """ 

94 #: the name of the application instance 

95 self.name = self.__class__.__name__ 

96 self._configuration_dict: dict[str, Any] = {} 

97 

98 #: the plugin manager of the application instance 

99 self.plugin_manager = plugin_manager or get_plugin_manager() 

100 

101 #: the exit status of the application 

102 self.exit_status = ProcessorExitStatus.Successful 

103 

104 self.user_interface: UserInterfaceBase | None 

105 if user_interface is not None: 

106 if isinstance(user_interface, UserInterfaceBase): 

107 self.user_interface = user_interface 

108 else: # if isinstance(user_interface, str): 

109 if TYPE_CHECKING: 

110 assert isinstance(user_interface, str) 

111 self.user_interface = self.get_user_interface(user_interface) 

112 else: 

113 self.user_interface = None 

114 

115 if steering_file is None: 

116 self._initialized = False 

117 self.steering_file = None 

118 else: 

119 self.steering_file = steering_file if isinstance(steering_file, Path) else Path(steering_file) 

120 self.init(self.steering_file) 

121 

122 def get_user_interface(self, user_interface: str) -> UserInterfaceBase: 

123 """ 

124 Retrieves the user interface from the plugin managers. 

125 

126 User interfaces are exposed via the plugin manager. 

127 

128 If the requested `user_interface` is not available, then the fallback console interface is used. 

129 

130 :param user_interface: The name of the user interface to be used. Normally rich or console. 

131 :type user_interface: str 

132 """ 

133 plugins = self.plugin_manager.load_plugins({'ui'}) 

134 if user_interface in plugins.ui_dict: 

135 ui_type = plugins.ui_dict[user_interface] 

136 else: 

137 log.warning('User interface %s is not available. Using console.' % user_interface) 

138 ui_type = plugins.ui_dict['console'] 

139 return ui_type() 

140 

141 def run(self, steering_file: Path | str | None = None) -> ProcessorExitStatus: 

142 """ 

143 Runs the application. 

144 

145 This method builds the :class:`~mafw.processor.ProcessorList` with the processors listed in the steering file 

146 and launches its execution. 

147 

148 A steering file can be provided at this stage if it was not done before. 

149 

150 .. versionchanged:: v2.0.0 

151 Refactor to accept replica names in the processor to run list. 

152 

153 :param steering_file: The steering file. Defaults to None. 

154 :type steering_file: Path | str, Optional 

155 :raises RunnerNotInitialized: if the application has not been initialized. Very likely a steering file was 

156 never provided. 

157 :raises UnknownProcessor: if a processor listed in the steering file is not available in the plugin library. 

158 """ 

159 if steering_file is None and not self._initialized: 

160 log.error('%s is not initialized. Have you provided a steering file?' % self.name) 

161 raise mafw_errors.RunnerNotInitialized() 

162 

163 if steering_file is not None and steering_file != self.steering_file: 

164 self.init(steering_file) 

165 

166 plugins = self.plugin_manager.load_plugins({'processors', 'db_modules'}) 

167 

168 description = self._configuration_dict.get('analysis_description', None) 

169 processors_to_run = self._configuration_dict.get('processors_to_run', []) 

170 

171 # build the possibly nested processor list 

172 processor_list = self._expand_processors_to_run(processors_to_run, self._configuration_dict, plugins) 

173 processor_list.name = self.name 

174 processor_list.description = description 

175 return processor_list.execute() 

176 

177 def _expand_processors_to_run( 

178 self, items: list[str], config: dict[str, Any], plugins: LoadedPlugins, seen: set[str] | None = None 

179 ) -> ProcessorList: 

180 """ 

181 Constructs a :class:`.ProcessorList` from a list of processor names or groups. 

182 

183 This method recursively expands a list of processor names or group definitions into a `ProcessorList` object, 

184 which can be executed. It handles three cases: direct processor class matches, group definitions in the configuration, 

185 and unknown entries. 

186 

187 .. versionadded:: v2.0.0 

188 

189 :param items: A list of processor names or group identifiers to be expanded. 

190 :type items: list[str] 

191 :param config: The configuration dictionary containing processor and group definitions. 

192 :type config: dict[str, Any] 

193 :param plugins: The loaded plugins containing available processors and groups. 

194 :type plugins: LoadedPlugins 

195 :param seen: A set of group names that have already been processed to detect cyclic definitions. 

196 :type seen: set[str] | None 

197 :return: A `ProcessorList` containing the expanded processors ready for execution. 

198 :rtype: ProcessorList 

199 :raises UnknownProcessorGroup: If a group is defined without processors or if a cyclic group definition is detected. 

200 :raises UnknownProcessor: If a processor name is not recognized in the plugin library. 

201 """ 

202 

203 seen = seen or set() 

204 

205 pl = ProcessorList( 

206 name=f'{self.name}_group', 

207 description=None, 

208 user_interface=self.user_interface, 

209 database_conf=config.get('DBConfiguration', None), 

210 create_standard_tables=config.get('create_standard_tables', True), 

211 ) 

212 

213 for entry in items: 

214 proc_name, replica = parse_processor_name(entry) 

215 

216 # Case 1: entry matches a real processor class 

217 if proc_name in plugins.processor_dict: 

218 pl.append(plugins.processor_dict[proc_name](config=config, replica_id=replica)) 

219 continue 

220 

221 # Case 2: entry is a group defined in the TOML 

222 if entry in config and isinstance(config[entry], dict): 

223 group = config[entry] 

224 if 'processors_to_run' not in group: 

225 raise UnknownProcessorGroup(f"Group '{entry}' exists but has no processors_to_run field.") 

226 

227 if entry in seen: 

228 raise UnknownProcessorGroup(f"Cyclic group definition detected at '{entry}'.") 

229 

230 seen.add(entry) 

231 sub_list = self._expand_processors_to_run(group['processors_to_run'], config, plugins, seen) 

232 sub_list.name = entry 

233 sub_list.description = group.get('description', entry) 

234 pl.append(sub_list) 

235 continue 

236 

237 # Case 3: completely unknown 

238 raise mafw_errors.UnknownProcessor(entry) 

239 

240 return pl 

241 

242 def init(self, steering_file: Path | str) -> None: 

243 """ 

244 Initializes the application. 

245 

246 This method is normally automatically invoked by the class constructor. 

247 It can be called in a later moment to force the parsing of the provided steering file. 

248 

249 :param steering_file: The path to the steering file. 

250 :type steering_file: Path | str 

251 """ 

252 if isinstance(steering_file, str): 

253 steering_file = Path(steering_file) 

254 

255 self.steering_file = steering_file 

256 self._configuration_dict = toml_tools.load_steering_file(steering_file) 

257 

258 if self.user_interface is None: 

259 self.user_interface = self.get_user_interface(self._configuration_dict['UserInterface']['interface']) 

260 

261 self._initialized = True 

262 self.name = self._configuration_dict.get('analysis_name', 'mafw analysis')