Coverage for src / mafw / plugin_manager.py: 100%
108 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Plugin management system for MAFw framework.
7This module provides the core functionality for loading and managing plugins within the MAFw framework.
8It supports loading various types of plugins including processors, standard tables, database models, and
9user interfaces.
11The plugin manager uses the pluggy library to handle plugin discovery and registration
12through entry points and hooks.
14When plugins are loaded using the :meth:`.MAFwPluginManager.load_plugins` function, the job is divided into multiple
15threads to improve performance.
17Key features:
18 - Asynchronous plugin loading with progress indication
19 - Support for both internal and external plugins
20 - Type-safe plugin handling with proper data structures
21 - Logging integration for monitoring plugin loading processes
22 - Global plugin manager singleton for consistent access
24The module defines several key components:
25 - :class:`.LoadedPlugins`: Data container for loaded plugins
26 - :class:`.MAFwPluginManager`: Main plugin manager class
27 - :func:`.get_plugin_manager`: Factory function for accessing the plugin manager
29Plugin types supported:
30 - Processors (`processors`): Classes that implement data processing logic
31 - Database Modules (`db_modules`): Model modules for database interaction
32 - User Interfaces (`ui`): UI implementations for different interfaces
34.. versionchanged:: v2.0.0
35 Complete refactoring of the plugin manager system.
37"""
39import importlib
40import itertools
41import logging
42import time
43from concurrent.futures import Future, ThreadPoolExecutor
44from dataclasses import dataclass, field
45from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Tuple, cast
47import pluggy
49from mafw import hookspecs, plugins
50from mafw.lazy_import import ProcessorClassProtocol, UserInterfaceClassProtocol
52if TYPE_CHECKING:
53 pass
55log = logging.getLogger(__name__)
58@dataclass
59class LoadedPlugins:
60 """
61 Container class for storing loaded plugins of various types.
63 This dataclass holds collections of different plugin types that have been loaded
64 by the :class:`MAFwPluginManager`. It provides organized storage for processors,
65 database modules, and user interfaces.
67 .. versionadded:: v2.0.0
68 """
70 processor_list: List[ProcessorClassProtocol] = field(default_factory=list)
71 # List[Type['Processor'] | 'LazyPlugin'] = field(default_factory=list)
72 """List of loaded processor classes."""
74 processor_dict: Dict[str, ProcessorClassProtocol] = field(default_factory=dict)
75 # Dict[str, Type['Processor'] | 'LazyPlugin'] = field(default_factory=dict)
76 """Dictionary mapping processor names to their classes."""
78 db_model_modules: List[str] = field(default_factory=list)
79 """List of database model module names."""
81 ui_list: List[UserInterfaceClassProtocol] = field(default_factory=list)
82 # List[Type['UserInterfaceBase'] | 'LazyPlugin'] = field(default_factory=list)
83 """List of loaded user interface classes."""
85 ui_dict: Dict[str, UserInterfaceClassProtocol] = field(default_factory=dict)
86 # Dict[str, Type['UserInterfaceBase'] | 'LazyPlugin'] = field(default_factory=dict)
87 """Dictionary mapping user interface names to their classes."""
90PluginTypes = Literal['processors', 'db_modules', 'ui']
91"""Type alias for accepted types of plugins."""
94def _as_processor_result(obj: object) -> tuple[list[ProcessorClassProtocol], dict[str, ProcessorClassProtocol]]:
95 """
96 Cast an object to the expected processor result type.
98 This helper function is used to convert the raw result from plugin loading
99 operations into the expected tuple format for processors.
101 .. versionadded:: v2.0.0
103 :param obj: The object to cast
104 :type obj: object
105 :return: A tuple containing a list of processor classes and a dictionary mapping
106 processor names to their classes
107 :rtype: tuple[list[ProcessorClassProtocol], dict[str, ProcessorClassProtocol]]
108 """
109 return cast(tuple[list[ProcessorClassProtocol], dict[str, ProcessorClassProtocol]], obj)
112def _as_ui_result(obj: object) -> tuple[list[UserInterfaceClassProtocol], dict[str, UserInterfaceClassProtocol]]:
113 """
114 Cast an object to the expected UI result type.
116 This helper function is used to convert the raw result from plugin loading
117 operations into the expected tuple format for user interfaces.
119 .. versionadded:: v2.0.0
121 :param obj: The object to cast
122 :type obj: object
123 :return: A tuple containing a list of UI classes and a dictionary mapping
124 UI names to their classes
125 :rtype: tuple[list[UserInterfaceClassProtocol], dict[str, UserInterfaceClassProtocol]]
126 """
127 return cast(tuple[list[UserInterfaceClassProtocol], dict[str, UserInterfaceClassProtocol]], obj)
130def _as_db_module_result(obj: object) -> list[str]:
131 """
132 Cast an object to the expected database module result type.
134 This helper function is used to convert the raw result from plugin loading
135 operations into the expected list format for database modules.
137 .. versionadded:: v2.0.0
139 :param obj: The object to cast
140 :type obj: object
141 :return: A list of database module names
142 :rtype: list[str]
143 """
144 return cast(list[str], obj)
147class MAFwPluginManager(pluggy.PluginManager):
148 """
149 The MAFw plugin manager.
151 The MAFwPluginManager class manages the loading and registration of plugins within the MAFw framework.
152 It supports asynchronous loading of various plugin types, including processors, database modules,
153 and user interfaces, using a thread pool executor for improved performance.
155 The class provides methods to load each type of plugin and handles delayed status messages if loading takes
156 longer than expected.
158 .. versionadded:: v2.0.0
159 """
161 max_loading_delay = 1 # sec
162 """
163 Loading delay before displaying a message.
165 If the loading of the external plugins is taking more than this value, a message is displayed to inform the user.
166 """
168 def __init__(self, project_name: str = 'mafw'):
169 super().__init__(project_name)
170 self._executor = ThreadPoolExecutor(max_workers=4)
172 def load_db_models_plugins(self) -> list[str]:
173 """
174 Load database model modules from the plugin manager.
176 This method retrieves all database model modules registered through the plugin manager's
177 :meth:`~mafw.plugins.register_db_model_modules` hook and imports them.
179 :returns: List of database model module names
180 :rtype: list[str]
181 """
182 log.debug('Starting database model plugins...')
183 db_model_module_list = list(itertools.chain(*self.hook.register_db_model_modules()))
184 for module in db_model_module_list:
185 importlib.import_module(module)
186 log.debug('Finished database model plugins')
187 return db_model_module_list
189 def load_processor_plugins(self) -> Tuple[List[ProcessorClassProtocol], Dict[str, ProcessorClassProtocol]]:
190 """
191 Load available processor plugins from the plugin manager.
193 This method retrieves all processor plugins registered through the plugin manager's
194 :meth:`~mafw.plugins.register_processors` hook.
195 :meth:`~mafw.plugins.register_processors` hook.
197 :returns: A tuple containing:
198 - List of available processor classes
199 - Dictionary mapping processor names to their classes
200 :rtype: tuple[list[type[Processor]], dict[str, type[Processor]]]
201 """
202 log.debug('Starting processor plugins...')
203 lst = list(itertools.chain(*self.hook.register_processors()))
204 dct = {}
205 for p in lst:
206 if hasattr(p, 'plugin_name'): # LazyPlugin case
207 key = p.plugin_name
208 else:
209 key = p.__name__
210 dct[key] = p
211 log.debug('Finished processor plugins')
212 return lst, dct
214 def load_user_interface_plugins(
215 self,
216 ) -> Tuple[List[UserInterfaceClassProtocol], Dict[str, UserInterfaceClassProtocol]]:
217 """
218 Load available user interface plugins from the plugin manager.
220 This method retrieves all user interface plugins registered through the plugin manager's
221 :meth:`~mafw.plugins.register_user_interfaces` hook.
223 :returns: A tuple containing:
224 - List of available user interface classes
225 - Dictionary mapping user interface names to their classes
226 :rtype: tuple[list[type[UserInterfaceBase]], dict[str, type[UserInterfaceBase]]]
227 """
228 log.debug('Start loading user interface plugins...')
229 lst = list(itertools.chain(*self.hook.register_user_interfaces()))
230 dct = {ui.name: ui for ui in lst}
231 log.debug('Finished loading user interface plugins')
232 return lst, dct
234 def _delayed_status_message(self, futures: List[Future[Any]]) -> None:
235 """
236 Display a warning message if plugin loading takes longer than expected.
238 This method is called after a delay to check if all plugin loading operations
239 have completed. If not, it logs a warning message to inform the user that
240 plugin loading is taking longer than expected.
242 :param futures: List of futures representing ongoing plugin loading operations
243 :type futures: list[concurrent.futures.Future]
244 """
245 time.sleep(self.max_loading_delay)
246 if not all(f.done() for f in futures):
247 log.warning('Plugin loading is taking longer than expected, please be patient.')
249 def load_plugins(self, plugins_to_load: Iterable[PluginTypes]) -> LoadedPlugins:
250 """
251 Load plugins of specified types in multiple threads.
253 This method loads plugins of the specified types using a thread pool executor
254 for improved performance. It handles different plugin types including processors,
255 standard tables, database modules, and user interfaces.
257 :param plugins_to_load: Iterable of plugin types to load
258 :type plugins_to_load: Iterable[:obj:`PluginTypes`]
259 :return: Container with loaded plugins of all requested types
260 :rtype: :obj:`LoadedPlugins`
261 """
262 plugins_to_load = list(dict.fromkeys(plugins_to_load))
263 if not plugins_to_load:
264 return LoadedPlugins()
266 lut = {
267 'processors': self.load_processor_plugins,
268 'db_modules': self.load_db_models_plugins,
269 'ui': self.load_user_interface_plugins,
270 }
272 # drop invalid plugin types
273 plugins_to_load = [p for p in plugins_to_load if p in lut]
274 if not plugins_to_load:
275 return LoadedPlugins()
277 log.debug(f'Status message will appear if loading takes > {self.max_loading_delay}s')
279 # Submit tasks to the executor
280 futures: list[Future[Any]] = []
281 for plugin_type in plugins_to_load:
282 fut = self._executor.submit(lut[plugin_type])
283 futures.append(fut)
285 # Start delayed status thread
286 status_thread = self._executor.submit(self._delayed_status_message, futures)
288 # Wait for completion
289 results = []
290 try:
291 for fut in futures:
292 results.append(fut.result()) # will re-raise exceptions
293 finally:
294 # When all tasks finished, no need for the warning message
295 if not status_thread.done():
296 status_thread.cancel()
298 # Assemble output
299 plugins_ = LoadedPlugins()
300 idx = 0
302 for plugin_type in plugins_to_load:
303 result = results[idx]
305 if plugin_type == 'processors':
306 plugins_.processor_list, plugins_.processor_dict = _as_processor_result(result)
308 elif plugin_type == 'ui':
309 plugins_.ui_list, plugins_.ui_dict = _as_ui_result(result)
311 else: # if plugin_type == 'db_modules':
312 plugins_.db_model_modules = _as_db_module_result(result)
314 idx += 1
316 return plugins_
319global_mafw_plugin_manager: dict[str, 'MAFwPluginManager'] = {}
320"""The global mafw plugin manager dictionary."""
323def get_plugin_manager(force_recreate: bool = False) -> 'MAFwPluginManager':
324 """
325 Create a new or return an existing plugin manager for a given project
327 :param force_recreate: Flag to force the creation of a new plugin manager. Defaults to False
328 :type force_recreate: bool, Optional
329 :return: The plugin manager
330 :rtype: pluggy.PluginManager
331 """
332 if 'mafw' in global_mafw_plugin_manager and force_recreate:
333 del global_mafw_plugin_manager['mafw']
335 if 'mafw' not in global_mafw_plugin_manager:
336 pm = MAFwPluginManager('mafw')
337 pm.add_hookspecs(hookspecs)
338 pm.load_setuptools_entrypoints('mafw')
339 pm.register(plugins)
340 global_mafw_plugin_manager['mafw'] = pm
342 return global_mafw_plugin_manager['mafw']