Coverage for src / mafw / processor.py: 99%
578 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core
6functionality of the MAFw.
7"""
9from __future__ import annotations
11import contextlib
12import inspect
13import logging
14import warnings
15from collections.abc import Callable, Iterator
16from copy import copy, deepcopy
17from functools import wraps
18from itertools import count
19from typing import TYPE_CHECKING, Any, Collection, Generic, Iterable, Self, SupportsIndex, TypeVar, Union, cast
21import peewee
22from peewee import Database
24# noinspection PyUnresolvedReferences
25from playhouse.db_url import connect
27import mafw.db.db_filter
28from mafw.active import Active
29from mafw.db.db_model import MAFwBaseModel, database_proxy, mafw_model_register
30from mafw.enumerators import LoopingStatus, LoopType, ProcessorExitStatus, ProcessorStatus
31from mafw.mafw_errors import (
32 AbortProcessorException,
33 MissingDatabase,
34 MissingOverloadedMethod,
35 MissingSuperCall,
36 ProcessorParameterError,
37)
38from mafw.timer import Timer, pretty_format_duration
39from mafw.tools.generics import deep_update
40from mafw.tools.regexp import extract_protocol
41from mafw.ui.abstract_user_interface import UserInterfaceBase
42from mafw.ui.console_user_interface import ConsoleInterface
44log = logging.getLogger(__name__)
46ParameterType = TypeVar('ParameterType')
47"""Generic variable type for the :class:`ActiveParameter` and :class:`PassiveParameter`."""
50def validate_database_conf(database_conf: dict[str, Any] | None = None) -> dict[str, Any] | None:
51 """
52 Validates the database configuration.
54 :param database_conf: The input database configuration. Defaults to None.
55 :type database_conf: dict, Optional
56 :return: Either the validated database configuration or None if it is invalid.
57 :rtype: dict, None
58 """
59 if database_conf is None:
60 return None
62 # dict is mutable, if I change it inside the function, I change it also outside.
63 conf = database_conf.copy()
65 if 'DBConfiguration' in conf:
66 # the database_conf is the steering file. Extract the DBConfiguration table
67 conf = conf['DBConfiguration']
68 required_fields = ['URL']
69 if all([field in conf for field in required_fields]):
70 return database_conf
71 else:
72 return None
75class PassiveParameter(Generic[ParameterType]):
76 """
77 An helper class to store processor parameter value and metadata.
79 This class is the private interface used by the :class:`ActiveParameter` descriptor to store its value and metadata.
81 When a new :class:`.ActiveParameter` is added to a class, an instance of a PassiveParameter is added to the
82 processor parameter :attr:`register <.Processor._processor_parameters>`.
84 .. seealso::
86 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor
87 parameters <parameters>`
89 .. versionchanged:: v2.0.0
91 User should only use :class:`ActiveParameter` and never manually instantiate :class:`PassiveParameter`.
92 """
94 def __init__(
95 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = ''
96 ):
97 """
98 Constructor parameters:
100 :param name: The name of the parameter. It must be a valid python identifier.
101 :type name: str
102 :param value: The set value of the parameter. If None, then the default value will be used. Defaults to None.
103 :type value: ParameterType, Optional
104 :param default: The default value for the parameter. It is used if the :attr:`value` is not provided. Defaults to None.
105 :type default: ParameterType, Optional
106 :param help_doc: A brief explanation of the parameter.
107 :type help_doc: str, Optional
108 :raises ProcessorParameterError: if both `value` and `default` are not provided or if `name` is not a valid identifier.
109 """
110 if not name.isidentifier():
111 raise ProcessorParameterError(f'{name} is not a valid python identifier.')
113 self.name = name
115 if value is not None:
116 self._value: ParameterType = value
117 self._is_set = True
118 self._is_optional = False
119 elif default is not None:
120 self._value = default
121 self._is_set = False
122 self._is_optional = True
123 else:
124 raise ProcessorParameterError('Processor parameter cannot have both value and default value set to None')
126 self.doc = help_doc
128 def __rich_repr__(self) -> Iterator[Any]:
129 yield 'name', self.name
130 yield 'value', self.value, None
131 yield 'help_doc', self.doc, ''
133 @property
134 def is_set(self) -> bool:
135 """
136 Property to check if the value has been set.
138 It is useful for optional parameter to see if the current value is the default one, or if the user set it.
139 """
140 return self._is_set
142 @property
143 def value(self) -> ParameterType:
144 """
145 Gets the parameter value.
147 :return: The parameter value.
148 :rtype: ParameterType
149 :raises ProcessorParameterError: if both value and default were not defined.
150 """
151 return self._value
153 @value.setter
154 def value(self, value: ParameterType) -> None:
155 """
156 Sets the parameter value.
158 :param value: The value to be set.
159 :type value: ParameterType
160 """
161 self._value = value
162 self._is_set = True
164 @property
165 def is_optional(self) -> bool:
166 """
167 Property to check if the parameter is optional.
169 :return: True if the parameter is optional
170 :rtype: bool
171 """
172 return self._is_optional
174 def __repr__(self) -> str:
175 args = ['name', 'value', 'doc']
176 values = [getattr(self, arg) for arg in args]
177 return '{klass}({attrs})'.format(
178 klass=self.__class__.__name__,
179 attrs=', '.join('{}={!r}'.format(k, v) for k, v in zip(args, values)),
180 )
183F = TypeVar('F', bound=Callable[..., Any])
184"""Type variable for generic callable with any return value."""
187def ensure_parameter_registration(func: F) -> F:
188 """Decorator to ensure that before calling `func` the processor parameters have been registered."""
190 @wraps(func)
191 def wrapper(*args: Processor, **kwargs: Any) -> F:
192 # the first positional arguments must be self
193 if len(args) == 0:
194 raise ProcessorParameterError(
195 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.'
196 )
197 self = args[0]
198 if not isinstance(self, Processor):
199 raise ProcessorParameterError(
200 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.'
201 )
202 if self._parameter_registered is False:
203 self._register_parameters()
204 return cast(F, func(*args, **kwargs))
206 return cast(F, wrapper)
209class ActiveParameter(Generic[ParameterType]):
210 r"""
211 The public interface to the processor parameter.
213 The behaviour of a :class:`Processor` can be customised by using processor parameters. The value of these
214 parameters can be either set via a configuration file or directly when creating the class.
216 If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an
217 ActiveParameter instance in this way:
219 .. code-block::
221 class MyProcessor(Processor):
223 # this is the input folder
224 input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files')
226 def __init__(self, *args, **kwargs):
227 super().__init(*args, **kwargs)
229 # change the input folder to something else
230 self.input_folder = Path(r'D:\data')
232 # get the value of the parameter
233 print(self.input_folder)
235 The ActiveParameter is a `descriptor <https://docs.python.org/3/glossary.html#term-descriptor>`_, it means that
236 when you create one of them, a lot of work is done behind the scene.
238 In simple words, a processor parameter is made by two objects: a public interface where the user can easily
239 access the value of the parameter and a private interface where all other information (default, documentation...)
240 is also stored.
242 The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as
243 in the code snippet above, the private interface is automatically created and will stay in the class instance
244 until the end of the class lifetime.
246 To access the private interface, the user can use the :meth:`Processor.get_parameter` method using the parameter
247 name as a key.
249 The user can assign to an ActiveParameter almost any name. There are just a few invalid parameter names that are
250 used for other purposes. The list of reserved names is available :attr:`here <reserved_names>`. Should the user
251 inadvertently use a reserved named, a :exc:`.ProcessorParameterError` is raised.
253 .. seealso::
255 The private counter part in the :class:`PassiveParameter`.
257 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor
258 parameters <parameters>`
260 The list of :attr:`reserved names <reserved_names>`.
261 """
263 reserved_names: list[str] = ['__logic__', '__filter__', '__new_only__', '__inheritance__']
264 """A list of names that cannot be used as processor parameter names.
266 - `__logic__`
267 - `__filter__`
268 - `__new_only__`
269 - `__inheritance__`
270 """
272 def __init__(
273 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = ''
274 ):
275 """
276 Constructor parameters:
278 :param name: The name of the parameter.
279 :type name: str
280 :param value: The initial value of the parameter. Defaults to None.
281 :type value: ParameterType, Optional
282 :param default: The default value of the parameter, to be used when ``value`` is not set., Defaults to None.
283 :type default: ParameterType, Optional
284 :param help_doc: An explanatory text describing the parameter.
285 :type help_doc: str, Optional
286 """
288 self._value = value
289 self._default = default
290 self._help_doc = help_doc
291 self._external_name = self._validate_name(name)
293 def _validate_name(self, proposed_name: str) -> str:
294 """
295 Validate that the proposed parameter name is not in the list of forbidden names.
297 This private method checks if the provided name is allowed for use as a processor parameter.
298 Names that are listed in :attr:`reserved_names` cannot be used as parameter names.
300 :param proposed_name: The name to be validated for use as a processor parameter.
301 :type proposed_name: str
302 :return: The validated name if it passes the forbidden names check.
303 :rtype: str
304 :raises ProcessorParameterError: If the proposed name is in the list of forbidden names.
305 """
306 if proposed_name not in self.reserved_names:
307 return proposed_name
308 raise ProcessorParameterError(f'Attempt to use a forbidden name ({proposed_name})')
310 def __set_name__(self, owner: type[Processor], name: str) -> None:
311 self.public_name = name
312 self.private_name = f'param_{name}'
313 self._owner = owner
315 def __get__(self, obj: Processor, obj_type: type[Processor]) -> ActiveParameter[ParameterType] | ParameterType:
316 if obj is None:
317 return self
319 # retrieve instance-level passive parameter
320 param = obj._processor_parameters[self._external_name]
321 return param.value
323 def __set__(self, obj: Processor, value: ParameterType) -> None:
324 param = obj._processor_parameters[self._external_name]
325 param.value = value
328class ProcessorMeta(type):
329 """A metaclass to implement the post-init method."""
331 def __call__(cls, *args: Any, **kwargs: Any) -> 'ProcessorMeta':
332 obj = type.__call__(cls, *args, **kwargs)
333 obj.__post_init__()
334 return cast(ProcessorMeta, obj)
337# noinspection PyProtectedMember
338class Processor(metaclass=ProcessorMeta):
339 """
340 The basic processor.
342 A very comprehensive description of what a Processor does and how it works is available at :ref:`doc_processor`.
343 """
345 processor_status = Active(ProcessorStatus.Unknown)
346 """Processor execution status"""
348 looping_status = Active(LoopingStatus.Continue)
349 """Looping modifier"""
351 progress_message: str = f'{__qualname__} is working'
352 """Message displayed to show the progress.
354 It can be customized with information about the current item in the loop by overloading the
355 :meth:`format_progress_message`."""
357 _ids = count(0)
358 """A counter for all processor instances"""
360 new_defaults: dict[str, Any] = {}
361 """
362 A dictionary containing defaults value for the parameters to be overridden
364 .. versionadded:: v2.0.0
365 """
367 new_only_flag = 'new_only'
369 def __init__(
370 self,
371 name: str | None = None,
372 description: str | None = None,
373 config: dict[str, Any] | None = None,
374 looper: LoopType | str = LoopType.ForLoop,
375 user_interface: UserInterfaceBase | None = None,
376 timer: Timer | None = None,
377 timer_params: dict[str, Any] | None = None,
378 database: Database | None = None,
379 database_conf: dict[str, Any] | None = None,
380 remove_orphan_files: bool = True,
381 replica_id: str | None = None,
382 create_standard_tables: bool = True,
383 *args: Any,
384 **kwargs: Any,
385 ) -> None:
386 """
387 Constructor parameters
389 :param name: The name of the processor. If None is provided, the class name is used instead. Defaults to None.
390 :type name: str, Optional
391 :param description: A short description of the processor task. Defaults to the processor name.
392 :type description: str, Optional
393 :param config: A configuration dictionary for this processor. Defaults to None.
394 :type config: dict, Optional
395 :param looper: Enumerator to define the looping type. Defaults to LoopType.ForLoop
396 :type looper: LoopType, Optional
397 :param user_interface: A user interface instance to be used by the processor to interact with the user.
398 :type user_interface: UserInterfaceBase, Optional
399 :param timer: A timer object to measure process duration.
400 :type timer: Timer, Optional
401 :param timer_params: Parameters for the timer object.
402 :type timer_params: dict, Optional
403 :param database: A database instance. Defaults to None.
404 :type database: Database, Optional
405 :param database_conf: Configuration for the database. Default to None.
406 :type database_conf: dict, Optional
407 :param remove_orphan_files: Boolean flag to remove files on disc without a reference to the database.
408 See :ref:`std_tables` and :meth:`~mafw.processor.Processor._remove_orphan_files`. Defaults to True
409 :type remove_orphan_files: bool, Optional
410 :param replica_id: The replica identifier for the current processor.
411 :type replica_id: str, Optional
412 :param create_standard_tables: Boolean flag to create std tables on disk. Defaults to True
413 :type create_standard_tables: bool, Optional
414 :param kwargs: Keyword arguments that can be used to set processor parameters.
415 """
417 self.name = name or self.__class__.__name__
418 """The name of the processor."""
420 self.unique_id = next(self._ids)
421 """A unique identifier representing how many instances of Processor has been created."""
423 self.replica_id = replica_id
424 """
425 The replica identifier specified in the constructor
427 .. versionadded:: v2.0.0
428 """
430 self.description = description or self.name
431 """A short description of the processor task."""
433 self.item: Any = None
434 """The current item of the loop."""
436 self.processor_exit_status = ProcessorExitStatus.Successful
437 """Processor exit status"""
439 self.loop_type: LoopType = LoopType(looper)
440 """
441 The loop type.
443 The value of this parameter can also be changed by the :func:`~mafw.decorators.execution_workflow` decorator
444 factory.
446 See :class:`~mafw.enumerators.LoopType` for more details.
447 """
449 self.create_standard_tables = create_standard_tables
450 """The boolean flag to proceed or skip with standard table creation and initialisation"""
452 # private attributes
453 self._config: dict[str, Any] = {} # deepcopy(config) if config is not None else {}
454 """
455 A dictionary containing the processor configuration object.
457 This dictionary is populated with configuration parameter (always type 2) during the
458 :meth:`._load_parameter_configuration` method.
460 The original value of the configuration dictionary that is passed to the constructor is stored in
461 :attr:`._orig_config`.
463 .. versionchanged:: v2.0.0
464 Now it is an empty dictionary until the :meth:`._load_parameter_configuration` is called.
466 """
468 self._orig_config = deepcopy(config) if config is not None else {}
469 """
470 A copy of the original configuration dictionary.
472 .. versionadded:: v2.0.0
473 """
475 self._processor_parameters: dict[str, PassiveParameter[ParameterType]] = {} # type: ignore
476 """
477 A dictionary to store all the processor parameter instances.
479 The name of the parameter is used as a key, while for the value an instance of the
480 :class:`.PassiveParameter` is used.
481 """
482 # wait for answer from SO
483 self._parameter_registered = False
484 """A boolean flag to confirm successful parameter registration."""
485 self._kwargs = kwargs
487 # loops attributes
488 self._i_item: int = -1
489 self._n_item: int | None = -1
490 self._process_durations: list[float] = []
492 # resource stack
493 self._resource_stack: contextlib.ExitStack
494 self._resource_acquisition: bool = True
496 # processor timer
497 self.timer: Timer | None = timer
498 self._timer_parameters: dict[str, Any] = timer_params or {}
500 # user interface
501 if user_interface is None:
502 self._user_interface: UserInterfaceBase = ConsoleInterface()
503 else:
504 self._user_interface = user_interface
506 # database stuff
507 self._database: peewee.Database | None = database
508 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf)
509 self.filter_register: mafw.db.db_filter.ProcessorFilter = mafw.db.db_filter.ProcessorFilter()
510 """The DB filter register of the Processor."""
511 self.remove_orphan_files: bool = remove_orphan_files
512 """The flag to remove or protect the orphan files. Defaults to True"""
514 # sub-classing stuff
515 # todo:
516 # should we make it a class attribute instead of an instance attribute?
517 self._methods_to_be_checked_for_super = [('start', Processor), ('finish', Processor)]
518 """
519 List of methods to be checked for super inclusion.
521 It is a list of tuple, with the first element the name of the method to be checked and the second the base
522 class to the be compared.
523 """
524 self.initialise_parameters()
526 def initialise_parameters(self) -> None:
527 """
528 Initialises processor parameters by registering them and applying various configuration sources.
530 This method orchestrates the parameter initialisation process by performing the following steps in order:
532 #. Registers processor parameters defined as :class:`ActiveParameter` instances
533 #. Overrides default parameter values with any configured overrides
534 #. Loads parameter configuration from the processor's configuration dictionary
535 #. Applies keyword arguments as parameter overrides
537 The method ensures that all processor parameters are properly configured before the processor
538 execution begins. It is automatically called during processor initialisation and should not
539 typically be called directly by users.
541 .. seealso::
542 :meth:`_register_parameters`, :meth:`_override_defaults`,
543 :meth:`_load_parameter_configuration`, :meth:`_overrule_kws_parameters`
545 .. versionadded:: v2.0.0
546 """
547 self._register_parameters()
548 self._override_defaults()
549 self._load_parameter_configuration()
550 self._overrule_kws_parameters()
552 def __post_init__(self) -> None:
553 """
554 Performs post-initialisation tasks for the processor.
556 This method is automatically called after the processor initialisation is complete.
557 It performs validation checks on overloaded methods and sets the initial processor status.
559 .. seealso::
560 :meth:`validate_configuration`, :meth:`_check_method_overload`, :meth:`_check_method_super`,
561 :attr:`~mafw.processor.Processor.processor_status`
563 .. versionchanged:: v2.0.0
564 Moved the parameter initialisation to :meth:`initialise_parameters` and now executed as last step of the
565 init method.
567 Added the validate configuration check. This method should silently check that configuration provided
568 with the processor parameters is valid. If not, a :exc:`.ProcessorParameterError` is raised.
569 """
570 self.validate_configuration()
571 self._check_method_overload()
572 self._check_method_super()
573 self.processor_status = ProcessorStatus.Init
575 def _register_parameters(self) -> None:
576 """
577 Register processor parameters defined as ActiveParameter instances in the class.
579 This private method scans the class definition for any :class:`.ActiveParameter` instances and creates
580 corresponding :class:`.PassiveParameter` instances to store the actual parameter values and metadata.
581 It ensures that all processor parameters are properly initialised and available for configuration
582 through the processor's configuration system.
584 The method checks for duplicate parameter names and raises a :exc:`.ProcessorParameterError` if duplicates
585 are detected. It also sets the internal flag :attr:`._parameter_registered` to True once registration is
586 complete.
588 .. note::
589 This method is automatically called during processor initialisation and should not be called directly
590 by users.
592 .. seealso::
593 :class:`.Processor`, :meth:`.Processor._override_defaults`,
594 :meth:`.Processor._load_parameter_configuration`, :meth:`.Processor._overrule_kws_parameters`
596 .. versionchanged:: v2.0.0
597 Only :class:`ActiveParameter` are not registered. The use of :class:`PassiveParameter` is only meant to
598 store the value and metadata of the active counter part.
599 """
600 if self._parameter_registered:
601 return
603 for name in dir(self.__class__):
604 attr = getattr(self.__class__, name)
606 if isinstance(attr, ActiveParameter):
607 ext_name = attr._external_name
608 if ext_name in self._processor_parameters:
609 raise ProcessorParameterError(f'Duplicated parameter name ({ext_name}.')
610 self._processor_parameters[ext_name] = PassiveParameter(
611 ext_name, attr._value, attr._default, attr._help_doc
612 )
614 self._parameter_registered = True
616 def _override_defaults(self) -> None:
617 """
618 Override default parameter values with values from :attr:`new_defaults`.
620 This private method iterates through the :attr:`new_defaults` dictionary and updates
621 the corresponding processor parameters with new values. Only parameters that exist
622 in both :attr:`new_defaults` and :attr:`_processor_parameters` are updated.
624 .. versionadded:: v2.0.0
625 """
626 for key, value in self.new_defaults.items():
627 if key in self._processor_parameters:
628 self._processor_parameters[key].value = value
630 def _reset_parameters(self) -> None:
631 """
632 Reset processor parameters to their initial state.
634 This method clears all currently registered processor parameters and triggers
635 a fresh registration process. It's useful when parameter configurations need
636 to be reinitialized or when parameters have been modified and need to be reset.
638 .. seealso::
639 :meth:`_register_parameters`, :meth:`_register_parameters`
640 """
641 self._processor_parameters = {}
642 self._parameter_registered = False
643 self._register_parameters()
645 @ensure_parameter_registration
646 def _load_parameter_configuration(self) -> None:
647 """
648 Load processor parameter configuration from the internal configuration dictionary.
650 This method processes the processor's configuration dictionary to set parameter values.
651 It handles two configuration formats:
653 1. Nested format: ``{'ProcessorName': {'param1': value1, ...}}``
654 2. Flat format: ``{'param1': value1, ...}``
656 The method also handles filter configurations by collecting filter table names
657 and deferring their initialisation until after the global filter has been processed.
659 .. versionchanged:: v2.0.0
660 For option 1 combining configuration from name and name_replica
662 :raises ProcessorParameterError: If a parameter in the configuration is not registered.
664 .. seealso::
665 :meth:`mafw.db.db_filter.ModelFilter.from_conf`
666 """
667 original_config = copy(self._orig_config)
668 flt_list = []
670 # by default the flag new_only is set to true
671 # unless the user specify differently in the general section of the steering file
672 self.filter_register.new_only = original_config.get(self.new_only_flag, True)
674 # we need to check if the configuration object is of type 1 or type 2
675 if any([name for name in [self.name, self.replica_name] if name in original_config]):
676 # one of the two names (the base or the replica) must be present in case of option 1
677 # we start from the base name. If not there, then take an empty dict
678 option1_config_base = original_config.get(self.name, {})
679 if self.name != self.replica_name:
680 # if there is the replica name, then update the base configuration with the replica value
681 # we get the replica configuration
682 option1_config_replica = original_config.get(self.replica_name, {})
684 # let's check if the user wants to have inheritance default
685 # by default is True
686 inheritance = option1_config_replica.get('__inheritance__', True)
687 if inheritance:
688 # we update the base with the replica without changing the base
689 option1_config_update = deep_update(option1_config_base, option1_config_replica, copy_first=True)
690 else:
691 # we do not use the base with the replica specific, we pass the replica as the updated
692 option1_config_update = option1_config_replica
694 # we modify the type 1 original so that the table for the replica has the updated configuration
695 # this is used for the filter configuration at the end.
696 original_config[self.replica_name] = option1_config_update
697 else:
698 # there is not replica, so the update is equal to the base.
699 option1_config_update = option1_config_base
701 self._config = option1_config_update
702 else:
703 # for type 2 we are already good to go
704 self._config = original_config
706 for key, value in self._config.items():
707 if key in self._processor_parameters:
708 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO
709 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above
710 elif key == '__filter__':
711 # we got a filter table!
712 # it should contain one table for each model
713 # we add all the names to a list for deferred initialisation
714 flt_table = self._config[key]
715 flt_list.extend([f'{self.replica_name}.__filter__.{model}' for model in flt_table])
716 elif key == '__logic__':
717 # we got a filter logic string
718 # we store it in the filter register directly
719 self.filter_register._logic = self._config[key]
720 elif key == '__new_only__':
721 # we got a new only boolean, we store it in the filter register
722 self.filter_register.new_only = self._config[key]
724 # only now, after the configuration file has been totally read, we can do the real filter initialisation.
725 # This is to be sure that if there were a GlobalFilter table, this has been read.
726 # The global filter region will be used as a starting point for the construction of a new filter (default
727 # parameter in the from_conf class method).
728 for flt_name in flt_list:
729 model_name = flt_name.split('.')[-1]
730 self.filter_register[model_name] = mafw.db.db_filter.ModelFilter.from_conf(flt_name, original_config)
732 @ensure_parameter_registration
733 def _overrule_kws_parameters(self) -> None:
734 """
735 Override processor parameters with values from keyword arguments.
737 This method applies parameter values passed as keyword arguments during processor
738 initialisation. It ensures that the parameter types match the expected types
739 before setting the values.
741 .. seealso::
742 :meth:`_register_parameters`, :meth:`_load_parameter_configuration`,
743 :meth:`set_parameter_value`
744 """
745 for key, value in self._kwargs.items():
746 if key in self._processor_parameters:
747 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO
748 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above
750 def validate_configuration(self) -> None:
751 """
752 Validate the configuration provided via the processor parameters.
754 Method to be implemented by subclasses if a configuration validation is needed.
756 The method should silently check for the proper configuration, if this is not obtained,
757 then the :exc:`.InvalidConfigurationError` must be raised.
759 .. versionadded:: v2.0.0
760 """
761 pass
763 def _check_method_overload(self) -> None:
764 """
765 Check if the user overloaded the required methods.
767 Depending on the loop type, the user must overload different methods.
768 This method is doing the check and if the required methods are not overloaded a warning is emitted.
769 """
770 methods_dict: dict[LoopType, list[str]] = {
771 LoopType.WhileLoop: ['while_condition'],
772 LoopType.ForLoop: ['get_items'],
773 }
774 required_methods: list[str] = methods_dict.get(self.loop_type, [])
775 for method in required_methods:
776 if getattr(type(self), method) == getattr(Processor, method):
777 warnings.warn(
778 MissingOverloadedMethod(
779 '%s was not overloaded. The process execution workflow might not work.' % method
780 )
781 )
783 def _check_method_super(self) -> None:
784 """
785 Check if some specific methods are calling their super.
787 For some specific methods (for example: start and finish), the user should always call their super method.
788 This method verifies that the user implementation of these methods is including a super call, otherwise a
789 warning is emitted to inform the user about the problem and possible misbehaviour of the processor.
791 The list of methods to be verified is stored in a private class attribute
792 :attr:`~._methods_to_be_checked_for_super` as a list of tuples, made by the name of the methods to be
793 verified and the base class for comparison. The base class is required because Processor subclasses may be
794 extending this list with methods that are not present in the base Processor. See, for example, the
795 :meth:`~.GenericPlotter.patch_data_frame` that is required to have a super call, but it is not present in the
796 base Processor.
798 """
799 for method, base in self._methods_to_be_checked_for_super:
800 # first check if the user overloaded the method.
801 if getattr(type(self), method) != getattr(base, method):
802 # if the method is the start method, then it might be that the user decorated the class with the @database_required decorator,
803 # so it looks different, but it is not
804 if method == 'start':
805 sub_start_src = inspect.getsource(getattr(type(self), method))
806 base_start_src = inspect.getsource(getattr(base, method))
807 if sub_start_src == base_start_src:
808 # it is actually the same method even though the function object is different, then
809 # there is no need to check for the super_call
810 continue
812 # let's check if in the overloaded method there is super calls
813 super_call = f'super().{method}'
814 method_object = getattr(type(self), method)
816 # this is the overloaded method source code.
817 method_source_code = inspect.getsource(method_object)
818 # we split the whole code in lines
819 code_lines = method_source_code.split('\n')
820 # we remove all comments, because the user may have commented out the super
821 code_lines = [line.strip() for line in code_lines if not line.strip().startswith('#')]
822 # we rebuild the whole source code, without indentation and comments.
823 method_source_code = '\n'.join(code_lines)
825 # check if the super call is in the source. if not then emit a warning
826 if super_call not in method_source_code:
827 warnings.warn(
828 MissingSuperCall(
829 'The overloaded %s is not invoking its super method. The processor might not work.' % method
830 )
831 )
833 @ensure_parameter_registration
834 def dump_parameter_configuration(self, option: int = 1) -> dict[str, Any]:
835 """
836 Dumps the processor parameter values in a dictionary.
838 The snippet below explains the meaning of `option`.
840 .. code-block:: python
842 # option 1
843 conf_dict1 = {
844 'Processor': {'param1': 5, 'input_table': 'my_table'}
845 }
847 # option 2
848 conf_dict2 = {'param1': 5, 'input_table': 'my_table'}
850 In the case of option 1, the replica aware name (:meth:`.replica_name`) will be used as a key for the
851 configuration dictionary.
853 .. versionchanged:: v2.0.0
854 With option 1, using :meth:`.replica_name` instead of :attr:`~.Processor.name` as key of the configuration
855 dictionary.
857 :param option: Select the dictionary style. Defaults to 1.
858 :type option: int, Optional
859 :return: A parameter configuration dictionary.
860 :rtype: dict
861 """
862 inner_dict = {}
863 for key, value in self._processor_parameters.items():
864 inner_dict[key] = value.value
866 if option == 1:
867 outer_dict = {self.replica_name: inner_dict}
868 elif option == 2:
869 outer_dict = inner_dict
870 else:
871 log.warning('Unknown option %s. Using option 2' % option)
872 outer_dict = inner_dict
873 return outer_dict
875 @ensure_parameter_registration
876 def get_parameter(self, name: str) -> PassiveParameter[ParameterType]:
877 """
878 Gets the processor parameter named name.
880 :param name: The name of the parameter.
881 :type name: str
882 :return: The processor parameter
883 :rtype: PassiveParameter
884 :raises ProcessorParameterError: If a parameter with `name` is not registered.
885 """
886 if name in self._processor_parameters:
887 return self._processor_parameters[name]
888 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
890 @ensure_parameter_registration
891 def get_parameters(self) -> dict[str, PassiveParameter[ParameterType]]:
892 """
893 Returns the full dictionary of registered parameters for this processor.
895 Useful when dumping the parameter specification in a configuration file, for example.
897 :return: The dictionary with the registered parameters.
898 :rtype: dict[str, PassiveParameter[ParameterType]
899 """
900 return self._processor_parameters
902 @ensure_parameter_registration
903 def delete_parameter(self, name: str) -> None:
904 """
905 Deletes a processor parameter.
907 :param name: The name of the parameter to be deleted.
908 :type name: str
909 :raises ProcessorParameterError: If a parameter with `name` is not registered.
910 """
911 if name in self._processor_parameters:
912 del self._processor_parameters[name]
913 else:
914 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
916 @ensure_parameter_registration
917 def set_parameter_value(self, name: str, value: ParameterType) -> None:
918 """
919 Sets the value of a processor parameter.
921 :param name: The name of the parameter to be deleted.
922 :type name: str
923 :param value: The value to be assigned to the parameter.
924 :type value: ParameterType
925 :raises ProcessorParameterError: If a parameter with `name` is not registered.
926 """
927 if name in self._processor_parameters:
928 self._processor_parameters[name].value = value
929 else:
930 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
932 def get_filter(self, model_name: str) -> mafw.db.db_filter.ModelFilter:
933 """
934 Returns a registered :class:`~mafw.db.db_filter.ModelFilter` via the model name.
936 If a filter for the provided model_name does not exist, a KeyError is raised.
938 :param model_name: The model name for which the filter will be returned.
939 :type model_name: str
940 :return: The registered filter
941 :rtype: mafw.db.db_filter.ModelFilter
942 :raises: KeyError is a filter with the give name is not found.
943 """
944 return self.filter_register[model_name]
946 def on_processor_status_change(self, old_status: ProcessorStatus, new_status: ProcessorStatus) -> None:
947 """
948 Callback invoked when the processor status is changed.
950 :param old_status: The old processor status.
951 :type old_status: ProcessorStatus
952 :param new_status: The new processor status.
953 :type new_status: ProcessorStatus
954 """
955 self._user_interface.change_of_processor_status(self.name, old_status, new_status)
957 def on_looping_status_set(self, status: LoopingStatus) -> None:
958 """
959 Call back invoked when the looping status is set.
961 The user can overload this method according to the needs.
963 :param status: The set looping status.
964 :type status: LoopingStatus
965 """
966 if status == LoopingStatus.Skip:
967 log.warning('Skipping item %s' % self.i_item)
968 elif status == LoopingStatus.Abort:
969 log.error('Looping has been aborted')
970 elif status == LoopingStatus.Quit:
971 log.warning('Looping has been quit')
973 def format_progress_message(self) -> None:
974 """Customizes the progress message with information about the current item.
976 The user can overload this method in order to modify the message being displayed during the process loop with
977 information about the current item.
979 The user can access the current value, its position in the looping cycle and the total number of items using
980 :attr:`.Processor.item`, :obj:`.Processor.i_item` and :obj:`.Processor.n_item`.
981 """
982 pass
984 @property
985 def i_item(self) -> int:
986 """The enumeration of the current item being processed."""
987 return self._i_item
989 @i_item.setter
990 def i_item(self, value: int) -> None:
991 self._i_item = value
993 @property
994 def n_item(self) -> int | None:
995 """The total number of items to be processed or None for an undefined loop"""
996 return self._n_item
998 @n_item.setter
999 def n_item(self, value: int | None) -> None:
1000 self._n_item = value
1002 @property
1003 def unique_name(self) -> str:
1004 """Returns the unique name for the processor."""
1005 return f'{self.name}_{self.unique_id}'
1007 @property
1008 def replica_name(self) -> str:
1009 """
1010 Returns the replica aware name of the processor.
1012 If no replica_id is specified, then return the pure name, otherwise join the two string using the '#' symbol.
1014 .. versionadded:: v2.0.0
1016 :return: The replica aware name of the processor.
1017 :rtype: str
1018 """
1019 if self.replica_id is None:
1020 return self.name
1021 else:
1022 return self.name + '#' + self.replica_id
1024 @property
1025 def local_resource_acquisition(self) -> bool:
1026 """
1027 Checks if resources should be acquired locally.
1029 When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external
1030 resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute
1031 resources among the whole processor list. In this case, resources should not be acquired locally by the
1032 single processor, but from the parent execution context.
1034 :return: True if resources are to be acquired locally by the processor. False, otherwise.
1035 :rtype: bool
1036 """
1037 return self._resource_acquisition
1039 @local_resource_acquisition.setter
1040 def local_resource_acquisition(self, flag: bool) -> None:
1041 self._resource_acquisition = flag
1043 @property
1044 def database(self) -> peewee.Database:
1045 """
1046 Returns the database instance
1048 :return: A database object.
1049 :raises MissingDatabase: If the database connection has not been established.
1050 """
1051 if self._database is None:
1052 raise MissingDatabase('Database connection not initialized')
1053 return self._database
1055 def execute(self) -> None:
1056 """Execute the processor tasks.
1058 This method works as a dispatcher, reassigning the call to a more specific execution implementation depending
1059 on the :attr:`~mafw.processor.Processor.loop_type`.
1060 """
1061 dispatcher: dict[LoopType, Callable[[], None]] = {
1062 LoopType.SingleLoop: self._execute_single,
1063 LoopType.ForLoop: self._execute_for_loop,
1064 LoopType.WhileLoop: self._execute_while_loop,
1065 }
1066 dispatcher[self.loop_type]()
1068 def _execute_single(self) -> None:
1069 """Execute the processor in single mode.
1071 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
1072 appropriate implementation depending on the processor LoopType.
1073 """
1074 with contextlib.ExitStack() as self._resource_stack:
1075 self.acquire_resources()
1076 self.start()
1077 self.processor_status = ProcessorStatus.Run
1078 self.process()
1079 self.finish()
1081 def _execute_for_loop(self) -> None:
1082 """Executes the processor within a for loop.
1084 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
1085 appropriate implementation depending on the processor LoopType.
1086 """
1088 with contextlib.ExitStack() as self._resource_stack:
1089 self.acquire_resources()
1090 self.start()
1092 # get the input item list and filter it
1093 item_list = self.get_items()
1095 # get the total number of items.
1096 self.n_item = len(item_list)
1098 # turn the processor status to run
1099 self.processor_status = ProcessorStatus.Run
1101 # create a new task in the progress bar interface
1102 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item)
1104 # start the looping
1105 for self.i_item, self.item in enumerate(item_list):
1106 # set the looping status to Continue. The user may want to change it in the process.
1107 self.looping_status = LoopingStatus.Continue
1109 # send a message to the user interface
1110 self.format_progress_message()
1111 self._user_interface.display_progress_message(self.progress_message, self.i_item, self.n_item, 0.1)
1113 # wrap the execution in a timer to measure how long it took for statistical reasons.
1114 with Timer(suppress_message=True) as timer:
1115 self.process()
1116 self._process_durations.append(timer.duration)
1118 # modify the loop depending on the looping status
1119 if self.looping_status == LoopingStatus.Continue:
1120 self.accept_item()
1121 elif self.looping_status == LoopingStatus.Skip:
1122 self.skip_item()
1123 else: # equiv to if self.looping_status in [LoopingStatus.Abort, LoopingStatus.Quit]:
1124 break
1126 # update the progress bar
1127 self._user_interface.update_task(self.replica_name, increment=1)
1128 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item)
1130 self.finish()
1132 def _execute_while_loop(self) -> None:
1133 """Executes the processor within a while loop.
1135 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
1136 appropriate implementation depending on the processor LoopType.
1137 """
1138 # it is a while loop, so a priori we don't know how many iterations we will have, nevertheless, we
1139 # can have a progress bar with 'total' set to None, so that it goes in the so-called indeterminate
1140 # progress. See https://rich.readthedocs.io/en/stable/progress.html#indeterminate-progress
1141 # we initialise n_item outside the loop, because it is possible that the user has a way to define n_item
1142 # and he can do it within the loop.
1143 self.n_item = None
1144 with contextlib.ExitStack() as self._resource_stack:
1145 self.acquire_resources()
1146 self.start()
1148 # turn the processor status to run
1149 self.processor_status = ProcessorStatus.Run
1151 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item)
1153 # we are ready to start the looping. For statistics, we can count the iterations.
1154 self.i_item = 0
1155 while self.while_condition():
1156 # set the looping status to Continue. The user may want to change it in the process method.
1157 self.looping_status = LoopingStatus.Continue
1159 # send a message to the user interface
1160 self.format_progress_message()
1161 self._user_interface.display_progress_message(
1162 self.progress_message, self.i_item, self.n_item, frequency=0.1
1163 )
1165 # wrap the execution in a timer to measure how long it too for statistical reasons.
1166 with Timer(suppress_message=True) as timer:
1167 self.process()
1168 self._process_durations.append(timer.duration)
1170 # modify the loop depending on the looping status
1171 if self.looping_status == LoopingStatus.Continue:
1172 self.accept_item()
1173 elif self.looping_status == LoopingStatus.Skip:
1174 self.skip_item()
1175 else: # equiv to if self.looping_status in [LoopingStatus.Abort, LoopingStatus.Quit]:
1176 break
1178 # update the progress bar. if self.n_item is still None, then the progress bar will show indeterminate
1179 # progress.
1180 self._user_interface.update_task(self.replica_name, self.i_item + 1, 1, self.n_item)
1182 # now that the loop is finished, we know how many elements we processed
1183 if self.n_item is None:
1184 self.n_item = self.i_item
1185 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item)
1187 self.finish()
1189 def acquire_resources(self) -> None:
1190 """
1191 Acquires resources and add them to the resource stack.
1193 The whole body of the :meth:`execute` method is within a context structure. The idea is that if any part of
1194 the code inside should throw an exception that breaking the execution, we want to be sure that all stateful
1195 resources are properly closed.
1197 Since the number of resources may vary, the variable number of nested `with` statements has been replaced by
1198 an `ExitStack <https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack>`_. Resources,
1199 like open files, timers, db connections, need to be added to the resource stacks in this method.
1201 In the case a processor is being executed within a :class:`~mafw.processor.ProcessorList`, then some resources might be shared, and
1202 for this reason they are not added to the stack. This selection can be done via the private
1203 :attr:`local_resource_acquisition`. This is normally True, meaning that the processor will handle its resources
1204 independently, but when the processor is executed from a :class:`~mafw.processor.ProcessorList`, this flag is automatically turned to
1205 False.
1207 If the user wants to add additional resources, he has to overload this method calling the super to preserve
1208 the original resources. If he wants to have shared resources among different processors executed from inside
1209 a processor list, he has to overload the :class:`~mafw.processor.ProcessorList` class as well.
1210 """
1211 # Both the timer and the user interface will be added to the processor resource stack only if the processor is
1212 # set to acquire its own resources.
1213 # The timer and the user interface have in-built enter and exit method.
1214 if self._resource_acquisition:
1215 self.timer = self._resource_stack.enter_context(Timer(**self._timer_parameters))
1216 self._resource_stack.enter_context(self._user_interface)
1218 # For the database it is is a bit different.
1219 if self._database is None and self._database_conf is None:
1220 # no database, nor configuration.
1221 # we cannot do anything
1222 pass
1223 elif self._database is None and self._database_conf is not None:
1224 # no db, but we got a configuration.
1225 # we can make a db.
1226 # This processor will try to make a valid connection, and in case it succeeds, it will add the database to
1227 # the resource stack.
1228 # The database has an enter method, but it is to generate transaction.
1229 # We will add the database.close via the callback method.
1230 if 'DBConfiguration' in self._database_conf:
1231 conf = self._database_conf['DBConfiguration'] # type1
1232 else:
1233 conf = self._database_conf # type2
1235 # guess the database type from the URL
1236 protocol = extract_protocol(conf.get('URL'))
1238 # build the connection parameter
1239 # in case of sqlite, we add the pragmas group as well
1240 connection_parameters = {}
1241 if protocol == 'sqlite':
1242 connection_parameters['pragmas'] = conf.get('pragmas', {})
1243 for key, value in conf.items():
1244 if key not in ['URL', 'pragmas']:
1245 connection_parameters[key] = value
1247 self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB
1248 self._resource_stack.callback(self._database.close)
1249 try:
1250 self._database.connect()
1251 except peewee.OperationalError as e:
1252 log.critical('Unable to connect to %s', self._database_conf.get('URL'))
1253 raise e
1254 database_proxy.initialize(self._database)
1255 if self.create_standard_tables:
1256 standard_tables = mafw_model_register.get_standard_tables()
1257 self.database.create_tables(standard_tables)
1258 for table in standard_tables:
1259 table.init()
1261 else: # equivalent to: if self._database is not None:
1262 # we got a database, so very likely we are inside a processor list
1263 # the connection has been already set and the initialisation as well.
1264 # nothing else to do here.
1265 # do not put the database in the exit stack. who create it has also to close it.
1266 pass
1268 def start(self) -> None:
1269 """
1270 Start method.
1272 The user can overload this method, including all steps that should be performed at the beginning of the
1273 operation.
1275 If the user decides to overload it, it should include a call to the super method.
1276 """
1277 self.processor_status = ProcessorStatus.Start
1278 self._remove_orphan_files()
1280 def get_items(self) -> Collection[Any]:
1281 """
1282 Returns the item collections for the processor loop.
1284 This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the
1285 database, or a list of files from the disk to be processed.
1287 :return: A collection of items for the loop
1288 :rtype: Collection[Any]
1289 """
1290 return []
1292 def while_condition(self) -> bool:
1293 """
1294 Return the while condition
1296 :return: True if the while loop has to continue, false otherwise.
1297 :rtype: bool
1298 """
1299 return False
1301 def process(self) -> None:
1302 """
1303 Processes the current item.
1305 This is the core of the Processor, where the user has to define the calculations required.
1306 """
1307 pass
1309 def accept_item(self) -> None:
1310 """
1311 Does post process actions on a successfully processed item.
1313 Within the :meth:`process`, the user left the looping status to Continue, so it means that everything looks
1314 good and this is the right place to perform database updates or file savings.
1316 .. seealso:
1317 Have a look at :meth:`skip_item` for what to do in case something went wrong.
1318 """
1319 pass
1321 def skip_item(self) -> None:
1322 """
1323 Does post process actions on a *NOT* successfully processed item.
1325 Within the :meth:`process`, the user set the looping status to Skip, so it means that something went wrong
1326 and here corrective actions can be taken if needed.
1328 .. seealso:
1329 Have a look at :meth:`accept_item` for what to do in case everything was OK.
1330 """
1331 pass
1333 def finish(self) -> None:
1334 """
1335 Concludes the execution.
1337 The user can reimplement this method if there are some conclusive tasks that must be achieved.
1338 Always include a call to super().
1339 """
1340 self.processor_status = ProcessorStatus.Finish
1341 if self.looping_status == LoopingStatus.Abort:
1342 self.processor_exit_status = ProcessorExitStatus.Aborted
1343 self.print_process_statistics()
1345 def print_process_statistics(self) -> None:
1346 """
1347 Print the process statistics.
1349 A utility method to display the fastest, the slowest and the average timing required to process on a single
1350 item. This is particularly useful when the looping processor is part of a ProcessorList.
1351 """
1352 if len(self._process_durations):
1353 log.info('[cyan] Processed %s items.' % len(self._process_durations))
1354 log.info(
1355 '[cyan] Fastest item process duration: %s '
1356 % pretty_format_duration(min(self._process_durations), n_digits=3)
1357 )
1358 log.info(
1359 '[cyan] Slowest item process duration: %s '
1360 % pretty_format_duration(max(self._process_durations), n_digits=3)
1361 )
1362 log.info(
1363 '[cyan] Average item process duration: %s '
1364 % pretty_format_duration((sum(self._process_durations) / len(self._process_durations)), n_digits=3)
1365 )
1366 log.info(
1367 '[cyan] Total process duration: %s' % pretty_format_duration(sum(self._process_durations), n_digits=3)
1368 )
1370 def _remove_orphan_files(self) -> None:
1371 """
1372 Remove orphan files.
1374 If a connection to the database is available, then the OrphanFile standard table is queried for all its entries,
1375 and all the files are then removed.
1377 The user can turn off this behaviour by switching the :attr:`~mafw.processor.Processor.remove_orphan_files` to False.
1379 """
1380 if self._database is None or self.remove_orphan_files is False:
1381 # no database connection or no wish to remove orphan files, it does not make sense to continue
1382 return
1384 try:
1385 OrphanFile = cast(MAFwBaseModel, mafw_model_register.get_model('OrphanFile'))
1386 except KeyError:
1387 log.warning('OrphanFile table not found in DB. Please verify database integrity')
1388 return
1390 if TYPE_CHECKING:
1391 assert hasattr(OrphanFile, '_meta')
1393 orphan_files = OrphanFile.select().execute() # type: ignore[no-untyped-call]
1394 if len(orphan_files) != 0:
1395 msg = f'[yellow]Pruning orphan files ({sum(len(f.filenames) for f in orphan_files)})...'
1396 log.info(msg)
1397 for orphan in orphan_files:
1398 # filenames is a list of files:
1399 for f in orphan.filenames:
1400 f.unlink(missing_ok=True)
1402 OrphanFile.delete().execute() # type: ignore[no-untyped-call]
1405class ProcessorList(list[Union['Processor', 'ProcessorList']]):
1406 """
1407 A list like collection of processors.
1409 ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList.
1411 An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError.
1413 Along with an iterable of processors, a new processor list can be built using the following parameters.
1414 """
1416 def __init__(
1417 self,
1418 *args: Processor | ProcessorList,
1419 name: str | None = None,
1420 description: str | None = None,
1421 timer: Timer | None = None,
1422 timer_params: dict[str, Any] | None = None,
1423 user_interface: UserInterfaceBase | None = None,
1424 database: Database | None = None,
1425 database_conf: dict[str, Any] | None = None,
1426 create_standard_tables: bool = True,
1427 ):
1428 """
1429 Constructor parameters:
1431 :param name: The name of the processor list. Defaults to ProcessorList.
1432 :type name: str, Optional
1433 :param description: An optional short description. Default to ProcessorList.
1434 :type description: str, Optional
1435 :param timer: The timer object. If None is provided, a new one will be created. Defaults to None.
1436 :type timer: Timer, Optional
1437 :param timer_params: A dictionary of parameter to build the timer object. Defaults to None.
1438 :type timer_params: dict, Optional
1439 :param user_interface: A user interface. Defaults to None
1440 :type user_interface: UserInterfaceBase, Optional
1441 :param database: A database instance. Defaults to None.
1442 :type database: Database, Optional
1443 :param database_conf: Configuration for the database. Default to None.
1444 :type database_conf: dict, Optional
1445 :param create_standard_tables: Whether or not to create the standard tables. Defaults to True.
1446 :type create_standard_tables: bool, Optional
1447 """
1449 # validate_items takes a tuple of processors, that's why we don't unpack args.
1450 super().__init__(self.validate_items(args))
1451 self._name = name or self.__class__.__name__
1452 self.description = description or self._name
1454 self.timer = timer
1455 self.timer_params = timer_params or {}
1456 self._user_interface = user_interface or ConsoleInterface()
1458 self._resource_stack: contextlib.ExitStack
1459 self._processor_exit_status: ProcessorExitStatus = ProcessorExitStatus.Successful
1460 self.nested_list = False
1461 """
1462 Boolean flag to identify that this list is actually inside another list.
1464 Similarly to the local resource flag for the :class:`.Processor`, this flag prevent the user interface to be
1465 added to the resource stack.
1466 """
1468 # database stuff
1469 self._database: peewee.Database | None = database
1470 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf)
1471 self.create_standard_tables = create_standard_tables
1472 """The boolean flag to proceed or skip with standard table creation and initialisation"""
1474 def __setitem__( # type: ignore[override]
1475 self,
1476 __index: SupportsIndex,
1477 __object: Processor | ProcessorList,
1478 ) -> None:
1479 super().__setitem__(__index, self.validate_item(__object))
1481 def insert(self, __index: SupportsIndex, __object: Processor | ProcessorList) -> None:
1482 """Adds a new processor at the specified index."""
1483 super().insert(__index, self.validate_item(__object))
1485 def append(self, __object: Processor | ProcessorList) -> None:
1486 """Appends a new processor at the end of the list."""
1487 super().append(self.validate_item(__object))
1489 def extend(self, __iterable: Iterable[Processor | ProcessorList]) -> None:
1490 """Extends the processor list with a list of processors."""
1491 if isinstance(__iterable, type(self)):
1492 super().extend(__iterable)
1493 else:
1494 super().extend([self.validate_item(item) for item in __iterable])
1496 @staticmethod
1497 def validate_item(item: Processor | ProcessorList) -> Processor | ProcessorList:
1498 """Validates the item being added."""
1499 if isinstance(item, Processor):
1500 item.local_resource_acquisition = False
1501 return item
1502 elif isinstance(item, ProcessorList):
1503 item.timer_params = dict(suppress_message=True)
1504 item.nested_list = True
1505 return item
1506 else:
1507 raise TypeError(f'Expected Processor or ProcessorList, got {type(item).__name__}')
1509 @staticmethod
1510 def validate_items(items: tuple[Processor | ProcessorList, ...] = ()) -> tuple[Processor | ProcessorList, ...]:
1511 """Validates a tuple of items being added."""
1512 if not items:
1513 return tuple()
1514 return tuple([ProcessorList.validate_item(item) for item in items if item is not None])
1516 @property
1517 def name(self) -> str:
1518 """
1519 The name of the processor list
1521 :return: The name of the processor list
1522 :rtype: str
1523 """
1524 return self._name
1526 @name.setter
1527 def name(self, name: str) -> None:
1528 self._name = name
1530 @property
1531 def processor_exit_status(self) -> ProcessorExitStatus:
1532 """
1533 The processor exit status.
1535 It refers to the whole processor list execution.
1536 """
1537 return self._processor_exit_status
1539 @processor_exit_status.setter
1540 def processor_exit_status(self, status: ProcessorExitStatus) -> None:
1541 self._processor_exit_status = status
1543 @property
1544 def database(self) -> peewee.Database:
1545 """
1546 Returns the database instance
1548 :return: A database instance
1549 :raises MissingDatabase: if a database connection is missing.
1550 """
1551 if self._database is None:
1552 raise MissingDatabase('Database connection not initialized')
1553 return self._database
1555 def execute(self) -> ProcessorExitStatus:
1556 """
1557 Execute the list of processors.
1559 Similarly to the :class:`Processor`, ProcessorList can be executed. In simple words, the execute
1560 method of each processor in the list is called exactly in the same sequence as they were added.
1561 """
1562 with contextlib.ExitStack() as self._resource_stack:
1563 self.acquire_resources()
1564 self._user_interface.create_task(self.name, self.description, completed=0, increment=0, total=len(self))
1565 for i, item in enumerate(self):
1566 if isinstance(item, Processor):
1567 log.info('[bold]Executing [red]%s[/red] processor[/bold]' % item.replica_name)
1568 else:
1569 log.info('[bold]Executing [blue]%s[/blue] processor list[/bold]' % item.name)
1570 self.distribute_resources(item)
1571 item.execute()
1572 self._user_interface.update_task(self.name, increment=1) # i+1, 1, len(self))
1573 self._processor_exit_status = item.processor_exit_status
1574 if self._processor_exit_status == ProcessorExitStatus.Aborted:
1575 msg = 'Processor %s caused the processor list to abort' % item.name
1576 log.error(msg)
1577 raise AbortProcessorException(msg)
1578 self._user_interface.update_task(self.name, completed=len(self), total=len(self))
1579 return self._processor_exit_status
1581 def acquire_resources(self) -> None:
1582 """Acquires external resources."""
1583 # The strategy is similar to the one for processor. if we do get resources already active (not None) then we use
1584 # them, otherwise, we create them and we add them to the resource stack.
1585 if self.timer is None:
1586 self.timer = self._resource_stack.enter_context(Timer(**self.timer_params))
1587 # The user interface is very likely already initialised by the runner.
1588 # But if this is a nested list, then we must not push the user interface in the stack
1589 # otherwise the user interface context (progress for rich) will be stopped at the end
1590 # of the nested list.
1591 if not self.nested_list: 1591 ↛ 1593line 1591 didn't jump to line 1593 because the condition on line 1591 was always true
1592 self._resource_stack.enter_context(self._user_interface)
1593 if self._database is None and self._database_conf is None:
1594 # no database, nor configuration.
1595 # we cannot do anything
1596 pass
1597 elif self._database is None and self._database_conf is not None:
1598 # no db, but we got a configuration.
1599 # we can make a db
1600 if 'DBConfiguration' in self._database_conf:
1601 conf = self._database_conf['DBConfiguration'] # type1
1602 else:
1603 conf = self._database_conf # type2
1605 # guess the database type from the URL
1606 protocol = extract_protocol(conf.get('URL'))
1608 # build the connection parameter
1609 # in case of sqlite, we add the pragmas group as well
1610 connection_parameters = {}
1611 if protocol == 'sqlite':
1612 connection_parameters['pragmas'] = conf.get('pragmas', {})
1613 for key, value in conf.items():
1614 if key not in ['URL', 'pragmas']:
1615 connection_parameters[key] = value
1617 self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB
1618 try:
1619 self._database.connect()
1620 self._resource_stack.callback(self._database.close)
1621 except peewee.OperationalError as e:
1622 log.critical('Unable to connect to %s', self._database_conf.get('URL'))
1623 raise e
1624 database_proxy.initialize(self._database)
1625 if self.create_standard_tables:
1626 standard_tables = mafw_model_register.get_standard_tables()
1627 self.database.create_tables(standard_tables)
1628 for table in standard_tables:
1629 table.init()
1630 else: # equiv to if self._database is not None:
1631 # we got a database, so very likely we are inside a processor list
1632 # the connection has been already set and the initialisation as well.
1633 # nothing else to do here.
1634 pass
1636 def distribute_resources(self, processor: Processor | Self) -> None:
1637 """Distributes the external resources to the items in the list."""
1638 processor.timer = self.timer
1639 processor._user_interface = self._user_interface
1640 processor._database = self._database