Coverage for src / mafw / processor.py: 99%
1014 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-28 13:34 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-28 13:34 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core
6functionality of the MAFw.
7"""
9from __future__ import annotations
11import contextlib
12import inspect
13import logging
14import os
15import queue
16import threading
17import time
18import warnings
19from collections import OrderedDict
20from collections.abc import Callable, Iterator
21from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
22from copy import copy, deepcopy
23from functools import wraps
24from itertools import count
25from typing import (
26 TYPE_CHECKING,
27 Any,
28 Collection,
29 Generic,
30 Iterable,
31 Self,
32 SupportsIndex,
33 TypeVar,
34 Union,
35 cast,
36 get_args,
37 get_origin,
38 get_type_hints,
39)
41import peewee
42from peewee import Database
44# noinspection PyUnresolvedReferences
45from playhouse.db_url import connect
47import mafw.db.db_filter
48from mafw.active import Active
49from mafw.db.db_connection import build_connection_parameters
50from mafw.db.db_model import MAFwBaseModel, database_proxy, mafw_model_register
51from mafw.enumerators import LoopingStatus, LoopType, ProcessorExitStatus, ProcessorStatus
52from mafw.mafw_errors import (
53 AbortProcessorException,
54 MissingDatabase,
55 MissingOverloadedMethod,
56 MissingSuperCall,
57 ProcessorParameterError,
58)
59from mafw.models.filter_schema import FilterSchema
60from mafw.models.loop_payloads import LoopItem, LoopResult
61from mafw.models.parameter_schema import ParameterSchema
62from mafw.models.processor_schema import ProcessorSchema
63from mafw.timer import Timer, pretty_format_duration
64from mafw.tools.generics import deep_update
65from mafw.tools.parallel import is_free_threading
66from mafw.ui.abstract_user_interface import UserInterfaceBase
67from mafw.ui.console_user_interface import ConsoleInterface
69log = logging.getLogger(__name__)
71ParameterType = TypeVar('ParameterType')
72"""Generic variable type for the :class:`ActiveParameter` and :class:`PassiveParameter`."""
75def validate_database_conf(database_conf: dict[str, Any] | None = None) -> dict[str, Any] | None:
76 """
77 Validates the database configuration.
79 :param database_conf: The input database configuration. Defaults to None.
80 :type database_conf: dict, Optional
81 :return: Either the validated database configuration or None if it is invalid.
82 :rtype: dict, None
83 """
84 if database_conf is None:
85 return None
87 # dict is mutable, if I change it inside the function, I change it also outside.
88 conf = database_conf.copy()
90 if 'DBConfiguration' in conf:
91 # the database_conf is the steering file. Extract the DBConfiguration table
92 conf = conf['DBConfiguration']
93 required_fields = ['URL']
94 if all([field in conf for field in required_fields]):
95 return database_conf
96 else:
97 return None
100class PassiveParameter(Generic[ParameterType]):
101 """
102 An helper class to store processor parameter value and metadata.
104 This class is the private interface used by the :class:`ActiveParameter` descriptor to store its value and metadata.
106 When a new :class:`.ActiveParameter` is added to a class, an instance of a PassiveParameter is added to the
107 processor parameter :attr:`register <.Processor._processor_parameters>`.
109 .. seealso::
111 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor
112 parameters <parameters>`
114 .. versionchanged:: v2.0.0
116 User should only use :class:`ActiveParameter` and never manually instantiate :class:`PassiveParameter`.
117 """
119 def __init__(
120 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = ''
121 ):
122 """
123 Constructor parameters:
125 :param name: The name of the parameter. It must be a valid python identifier.
126 :type name: str
127 :param value: The set value of the parameter. If None, then the default value will be used. Defaults to None.
128 :type value: ParameterType, Optional
129 :param default: The default value for the parameter. It is used if the :attr:`value` is not provided. Defaults to None.
130 :type default: ParameterType, Optional
131 :param help_doc: A brief explanation of the parameter.
132 :type help_doc: str, Optional
133 :raises ProcessorParameterError: if both `value` and `default` are not provided or if `name` is not a valid identifier.
134 """
135 if not name.isidentifier():
136 raise ProcessorParameterError(f'{name} is not a valid python identifier.')
138 self.name = name
140 if value is not None:
141 self._value: ParameterType = value
142 self._is_set = True
143 self._is_optional = False
144 elif default is not None:
145 self._value = default
146 self._is_set = False
147 self._is_optional = True
148 else:
149 raise ProcessorParameterError('Processor parameter cannot have both value and default value set to None')
151 self.doc = help_doc
153 def __rich_repr__(self) -> Iterator[Any]:
154 yield 'name', self.name
155 yield 'value', self.value, None
156 yield 'help_doc', self.doc, ''
158 @property
159 def is_set(self) -> bool:
160 """
161 Property to check if the value has been set.
163 It is useful for optional parameter to see if the current value is the default one, or if the user set it.
164 """
165 return self._is_set
167 @property
168 def value(self) -> ParameterType:
169 """
170 Gets the parameter value.
172 :return: The parameter value.
173 :rtype: ParameterType
174 :raises ProcessorParameterError: if both value and default were not defined.
175 """
176 return self._value
178 @value.setter
179 def value(self, value: ParameterType) -> None:
180 """
181 Sets the parameter value.
183 :param value: The value to be set.
184 :type value: ParameterType
185 """
186 self._value = value
187 self._is_set = True
189 @property
190 def is_optional(self) -> bool:
191 """
192 Property to check if the parameter is optional.
194 :return: True if the parameter is optional
195 :rtype: bool
196 """
197 return self._is_optional
199 def __repr__(self) -> str:
200 args = ['name', 'value', 'doc']
201 values = [getattr(self, arg) for arg in args]
202 return '{klass}({attrs})'.format(
203 klass=self.__class__.__name__,
204 attrs=', '.join('{}={!r}'.format(k, v) for k, v in zip(args, values)),
205 )
208F = TypeVar('F', bound=Callable[..., Any])
209"""Type variable for generic callable with any return value."""
212def ensure_parameter_registration(func: F) -> F:
213 """Decorator to ensure that before calling `func` the processor parameters have been registered."""
215 @wraps(func)
216 def wrapper(*args: Processor, **kwargs: Any) -> F:
217 # the first positional arguments must be self
218 if len(args) == 0:
219 raise ProcessorParameterError(
220 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.'
221 )
222 self = args[0]
223 if not isinstance(self, Processor):
224 raise ProcessorParameterError(
225 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.'
226 )
227 if self._parameter_registered is False:
228 self._register_parameters()
229 return cast(F, func(*args, **kwargs))
231 return cast(F, wrapper)
234_ParameterRegistry = OrderedDict[str, 'ActiveParameter[Any]']
237def _copy_parent_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry:
238 """
239 Build an ordered dictionary of ActiveParameters inherited from base classes.
240 """
241 definitions: _ParameterRegistry = OrderedDict()
242 for base in owner.__mro__[1:]:
243 base_definitions = getattr(base, '_parameter_definitions', None)
244 if base_definitions:
245 for name, descriptor in base_definitions.items():
246 definitions[name] = descriptor
247 return definitions
250def _ensure_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry:
251 """
252 Return the class-level parameter registry, creating it by copying base definitions if necessary.
253 """
254 definitions: _ParameterRegistry | None = owner.__dict__.get('_parameter_definitions')
255 if definitions is not None:
256 return definitions
258 definitions = _copy_parent_parameter_definitions(owner)
259 setattr(owner, '_parameter_definitions', definitions)
260 return definitions
263def _validate_filter_schema(owner: type[Processor]) -> None:
264 """
265 Validate the optional :class:`~mafw.models.filter_schema.FilterSchema` declared on a Processor.
266 """
267 schema = getattr(owner, '_filter_schema', None)
268 if schema is None:
269 return
270 if not isinstance(schema, FilterSchema):
271 raise ProcessorParameterError('Processor._filter_schema must be a FilterSchema instance')
273 root_model = schema.root_model
274 if not (inspect.isclass(root_model) and issubclass(root_model, MAFwBaseModel)):
275 raise ProcessorParameterError('FilterSchema.root_model must inherit from MAFwBaseModel')
277 seen_models = {root_model}
278 for model in schema.allowed_models:
279 if not (inspect.isclass(model) and issubclass(model, MAFwBaseModel)):
280 raise ProcessorParameterError('FilterSchema.allowed_models must contain MAFwBaseModel subclasses')
281 if model in seen_models:
282 raise ProcessorParameterError(f'Model {model!r} already declared in FilterSchema')
283 seen_models.add(model)
286class ActiveParameter(Generic[ParameterType]):
287 r"""
288 The public interface to the processor parameter.
290 The behaviour of a :class:`Processor` can be customised by using processor parameters. The value of these
291 parameters can be either set via a configuration file or directly when creating the class.
293 If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an
294 ActiveParameter instance in this way:
296 .. code-block::
298 class MyProcessor(Processor):
300 # this is the input folder
301 input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files')
303 def __init__(self, *args, **kwargs):
304 super().__init(*args, **kwargs)
306 # change the input folder to something else
307 self.input_folder = Path(r'D:\data')
309 # get the value of the parameter
310 print(self.input_folder)
312 The ActiveParameter is a `descriptor <https://docs.python.org/3/glossary.html#term-descriptor>`_, it means that
313 when you create one of them, a lot of work is done behind the scene.
315 In simple words, a processor parameter is made by two objects: a public interface where the user can easily
316 access the value of the parameter and a private interface where all other information (default, documentation...)
317 is also stored.
319 The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as
320 in the code snippet above, the private interface is automatically created and will stay in the class instance
321 until the end of the class lifetime.
323 To access the private interface, the user can use the :meth:`Processor.get_parameter` method using the parameter
324 name as a key.
326 The user can assign to an ActiveParameter almost any name. There are just a few invalid parameter names that are
327 used for other purposes. The list of reserved names is available :attr:`here <reserved_names>`. Should the user
328 inadvertently use a reserved named, a :exc:`.ProcessorParameterError` is raised.
330 .. seealso::
332 The private counter part in the :class:`PassiveParameter`.
334 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor
335 parameters <parameters>`
337 The list of :attr:`reserved names <reserved_names>`.
338 """
340 reserved_names: list[str] = ['__logic__', '__filter__', '__new_only__', '__inheritance__', '__enable__']
341 """A list of names that cannot be used as processor parameter names.
343 - `__logic__`
344 - `__filter__`
345 - `__new_only__`
346 - `__inheritance__`
347 - `__enable__`
348 """
350 def __init__(
351 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = ''
352 ):
353 """
354 Constructor parameters:
356 :param name: The name of the parameter.
357 :type name: str
358 :param value: The initial value of the parameter. Defaults to None.
359 :type value: ParameterType, Optional
360 :param default: The default value of the parameter, to be used when ``value`` is not set., Defaults to None.
361 :type default: ParameterType, Optional
362 :param help_doc: An explanatory text describing the parameter.
363 :type help_doc: str, Optional
364 """
366 self._value = value
367 self._default = default
368 self._help_doc = help_doc
369 self._external_name = self._validate_name(name)
371 def _validate_name(self, proposed_name: str) -> str:
372 """
373 Validate that the proposed parameter name is not in the list of forbidden names.
375 This private method checks if the provided name is allowed for use as a processor parameter.
376 Names that are listed in :attr:`reserved_names` cannot be used as parameter names.
378 :param proposed_name: The name to be validated for use as a processor parameter.
379 :type proposed_name: str
380 :return: The validated name if it passes the forbidden names check.
381 :rtype: str
382 :raises ProcessorParameterError: If the proposed name is in the list of forbidden names.
383 """
384 if proposed_name not in self.reserved_names:
385 return proposed_name
386 raise ProcessorParameterError(f'Attempt to use a forbidden name ({proposed_name})')
388 def __set_name__(self, owner: type[Processor], name: str) -> None:
389 self.public_name = name
390 self.private_name = f'param_{name}'
391 self._owner = owner
393 definitions = _ensure_parameter_definitions(owner)
394 existing = definitions.get(self._external_name)
395 if existing is not None and getattr(existing, '_owner', None) is owner:
396 if not hasattr(owner, '_parameter_definition_error'):
397 setattr(
398 owner,
399 '_parameter_definition_error',
400 ProcessorParameterError(f'Duplicated parameter name ({self._external_name}).'),
401 )
402 return
403 definitions[self._external_name] = self
405 def __get__(self, obj: Processor, obj_type: type[Processor]) -> ActiveParameter[ParameterType] | ParameterType:
406 if obj is None:
407 return self
409 # retrieve instance-level passive parameter
410 param = cast(PassiveParameter[ParameterType], obj._processor_parameters[self._external_name])
411 return param.value
413 def __set__(self, obj: Processor, value: ParameterType) -> None:
414 param = obj._processor_parameters[self._external_name]
415 param.value = value
417 def to_schema(self) -> ParameterSchema:
418 """
419 Returns the static schema describing this parameter.
421 The schema is derived solely from the descriptor metadata and does not instantiate the owning processor.
422 """
423 annotation = self._resolve_parameter_annotation()
424 default_value = self._schema_default_value()
425 help_text = self._help_doc or None
426 is_list = self._is_list_annotation(annotation, default_value)
427 is_dict = self._is_dict_annotation(annotation, default_value)
428 return ParameterSchema(
429 name=self._external_name,
430 annotation=annotation,
431 default=default_value,
432 help=help_text,
433 is_list=is_list,
434 is_dict=is_dict,
435 )
437 def _resolve_parameter_annotation(self) -> type | None:
438 if not hasattr(self, 'public_name') or getattr(self, '_owner', None) is None:
439 return None
441 target = getattr(self, '_owner', None)
443 try:
444 hints = get_type_hints(target)
445 except Exception:
446 hints = {}
448 hint = hints.get(self.public_name)
449 if hint is None:
450 fallback = self._default if self._default is not None else self._value
451 if fallback is not None:
452 return type(fallback)
453 return None
455 origin = get_origin(hint)
456 if origin is ActiveParameter:
457 args = get_args(hint)
458 if args:
459 return cast(type | None, args[0])
460 return None
461 return cast(type | None, hint)
463 def _schema_default_value(self) -> Any:
464 candidate = self._default if self._default is not None else self._value
465 if candidate is None:
466 return None
467 try:
468 return deepcopy(candidate)
469 except Exception:
470 return candidate
472 def _is_list_annotation(self, annotation: type | None, default_value: Any) -> bool:
473 return self._matches_container(annotation, default_value, list)
475 def _is_dict_annotation(self, annotation: type | None, default_value: Any) -> bool:
476 return self._matches_container(annotation, default_value, dict)
478 def _matches_container(self, annotation: type | None, default_value: Any, container: type) -> bool:
479 type_hint = annotation
480 if type_hint is None and default_value is not None:
481 type_hint = type(default_value)
483 if type_hint is None:
484 return False
486 origin = get_origin(type_hint)
487 if origin is container:
488 return True
490 if isinstance(type_hint, type) and issubclass(type_hint, container):
491 return True
493 return False
496class ProcessorMeta(type):
497 """Metaclass that finalizes Processor subclasses before instantiation."""
499 def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict[str, Any]) -> None:
500 """
501 Set up new Processor subclasses.
503 This method first checks the class for well-formed parameter definitions and filter metadata, then runs the
504 super-call wrapping machinery described below. The wrappers are installed at class-creation time so that any
505 later instance will be guarded before it even starts executing.
506 """
507 super().__init__(name, bases, namespace)
508 _ensure_parameter_definitions(cast(type['Processor'], cls))
509 _validate_filter_schema(cast(type['Processor'], cls))
510 error = getattr(cls, '_parameter_definition_error', None)
511 if error is not None:
512 delattr(cls, '_parameter_definition_error')
513 raise error
514 if any(base.__name__ == 'Processor' for base in cls.__mro__[1:]):
515 apply_wrappers = getattr(cls, '_apply_super_call_wrappers', None)
516 if apply_wrappers is not None:
517 apply_wrappers()
519 def __call__(cls, *args: Any, **kwargs: Any) -> 'ProcessorMeta':
520 obj = type.__call__(cls, *args, **kwargs)
521 obj.__post_init__()
522 return cast(ProcessorMeta, obj)
525# noinspection PyProtectedMember
526class Processor(metaclass=ProcessorMeta):
527 """
528 The basic processor.
530 A very comprehensive description of what a Processor does and how it works is available at :ref:`doc_processor`.
531 """
533 processor_status = Active(ProcessorStatus.Unknown)
534 """Processor execution status"""
536 progress_message: str = f'{__qualname__} is working'
537 """Message displayed to show the progress.
539 It can be customized with information about the current item in the loop by overloading the
540 :meth:`format_progress_message`."""
542 #: List of methods that should invoke their super implementation when overridden.
543 _methods_to_be_checked_for_super: tuple[str, ...] = ('start', 'finish')
545 @classmethod
546 def parameter_schema(cls) -> list[ParameterSchema]:
547 """
548 Return the ordered static schema for the processor parameters defined on the class.
550 The schema is derived without instantiating the processor, keeping toolchains free from side effects.
551 """
552 definitions = getattr(cls, '_parameter_definitions', None)
553 if definitions is None:
554 definitions = _ensure_parameter_definitions(cls)
555 return [param.to_schema() for param in definitions.values()]
557 @classmethod
558 def filter_schema(cls) -> FilterSchema | None:
559 """
560 Optional metadata describing the models available for filtering.
561 """
562 return getattr(cls, '_filter_schema', None)
564 @classmethod
565 def processor_schema(cls) -> ProcessorSchema:
566 """
567 Return the static schema for the processor.
568 """
569 return ProcessorSchema(parameters=cls.parameter_schema(), filter=cls.filter_schema())
571 _ids = count(0)
572 """A counter for all processor instances"""
574 new_defaults: dict[str, Any] = {}
575 """
576 A dictionary containing defaults value for the parameters to be overridden
578 .. versionadded:: v2.0.0
579 """
581 new_only_flag = 'new_only'
583 def __init__(
584 self,
585 name: str | None = None,
586 description: str | None = None,
587 config: dict[str, Any] | None = None,
588 looper: LoopType | str = LoopType.ForLoop,
589 user_interface: UserInterfaceBase | None = None,
590 timer: Timer | None = None,
591 timer_params: dict[str, Any] | None = None,
592 database: Database | None = None,
593 database_conf: dict[str, Any] | None = None,
594 remove_orphan_files: bool = True,
595 replica_id: str | None = None,
596 create_standard_tables: bool = True,
597 max_workers: int | None = None,
598 queue_size: int | None = None,
599 queue_batch_size: int | None = None,
600 *args: Any,
601 **kwargs: Any,
602 ) -> None:
603 """
604 Constructor parameters
606 :param name: The name of the processor. If None is provided, the class name is used instead. Defaults to None.
607 :type name: str, Optional
608 :param description: A short description of the processor task. Defaults to the processor name.
609 :type description: str, Optional
610 :param config: A configuration dictionary for this processor. Defaults to None.
611 :type config: dict, Optional
612 :param looper: Enumerator to define the looping type. Defaults to LoopType.ForLoop
613 :type looper: LoopType, Optional
614 :param user_interface: A user interface instance to be used by the processor to interact with the user.
615 :type user_interface: UserInterfaceBase, Optional
616 :param timer: A timer object to measure process duration.
617 :type timer: Timer, Optional
618 :param timer_params: Parameters for the timer object.
619 :type timer_params: dict, Optional
620 :param database: A database instance. Defaults to None.
621 :type database: Database, Optional
622 :param database_conf: Configuration for the database. Default to None.
623 :type database_conf: dict, Optional
624 :param remove_orphan_files: Boolean flag to remove files on disc without a reference to the database.
625 See :ref:`std_tables` and :meth:`~mafw.processor.Processor._remove_orphan_files`. Defaults to True
626 :type remove_orphan_files: bool, Optional
627 :param replica_id: The replica identifier for the current processor.
628 :type replica_id: str, Optional
629 :param create_standard_tables: Boolean flag to create std tables on disk. Defaults to True
630 When a nested steering configuration is loaded, this value can be overridden by the
631 global create_standard_tables entry. Flat processor configurations keep the constructor value.
632 :type create_standard_tables: bool, Optional
633 :param max_workers: Number of worker threads for parallel loops.
634 :type max_workers: int, Optional
635 :param queue_size: Maximum size of the internal queue for the queue-based parallel loop.
636 :type queue_size: int, Optional
637 :param queue_batch_size: Number of items processed per worker task in the queue-based parallel loop.
638 :type queue_batch_size: int, Optional
639 :param kwargs: Keyword arguments that can be used to set processor parameters.
640 """
642 self.name = name or self.__class__.__name__
643 """The name of the processor."""
645 self.unique_id = next(self._ids)
646 """A unique identifier representing how many instances of Processor has been created."""
648 self.replica_id = replica_id
649 """
650 The replica identifier specified in the constructor
652 .. versionadded:: v2.0.0
653 """
655 self.description = description or self.name
656 """A short description of the processor task."""
658 self._item: Any = None
659 """The current item of the loop."""
661 self._looping_status: LoopingStatus = LoopingStatus.Continue
662 """The looping status for the main thread."""
664 self._thread_local = threading.local()
665 """Thread-local storage for loop attributes in parallel execution.
667 .. versionadded:: v2.1.0
668 """
670 self._wall_clock_start: float | None = None
671 """Timestamp when the looping execution began."""
673 self.processor_exit_status = ProcessorExitStatus.Successful
674 """Processor exit status"""
676 self.loop_type: LoopType = LoopType(looper)
677 """
678 The loop type.
680 The value of this parameter can also be changed by the :func:`~mafw.decorators.execution_workflow` decorator
681 factory.
683 See :class:`~mafw.enumerators.LoopType` for more details.
684 """
686 self.create_standard_tables = create_standard_tables
687 """The boolean flag to proceed or skip with standard table creation and initialisation"""
689 self.max_workers = max_workers if max_workers is not None else self._compute_default_max_workers()
690 """Maximum number of worker threads used in parallel loops."""
692 computed_queue_size = max(1, self.max_workers * 2)
693 self.queue_size = queue_size if queue_size is not None else computed_queue_size
694 """Maximum size of the queue used by :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`."""
696 self.queue_batch_size = max(1, queue_batch_size or 1)
697 """Number of items processed per worker task in :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`."""
699 # private attributes
700 self._config: dict[str, Any] = {} # deepcopy(config) if config is not None else {}
701 """
702 A dictionary containing the processor configuration object.
704 This dictionary is populated with configuration parameter (always type 2) during the
705 :meth:`._load_parameter_configuration` method.
707 The original value of the configuration dictionary that is passed to the constructor is stored in
708 :attr:`._orig_config`.
710 .. versionchanged:: v2.0.0
711 Now it is an empty dictionary until the :meth:`._load_parameter_configuration` is called.
713 """
715 self._orig_config = deepcopy(config) if config is not None else {}
716 """
717 A copy of the original configuration dictionary.
719 .. versionadded:: v2.0.0
720 """
722 self._processor_parameters: OrderedDict[str, PassiveParameter[Any]] = OrderedDict()
723 """
724 A dictionary to store all the processor parameter instances.
726 The name of the parameter is used as a key, while for the value an instance of the
727 :class:`.PassiveParameter` is used.
728 """
729 # wait for answer from SO
730 self._parameter_registered = False
731 """A boolean flag to confirm successful parameter registration."""
732 self._kwargs = kwargs
734 # loops attributes
735 self._i_item: int = -1
736 self._n_item: int | None = -1
737 self._process_durations: list[float] = []
738 self._super_call_flags: dict[str, bool] = {}
739 """Tracks super-call usage for methods that require it."""
741 # resource stack
742 self._resource_stack: contextlib.ExitStack
743 self._resource_acquisition: bool = True
745 # processor timer
746 self.timer: Timer | None = timer
747 self._timer_parameters: dict[str, Any] = timer_params or {}
749 # user interface
750 if user_interface is None:
751 self._user_interface: UserInterfaceBase = ConsoleInterface()
752 else:
753 self._user_interface = user_interface
755 # database stuff
756 self._database: peewee.Database | None = database
757 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf)
758 self.filter_register: mafw.db.db_filter.ProcessorFilter = mafw.db.db_filter.ProcessorFilter()
759 """The DB filter register of the Processor."""
760 self.remove_orphan_files: bool = remove_orphan_files
761 """The flag to remove or protect the orphan files. Defaults to True"""
763 self.initialise_parameters()
765 def initialise_parameters(self) -> None:
766 """
767 Initialises processor parameters by registering them and applying various configuration sources.
769 This method orchestrates the parameter initialisation process by performing the following steps in order:
771 #. Registers processor parameters defined as :class:`ActiveParameter` instances
772 #. Overrides default parameter values with any configured overrides
773 #. Loads parameter configuration from the processor's configuration dictionary
774 #. Applies keyword arguments as parameter overrides
776 The method ensures that all processor parameters are properly configured before the processor
777 execution begins. It is automatically called during processor initialisation and should not
778 typically be called directly by users.
780 .. seealso::
781 :meth:`_register_parameters`, :meth:`_override_defaults`,
782 :meth:`_load_parameter_configuration`, :meth:`_overrule_kws_parameters`
784 .. versionadded:: v2.0.0
785 """
786 self._register_parameters()
787 self._override_defaults()
788 self._load_parameter_configuration()
789 self._overrule_kws_parameters()
791 def __post_init__(self) -> None:
792 """
793 Performs post-initialisation tasks for the processor.
795 This method is automatically called after the processor initialisation is complete.
796 It performs validation checks on overloaded methods and sets the initial processor status.
798 .. seealso::
799 :meth:`validate_configuration`, :meth:`_check_method_overload`,
800 :attr:`~mafw.processor.Processor.processor_status`
802 .. versionchanged:: v2.0.0
803 Moved the parameter initialisation to :meth:`initialise_parameters` and now executed as last step of the
804 init method.
806 Added the validate configuration check. This method should silently check that configuration provided
807 with the processor parameters is valid. If not, a :exc:`.ProcessorParameterError` is raised.
808 """
809 self.validate_configuration()
810 self._check_method_overload()
811 self.processor_status = ProcessorStatus.Init
813 def _register_parameters(self) -> None:
814 """
815 Register processor parameters defined as ActiveParameter instances in the class.
817 This private method scans the class definition for any :class:`.ActiveParameter` instances and creates
818 corresponding :class:`.PassiveParameter` instances to store the actual parameter values and metadata.
819 It ensures that all processor parameters are properly initialised and available for configuration
820 through the processor's configuration system.
822 The method checks for duplicate parameter names and raises a :exc:`.ProcessorParameterError` if duplicates
823 are detected. It also sets the internal flag :attr:`._parameter_registered` to True once registration is
824 complete.
826 .. note::
827 This method is automatically called during processor initialisation and should not be called directly
828 by users.
830 .. seealso::
831 :class:`.Processor`, :meth:`.Processor._override_defaults`,
832 :meth:`.Processor._load_parameter_configuration`, :meth:`.Processor._overrule_kws_parameters`
834 .. versionchanged:: v2.0.0
835 Only :class:`ActiveParameter` are not registered. The use of :class:`PassiveParameter` is only meant to
836 store the value and metadata of the active counter part.
837 """
838 if self._parameter_registered:
839 return
841 definitions = getattr(self.__class__, '_parameter_definitions', None)
842 if definitions is None:
843 definitions = _ensure_parameter_definitions(self.__class__)
845 for attr in definitions.values():
846 ext_name = attr._external_name
847 if ext_name in self._processor_parameters:
848 raise ProcessorParameterError(f'Duplicated parameter name ({ext_name}).')
849 self._processor_parameters[ext_name] = PassiveParameter(
850 ext_name, attr._value, attr._default, attr._help_doc
851 )
853 self._parameter_registered = True
855 def _override_defaults(self) -> None:
856 """
857 Override default parameter values with values from :attr:`new_defaults`.
859 This private method iterates through the :attr:`new_defaults` dictionary and updates
860 the corresponding processor parameters with new values. Only parameters that exist
861 in both :attr:`new_defaults` and :attr:`_processor_parameters` are updated.
863 .. versionadded:: v2.0.0
864 """
865 for key, value in self.new_defaults.items():
866 if key in self._processor_parameters:
867 self._processor_parameters[key].value = value
869 def _reset_parameters(self) -> None:
870 """
871 Reset processor parameters to their initial state.
873 This method clears all currently registered processor parameters and triggers
874 a fresh registration process. It's useful when parameter configurations need
875 to be reinitialized or when parameters have been modified and need to be reset.
877 .. seealso::
878 :meth:`_register_parameters`, :meth:`_register_parameters`
879 """
880 self._processor_parameters = OrderedDict()
881 self._parameter_registered = False
882 self._register_parameters()
884 @ensure_parameter_registration
885 def _load_parameter_configuration(self) -> None:
886 """
887 Load processor parameter configuration from the internal configuration dictionary.
889 This method processes the processor's configuration dictionary to set parameter values.
890 It handles two configuration formats:
892 1. Nested format: ``{'ProcessorName': {'param1': value1, ...}}``
893 2. Flat format: ``{'param1': value1, ...}``
895 The method also handles filter configurations by collecting filter table names
896 and deferring their initialisation until after the global filter has been processed.
898 .. versionchanged:: v2.0.0
899 For option 1 combining configuration from name and name_replica
901 .. versionchanged:: v2.1.2
902 When a nested steering configuration is loaded, the processor-level
903 create_standard_tables value is overridden from the global steering-file setting.
904 Flat configurations keep the constructor value untouched.
906 :raises ProcessorParameterError: If a parameter in the configuration is not registered.
908 .. seealso::
909 :meth:`mafw.db.db_filter.ModelFilter.from_conf`
910 """
911 original_config = copy(self._orig_config)
912 flt_list = []
914 # by default the flag new_only is set to true
915 # unless the user specify differently in the general section of the steering file
916 self.filter_register.new_only = original_config.get(self.new_only_flag, True)
918 # we need to check if the configuration object is of type 1 or type 2
919 if any([name for name in [self.name, self.replica_name] if name in original_config]):
920 # one of the two names (the base or the replica) must be present in case of option 1
921 # we start from the base name. If not there, then take an empty dict
922 option1_config_base = original_config.get(self.name, {})
923 if self.name != self.replica_name:
924 # if there is the replica name, then update the base configuration with the replica value
925 # we get the replica configuration
926 option1_config_replica = original_config.get(self.replica_name, {})
928 # let's check if the user wants to have inheritance default
929 # by default is True
930 inheritance = option1_config_replica.get('__inheritance__', True)
931 if inheritance:
932 # we update the base with the replica without changing the base
933 option1_config_update = deep_update(option1_config_base, option1_config_replica, copy_first=True)
934 else:
935 # we do not use the base with the replica specific, we pass the replica as the updated
936 option1_config_update = option1_config_replica
938 # we modify the type 1 original so that the table for the replica has the updated configuration
939 # this is used for the filter configuration at the end.
940 original_config[self.replica_name] = option1_config_update
941 else:
942 # there is not replica, so the update is equal to the base.
943 option1_config_update = option1_config_base
945 self._config = option1_config_update
946 if 'create_standard_tables' in original_config:
947 self.create_standard_tables = bool(original_config['create_standard_tables'])
948 else:
949 # for type 2 we are already good to go
950 self._config = original_config
952 filter_config = deepcopy(original_config)
954 def _sanitize_filter_config(processor_name: str) -> None:
955 processor_config = filter_config.get(processor_name)
956 if not isinstance(processor_config, dict):
957 return
958 filter_table = processor_config.get('__filter__')
959 if not isinstance(filter_table, dict):
960 return
962 sanitized_table: dict[str, Any] = {}
963 for model_name, model_config in filter_table.items():
964 if not isinstance(model_config, dict):
965 sanitized_table[model_name] = model_config
966 continue
968 model_config_copy = deepcopy(model_config)
969 if not bool(model_config_copy.pop('__enable__', True)):
970 continue
972 for field_name, field_value in list(model_config_copy.items()):
973 if (
974 isinstance(field_value, dict)
975 and not ('op' in field_value and 'value' in field_value)
976 and '__enable__' in field_value
977 ):
978 field_enabled = bool(field_value.pop('__enable__', True))
979 if not field_enabled:
980 model_config_copy.pop(field_name, None)
982 conditionals = model_config_copy.get('__conditional__')
983 if isinstance(conditionals, list):
984 filtered_conditionals: list[Any] = []
985 for conditional in conditionals:
986 if isinstance(conditional, dict):
987 conditional_enabled = bool(conditional.pop('__enable__', True))
988 if not conditional_enabled:
989 continue
990 filtered_conditionals.append(conditional)
991 model_config_copy['__conditional__'] = filtered_conditionals
993 sanitized_table[model_name] = model_config_copy
995 processor_config['__filter__'] = sanitized_table
997 _sanitize_filter_config(self.replica_name)
999 for key, value in self._config.items():
1000 if key in self._processor_parameters:
1001 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO
1002 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above
1003 elif key == '__filter__':
1004 # we got a filter table!
1005 # it should contain one table for each model
1006 # we add all the names to a list for deferred initialisation
1007 flt_table = self._config[key]
1008 if isinstance(flt_table, dict):
1009 for model_name, model_config in flt_table.items():
1010 if isinstance(model_config, dict) and not bool(model_config.get('__enable__', True)):
1011 continue
1012 flt_list.append(f'{self.replica_name}.__filter__.{model_name}')
1013 elif key == '__logic__':
1014 # we got a filter logic string
1015 # we store it in the filter register directly
1016 self.filter_register._logic = self._config[key]
1017 elif key == '__new_only__':
1018 # we got a new only boolean, we store it in the filter register
1019 self.filter_register.new_only = self._config[key]
1021 # only now, after the configuration file has been totally read, we can do the real filter initialisation.
1022 # This is to be sure that if there were a GlobalFilter table, this has been read.
1023 # The global filter region will be used as a starting point for the construction of a new filter (default
1024 # parameter in the from_conf class method).
1025 for flt_name in flt_list:
1026 model_name = flt_name.split('.')[-1]
1027 self.filter_register[model_name] = mafw.db.db_filter.ModelFilter.from_conf(flt_name, filter_config)
1029 @ensure_parameter_registration
1030 def _overrule_kws_parameters(self) -> None:
1031 """
1032 Override processor parameters with values from keyword arguments.
1034 This method applies parameter values passed as keyword arguments during processor
1035 initialisation. It ensures that the parameter types match the expected types
1036 before setting the values.
1038 .. seealso::
1039 :meth:`_register_parameters`, :meth:`_load_parameter_configuration`,
1040 :meth:`set_parameter_value`
1041 """
1042 for key, value in self._kwargs.items():
1043 if key in self._processor_parameters:
1044 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO
1045 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above
1047 def validate_configuration(self) -> None:
1048 """
1049 Validate the configuration provided via the processor parameters.
1051 Method to be implemented by subclasses if a configuration validation is needed.
1053 The method should silently check for the proper configuration, if this is not obtained,
1054 then the :exc:`.InvalidConfigurationError` must be raised.
1056 .. versionadded:: v2.0.0
1057 """
1058 pass
1060 def _check_method_overload(self) -> None:
1061 """
1062 Check if the user overloaded the required methods.
1064 Depending on the loop type, the user must overload different methods.
1065 This method is doing the check and if the required methods are not overloaded a warning is emitted.
1066 """
1067 methods_dict: dict[LoopType, list[str]] = {
1068 LoopType.WhileLoop: ['while_condition'],
1069 LoopType.ForLoop: ['get_items'],
1070 LoopType.ParallelForLoop: ['get_items'],
1071 LoopType.ParallelForLoopWithQueue: ['get_items'],
1072 }
1073 required_methods: list[str] = methods_dict.get(self.loop_type, [])
1074 for method in required_methods:
1075 if getattr(type(self), method) == getattr(Processor, method):
1076 warnings.warn(
1077 MissingOverloadedMethod(
1078 '%s was not overloaded. The process execution workflow might not work.' % method
1079 )
1080 )
1082 @classmethod
1083 def _apply_super_call_wrappers(cls) -> None:
1084 """
1085 Wraps overridden methods so the class can detect whether they called `super()`.
1087 This method runs immediately after the class is created (see :class:`.ProcessorMeta`). For every
1088 method that we expect scientists to extend (start, finish, etc.) we replace their implementation with a
1089 wrapper. The wrapper resets a per-instance flag, invokes the real override, and only after that method returns
1090 it checks whether the real `super()` was ever reached; if not, it emits a :class:`~mafw.mafw_errors.MissingSuperCall`
1091 warning. In other words, the wiring happens while the subclass is defined, and the actual smoke test executes
1092 each time the method runs.
1093 """
1094 methods = getattr(cls, '_methods_to_be_checked_for_super', ())
1095 for method in methods:
1096 if method not in cls.__dict__:
1097 continue
1098 if not any(hasattr(base, method) for base in cls.__mro__[1:]):
1099 continue
1100 original: Callable[..., Any] = getattr(cls, method)
1101 # we are adding an attribute to the method, it looks strange, but it is possible
1102 # in this way we avoid wrapping the same method more than once.
1103 if getattr(original, '_mafw_super_check_wrapped', False):
1104 continue
1106 def _make_wrapper(
1107 __orig: Callable[..., Any],
1108 __method: str, # pragma: no cover
1109 ) -> Callable[..., Any]:
1110 @wraps(__orig)
1111 def _wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
1112 # self is the processor instance.
1113 # _reset_super_call_flag is resetting the call status
1114 # for method as False
1115 self._reset_super_call_flag(__method)
1116 # in the base method, the super call flag is set to True
1117 result = __orig(self, *args, **kwargs)
1118 # if the super call flag is not True, then it is because the base
1119 # method was not called.
1120 # emit the warning and return the original method return value
1121 if not self._did_call_super(__method):
1122 warnings.warn(
1123 MissingSuperCall(
1124 'The overloaded %s is not invoking its super method. The processor might not work.'
1125 % __method
1126 )
1127 )
1128 return result
1130 return _wrapper
1132 # we create a wrapped method from the original
1133 wrapper = _make_wrapper(original, method)
1134 # we set a flag for the method to avoid multiple wrapping
1135 wrapper._mafw_super_check_wrapped = True # type: ignore[attr-defined]
1136 setattr(cls, method, wrapper)
1138 def _reset_super_call_flag(self, method: str) -> None:
1139 """
1140 Reset the super-call flag for a method.
1141 """
1142 self._super_call_flags[method] = False
1144 def _mark_super_call(self, method: str) -> None:
1145 """
1146 Mark a method as having called its super implementation.
1147 """
1148 self._super_call_flags[method] = True
1150 def _did_call_super(self, method: str) -> bool:
1151 """
1152 Check whether a method called its super implementation.
1153 """
1154 return self._super_call_flags.get(method, False)
1156 @ensure_parameter_registration
1157 def dump_parameter_configuration(self, option: int = 1) -> dict[str, Any]:
1158 """
1159 Dumps the processor parameter values in a dictionary.
1161 The snippet below explains the meaning of `option`.
1163 .. code-block:: python
1165 # option 1
1166 conf_dict1 = {
1167 'Processor': {'param1': 5, 'input_table': 'my_table'}
1168 }
1170 # option 2
1171 conf_dict2 = {'param1': 5, 'input_table': 'my_table'}
1173 In the case of option 1, the replica aware name (:meth:`.replica_name`) will be used as a key for the
1174 configuration dictionary.
1176 .. versionchanged:: v2.0.0
1177 With option 1, using :meth:`.replica_name` instead of :attr:`~.Processor.name` as key of the configuration
1178 dictionary.
1180 :param option: Select the dictionary style. Defaults to 1.
1181 :type option: int, Optional
1182 :return: A parameter configuration dictionary.
1183 :rtype: dict
1184 """
1185 inner_dict = {}
1186 for key, value in self._processor_parameters.items():
1187 inner_dict[key] = value.value
1189 if option == 1:
1190 outer_dict = {self.replica_name: inner_dict}
1191 elif option == 2:
1192 outer_dict = inner_dict
1193 else:
1194 log.warning('Unknown option %s. Using option 2' % option)
1195 outer_dict = inner_dict
1196 return outer_dict
1198 @ensure_parameter_registration
1199 def get_parameter(self, name: str) -> PassiveParameter[Any]:
1200 """
1201 Gets the processor parameter named name.
1203 :param name: The name of the parameter.
1204 :type name: str
1205 :return: The processor parameter
1206 :rtype: PassiveParameter
1207 :raises ProcessorParameterError: If a parameter with `name` is not registered.
1208 """
1209 if name in self._processor_parameters:
1210 return self._processor_parameters[name]
1211 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
1213 @ensure_parameter_registration
1214 def get_parameters(self) -> dict[str, PassiveParameter[Any]]:
1215 """
1216 Returns the full dictionary of registered parameters for this processor.
1218 Useful when dumping the parameter specification in a configuration file, for example.
1220 :return: The dictionary with the registered parameters.
1221 :rtype: dict[str, PassiveParameter[ParameterType]
1222 """
1223 return self._processor_parameters
1225 @ensure_parameter_registration
1226 def delete_parameter(self, name: str) -> None:
1227 """
1228 Deletes a processor parameter.
1230 :param name: The name of the parameter to be deleted.
1231 :type name: str
1232 :raises ProcessorParameterError: If a parameter with `name` is not registered.
1233 """
1234 if name in self._processor_parameters:
1235 del self._processor_parameters[name]
1236 else:
1237 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
1239 @ensure_parameter_registration
1240 def set_parameter_value(self, name: str, value: ParameterType) -> None:
1241 """
1242 Sets the value of a processor parameter.
1244 :param name: The name of the parameter to be deleted.
1245 :type name: str
1246 :param value: The value to be assigned to the parameter.
1247 :type value: ParameterType
1248 :raises ProcessorParameterError: If a parameter with `name` is not registered.
1249 """
1250 if name in self._processor_parameters:
1251 self._processor_parameters[name].value = value
1252 else:
1253 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
1255 def get_filter(self, model_name: str) -> mafw.db.db_filter.ModelFilter:
1256 """
1257 Returns a registered :class:`~mafw.db.db_filter.ModelFilter` via the model name.
1259 If a filter for the provided model_name does not exist, a KeyError is raised.
1261 :param model_name: The model name for which the filter will be returned.
1262 :type model_name: str
1263 :return: The registered filter
1264 :rtype: mafw.db.db_filter.ModelFilter
1265 :raises: KeyError is a filter with the give name is not found.
1266 """
1267 return self.filter_register[model_name]
1269 def on_processor_status_change(self, old_status: ProcessorStatus, new_status: ProcessorStatus) -> None:
1270 """
1271 Callback invoked when the processor status is changed.
1273 :param old_status: The old processor status.
1274 :type old_status: ProcessorStatus
1275 :param new_status: The new processor status.
1276 :type new_status: ProcessorStatus
1277 """
1278 self._user_interface.change_of_processor_status(self.name, old_status, new_status)
1280 def on_looping_status_set(self, status: LoopingStatus) -> None:
1281 """
1282 Call back invoked when the looping status is set.
1284 The user can overload this method according to the needs.
1286 :param status: The set looping status.
1287 :type status: LoopingStatus
1288 """
1289 if status == LoopingStatus.Skip:
1290 log.warning('Skipping item %s' % self.i_item)
1291 elif status == LoopingStatus.Abort:
1292 log.error('Looping has been aborted')
1293 elif status == LoopingStatus.Quit:
1294 log.warning('Looping has been quit')
1296 def format_progress_message(self) -> None:
1297 """Customizes the progress message with information about the current item.
1299 The user can overload this method in order to modify the message being displayed during the process loop with
1300 information about the current item.
1302 The user can access the current value, its position in the looping cycle and the total number of items using
1303 :attr:`.Processor.item`, :obj:`.Processor.i_item` and :obj:`.Processor.n_item`.
1304 """
1305 pass
1307 @contextlib.contextmanager
1308 def _thread_loop_context(self, i_item: int, n_item: int, item: Any) -> Iterator[None]:
1309 """
1310 Context manager to set thread-local loop attributes for parallel execution.
1312 :param i_item: Item index for the loop.
1313 :type i_item: int
1314 :param n_item: Total number of items in the loop.
1315 :type n_item: int
1316 :param item: Item payload.
1317 :type item: Any
1318 """
1319 self._thread_local.in_worker = True
1320 self._thread_local.i_item = i_item
1321 self._thread_local.n_item = n_item
1322 self._thread_local.item = item
1323 self._thread_local.looping_status = LoopingStatus.Continue
1324 try:
1325 yield
1326 finally:
1327 for name in ('i_item', 'n_item', 'item', 'looping_status', 'in_worker'):
1328 if hasattr(self._thread_local, name):
1329 delattr(self._thread_local, name)
1331 def _in_thread_context(self) -> bool:
1332 """Return True when running inside a parallel worker thread."""
1333 return bool(getattr(self._thread_local, 'in_worker', False))
1335 @property
1336 def item(self) -> Any:
1337 """The current item of the loop."""
1338 if self._in_thread_context() and hasattr(self._thread_local, 'item'):
1339 return self._thread_local.item
1340 return self._item
1342 @item.setter
1343 def item(self, value: Any) -> None:
1344 if self._in_thread_context():
1345 self._thread_local.item = value
1346 else:
1347 self._item = value
1349 @property
1350 def i_item(self) -> int:
1351 """The enumeration of the current item being processed."""
1352 if self._in_thread_context() and hasattr(self._thread_local, 'i_item'):
1353 return cast(int, self._thread_local.i_item)
1354 return self._i_item
1356 @i_item.setter
1357 def i_item(self, value: int) -> None:
1358 if self._in_thread_context():
1359 self._thread_local.i_item = value
1360 else:
1361 self._i_item = value
1363 @property
1364 def n_item(self) -> int | None:
1365 """The total number of items to be processed or None for an undefined loop"""
1366 if self._in_thread_context() and hasattr(self._thread_local, 'n_item'):
1367 return cast(int | None, self._thread_local.n_item)
1368 return self._n_item
1370 @n_item.setter
1371 def n_item(self, value: int | None) -> None:
1372 if self._in_thread_context():
1373 self._thread_local.n_item = value
1374 else:
1375 self._n_item = value
1377 @property
1378 def looping_status(self) -> LoopingStatus:
1379 """The looping status for the current thread context."""
1380 if self._in_thread_context():
1381 value = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue)
1382 else:
1383 value = self._looping_status
1384 if hasattr(self, 'on_looping_status_get'):
1385 self.on_looping_status_get(value)
1386 return value
1388 @looping_status.setter
1389 def looping_status(self, value: LoopingStatus) -> None:
1390 if self._in_thread_context():
1391 current = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue)
1392 self._thread_local.looping_status = value
1393 else:
1394 current = self._looping_status
1395 self._looping_status = value
1396 if current != value:
1397 if hasattr(self, 'on_looping_status_change'):
1398 self.on_looping_status_change(current, value)
1399 else:
1400 if hasattr(self, 'on_looping_status_set'):
1401 self.on_looping_status_set(value)
1403 @property
1404 def unique_name(self) -> str:
1405 """Returns the unique name for the processor."""
1406 return f'{self.name}_{self.unique_id}'
1408 @property
1409 def replica_name(self) -> str:
1410 """
1411 Returns the replica aware name of the processor.
1413 If no replica_id is specified, then return the pure name, otherwise join the two string using the '#' symbol.
1415 .. versionadded:: v2.0.0
1417 :return: The replica aware name of the processor.
1418 :rtype: str
1419 """
1420 if self.replica_id is None:
1421 return self.name
1422 else:
1423 return self.name + '#' + self.replica_id
1425 @property
1426 def local_resource_acquisition(self) -> bool:
1427 """
1428 Checks if resources should be acquired locally.
1430 When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external
1431 resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute
1432 resources among the whole processor list. In this case, resources should not be acquired locally by the
1433 single processor, but from the parent execution context.
1435 :return: True if resources are to be acquired locally by the processor. False, otherwise.
1436 :rtype: bool
1437 """
1438 return self._resource_acquisition
1440 @local_resource_acquisition.setter
1441 def local_resource_acquisition(self, flag: bool) -> None:
1442 self._resource_acquisition = flag
1444 @property
1445 def database(self) -> peewee.Database:
1446 """
1447 Returns the database instance
1449 :return: A database object.
1450 :raises MissingDatabase: If the database connection has not been established.
1451 """
1452 if self._database is None:
1453 raise MissingDatabase('Database connection not initialized')
1454 return self._database
1456 def execute(self) -> None:
1457 """Execute the processor tasks.
1459 This method works as a dispatcher, reassigning the call to a more specific execution implementation depending
1460 on the :attr:`~mafw.processor.Processor.loop_type`.
1461 """
1462 dispatcher: dict[LoopType, Callable[[], None]] = {
1463 LoopType.SingleLoop: self._execute_single,
1464 LoopType.ForLoop: self._execute_for_loop,
1465 LoopType.ParallelForLoop: self._execute_for_loop,
1466 LoopType.ParallelForLoopWithQueue: self._execute_for_loop,
1467 LoopType.WhileLoop: self._execute_while_loop,
1468 }
1469 dispatcher[self.loop_type]()
1471 @staticmethod
1472 def _compute_default_max_workers() -> int:
1473 """
1474 Helper to compute the default number of workers for parallel execution.
1476 Returns min(32, cpu_count + 4).
1477 """
1478 return min(32, (os.cpu_count() or 1) + 4)
1480 def _execute_single(self) -> None:
1481 """Execute the processor in single mode.
1483 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
1484 appropriate implementation depending on the processor LoopType.
1485 """
1486 with contextlib.ExitStack() as self._resource_stack:
1487 self.acquire_resources()
1488 self._wall_clock_start = time.perf_counter()
1489 self.start()
1490 self.processor_status = ProcessorStatus.Run
1491 self.process()
1492 self.finish()
1494 def _execute_for_loop(self) -> None:
1495 """Executes the processor within a for loop.
1497 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
1498 appropriate implementation depending on the processor LoopType.
1499 """
1501 with contextlib.ExitStack() as self._resource_stack:
1502 self.acquire_resources()
1503 # we cannot use a Timer context here to measure the whole duration because it spans
1504 # over different methods. Instead we directly use a performance clock.
1505 self._wall_clock_start = time.perf_counter()
1506 self.start()
1508 # get the input item list and filter it
1509 item_list = self.get_items()
1511 # get the total number of items.
1512 self.n_item = len(item_list)
1514 # turn the processor status to run
1515 self.processor_status = ProcessorStatus.Run
1517 # create a new task in the progress bar interface
1518 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item)
1520 # verify if we can use parallel for loop. If not, switch back to a serial for loop.
1521 if (
1522 self.loop_type in (LoopType.ParallelForLoop, LoopType.ParallelForLoopWithQueue)
1523 and not is_free_threading()
1524 ):
1525 warnings.warn(
1526 'Parallel for-loop requires free-threading; falling back to serial for loop.',
1527 stacklevel=2,
1528 )
1529 self.loop_type = LoopType.ForLoop
1531 if self.loop_type == LoopType.ParallelForLoopWithQueue:
1532 self._process_parallel_for_loop_with_queue(item_list)
1533 elif self.loop_type == LoopType.ParallelForLoop:
1534 self._process_parallel_for_loop(item_list)
1535 else:
1536 self._process_for_loop(item_list)
1538 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item)
1540 self.finish()
1542 def _build_loop_item(self) -> LoopItem:
1543 """
1544 Build a LoopItem payload for the current loop context.
1546 :return: The LoopItem payload.
1547 :rtype: mafw.models.LoopItem
1548 """
1549 return LoopItem(self.i_item, int(self.n_item or 0), self.item)
1551 def _build_loop_result(self, payload: Any, duration: float) -> LoopResult:
1552 """
1553 Build a LoopResult payload for the current loop context.
1555 :param payload: The optional payload returned by process.
1556 :type payload: Any
1557 :param duration: Wall-clock duration of the item processing.
1558 :type duration: float
1559 :return: The LoopResult payload.
1560 :rtype: mafw.models.LoopResult
1561 """
1562 return LoopResult(self.i_item, int(self.n_item or 0), self.looping_status, payload, duration)
1564 def _payload_annotation_matches(self, annotation: Any) -> bool:
1565 """
1566 Check whether an annotation matches LoopItem or LoopResult.
1568 :param annotation: The annotation to inspect.
1569 :type annotation: Any
1570 :return: True if the annotation matches LoopItem or LoopResult.
1571 :rtype: bool
1572 """
1573 if annotation in (LoopItem, LoopResult):
1574 return True
1575 origin = get_origin(annotation)
1576 if origin is Union:
1577 return any(arg in (LoopItem, LoopResult) for arg in get_args(annotation))
1578 return False
1580 def _call_with_optional_payload(self, func: Callable[..., Any], payload: Any) -> Any:
1581 """
1582 Invoke a processor hook with an optional payload if the signature allows it.
1584 :param func: The callable to invoke.
1585 :type func: Callable
1586 :param payload: The payload to pass if supported.
1587 :type payload: Any
1588 :return: The callable return value.
1589 :rtype: Any
1590 """
1591 signature = inspect.signature(func)
1592 parameters = list(signature.parameters.values())
1593 if parameters and parameters[0].name == 'self':
1594 parameters = parameters[1:]
1595 if not parameters:
1596 return func()
1597 if any(param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD) for param in parameters):
1598 return func(payload)
1599 first = parameters[0]
1600 if first.kind in (first.POSITIONAL_ONLY, first.POSITIONAL_OR_KEYWORD):
1601 if first.name in ('item', 'loop_item', 'result', 'loop_result', 'payload'):
1602 return func(payload)
1603 if self._payload_annotation_matches(first.annotation):
1604 return func(payload)
1605 return func()
1607 def _process_for_loop(self, item_list: Collection[Any]) -> None:
1608 """
1609 Process items with the standard serial for-loop.
1611 :param item_list: The list of items to process.
1612 :type item_list: Collection[Any]
1613 """
1614 for self.i_item, self.item in enumerate(item_list):
1615 self.looping_status = LoopingStatus.Continue
1617 self.format_progress_message()
1618 self._user_interface.display_progress_message(self.progress_message, self.i_item, self.n_item, 0.1)
1620 loop_item = self._build_loop_item()
1621 with Timer(suppress_message=True) as timer:
1622 payload = self._call_with_optional_payload(self.process, loop_item)
1623 self._process_durations.append(timer.duration)
1625 loop_result = self._build_loop_result(payload, timer.duration)
1627 if self.looping_status == LoopingStatus.Continue:
1628 self._call_with_optional_payload(self.accept_item, loop_result)
1629 elif self.looping_status == LoopingStatus.Skip:
1630 self._call_with_optional_payload(self.skip_item, loop_result)
1631 else: # Abort or Quit
1632 break
1634 self._user_interface.update_task(self.replica_name, increment=1)
1636 def _process_parallel_for_loop(self, item_list: Collection[Any]) -> None:
1637 """
1638 Process items in parallel using a thread pool.
1640 :param item_list: The list of items to process.
1641 :type item_list: Collection[Any]
1642 """
1643 max_workers = self.max_workers
1644 abort_event = threading.Event()
1645 abort_lock = threading.Lock()
1646 abort_status: LoopingStatus | None = None
1648 def _set_abort(status: LoopingStatus) -> None:
1649 nonlocal abort_status
1650 with abort_lock:
1651 if abort_status == LoopingStatus.Abort:
1652 return
1653 if status == LoopingStatus.Abort:
1654 abort_status = LoopingStatus.Abort
1655 elif abort_status is None:
1656 abort_status = LoopingStatus.Quit
1657 abort_event.set()
1659 def _worker(i_item: int, item: Any) -> tuple[int, Any, LoopingStatus, Any, float]:
1660 with self._thread_loop_context(i_item, int(self.n_item or 0), item):
1661 self.looping_status = LoopingStatus.Continue
1662 loop_item = self._build_loop_item()
1663 with Timer(suppress_message=True) as timer:
1664 payload = self._call_with_optional_payload(self.process, loop_item)
1665 status = self.looping_status
1667 if status in (LoopingStatus.Abort, LoopingStatus.Quit):
1668 _set_abort(status)
1669 return i_item, item, status, payload, timer.duration
1671 if abort_event.is_set():
1672 return i_item, item, status, payload, timer.duration
1674 loop_result = self._build_loop_result(payload, timer.duration)
1675 if status == LoopingStatus.Continue:
1676 self._call_with_optional_payload(self.accept_item, loop_result)
1677 elif status == LoopingStatus.Skip:
1678 self._call_with_optional_payload(self.skip_item, loop_result)
1680 return i_item, item, status, payload, timer.duration
1682 pending: set[Future[tuple[int, Any, LoopingStatus, Any, float]]] = set()
1683 items_iter = iter(enumerate(item_list))
1685 def _submit_next() -> bool:
1686 try:
1687 idx, itm = next(items_iter)
1688 except StopIteration:
1689 return False
1690 fut: Future[tuple[int, Any, LoopingStatus, Any, float]] = executor.submit(_worker, idx, itm)
1691 pending.add(fut)
1692 return True
1694 with ThreadPoolExecutor(max_workers=max_workers) as executor:
1695 while len(pending) < max_workers and _submit_next():
1696 pass
1698 while pending:
1699 done, _ = wait(pending, return_when=FIRST_COMPLETED)
1700 for fut in done:
1701 pending.remove(fut)
1702 i_item, item, status, _payload, duration = fut.result()
1703 self._process_durations.append(duration)
1704 self.item = item
1705 self.i_item = i_item
1706 self.format_progress_message()
1707 self._user_interface.display_progress_message(self.progress_message, i_item, self.n_item, 0.1)
1708 self._user_interface.update_task(self.replica_name, increment=1)
1710 if abort_event.is_set():
1711 continue
1713 while len(pending) < max_workers and _submit_next():
1714 pass
1716 if abort_status is not None:
1717 self.looping_status = abort_status
1719 def _process_parallel_for_loop_with_queue(self, item_list: Collection[Any]) -> None:
1720 """
1721 Process items in parallel using a producer/consumer queue.
1723 Items are processed in worker threads, while a dedicated consumer thread handles the post-processing hooks.
1725 :param item_list: The list of items to process.
1726 :type item_list: Collection[Any]
1727 """
1728 # the idea is the following, we have a single consumer thread that is constantly trying to pull
1729 # items out of a shared queue and we have a pool of producer threads that are putting items in the queue as
1730 # long as there is space into it (back-pressure).
1731 # it the queue is full, then the producer threads are set to sleep for a short interval of time and waken up
1732 # again after some times for another attempt to write the output in the queue.
1733 # when there are no more items to execute, the main thread is pushing a sentinel object into the queue
1734 # marking the end of the processing, so that the consumer thread can be gracefully terminated.
1735 max_workers = self.max_workers
1736 result_queue: queue.Queue[object] = queue.Queue(maxsize=self.queue_size)
1737 abort_event = threading.Event()
1738 abort_lock = threading.Lock()
1739 abort_status: LoopingStatus | None = None
1740 sentinel = object()
1742 def _set_abort(status: LoopingStatus) -> None:
1743 nonlocal abort_status
1744 with abort_lock:
1745 if abort_status == LoopingStatus.Abort: 1745 ↛ 1746line 1745 didn't jump to line 1746 because the condition on line 1745 was never true
1746 return
1747 if status == LoopingStatus.Abort:
1748 abort_status = LoopingStatus.Abort
1749 elif abort_status is None: 1749 ↛ 1751line 1749 didn't jump to line 1751
1750 abort_status = LoopingStatus.Quit
1751 abort_event.set()
1753 def _get_abort_status() -> LoopingStatus | None:
1754 with abort_lock:
1755 return abort_status
1757 def _worker(batch_items: list[tuple[int, Any]]) -> None:
1758 batch_results: list[tuple[LoopItem, LoopResult]] = []
1759 for i_item, item in batch_items:
1760 if abort_event.is_set():
1761 break
1762 with self._thread_loop_context(i_item, int(self.n_item or 0), item):
1763 self.looping_status = LoopingStatus.Continue
1764 loop_item = self._build_loop_item()
1765 with Timer(suppress_message=True) as timer:
1766 payload = self._call_with_optional_payload(self.process, loop_item)
1767 status = self.looping_status
1768 if status in (LoopingStatus.Abort, LoopingStatus.Quit):
1769 _set_abort(status)
1770 loop_result = self._build_loop_result(payload, timer.duration)
1771 batch_results.append((loop_item, loop_result))
1772 if status in (LoopingStatus.Abort, LoopingStatus.Quit):
1773 break
1774 if batch_results:
1775 result_queue.put(batch_results)
1777 def _consumer() -> None:
1778 self.consumer_start()
1779 try:
1780 while True:
1781 queued = result_queue.get()
1782 if queued is sentinel:
1783 break
1784 batch_results = cast(list[tuple[LoopItem, LoopResult]], queued)
1785 for loop_item, loop_result in batch_results:
1786 with self._thread_loop_context(loop_item.i_item, loop_item.n_item, loop_item.payload):
1787 self.looping_status = loop_result.looping_status
1788 self.format_progress_message()
1789 self._user_interface.display_progress_message(
1790 self.progress_message, loop_item.i_item, self.n_item, 0.1
1791 )
1792 self._process_durations.append(loop_result.duration)
1793 self._user_interface.update_task(self.replica_name, increment=1)
1795 if _get_abort_status() is None and loop_result.looping_status in (
1796 LoopingStatus.Continue,
1797 LoopingStatus.Skip,
1798 ):
1799 self._call_with_optional_payload(self.consumer_process, loop_result)
1800 finally:
1801 self.consumer_finish()
1803 pending: set[Future[None]] = set()
1804 items_iter = iter(enumerate(item_list))
1806 def _submit_next() -> bool:
1807 if abort_event.is_set():
1808 return False
1809 batch_items: list[tuple[int, Any]] = []
1810 for _ in range(self.queue_batch_size):
1811 try:
1812 idx, itm = next(items_iter)
1813 except StopIteration:
1814 break
1815 batch_items.append((idx, itm))
1816 if not batch_items:
1817 return False
1818 fut: Future[None] = executor.submit(_worker, batch_items)
1819 pending.add(fut)
1820 return True
1822 consumer_thread = threading.Thread(target=_consumer, name=f'{self.name}-consumer')
1823 consumer_thread.start()
1825 try:
1826 with ThreadPoolExecutor(max_workers=max_workers) as executor:
1827 while len(pending) < max_workers and _submit_next():
1828 pass
1830 while pending:
1831 done, _ = wait(pending, return_when=FIRST_COMPLETED)
1832 for fut in done:
1833 pending.remove(fut)
1834 fut.result()
1836 if abort_event.is_set():
1837 continue
1839 while len(pending) < max_workers and _submit_next():
1840 pass
1841 finally:
1842 result_queue.put(sentinel)
1843 consumer_thread.join()
1845 if abort_status is not None:
1846 self.looping_status = abort_status
1848 def _execute_while_loop(self) -> None:
1849 """Executes the processor within a while loop.
1851 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
1852 appropriate implementation depending on the processor LoopType.
1853 """
1854 # it is a while loop, so a priori we don't know how many iterations we will have, nevertheless, we
1855 # can have a progress bar with 'total' set to None, so that it goes in the so-called indeterminate
1856 # progress. See https://rich.readthedocs.io/en/stable/progress.html#indeterminate-progress
1857 # we initialise n_item outside the loop, because it is possible that the user has a way to define n_item
1858 # and he can do it within the loop.
1859 self.n_item = None
1860 with contextlib.ExitStack() as self._resource_stack:
1861 self.acquire_resources()
1862 self._wall_clock_start = time.perf_counter()
1863 self.start()
1865 # turn the processor status to run
1866 self.processor_status = ProcessorStatus.Run
1868 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item)
1870 # we are ready to start the looping. For statistics, we can count the iterations.
1871 self.i_item = 0
1872 while self.while_condition():
1873 # set the looping status to Continue. The user may want to change it in the process method.
1874 self.looping_status = LoopingStatus.Continue
1876 # send a message to the user interface
1877 self.format_progress_message()
1878 self._user_interface.display_progress_message(
1879 self.progress_message, self.i_item, self.n_item, frequency=0.1
1880 )
1882 # wrap the execution in a timer to measure how long it too for statistical reasons.
1883 with Timer(suppress_message=True) as timer:
1884 self.process()
1885 self._process_durations.append(timer.duration)
1887 # modify the loop depending on the looping status
1888 if self.looping_status == LoopingStatus.Continue:
1889 self.accept_item()
1890 elif self.looping_status == LoopingStatus.Skip:
1891 self.skip_item()
1892 else: # equiv to if self.looping_status in [LoopingStatus.Abort, LoopingStatus.Quit]:
1893 break
1895 # update the progress bar. if self.n_item is still None, then the progress bar will show indeterminate
1896 # progress.
1897 self._user_interface.update_task(self.replica_name, self.i_item + 1, 1, self.n_item)
1899 # now that the loop is finished, we know how many elements we processed
1900 if self.n_item is None:
1901 self.n_item = self.i_item
1902 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item)
1904 self.finish()
1906 def acquire_resources(self) -> None:
1907 """
1908 Acquires resources and add them to the resource stack.
1910 The whole body of the :meth:`execute` method is within a context structure. The idea is that if any part of
1911 the code inside should throw an exception that breaking the execution, we want to be sure that all stateful
1912 resources are properly closed.
1914 Since the number of resources may vary, the variable number of nested `with` statements has been replaced by
1915 an `ExitStack <https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack>`_. Resources,
1916 like open files, timers, db connections, need to be added to the resource stacks in this method.
1918 In the case a processor is being executed within a :class:`~mafw.processor.ProcessorList`, then some resources might be shared, and
1919 for this reason they are not added to the stack. This selection can be done via the private
1920 :attr:`local_resource_acquisition`. This is normally True, meaning that the processor will handle its resources
1921 independently, but when the processor is executed from a :class:`~mafw.processor.ProcessorList`, this flag is automatically turned to
1922 False.
1924 If the user wants to add additional resources, he has to overload this method calling the super to preserve
1925 the original resources. If he wants to have shared resources among different processors executed from inside
1926 a processor list, he has to overload the :class:`~mafw.processor.ProcessorList` class as well.
1927 """
1928 # Both the timer and the user interface will be added to the processor resource stack only if the processor is
1929 # set to acquire its own resources.
1930 # The timer and the user interface have in-built enter and exit method.
1931 if self._resource_acquisition:
1932 self.timer = self._resource_stack.enter_context(Timer(**self._timer_parameters))
1933 self._resource_stack.enter_context(self._user_interface)
1935 # For the database it is a bit different.
1936 if self._database is None and self._database_conf is None:
1937 # no database, nor configuration.
1938 # we cannot do anything
1939 pass
1940 elif self._database is None and self._database_conf is not None:
1941 # no db, but we got a configuration.
1942 # we can make a db.
1943 # This processor will try to make a valid connection, and in case it succeeds, it will add the database to
1944 # the resource stack.
1945 # The database has an enter method, but it is to generate transaction.
1946 # We will add the database.close via the callback method.
1947 if 'DBConfiguration' in self._database_conf:
1948 conf = self._database_conf['DBConfiguration'] # type1
1949 else:
1950 conf = self._database_conf # type2
1952 db_url, connection_parameters = build_connection_parameters(conf)
1954 self._database = connect(db_url, **connection_parameters) # type: ignore # peewee is not returning a DB
1955 self._resource_stack.callback(self._database.close)
1956 try:
1957 self._database.connect()
1958 except peewee.OperationalError as e:
1959 log.critical('Unable to connect to %s', db_url)
1960 raise e
1961 database_proxy.initialize(self._database)
1962 if self.create_standard_tables:
1963 standard_tables = mafw_model_register.get_standard_tables()
1964 self.database.create_tables(standard_tables)
1965 for table in standard_tables:
1966 table.init()
1968 else: # equivalent to: if self._database is not None:
1969 # we got a database, so very likely we are inside a processor list
1970 # the connection has been already set and the initialisation as well.
1971 # nothing else to do here.
1972 # do not put the database in the exit stack. who create it has also to close it.
1973 pass
1975 def start(self) -> None:
1976 """
1977 Start method.
1979 The user can overload this method, including all steps that should be performed at the beginning of the
1980 operation.
1982 If the user decides to overload it, it should include a call to the super method.
1983 """
1984 self._mark_super_call('start')
1985 self.processor_status = ProcessorStatus.Start
1986 self._remove_orphan_files()
1988 def get_items(self) -> Collection[Any]:
1989 """
1990 Returns the item collections for the processor loop.
1992 This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the
1993 database, or a list of files from the disk to be processed.
1995 :return: A collection of items for the loop
1996 :rtype: Collection[Any]
1997 """
1998 return []
2000 def while_condition(self) -> bool:
2001 """
2002 Return the while condition
2004 :return: True if the while loop has to continue, false otherwise.
2005 :rtype: bool
2006 """
2007 return False
2009 def process(self) -> None:
2010 """
2011 Processes the current item.
2013 This is the core of the Processor, where the user has to define the calculations required.
2015 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopItem`
2016 parameter if the user prefers not to rely on thread-local access to :attr:`.Processor.item`, :attr:`.Processor.i_item` and
2017 :attr:`.Processor.n_item`.
2018 """
2019 pass
2021 def accept_item(self) -> None:
2022 """
2023 Does post process actions on a successfully processed item.
2025 Within the :meth:`process`, the user left the looping status to Continue, so it means that everything looks
2026 good and this is the right place to perform database updates or file savings.
2028 .. seealso:
2029 Have a look at :meth:`skip_item` for what to do in case something went wrong.
2031 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult`
2032 parameter for direct access to the processed payload and looping status.
2033 """
2034 pass
2036 def skip_item(self) -> None:
2037 """
2038 Does post process actions on a *NOT* successfully processed item.
2040 Within the :meth:`process`, the user set the looping status to Skip, so it means that something went wrong
2041 and here corrective actions can be taken if needed.
2043 .. seealso:
2044 Have a look at :meth:`accept_item` for what to do in case everything was OK.
2046 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult`
2047 parameter for direct access to the processed payload and looping status.
2048 """
2049 pass
2051 def consumer_start(self) -> None:
2052 """
2053 Executes once in the consumer thread for the parallel queue loop.
2055 This hook mirrors :meth:`start` but is only used with the queue-based parallel loop.
2056 """
2057 pass
2059 def consumer_process(self, loop_result: LoopResult | None = None) -> None:
2060 """
2061 Handle processed items in the consumer thread for the parallel queue loop.
2063 By default this dispatches to :meth:`accept_item` or :meth:`skip_item` based on the looping status. The method
2064 can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult` payload.
2065 """
2066 if loop_result is None:
2067 loop_result = self._build_loop_result(None, 0.0)
2068 if self.looping_status == LoopingStatus.Continue:
2069 self._call_with_optional_payload(self.accept_item, loop_result)
2070 elif self.looping_status == LoopingStatus.Skip:
2071 self._call_with_optional_payload(self.skip_item, loop_result)
2073 def consumer_finish(self) -> None:
2074 """
2075 Executes once in the consumer thread after the queue has been drained.
2076 """
2077 pass
2079 def finish(self) -> None:
2080 """
2081 Concludes the execution.
2083 The user can reimplement this method if there are some conclusive tasks that must be achieved.
2084 Always include a call to super().
2085 """
2086 self._mark_super_call('finish')
2087 self.processor_status = ProcessorStatus.Finish
2088 if self.looping_status == LoopingStatus.Abort:
2089 self.processor_exit_status = ProcessorExitStatus.Aborted
2090 self.print_process_statistics()
2092 def print_process_statistics(self) -> None:
2093 """
2094 Print the process statistics.
2096 A utility method to display the fastest, the slowest and the average timing required to process on a single
2097 item. This is particularly useful when the looping processor is part of a ProcessorList.
2098 """
2099 if len(self._process_durations):
2100 log.info('[cyan] Processed %s items.' % len(self._process_durations))
2101 log.info(
2102 '[cyan] Fastest item process duration: %s '
2103 % pretty_format_duration(min(self._process_durations), n_digits=3)
2104 )
2105 log.info(
2106 '[cyan] Slowest item process duration: %s '
2107 % pretty_format_duration(max(self._process_durations), n_digits=3)
2108 )
2109 log.info(
2110 '[cyan] Average item process duration: %s '
2111 % pretty_format_duration((sum(self._process_durations) / len(self._process_durations)), n_digits=3)
2112 )
2113 if self._wall_clock_start is not None:
2114 total_duration = time.perf_counter() - self._wall_clock_start
2115 else:
2116 total_duration = sum(self._process_durations)
2117 log.info('[cyan] Total process duration: %s' % pretty_format_duration(total_duration, n_digits=3))
2119 def _remove_orphan_files(self) -> None:
2120 """
2121 Remove orphan files.
2123 If a connection to the database is available, then the OrphanFile standard table is queried for all its entries,
2124 and all the files are then removed.
2126 The user can turn off this behaviour by switching the :attr:`~mafw.processor.Processor.remove_orphan_files` to False.
2128 """
2129 if self._database is None or self.remove_orphan_files is False:
2130 # no database connection or no wish to remove orphan files, it does not make sense to continue
2131 return
2133 try:
2134 OrphanFile = cast(MAFwBaseModel, mafw_model_register.get_model('OrphanFile'))
2135 except KeyError:
2136 log.warning('OrphanFile table not found in DB. Please verify database integrity')
2137 return
2139 if TYPE_CHECKING:
2140 assert hasattr(OrphanFile, '_meta')
2142 orphan_files = OrphanFile.select().execute() # type: ignore[no-untyped-call]
2143 if len(orphan_files) != 0:
2144 msg = f'[yellow]Pruning orphan files ({sum(len(f.filenames) for f in orphan_files)})...'
2145 log.info(msg)
2146 for orphan in orphan_files:
2147 # filenames is a list of files:
2148 for f in orphan.filenames:
2149 f.unlink(missing_ok=True)
2151 OrphanFile.delete().execute() # type: ignore[no-untyped-call]
2154class ProcessorList(list[Union['Processor', 'ProcessorList']]):
2155 """
2156 A list like collection of processors.
2158 ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList.
2160 An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError.
2162 Along with an iterable of processors, a new processor list can be built using the following parameters.
2163 """
2165 def __init__(
2166 self,
2167 *args: Processor | ProcessorList,
2168 name: str | None = None,
2169 description: str | None = None,
2170 timer: Timer | None = None,
2171 timer_params: dict[str, Any] | None = None,
2172 user_interface: UserInterfaceBase | None = None,
2173 database: Database | None = None,
2174 database_conf: dict[str, Any] | None = None,
2175 create_standard_tables: bool = True,
2176 ):
2177 """
2178 Constructor parameters:
2180 :param name: The name of the processor list. Defaults to ProcessorList.
2181 :type name: str, Optional
2182 :param description: An optional short description. Default to ProcessorList.
2183 :type description: str, Optional
2184 :param timer: The timer object. If None is provided, a new one will be created. Defaults to None.
2185 :type timer: Timer, Optional
2186 :param timer_params: A dictionary of parameter to build the timer object. Defaults to None.
2187 :type timer_params: dict, Optional
2188 :param user_interface: A user interface. Defaults to None
2189 :type user_interface: UserInterfaceBase, Optional
2190 :param database: A database instance. Defaults to None.
2191 :type database: Database, Optional
2192 :param database_conf: Configuration for the database. Default to None.
2193 :type database_conf: dict, Optional
2194 :param create_standard_tables: Whether or not to create the standard tables. Defaults to True.
2195 :type create_standard_tables: bool, Optional
2196 """
2198 # validate_items takes a tuple of processors, that's why we don't unpack args.
2199 super().__init__(self.validate_items(args))
2200 self._name = name or self.__class__.__name__
2201 self.description = description or self._name
2203 self.timer = timer
2204 self.timer_params = timer_params or {}
2205 self._user_interface = user_interface or ConsoleInterface()
2207 self._resource_stack: contextlib.ExitStack
2208 self._processor_exit_status: ProcessorExitStatus = ProcessorExitStatus.Successful
2209 self.nested_list = False
2210 """
2211 Boolean flag to identify that this list is actually inside another list.
2213 Similarly to the local resource flag for the :class:`.Processor`, this flag prevent the user interface to be
2214 added to the resource stack.
2215 """
2217 # database stuff
2218 self._database: peewee.Database | None = database
2219 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf)
2220 self.create_standard_tables = create_standard_tables
2221 """The boolean flag to proceed or skip with standard table creation and initialisation"""
2223 def __setitem__( # type: ignore[override]
2224 self,
2225 __index: SupportsIndex,
2226 __object: Processor | ProcessorList,
2227 ) -> None:
2228 super().__setitem__(__index, self.validate_item(__object))
2230 def insert(self, __index: SupportsIndex, __object: Processor | ProcessorList) -> None:
2231 """Adds a new processor at the specified index."""
2232 super().insert(__index, self.validate_item(__object))
2234 def append(self, __object: Processor | ProcessorList) -> None:
2235 """Appends a new processor at the end of the list."""
2236 super().append(self.validate_item(__object))
2238 def extend(self, __iterable: Iterable[Processor | ProcessorList]) -> None:
2239 """Extends the processor list with a list of processors."""
2240 if isinstance(__iterable, type(self)):
2241 super().extend(__iterable)
2242 else:
2243 super().extend([self.validate_item(item) for item in __iterable])
2245 @staticmethod
2246 def validate_item(item: Processor | ProcessorList) -> Processor | ProcessorList:
2247 """Validates the item being added."""
2248 if isinstance(item, Processor):
2249 item.local_resource_acquisition = False
2250 return item
2251 elif isinstance(item, ProcessorList):
2252 item.timer_params = dict(suppress_message=True)
2253 item.nested_list = True
2254 return item
2255 else:
2256 raise TypeError(f'Expected Processor or ProcessorList, got {type(item).__name__}')
2258 @staticmethod
2259 def validate_items(items: tuple[Processor | ProcessorList, ...] = ()) -> tuple[Processor | ProcessorList, ...]:
2260 """Validates a tuple of items being added."""
2261 if not items:
2262 return tuple()
2263 return tuple([ProcessorList.validate_item(item) for item in items if item is not None])
2265 @property
2266 def name(self) -> str:
2267 """
2268 The name of the processor list
2270 :return: The name of the processor list
2271 :rtype: str
2272 """
2273 return self._name
2275 @name.setter
2276 def name(self, name: str) -> None:
2277 self._name = name
2279 @property
2280 def processor_exit_status(self) -> ProcessorExitStatus:
2281 """
2282 The processor exit status.
2284 It refers to the whole processor list execution.
2285 """
2286 return self._processor_exit_status
2288 @processor_exit_status.setter
2289 def processor_exit_status(self, status: ProcessorExitStatus) -> None:
2290 self._processor_exit_status = status
2292 @property
2293 def database(self) -> peewee.Database:
2294 """
2295 Returns the database instance
2297 :return: A database instance
2298 :raises MissingDatabase: if a database connection is missing.
2299 """
2300 if self._database is None:
2301 raise MissingDatabase('Database connection not initialized')
2302 return self._database
2304 def execute(self) -> ProcessorExitStatus:
2305 """
2306 Execute the list of processors.
2308 Similarly to the :class:`Processor`, ProcessorList can be executed. In simple words, the execute
2309 method of each processor in the list is called exactly in the same sequence as they were added.
2310 """
2311 with contextlib.ExitStack() as self._resource_stack:
2312 self.acquire_resources()
2313 self._user_interface.create_task(self.name, self.description, completed=0, increment=0, total=len(self))
2314 for i, item in enumerate(self):
2315 if isinstance(item, Processor):
2316 log.info('[bold]Executing [red]%s[/red] processor[/bold]' % item.replica_name)
2317 else:
2318 log.info('[bold]Executing [blue]%s[/blue] processor list[/bold]' % item.name)
2319 self.distribute_resources(item)
2320 item.execute()
2321 self._user_interface.update_task(self.name, increment=1) # i+1, 1, len(self))
2322 self._processor_exit_status = item.processor_exit_status
2323 if self._processor_exit_status == ProcessorExitStatus.Aborted:
2324 msg = 'Processor %s caused the processor list to abort' % item.name
2325 log.error(msg)
2326 raise AbortProcessorException(msg)
2327 self._user_interface.update_task(self.name, completed=len(self), total=len(self))
2328 return self._processor_exit_status
2330 def acquire_resources(self) -> None:
2331 """Acquires external resources."""
2332 # The strategy is similar to the one for processor. if we do get resources already active (not None) then we use
2333 # them, otherwise, we create them and we add them to the resource stack.
2334 if self.timer is None:
2335 self.timer = self._resource_stack.enter_context(Timer(**self.timer_params))
2336 # The user interface is very likely already initialised by the runner.
2337 # But if this is a nested list, then we must not push the user interface in the stack
2338 # otherwise the user interface context (progress for rich) will be stopped at the end
2339 # of the nested list.
2340 if not self.nested_list:
2341 self._resource_stack.enter_context(self._user_interface)
2342 if self._database is None and self._database_conf is None:
2343 # no database, nor configuration.
2344 # we cannot do anything
2345 pass
2346 elif self._database is None and self._database_conf is not None:
2347 # no db, but we got a configuration.
2348 # we can make a db
2349 if 'DBConfiguration' in self._database_conf:
2350 conf = self._database_conf['DBConfiguration'] # type1
2351 else:
2352 conf = self._database_conf # type2
2354 db_url, connection_parameters = build_connection_parameters(conf)
2356 self._database = connect(db_url, **connection_parameters) # type: ignore # peewee is not returning a DB
2357 try:
2358 self._database.connect()
2359 self._resource_stack.callback(self._database.close)
2360 except peewee.OperationalError as e:
2361 log.critical('Unable to connect to %s', db_url)
2362 raise e
2363 database_proxy.initialize(self._database)
2364 if self.create_standard_tables:
2365 standard_tables = mafw_model_register.get_standard_tables()
2366 self.database.create_tables(standard_tables)
2367 for table in standard_tables:
2368 table.init()
2369 else: # equiv to if self._database is not None:
2370 # we got a database, so very likely we are inside a processor list
2371 # the connection has been already set and the initialisation as well.
2372 # nothing else to do here.
2373 pass
2375 def distribute_resources(self, processor: Processor | Self) -> None:
2376 """Distributes the external resources to the items in the list."""
2377 processor.timer = self.timer
2378 processor._user_interface = self._user_interface
2379 processor._database = self._database