Coverage for src / mafw / plugin_manager.py: 100%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Plugin management system for MAFw framework. 

6 

7This module provides the core functionality for loading and managing plugins within the MAFw framework. 

8It supports loading various types of plugins including processors, standard tables, database models, and 

9user interfaces. 

10 

11The plugin manager uses the pluggy library to handle plugin discovery and registration 

12through entry points and hooks. 

13 

14When plugins are loaded using the :meth:`.MAFwPluginManager.load_plugins` function, the job is divided into multiple 

15threads to improve performance. 

16 

17Key features: 

18 - Asynchronous plugin loading with progress indication 

19 - Support for both internal and external plugins 

20 - Type-safe plugin handling with proper data structures 

21 - Logging integration for monitoring plugin loading processes 

22 - Global plugin manager singleton for consistent access 

23 

24The module defines several key components: 

25 - :class:`.LoadedPlugins`: Data container for loaded plugins 

26 - :class:`.MAFwPluginManager`: Main plugin manager class 

27 - :func:`.get_plugin_manager`: Factory function for accessing the plugin manager 

28 

29Plugin types supported: 

30 - Processors (`processors`): Classes that implement data processing logic 

31 - Database Modules (`db_modules`): Model modules for database interaction 

32 - User Interfaces (`ui`): UI implementations for different interfaces 

33 

34.. versionchanged:: v2.0.0 

35 Complete refactoring of the plugin manager system. 

36 

37""" 

38 

39import importlib 

40import itertools 

41import logging 

42import time 

43from concurrent.futures import Future, ThreadPoolExecutor 

44from dataclasses import dataclass, field 

45from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Tuple, cast 

46 

47import pluggy 

48 

49from mafw import hookspecs, plugins 

50from mafw.lazy_import import ProcessorClassProtocol, UserInterfaceClassProtocol 

51 

52if TYPE_CHECKING: 

53 pass 

54 

55log = logging.getLogger(__name__) 

56 

57 

58@dataclass 

59class LoadedPlugins: 

60 """ 

61 Container class for storing loaded plugins of various types. 

62 

63 This dataclass holds collections of different plugin types that have been loaded 

64 by the :class:`MAFwPluginManager`. It provides organized storage for processors, 

65 database modules, and user interfaces. 

66 

67 .. versionadded:: v2.0.0 

68 """ 

69 

70 processor_list: List[ProcessorClassProtocol] = field(default_factory=list) 

71 # List[Type['Processor'] | 'LazyPlugin'] = field(default_factory=list) 

72 """List of loaded processor classes.""" 

73 

74 processor_dict: Dict[str, ProcessorClassProtocol] = field(default_factory=dict) 

75 # Dict[str, Type['Processor'] | 'LazyPlugin'] = field(default_factory=dict) 

76 """Dictionary mapping processor names to their classes.""" 

77 

78 db_model_modules: List[str] = field(default_factory=list) 

79 """List of database model module names.""" 

80 

81 ui_list: List[UserInterfaceClassProtocol] = field(default_factory=list) 

82 # List[Type['UserInterfaceBase'] | 'LazyPlugin'] = field(default_factory=list) 

83 """List of loaded user interface classes.""" 

84 

85 ui_dict: Dict[str, UserInterfaceClassProtocol] = field(default_factory=dict) 

86 # Dict[str, Type['UserInterfaceBase'] | 'LazyPlugin'] = field(default_factory=dict) 

87 """Dictionary mapping user interface names to their classes.""" 

88 

89 

90PluginTypes = Literal['processors', 'db_modules', 'ui'] 

91"""Type alias for accepted types of plugins.""" 

92 

93 

94def _as_processor_result(obj: object) -> tuple[list[ProcessorClassProtocol], dict[str, ProcessorClassProtocol]]: 

95 """ 

96 Cast an object to the expected processor result type. 

97 

98 This helper function is used to convert the raw result from plugin loading 

99 operations into the expected tuple format for processors. 

100 

101 .. versionadded:: v2.0.0 

102 

103 :param obj: The object to cast 

104 :type obj: object 

105 :return: A tuple containing a list of processor classes and a dictionary mapping 

106 processor names to their classes 

107 :rtype: tuple[list[ProcessorClassProtocol], dict[str, ProcessorClassProtocol]] 

108 """ 

109 return cast(tuple[list[ProcessorClassProtocol], dict[str, ProcessorClassProtocol]], obj) 

110 

111 

112def _as_ui_result(obj: object) -> tuple[list[UserInterfaceClassProtocol], dict[str, UserInterfaceClassProtocol]]: 

113 """ 

114 Cast an object to the expected UI result type. 

115 

116 This helper function is used to convert the raw result from plugin loading 

117 operations into the expected tuple format for user interfaces. 

118 

119 .. versionadded:: v2.0.0 

120 

121 :param obj: The object to cast 

122 :type obj: object 

123 :return: A tuple containing a list of UI classes and a dictionary mapping 

124 UI names to their classes 

125 :rtype: tuple[list[UserInterfaceClassProtocol], dict[str, UserInterfaceClassProtocol]] 

126 """ 

127 return cast(tuple[list[UserInterfaceClassProtocol], dict[str, UserInterfaceClassProtocol]], obj) 

128 

129 

130def _as_db_module_result(obj: object) -> list[str]: 

131 """ 

132 Cast an object to the expected database module result type. 

133 

134 This helper function is used to convert the raw result from plugin loading 

135 operations into the expected list format for database modules. 

136 

137 .. versionadded:: v2.0.0 

138 

139 :param obj: The object to cast 

140 :type obj: object 

141 :return: A list of database module names 

142 :rtype: list[str] 

143 """ 

144 return cast(list[str], obj) 

145 

146 

147class MAFwPluginManager(pluggy.PluginManager): 

148 """ 

149 The MAFw plugin manager. 

150 

151 The MAFwPluginManager class manages the loading and registration of plugins within the MAFw framework. 

152 It supports asynchronous loading of various plugin types, including processors, database modules, 

153 and user interfaces, using a thread pool executor for improved performance. 

154 

155 The class provides methods to load each type of plugin and handles delayed status messages if loading takes 

156 longer than expected. 

157 

158 .. versionadded:: v2.0.0 

159 """ 

160 

161 max_loading_delay = 1 # sec 

162 """ 

163 Loading delay before displaying a message. 

164 

165 If the loading of the external plugins is taking more than this value, a message is displayed to inform the user. 

166 """ 

167 

168 def __init__(self, project_name: str = 'mafw'): 

169 super().__init__(project_name) 

170 self._executor = ThreadPoolExecutor(max_workers=4) 

171 

172 def load_db_models_plugins(self) -> list[str]: 

173 """ 

174 Load database model modules from the plugin manager. 

175 

176 This method retrieves all database model modules registered through the plugin manager's 

177 :meth:`~mafw.plugins.register_db_model_modules` hook and imports them. 

178 

179 :returns: List of database model module names 

180 :rtype: list[str] 

181 """ 

182 log.debug('Starting database model plugins...') 

183 db_model_module_list = list(itertools.chain(*self.hook.register_db_model_modules())) 

184 for module in db_model_module_list: 

185 importlib.import_module(module) 

186 log.debug('Finished database model plugins') 

187 return db_model_module_list 

188 

189 def load_processor_plugins(self) -> Tuple[List[ProcessorClassProtocol], Dict[str, ProcessorClassProtocol]]: 

190 """ 

191 Load available processor plugins from the plugin manager. 

192 

193 This method retrieves all processor plugins registered through the plugin manager's 

194 :meth:`~mafw.plugins.register_processors` hook. 

195 :meth:`~mafw.plugins.register_processors` hook. 

196 

197 :returns: A tuple containing: 

198 - List of available processor classes 

199 - Dictionary mapping processor names to their classes 

200 :rtype: tuple[list[type[Processor]], dict[str, type[Processor]]] 

201 """ 

202 log.debug('Starting processor plugins...') 

203 lst = list(itertools.chain(*self.hook.register_processors())) 

204 dct = {} 

205 for p in lst: 

206 if hasattr(p, 'plugin_name'): # LazyPlugin case 

207 key = p.plugin_name 

208 else: 

209 key = p.__name__ 

210 dct[key] = p 

211 log.debug('Finished processor plugins') 

212 return lst, dct 

213 

214 def load_user_interface_plugins( 

215 self, 

216 ) -> Tuple[List[UserInterfaceClassProtocol], Dict[str, UserInterfaceClassProtocol]]: 

217 """ 

218 Load available user interface plugins from the plugin manager. 

219 

220 This method retrieves all user interface plugins registered through the plugin manager's 

221 :meth:`~mafw.plugins.register_user_interfaces` hook. 

222 

223 :returns: A tuple containing: 

224 - List of available user interface classes 

225 - Dictionary mapping user interface names to their classes 

226 :rtype: tuple[list[type[UserInterfaceBase]], dict[str, type[UserInterfaceBase]]] 

227 """ 

228 log.debug('Start loading user interface plugins...') 

229 lst = list(itertools.chain(*self.hook.register_user_interfaces())) 

230 dct = {ui.name: ui for ui in lst} 

231 log.debug('Finished loading user interface plugins') 

232 return lst, dct 

233 

234 def _delayed_status_message(self, futures: List[Future[Any]]) -> None: 

235 """ 

236 Display a warning message if plugin loading takes longer than expected. 

237 

238 This method is called after a delay to check if all plugin loading operations 

239 have completed. If not, it logs a warning message to inform the user that 

240 plugin loading is taking longer than expected. 

241 

242 :param futures: List of futures representing ongoing plugin loading operations 

243 :type futures: list[concurrent.futures.Future] 

244 """ 

245 time.sleep(self.max_loading_delay) 

246 if not all(f.done() for f in futures): 

247 log.warning('Plugin loading is taking longer than expected, please be patient.') 

248 

249 def load_plugins(self, plugins_to_load: Iterable[PluginTypes]) -> LoadedPlugins: 

250 """ 

251 Load plugins of specified types in multiple threads. 

252 

253 This method loads plugins of the specified types using a thread pool executor 

254 for improved performance. It handles different plugin types including processors, 

255 standard tables, database modules, and user interfaces. 

256 

257 :param plugins_to_load: Iterable of plugin types to load 

258 :type plugins_to_load: Iterable[:obj:`PluginTypes`] 

259 :return: Container with loaded plugins of all requested types 

260 :rtype: :obj:`LoadedPlugins` 

261 """ 

262 plugins_to_load = list(dict.fromkeys(plugins_to_load)) 

263 if not plugins_to_load: 

264 return LoadedPlugins() 

265 

266 lut = { 

267 'processors': self.load_processor_plugins, 

268 'db_modules': self.load_db_models_plugins, 

269 'ui': self.load_user_interface_plugins, 

270 } 

271 

272 # drop invalid plugin types 

273 plugins_to_load = [p for p in plugins_to_load if p in lut] 

274 if not plugins_to_load: 

275 return LoadedPlugins() 

276 

277 log.debug(f'Status message will appear if loading takes > {self.max_loading_delay}s') 

278 

279 # Submit tasks to the executor 

280 futures: list[Future[Any]] = [] 

281 for plugin_type in plugins_to_load: 

282 fut = self._executor.submit(lut[plugin_type]) 

283 futures.append(fut) 

284 

285 # Start delayed status thread 

286 status_thread = self._executor.submit(self._delayed_status_message, futures) 

287 

288 # Wait for completion 

289 results = [] 

290 try: 

291 for fut in futures: 

292 results.append(fut.result()) # will re-raise exceptions 

293 finally: 

294 # When all tasks finished, no need for the warning message 

295 if not status_thread.done(): 

296 status_thread.cancel() 

297 

298 # Assemble output 

299 plugins_ = LoadedPlugins() 

300 idx = 0 

301 

302 for plugin_type in plugins_to_load: 

303 result = results[idx] 

304 

305 if plugin_type == 'processors': 

306 plugins_.processor_list, plugins_.processor_dict = _as_processor_result(result) 

307 

308 elif plugin_type == 'ui': 

309 plugins_.ui_list, plugins_.ui_dict = _as_ui_result(result) 

310 

311 else: # if plugin_type == 'db_modules': 

312 plugins_.db_model_modules = _as_db_module_result(result) 

313 

314 idx += 1 

315 

316 return plugins_ 

317 

318 

319global_mafw_plugin_manager: dict[str, 'MAFwPluginManager'] = {} 

320"""The global mafw plugin manager dictionary.""" 

321 

322 

323def get_plugin_manager(force_recreate: bool = False) -> 'MAFwPluginManager': 

324 """ 

325 Create a new or return an existing plugin manager for a given project 

326 

327 :param force_recreate: Flag to force the creation of a new plugin manager. Defaults to False 

328 :type force_recreate: bool, Optional 

329 :return: The plugin manager 

330 :rtype: pluggy.PluginManager 

331 """ 

332 if 'mafw' in global_mafw_plugin_manager and force_recreate: 

333 del global_mafw_plugin_manager['mafw'] 

334 

335 if 'mafw' not in global_mafw_plugin_manager: 

336 pm = MAFwPluginManager('mafw') 

337 pm.add_hookspecs(hookspecs) 

338 pm.load_setuptools_entrypoints('mafw') 

339 pm.register(plugins) 

340 global_mafw_plugin_manager['mafw'] = pm 

341 

342 return global_mafw_plugin_manager['mafw']