Coverage for src / mafw / processor.py: 99%
1024 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core
6functionality of the MAFw.
7"""
9from __future__ import annotations
11import contextlib
12import inspect
13import logging
14import os
15import queue
16import threading
17import time
18import warnings
19from collections import OrderedDict
20from collections.abc import Callable, Iterator
21from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
22from copy import copy, deepcopy
23from functools import wraps
24from itertools import count
25from typing import (
26 TYPE_CHECKING,
27 Any,
28 Collection,
29 Generic,
30 Iterable,
31 Self,
32 SupportsIndex,
33 TypeVar,
34 Union,
35 cast,
36 get_args,
37 get_origin,
38 get_type_hints,
39)
41import peewee
42from peewee import Database
44# noinspection PyUnresolvedReferences
45from playhouse.db_url import connect
47import mafw.db.db_filter
48from mafw.active import Active
49from mafw.db.db_model import MAFwBaseModel, database_proxy, mafw_model_register
50from mafw.enumerators import LoopingStatus, LoopType, ProcessorExitStatus, ProcessorStatus
51from mafw.mafw_errors import (
52 AbortProcessorException,
53 MissingDatabase,
54 MissingOverloadedMethod,
55 MissingSuperCall,
56 ProcessorParameterError,
57)
58from mafw.models.filter_schema import FilterSchema
59from mafw.models.loop_payloads import LoopItem, LoopResult
60from mafw.models.parameter_schema import ParameterSchema
61from mafw.models.processor_schema import ProcessorSchema
62from mafw.timer import Timer, pretty_format_duration
63from mafw.tools.generics import deep_update
64from mafw.tools.parallel import is_free_threading
65from mafw.tools.regexp import extract_protocol
66from mafw.ui.abstract_user_interface import UserInterfaceBase
67from mafw.ui.console_user_interface import ConsoleInterface
69log = logging.getLogger(__name__)
71ParameterType = TypeVar('ParameterType')
72"""Generic variable type for the :class:`ActiveParameter` and :class:`PassiveParameter`."""
75def validate_database_conf(database_conf: dict[str, Any] | None = None) -> dict[str, Any] | None:
76 """
77 Validates the database configuration.
79 :param database_conf: The input database configuration. Defaults to None.
80 :type database_conf: dict, Optional
81 :return: Either the validated database configuration or None if it is invalid.
82 :rtype: dict, None
83 """
84 if database_conf is None:
85 return None
87 # dict is mutable, if I change it inside the function, I change it also outside.
88 conf = database_conf.copy()
90 if 'DBConfiguration' in conf:
91 # the database_conf is the steering file. Extract the DBConfiguration table
92 conf = conf['DBConfiguration']
93 required_fields = ['URL']
94 if all([field in conf for field in required_fields]):
95 return database_conf
96 else:
97 return None
100class PassiveParameter(Generic[ParameterType]):
101 """
102 An helper class to store processor parameter value and metadata.
104 This class is the private interface used by the :class:`ActiveParameter` descriptor to store its value and metadata.
106 When a new :class:`.ActiveParameter` is added to a class, an instance of a PassiveParameter is added to the
107 processor parameter :attr:`register <.Processor._processor_parameters>`.
109 .. seealso::
111 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor
112 parameters <parameters>`
114 .. versionchanged:: v2.0.0
116 User should only use :class:`ActiveParameter` and never manually instantiate :class:`PassiveParameter`.
117 """
119 def __init__(
120 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = ''
121 ):
122 """
123 Constructor parameters:
125 :param name: The name of the parameter. It must be a valid python identifier.
126 :type name: str
127 :param value: The set value of the parameter. If None, then the default value will be used. Defaults to None.
128 :type value: ParameterType, Optional
129 :param default: The default value for the parameter. It is used if the :attr:`value` is not provided. Defaults to None.
130 :type default: ParameterType, Optional
131 :param help_doc: A brief explanation of the parameter.
132 :type help_doc: str, Optional
133 :raises ProcessorParameterError: if both `value` and `default` are not provided or if `name` is not a valid identifier.
134 """
135 if not name.isidentifier():
136 raise ProcessorParameterError(f'{name} is not a valid python identifier.')
138 self.name = name
140 if value is not None:
141 self._value: ParameterType = value
142 self._is_set = True
143 self._is_optional = False
144 elif default is not None:
145 self._value = default
146 self._is_set = False
147 self._is_optional = True
148 else:
149 raise ProcessorParameterError('Processor parameter cannot have both value and default value set to None')
151 self.doc = help_doc
153 def __rich_repr__(self) -> Iterator[Any]:
154 yield 'name', self.name
155 yield 'value', self.value, None
156 yield 'help_doc', self.doc, ''
158 @property
159 def is_set(self) -> bool:
160 """
161 Property to check if the value has been set.
163 It is useful for optional parameter to see if the current value is the default one, or if the user set it.
164 """
165 return self._is_set
167 @property
168 def value(self) -> ParameterType:
169 """
170 Gets the parameter value.
172 :return: The parameter value.
173 :rtype: ParameterType
174 :raises ProcessorParameterError: if both value and default were not defined.
175 """
176 return self._value
178 @value.setter
179 def value(self, value: ParameterType) -> None:
180 """
181 Sets the parameter value.
183 :param value: The value to be set.
184 :type value: ParameterType
185 """
186 self._value = value
187 self._is_set = True
189 @property
190 def is_optional(self) -> bool:
191 """
192 Property to check if the parameter is optional.
194 :return: True if the parameter is optional
195 :rtype: bool
196 """
197 return self._is_optional
199 def __repr__(self) -> str:
200 args = ['name', 'value', 'doc']
201 values = [getattr(self, arg) for arg in args]
202 return '{klass}({attrs})'.format(
203 klass=self.__class__.__name__,
204 attrs=', '.join('{}={!r}'.format(k, v) for k, v in zip(args, values)),
205 )
208F = TypeVar('F', bound=Callable[..., Any])
209"""Type variable for generic callable with any return value."""
212def ensure_parameter_registration(func: F) -> F:
213 """Decorator to ensure that before calling `func` the processor parameters have been registered."""
215 @wraps(func)
216 def wrapper(*args: Processor, **kwargs: Any) -> F:
217 # the first positional arguments must be self
218 if len(args) == 0:
219 raise ProcessorParameterError(
220 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.'
221 )
222 self = args[0]
223 if not isinstance(self, Processor):
224 raise ProcessorParameterError(
225 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.'
226 )
227 if self._parameter_registered is False:
228 self._register_parameters()
229 return cast(F, func(*args, **kwargs))
231 return cast(F, wrapper)
234_ParameterRegistry = OrderedDict[str, 'ActiveParameter[Any]']
237def _copy_parent_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry:
238 """
239 Build an ordered dictionary of ActiveParameters inherited from base classes.
240 """
241 definitions: _ParameterRegistry = OrderedDict()
242 for base in owner.__mro__[1:]:
243 base_definitions = getattr(base, '_parameter_definitions', None)
244 if base_definitions:
245 for name, descriptor in base_definitions.items():
246 definitions[name] = descriptor
247 return definitions
250def _ensure_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry:
251 """
252 Return the class-level parameter registry, creating it by copying base definitions if necessary.
253 """
254 definitions: _ParameterRegistry | None = owner.__dict__.get('_parameter_definitions')
255 if definitions is not None:
256 return definitions
258 definitions = _copy_parent_parameter_definitions(owner)
259 setattr(owner, '_parameter_definitions', definitions)
260 return definitions
263def _validate_filter_schema(owner: type[Processor]) -> None:
264 """
265 Validate the optional :class:`~mafw.models.filter_schema.FilterSchema` declared on a Processor.
266 """
267 schema = getattr(owner, '_filter_schema', None)
268 if schema is None:
269 return
270 if not isinstance(schema, FilterSchema):
271 raise ProcessorParameterError('Processor._filter_schema must be a FilterSchema instance')
273 root_model = schema.root_model
274 if not (inspect.isclass(root_model) and issubclass(root_model, MAFwBaseModel)):
275 raise ProcessorParameterError('FilterSchema.root_model must inherit from MAFwBaseModel')
277 seen_models = {root_model}
278 for model in schema.allowed_models:
279 if not (inspect.isclass(model) and issubclass(model, MAFwBaseModel)):
280 raise ProcessorParameterError('FilterSchema.allowed_models must contain MAFwBaseModel subclasses')
281 if model in seen_models:
282 raise ProcessorParameterError(f'Model {model!r} already declared in FilterSchema')
283 seen_models.add(model)
286class ActiveParameter(Generic[ParameterType]):
287 r"""
288 The public interface to the processor parameter.
290 The behaviour of a :class:`Processor` can be customised by using processor parameters. The value of these
291 parameters can be either set via a configuration file or directly when creating the class.
293 If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an
294 ActiveParameter instance in this way:
296 .. code-block::
298 class MyProcessor(Processor):
300 # this is the input folder
301 input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files')
303 def __init__(self, *args, **kwargs):
304 super().__init(*args, **kwargs)
306 # change the input folder to something else
307 self.input_folder = Path(r'D:\data')
309 # get the value of the parameter
310 print(self.input_folder)
312 The ActiveParameter is a `descriptor <https://docs.python.org/3/glossary.html#term-descriptor>`_, it means that
313 when you create one of them, a lot of work is done behind the scene.
315 In simple words, a processor parameter is made by two objects: a public interface where the user can easily
316 access the value of the parameter and a private interface where all other information (default, documentation...)
317 is also stored.
319 The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as
320 in the code snippet above, the private interface is automatically created and will stay in the class instance
321 until the end of the class lifetime.
323 To access the private interface, the user can use the :meth:`Processor.get_parameter` method using the parameter
324 name as a key.
326 The user can assign to an ActiveParameter almost any name. There are just a few invalid parameter names that are
327 used for other purposes. The list of reserved names is available :attr:`here <reserved_names>`. Should the user
328 inadvertently use a reserved named, a :exc:`.ProcessorParameterError` is raised.
330 .. seealso::
332 The private counter part in the :class:`PassiveParameter`.
334 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor
335 parameters <parameters>`
337 The list of :attr:`reserved names <reserved_names>`.
338 """
340 reserved_names: list[str] = ['__logic__', '__filter__', '__new_only__', '__inheritance__', '__enable__']
341 """A list of names that cannot be used as processor parameter names.
343 - `__logic__`
344 - `__filter__`
345 - `__new_only__`
346 - `__inheritance__`
347 - `__enable__`
348 """
350 def __init__(
351 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = ''
352 ):
353 """
354 Constructor parameters:
356 :param name: The name of the parameter.
357 :type name: str
358 :param value: The initial value of the parameter. Defaults to None.
359 :type value: ParameterType, Optional
360 :param default: The default value of the parameter, to be used when ``value`` is not set., Defaults to None.
361 :type default: ParameterType, Optional
362 :param help_doc: An explanatory text describing the parameter.
363 :type help_doc: str, Optional
364 """
366 self._value = value
367 self._default = default
368 self._help_doc = help_doc
369 self._external_name = self._validate_name(name)
371 def _validate_name(self, proposed_name: str) -> str:
372 """
373 Validate that the proposed parameter name is not in the list of forbidden names.
375 This private method checks if the provided name is allowed for use as a processor parameter.
376 Names that are listed in :attr:`reserved_names` cannot be used as parameter names.
378 :param proposed_name: The name to be validated for use as a processor parameter.
379 :type proposed_name: str
380 :return: The validated name if it passes the forbidden names check.
381 :rtype: str
382 :raises ProcessorParameterError: If the proposed name is in the list of forbidden names.
383 """
384 if proposed_name not in self.reserved_names:
385 return proposed_name
386 raise ProcessorParameterError(f'Attempt to use a forbidden name ({proposed_name})')
388 def __set_name__(self, owner: type[Processor], name: str) -> None:
389 self.public_name = name
390 self.private_name = f'param_{name}'
391 self._owner = owner
393 definitions = _ensure_parameter_definitions(owner)
394 existing = definitions.get(self._external_name)
395 if existing is not None and getattr(existing, '_owner', None) is owner:
396 if not hasattr(owner, '_parameter_definition_error'):
397 setattr(
398 owner,
399 '_parameter_definition_error',
400 ProcessorParameterError(f'Duplicated parameter name ({self._external_name}).'),
401 )
402 return
403 definitions[self._external_name] = self
405 def __get__(self, obj: Processor, obj_type: type[Processor]) -> ActiveParameter[ParameterType] | ParameterType:
406 if obj is None:
407 return self
409 # retrieve instance-level passive parameter
410 param = cast(PassiveParameter[ParameterType], obj._processor_parameters[self._external_name])
411 return param.value
413 def __set__(self, obj: Processor, value: ParameterType) -> None:
414 param = obj._processor_parameters[self._external_name]
415 param.value = value
417 def to_schema(self) -> ParameterSchema:
418 """
419 Returns the static schema describing this parameter.
421 The schema is derived solely from the descriptor metadata and does not instantiate the owning processor.
422 """
423 annotation = self._resolve_parameter_annotation()
424 default_value = self._schema_default_value()
425 help_text = self._help_doc or None
426 is_list = self._is_list_annotation(annotation, default_value)
427 is_dict = self._is_dict_annotation(annotation, default_value)
428 return ParameterSchema(
429 name=self._external_name,
430 annotation=annotation,
431 default=default_value,
432 help=help_text,
433 is_list=is_list,
434 is_dict=is_dict,
435 )
437 def _resolve_parameter_annotation(self) -> type | None:
438 if not hasattr(self, 'public_name') or getattr(self, '_owner', None) is None:
439 return None
441 target = getattr(self, '_owner', None)
443 try:
444 hints = get_type_hints(target)
445 except Exception:
446 hints = {}
448 hint = hints.get(self.public_name)
449 if hint is None:
450 fallback = self._default if self._default is not None else self._value
451 if fallback is not None:
452 return type(fallback)
453 return None
455 origin = get_origin(hint)
456 if origin is ActiveParameter:
457 args = get_args(hint)
458 if args:
459 return cast(type | None, args[0])
460 return None
461 return cast(type | None, hint)
463 def _schema_default_value(self) -> Any:
464 candidate = self._default if self._default is not None else self._value
465 if candidate is None:
466 return None
467 try:
468 return deepcopy(candidate)
469 except Exception:
470 return candidate
472 def _is_list_annotation(self, annotation: type | None, default_value: Any) -> bool:
473 return self._matches_container(annotation, default_value, list)
475 def _is_dict_annotation(self, annotation: type | None, default_value: Any) -> bool:
476 return self._matches_container(annotation, default_value, dict)
478 def _matches_container(self, annotation: type | None, default_value: Any, container: type) -> bool:
479 type_hint = annotation
480 if type_hint is None and default_value is not None:
481 type_hint = type(default_value)
483 if type_hint is None:
484 return False
486 origin = get_origin(type_hint)
487 if origin is container:
488 return True
490 if isinstance(type_hint, type) and issubclass(type_hint, container):
491 return True
493 return False
496class ProcessorMeta(type):
497 """Metaclass that finalizes Processor subclasses before instantiation."""
499 def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict[str, Any]) -> None:
500 """
501 Set up new Processor subclasses.
503 This method first checks the class for well-formed parameter definitions and filter metadata, then runs the
504 super-call wrapping machinery described below. The wrappers are installed at class-creation time so that any
505 later instance will be guarded before it even starts executing.
506 """
507 super().__init__(name, bases, namespace)
508 _ensure_parameter_definitions(cast(type['Processor'], cls))
509 _validate_filter_schema(cast(type['Processor'], cls))
510 error = getattr(cls, '_parameter_definition_error', None)
511 if error is not None:
512 delattr(cls, '_parameter_definition_error')
513 raise error
514 if any(base.__name__ == 'Processor' for base in cls.__mro__[1:]):
515 apply_wrappers = getattr(cls, '_apply_super_call_wrappers', None)
516 if apply_wrappers is not None: 516 ↛ exitline 516 didn't return from function '__init__' because the condition on line 516 was always true
517 apply_wrappers()
519 def __call__(cls, *args: Any, **kwargs: Any) -> 'ProcessorMeta':
520 obj = type.__call__(cls, *args, **kwargs)
521 obj.__post_init__()
522 return cast(ProcessorMeta, obj)
525# noinspection PyProtectedMember
526class Processor(metaclass=ProcessorMeta):
527 """
528 The basic processor.
530 A very comprehensive description of what a Processor does and how it works is available at :ref:`doc_processor`.
531 """
533 processor_status = Active(ProcessorStatus.Unknown)
534 """Processor execution status"""
536 progress_message: str = f'{__qualname__} is working'
537 """Message displayed to show the progress.
539 It can be customized with information about the current item in the loop by overloading the
540 :meth:`format_progress_message`."""
542 #: List of methods that should invoke their super implementation when overridden.
543 _methods_to_be_checked_for_super: tuple[str, ...] = ('start', 'finish')
545 @classmethod
546 def parameter_schema(cls) -> list[ParameterSchema]:
547 """
548 Return the ordered static schema for the processor parameters defined on the class.
550 The schema is derived without instantiating the processor, keeping toolchains free from side effects.
551 """
552 definitions = getattr(cls, '_parameter_definitions', None)
553 if definitions is None:
554 definitions = _ensure_parameter_definitions(cls)
555 return [param.to_schema() for param in definitions.values()]
557 @classmethod
558 def filter_schema(cls) -> FilterSchema | None:
559 """
560 Optional metadata describing the models available for filtering.
561 """
562 return getattr(cls, '_filter_schema', None)
564 @classmethod
565 def processor_schema(cls) -> ProcessorSchema:
566 """
567 Return the static schema for the processor.
568 """
569 return ProcessorSchema(parameters=cls.parameter_schema(), filter=cls.filter_schema())
571 _ids = count(0)
572 """A counter for all processor instances"""
574 new_defaults: dict[str, Any] = {}
575 """
576 A dictionary containing defaults value for the parameters to be overridden
578 .. versionadded:: v2.0.0
579 """
581 new_only_flag = 'new_only'
583 def __init__(
584 self,
585 name: str | None = None,
586 description: str | None = None,
587 config: dict[str, Any] | None = None,
588 looper: LoopType | str = LoopType.ForLoop,
589 user_interface: UserInterfaceBase | None = None,
590 timer: Timer | None = None,
591 timer_params: dict[str, Any] | None = None,
592 database: Database | None = None,
593 database_conf: dict[str, Any] | None = None,
594 remove_orphan_files: bool = True,
595 replica_id: str | None = None,
596 create_standard_tables: bool = True,
597 max_workers: int | None = None,
598 queue_size: int | None = None,
599 queue_batch_size: int | None = None,
600 *args: Any,
601 **kwargs: Any,
602 ) -> None:
603 """
604 Constructor parameters
606 :param name: The name of the processor. If None is provided, the class name is used instead. Defaults to None.
607 :type name: str, Optional
608 :param description: A short description of the processor task. Defaults to the processor name.
609 :type description: str, Optional
610 :param config: A configuration dictionary for this processor. Defaults to None.
611 :type config: dict, Optional
612 :param looper: Enumerator to define the looping type. Defaults to LoopType.ForLoop
613 :type looper: LoopType, Optional
614 :param user_interface: A user interface instance to be used by the processor to interact with the user.
615 :type user_interface: UserInterfaceBase, Optional
616 :param timer: A timer object to measure process duration.
617 :type timer: Timer, Optional
618 :param timer_params: Parameters for the timer object.
619 :type timer_params: dict, Optional
620 :param database: A database instance. Defaults to None.
621 :type database: Database, Optional
622 :param database_conf: Configuration for the database. Default to None.
623 :type database_conf: dict, Optional
624 :param remove_orphan_files: Boolean flag to remove files on disc without a reference to the database.
625 See :ref:`std_tables` and :meth:`~mafw.processor.Processor._remove_orphan_files`. Defaults to True
626 :type remove_orphan_files: bool, Optional
627 :param replica_id: The replica identifier for the current processor.
628 :type replica_id: str, Optional
629 :param create_standard_tables: Boolean flag to create std tables on disk. Defaults to True
630 :type create_standard_tables: bool, Optional
631 :param max_workers: Number of worker threads for parallel loops.
632 :type max_workers: int, Optional
633 :param queue_size: Maximum size of the internal queue for the queue-based parallel loop.
634 :type queue_size: int, Optional
635 :param queue_batch_size: Number of items processed per worker task in the queue-based parallel loop.
636 :type queue_batch_size: int, Optional
637 :param kwargs: Keyword arguments that can be used to set processor parameters.
638 """
640 self.name = name or self.__class__.__name__
641 """The name of the processor."""
643 self.unique_id = next(self._ids)
644 """A unique identifier representing how many instances of Processor has been created."""
646 self.replica_id = replica_id
647 """
648 The replica identifier specified in the constructor
650 .. versionadded:: v2.0.0
651 """
653 self.description = description or self.name
654 """A short description of the processor task."""
656 self._item: Any = None
657 """The current item of the loop."""
659 self._looping_status: LoopingStatus = LoopingStatus.Continue
660 """The looping status for the main thread."""
662 self._thread_local = threading.local()
663 """Thread-local storage for loop attributes in parallel execution.
665 .. versionadded:: v2.1.0
666 """
668 self._wall_clock_start: float | None = None
669 """Timestamp when the looping execution began."""
671 self.processor_exit_status = ProcessorExitStatus.Successful
672 """Processor exit status"""
674 self.loop_type: LoopType = LoopType(looper)
675 """
676 The loop type.
678 The value of this parameter can also be changed by the :func:`~mafw.decorators.execution_workflow` decorator
679 factory.
681 See :class:`~mafw.enumerators.LoopType` for more details.
682 """
684 self.create_standard_tables = create_standard_tables
685 """The boolean flag to proceed or skip with standard table creation and initialisation"""
687 self.max_workers = max_workers if max_workers is not None else self._compute_default_max_workers()
688 """Maximum number of worker threads used in parallel loops."""
690 computed_queue_size = max(1, self.max_workers * 2)
691 self.queue_size = queue_size if queue_size is not None else computed_queue_size
692 """Maximum size of the queue used by :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`."""
694 self.queue_batch_size = max(1, queue_batch_size or 1)
695 """Number of items processed per worker task in :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`."""
697 # private attributes
698 self._config: dict[str, Any] = {} # deepcopy(config) if config is not None else {}
699 """
700 A dictionary containing the processor configuration object.
702 This dictionary is populated with configuration parameter (always type 2) during the
703 :meth:`._load_parameter_configuration` method.
705 The original value of the configuration dictionary that is passed to the constructor is stored in
706 :attr:`._orig_config`.
708 .. versionchanged:: v2.0.0
709 Now it is an empty dictionary until the :meth:`._load_parameter_configuration` is called.
711 """
713 self._orig_config = deepcopy(config) if config is not None else {}
714 """
715 A copy of the original configuration dictionary.
717 .. versionadded:: v2.0.0
718 """
720 self._processor_parameters: OrderedDict[str, PassiveParameter[Any]] = OrderedDict()
721 """
722 A dictionary to store all the processor parameter instances.
724 The name of the parameter is used as a key, while for the value an instance of the
725 :class:`.PassiveParameter` is used.
726 """
727 # wait for answer from SO
728 self._parameter_registered = False
729 """A boolean flag to confirm successful parameter registration."""
730 self._kwargs = kwargs
732 # loops attributes
733 self._i_item: int = -1
734 self._n_item: int | None = -1
735 self._process_durations: list[float] = []
736 self._super_call_flags: dict[str, bool] = {}
737 """Tracks super-call usage for methods that require it."""
739 # resource stack
740 self._resource_stack: contextlib.ExitStack
741 self._resource_acquisition: bool = True
743 # processor timer
744 self.timer: Timer | None = timer
745 self._timer_parameters: dict[str, Any] = timer_params or {}
747 # user interface
748 if user_interface is None:
749 self._user_interface: UserInterfaceBase = ConsoleInterface()
750 else:
751 self._user_interface = user_interface
753 # database stuff
754 self._database: peewee.Database | None = database
755 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf)
756 self.filter_register: mafw.db.db_filter.ProcessorFilter = mafw.db.db_filter.ProcessorFilter()
757 """The DB filter register of the Processor."""
758 self.remove_orphan_files: bool = remove_orphan_files
759 """The flag to remove or protect the orphan files. Defaults to True"""
761 self.initialise_parameters()
763 def initialise_parameters(self) -> None:
764 """
765 Initialises processor parameters by registering them and applying various configuration sources.
767 This method orchestrates the parameter initialisation process by performing the following steps in order:
769 #. Registers processor parameters defined as :class:`ActiveParameter` instances
770 #. Overrides default parameter values with any configured overrides
771 #. Loads parameter configuration from the processor's configuration dictionary
772 #. Applies keyword arguments as parameter overrides
774 The method ensures that all processor parameters are properly configured before the processor
775 execution begins. It is automatically called during processor initialisation and should not
776 typically be called directly by users.
778 .. seealso::
779 :meth:`_register_parameters`, :meth:`_override_defaults`,
780 :meth:`_load_parameter_configuration`, :meth:`_overrule_kws_parameters`
782 .. versionadded:: v2.0.0
783 """
784 self._register_parameters()
785 self._override_defaults()
786 self._load_parameter_configuration()
787 self._overrule_kws_parameters()
789 def __post_init__(self) -> None:
790 """
791 Performs post-initialisation tasks for the processor.
793 This method is automatically called after the processor initialisation is complete.
794 It performs validation checks on overloaded methods and sets the initial processor status.
796 .. seealso::
797 :meth:`validate_configuration`, :meth:`_check_method_overload`,
798 :attr:`~mafw.processor.Processor.processor_status`
800 .. versionchanged:: v2.0.0
801 Moved the parameter initialisation to :meth:`initialise_parameters` and now executed as last step of the
802 init method.
804 Added the validate configuration check. This method should silently check that configuration provided
805 with the processor parameters is valid. If not, a :exc:`.ProcessorParameterError` is raised.
806 """
807 self.validate_configuration()
808 self._check_method_overload()
809 self.processor_status = ProcessorStatus.Init
811 def _register_parameters(self) -> None:
812 """
813 Register processor parameters defined as ActiveParameter instances in the class.
815 This private method scans the class definition for any :class:`.ActiveParameter` instances and creates
816 corresponding :class:`.PassiveParameter` instances to store the actual parameter values and metadata.
817 It ensures that all processor parameters are properly initialised and available for configuration
818 through the processor's configuration system.
820 The method checks for duplicate parameter names and raises a :exc:`.ProcessorParameterError` if duplicates
821 are detected. It also sets the internal flag :attr:`._parameter_registered` to True once registration is
822 complete.
824 .. note::
825 This method is automatically called during processor initialisation and should not be called directly
826 by users.
828 .. seealso::
829 :class:`.Processor`, :meth:`.Processor._override_defaults`,
830 :meth:`.Processor._load_parameter_configuration`, :meth:`.Processor._overrule_kws_parameters`
832 .. versionchanged:: v2.0.0
833 Only :class:`ActiveParameter` are not registered. The use of :class:`PassiveParameter` is only meant to
834 store the value and metadata of the active counter part.
835 """
836 if self._parameter_registered:
837 return
839 definitions = getattr(self.__class__, '_parameter_definitions', None)
840 if definitions is None:
841 definitions = _ensure_parameter_definitions(self.__class__)
843 for attr in definitions.values():
844 ext_name = attr._external_name
845 if ext_name in self._processor_parameters:
846 raise ProcessorParameterError(f'Duplicated parameter name ({ext_name}).')
847 self._processor_parameters[ext_name] = PassiveParameter(
848 ext_name, attr._value, attr._default, attr._help_doc
849 )
851 self._parameter_registered = True
853 def _override_defaults(self) -> None:
854 """
855 Override default parameter values with values from :attr:`new_defaults`.
857 This private method iterates through the :attr:`new_defaults` dictionary and updates
858 the corresponding processor parameters with new values. Only parameters that exist
859 in both :attr:`new_defaults` and :attr:`_processor_parameters` are updated.
861 .. versionadded:: v2.0.0
862 """
863 for key, value in self.new_defaults.items():
864 if key in self._processor_parameters:
865 self._processor_parameters[key].value = value
867 def _reset_parameters(self) -> None:
868 """
869 Reset processor parameters to their initial state.
871 This method clears all currently registered processor parameters and triggers
872 a fresh registration process. It's useful when parameter configurations need
873 to be reinitialized or when parameters have been modified and need to be reset.
875 .. seealso::
876 :meth:`_register_parameters`, :meth:`_register_parameters`
877 """
878 self._processor_parameters = OrderedDict()
879 self._parameter_registered = False
880 self._register_parameters()
882 @ensure_parameter_registration
883 def _load_parameter_configuration(self) -> None:
884 """
885 Load processor parameter configuration from the internal configuration dictionary.
887 This method processes the processor's configuration dictionary to set parameter values.
888 It handles two configuration formats:
890 1. Nested format: ``{'ProcessorName': {'param1': value1, ...}}``
891 2. Flat format: ``{'param1': value1, ...}``
893 The method also handles filter configurations by collecting filter table names
894 and deferring their initialisation until after the global filter has been processed.
896 .. versionchanged:: v2.0.0
897 For option 1 combining configuration from name and name_replica
899 :raises ProcessorParameterError: If a parameter in the configuration is not registered.
901 .. seealso::
902 :meth:`mafw.db.db_filter.ModelFilter.from_conf`
903 """
904 original_config = copy(self._orig_config)
905 flt_list = []
907 # by default the flag new_only is set to true
908 # unless the user specify differently in the general section of the steering file
909 self.filter_register.new_only = original_config.get(self.new_only_flag, True)
911 # we need to check if the configuration object is of type 1 or type 2
912 if any([name for name in [self.name, self.replica_name] if name in original_config]):
913 # one of the two names (the base or the replica) must be present in case of option 1
914 # we start from the base name. If not there, then take an empty dict
915 option1_config_base = original_config.get(self.name, {})
916 if self.name != self.replica_name:
917 # if there is the replica name, then update the base configuration with the replica value
918 # we get the replica configuration
919 option1_config_replica = original_config.get(self.replica_name, {})
921 # let's check if the user wants to have inheritance default
922 # by default is True
923 inheritance = option1_config_replica.get('__inheritance__', True)
924 if inheritance:
925 # we update the base with the replica without changing the base
926 option1_config_update = deep_update(option1_config_base, option1_config_replica, copy_first=True)
927 else:
928 # we do not use the base with the replica specific, we pass the replica as the updated
929 option1_config_update = option1_config_replica
931 # we modify the type 1 original so that the table for the replica has the updated configuration
932 # this is used for the filter configuration at the end.
933 original_config[self.replica_name] = option1_config_update
934 else:
935 # there is not replica, so the update is equal to the base.
936 option1_config_update = option1_config_base
938 self._config = option1_config_update
939 else:
940 # for type 2 we are already good to go
941 self._config = original_config
943 filter_config = deepcopy(original_config)
945 def _sanitize_filter_config(processor_name: str) -> None:
946 processor_config = filter_config.get(processor_name)
947 if not isinstance(processor_config, dict):
948 return
949 filter_table = processor_config.get('__filter__')
950 if not isinstance(filter_table, dict):
951 return
953 sanitized_table: dict[str, Any] = {}
954 for model_name, model_config in filter_table.items():
955 if not isinstance(model_config, dict):
956 sanitized_table[model_name] = model_config
957 continue
959 model_config_copy = deepcopy(model_config)
960 if not bool(model_config_copy.pop('__enable__', True)):
961 continue
963 for field_name, field_value in list(model_config_copy.items()):
964 if (
965 isinstance(field_value, dict)
966 and not ('op' in field_value and 'value' in field_value)
967 and '__enable__' in field_value
968 ):
969 field_enabled = bool(field_value.pop('__enable__', True))
970 if not field_enabled: 970 ↛ 963line 970 didn't jump to line 963 because the condition on line 970 was always true
971 model_config_copy.pop(field_name, None)
973 conditionals = model_config_copy.get('__conditional__')
974 if isinstance(conditionals, list):
975 filtered_conditionals: list[Any] = []
976 for conditional in conditionals:
977 if isinstance(conditional, dict):
978 conditional_enabled = bool(conditional.pop('__enable__', True))
979 if not conditional_enabled:
980 continue
981 filtered_conditionals.append(conditional)
982 model_config_copy['__conditional__'] = filtered_conditionals
984 sanitized_table[model_name] = model_config_copy
986 processor_config['__filter__'] = sanitized_table
988 _sanitize_filter_config(self.replica_name)
990 for key, value in self._config.items():
991 if key in self._processor_parameters:
992 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO
993 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above
994 elif key == '__filter__':
995 # we got a filter table!
996 # it should contain one table for each model
997 # we add all the names to a list for deferred initialisation
998 flt_table = self._config[key]
999 if isinstance(flt_table, dict):
1000 for model_name, model_config in flt_table.items():
1001 if isinstance(model_config, dict) and not bool(model_config.get('__enable__', True)):
1002 continue
1003 flt_list.append(f'{self.replica_name}.__filter__.{model_name}')
1004 elif key == '__logic__':
1005 # we got a filter logic string
1006 # we store it in the filter register directly
1007 self.filter_register._logic = self._config[key]
1008 elif key == '__new_only__':
1009 # we got a new only boolean, we store it in the filter register
1010 self.filter_register.new_only = self._config[key]
1012 # only now, after the configuration file has been totally read, we can do the real filter initialisation.
1013 # This is to be sure that if there were a GlobalFilter table, this has been read.
1014 # The global filter region will be used as a starting point for the construction of a new filter (default
1015 # parameter in the from_conf class method).
1016 for flt_name in flt_list:
1017 model_name = flt_name.split('.')[-1]
1018 self.filter_register[model_name] = mafw.db.db_filter.ModelFilter.from_conf(flt_name, filter_config)
1020 @ensure_parameter_registration
1021 def _overrule_kws_parameters(self) -> None:
1022 """
1023 Override processor parameters with values from keyword arguments.
1025 This method applies parameter values passed as keyword arguments during processor
1026 initialisation. It ensures that the parameter types match the expected types
1027 before setting the values.
1029 .. seealso::
1030 :meth:`_register_parameters`, :meth:`_load_parameter_configuration`,
1031 :meth:`set_parameter_value`
1032 """
1033 for key, value in self._kwargs.items():
1034 if key in self._processor_parameters:
1035 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO
1036 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above
1038 def validate_configuration(self) -> None:
1039 """
1040 Validate the configuration provided via the processor parameters.
1042 Method to be implemented by subclasses if a configuration validation is needed.
1044 The method should silently check for the proper configuration, if this is not obtained,
1045 then the :exc:`.InvalidConfigurationError` must be raised.
1047 .. versionadded:: v2.0.0
1048 """
1049 pass
1051 def _check_method_overload(self) -> None:
1052 """
1053 Check if the user overloaded the required methods.
1055 Depending on the loop type, the user must overload different methods.
1056 This method is doing the check and if the required methods are not overloaded a warning is emitted.
1057 """
1058 methods_dict: dict[LoopType, list[str]] = {
1059 LoopType.WhileLoop: ['while_condition'],
1060 LoopType.ForLoop: ['get_items'],
1061 LoopType.ParallelForLoop: ['get_items'],
1062 LoopType.ParallelForLoopWithQueue: ['get_items'],
1063 }
1064 required_methods: list[str] = methods_dict.get(self.loop_type, [])
1065 for method in required_methods:
1066 if getattr(type(self), method) == getattr(Processor, method):
1067 warnings.warn(
1068 MissingOverloadedMethod(
1069 '%s was not overloaded. The process execution workflow might not work.' % method
1070 )
1071 )
1073 @classmethod
1074 def _apply_super_call_wrappers(cls) -> None:
1075 """
1076 Wraps overridden methods so the class can detect whether they called `super()`.
1078 This method runs immediately after the class is created (see :class:`.ProcessorMeta`). For every
1079 method that we expect scientists to extend (start, finish, etc.) we replace their implementation with a
1080 wrapper. The wrapper resets a per-instance flag, invokes the real override, and only after that method returns
1081 it checks whether the real `super()` was ever reached; if not, it emits a :class:`~mafw.mafw_errors.MissingSuperCall`
1082 warning. In other words, the wiring happens while the subclass is defined, and the actual smoke test executes
1083 each time the method runs.
1084 """
1085 methods = getattr(cls, '_methods_to_be_checked_for_super', ())
1086 for method in methods:
1087 if method not in cls.__dict__:
1088 continue
1089 if not any(hasattr(base, method) for base in cls.__mro__[1:]):
1090 continue
1091 original: Callable[..., Any] = getattr(cls, method)
1092 # we are adding an attribute to the method, it looks strange, but it is possible
1093 # in this way we avoid wrapping the same method more than once.
1094 if getattr(original, '_mafw_super_check_wrapped', False):
1095 continue
1097 def _make_wrapper(
1098 __orig: Callable[..., Any],
1099 __method: str, # pragma: no cover
1100 ) -> Callable[..., Any]:
1101 @wraps(__orig)
1102 def _wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
1103 # self is the processor instance.
1104 # _reset_super_call_flag is resetting the call status
1105 # for method as False
1106 self._reset_super_call_flag(__method)
1107 # in the base method, the super call flag is set to True
1108 result = __orig(self, *args, **kwargs)
1109 # if the super call flag is not True, then it is because the base
1110 # method was not called.
1111 # emit the warning and return the original method return value
1112 if not self._did_call_super(__method):
1113 warnings.warn(
1114 MissingSuperCall(
1115 'The overloaded %s is not invoking its super method. The processor might not work.'
1116 % __method
1117 )
1118 )
1119 return result
1121 return _wrapper
1123 # we create a wrapped method from the original
1124 wrapper = _make_wrapper(original, method)
1125 # we set a flag for the method to avoid multiple wrapping
1126 wrapper._mafw_super_check_wrapped = True # type: ignore[attr-defined]
1127 setattr(cls, method, wrapper)
1129 def _reset_super_call_flag(self, method: str) -> None:
1130 """
1131 Reset the super-call flag for a method.
1132 """
1133 self._super_call_flags[method] = False
1135 def _mark_super_call(self, method: str) -> None:
1136 """
1137 Mark a method as having called its super implementation.
1138 """
1139 self._super_call_flags[method] = True
1141 def _did_call_super(self, method: str) -> bool:
1142 """
1143 Check whether a method called its super implementation.
1144 """
1145 return self._super_call_flags.get(method, False)
1147 @ensure_parameter_registration
1148 def dump_parameter_configuration(self, option: int = 1) -> dict[str, Any]:
1149 """
1150 Dumps the processor parameter values in a dictionary.
1152 The snippet below explains the meaning of `option`.
1154 .. code-block:: python
1156 # option 1
1157 conf_dict1 = {
1158 'Processor': {'param1': 5, 'input_table': 'my_table'}
1159 }
1161 # option 2
1162 conf_dict2 = {'param1': 5, 'input_table': 'my_table'}
1164 In the case of option 1, the replica aware name (:meth:`.replica_name`) will be used as a key for the
1165 configuration dictionary.
1167 .. versionchanged:: v2.0.0
1168 With option 1, using :meth:`.replica_name` instead of :attr:`~.Processor.name` as key of the configuration
1169 dictionary.
1171 :param option: Select the dictionary style. Defaults to 1.
1172 :type option: int, Optional
1173 :return: A parameter configuration dictionary.
1174 :rtype: dict
1175 """
1176 inner_dict = {}
1177 for key, value in self._processor_parameters.items():
1178 inner_dict[key] = value.value
1180 if option == 1:
1181 outer_dict = {self.replica_name: inner_dict}
1182 elif option == 2:
1183 outer_dict = inner_dict
1184 else:
1185 log.warning('Unknown option %s. Using option 2' % option)
1186 outer_dict = inner_dict
1187 return outer_dict
1189 @ensure_parameter_registration
1190 def get_parameter(self, name: str) -> PassiveParameter[Any]:
1191 """
1192 Gets the processor parameter named name.
1194 :param name: The name of the parameter.
1195 :type name: str
1196 :return: The processor parameter
1197 :rtype: PassiveParameter
1198 :raises ProcessorParameterError: If a parameter with `name` is not registered.
1199 """
1200 if name in self._processor_parameters:
1201 return self._processor_parameters[name]
1202 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
1204 @ensure_parameter_registration
1205 def get_parameters(self) -> dict[str, PassiveParameter[Any]]:
1206 """
1207 Returns the full dictionary of registered parameters for this processor.
1209 Useful when dumping the parameter specification in a configuration file, for example.
1211 :return: The dictionary with the registered parameters.
1212 :rtype: dict[str, PassiveParameter[ParameterType]
1213 """
1214 return self._processor_parameters
1216 @ensure_parameter_registration
1217 def delete_parameter(self, name: str) -> None:
1218 """
1219 Deletes a processor parameter.
1221 :param name: The name of the parameter to be deleted.
1222 :type name: str
1223 :raises ProcessorParameterError: If a parameter with `name` is not registered.
1224 """
1225 if name in self._processor_parameters:
1226 del self._processor_parameters[name]
1227 else:
1228 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
1230 @ensure_parameter_registration
1231 def set_parameter_value(self, name: str, value: ParameterType) -> None:
1232 """
1233 Sets the value of a processor parameter.
1235 :param name: The name of the parameter to be deleted.
1236 :type name: str
1237 :param value: The value to be assigned to the parameter.
1238 :type value: ParameterType
1239 :raises ProcessorParameterError: If a parameter with `name` is not registered.
1240 """
1241 if name in self._processor_parameters:
1242 self._processor_parameters[name].value = value
1243 else:
1244 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
1246 def get_filter(self, model_name: str) -> mafw.db.db_filter.ModelFilter:
1247 """
1248 Returns a registered :class:`~mafw.db.db_filter.ModelFilter` via the model name.
1250 If a filter for the provided model_name does not exist, a KeyError is raised.
1252 :param model_name: The model name for which the filter will be returned.
1253 :type model_name: str
1254 :return: The registered filter
1255 :rtype: mafw.db.db_filter.ModelFilter
1256 :raises: KeyError is a filter with the give name is not found.
1257 """
1258 return self.filter_register[model_name]
1260 def on_processor_status_change(self, old_status: ProcessorStatus, new_status: ProcessorStatus) -> None:
1261 """
1262 Callback invoked when the processor status is changed.
1264 :param old_status: The old processor status.
1265 :type old_status: ProcessorStatus
1266 :param new_status: The new processor status.
1267 :type new_status: ProcessorStatus
1268 """
1269 self._user_interface.change_of_processor_status(self.name, old_status, new_status)
1271 def on_looping_status_set(self, status: LoopingStatus) -> None:
1272 """
1273 Call back invoked when the looping status is set.
1275 The user can overload this method according to the needs.
1277 :param status: The set looping status.
1278 :type status: LoopingStatus
1279 """
1280 if status == LoopingStatus.Skip:
1281 log.warning('Skipping item %s' % self.i_item)
1282 elif status == LoopingStatus.Abort:
1283 log.error('Looping has been aborted')
1284 elif status == LoopingStatus.Quit:
1285 log.warning('Looping has been quit')
1287 def format_progress_message(self) -> None:
1288 """Customizes the progress message with information about the current item.
1290 The user can overload this method in order to modify the message being displayed during the process loop with
1291 information about the current item.
1293 The user can access the current value, its position in the looping cycle and the total number of items using
1294 :attr:`.Processor.item`, :obj:`.Processor.i_item` and :obj:`.Processor.n_item`.
1295 """
1296 pass
1298 @contextlib.contextmanager
1299 def _thread_loop_context(self, i_item: int, n_item: int, item: Any) -> Iterator[None]:
1300 """
1301 Context manager to set thread-local loop attributes for parallel execution.
1303 :param i_item: Item index for the loop.
1304 :type i_item: int
1305 :param n_item: Total number of items in the loop.
1306 :type n_item: int
1307 :param item: Item payload.
1308 :type item: Any
1309 """
1310 self._thread_local.in_worker = True
1311 self._thread_local.i_item = i_item
1312 self._thread_local.n_item = n_item
1313 self._thread_local.item = item
1314 self._thread_local.looping_status = LoopingStatus.Continue
1315 try:
1316 yield
1317 finally:
1318 for name in ('i_item', 'n_item', 'item', 'looping_status', 'in_worker'):
1319 if hasattr(self._thread_local, name): 1319 ↛ 1318line 1319 didn't jump to line 1318 because the condition on line 1319 was always true
1320 delattr(self._thread_local, name)
1322 def _in_thread_context(self) -> bool:
1323 """Return True when running inside a parallel worker thread."""
1324 return bool(getattr(self._thread_local, 'in_worker', False))
1326 @property
1327 def item(self) -> Any:
1328 """The current item of the loop."""
1329 if self._in_thread_context() and hasattr(self._thread_local, 'item'):
1330 return self._thread_local.item
1331 return self._item
1333 @item.setter
1334 def item(self, value: Any) -> None:
1335 if self._in_thread_context():
1336 self._thread_local.item = value
1337 else:
1338 self._item = value
1340 @property
1341 def i_item(self) -> int:
1342 """The enumeration of the current item being processed."""
1343 if self._in_thread_context() and hasattr(self._thread_local, 'i_item'):
1344 return cast(int, self._thread_local.i_item)
1345 return self._i_item
1347 @i_item.setter
1348 def i_item(self, value: int) -> None:
1349 if self._in_thread_context():
1350 self._thread_local.i_item = value
1351 else:
1352 self._i_item = value
1354 @property
1355 def n_item(self) -> int | None:
1356 """The total number of items to be processed or None for an undefined loop"""
1357 if self._in_thread_context() and hasattr(self._thread_local, 'n_item'):
1358 return cast(int | None, self._thread_local.n_item)
1359 return self._n_item
1361 @n_item.setter
1362 def n_item(self, value: int | None) -> None:
1363 if self._in_thread_context():
1364 self._thread_local.n_item = value
1365 else:
1366 self._n_item = value
1368 @property
1369 def looping_status(self) -> LoopingStatus:
1370 """The looping status for the current thread context."""
1371 if self._in_thread_context():
1372 value = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue)
1373 else:
1374 value = self._looping_status
1375 if hasattr(self, 'on_looping_status_get'):
1376 self.on_looping_status_get(value)
1377 return value
1379 @looping_status.setter
1380 def looping_status(self, value: LoopingStatus) -> None:
1381 if self._in_thread_context():
1382 current = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue)
1383 self._thread_local.looping_status = value
1384 else:
1385 current = self._looping_status
1386 self._looping_status = value
1387 if current != value:
1388 if hasattr(self, 'on_looping_status_change'):
1389 self.on_looping_status_change(current, value)
1390 else:
1391 if hasattr(self, 'on_looping_status_set'): 1391 ↛ exitline 1391 didn't return from function 'looping_status' because the condition on line 1391 was always true
1392 self.on_looping_status_set(value)
1394 @property
1395 def unique_name(self) -> str:
1396 """Returns the unique name for the processor."""
1397 return f'{self.name}_{self.unique_id}'
1399 @property
1400 def replica_name(self) -> str:
1401 """
1402 Returns the replica aware name of the processor.
1404 If no replica_id is specified, then return the pure name, otherwise join the two string using the '#' symbol.
1406 .. versionadded:: v2.0.0
1408 :return: The replica aware name of the processor.
1409 :rtype: str
1410 """
1411 if self.replica_id is None:
1412 return self.name
1413 else:
1414 return self.name + '#' + self.replica_id
1416 @property
1417 def local_resource_acquisition(self) -> bool:
1418 """
1419 Checks if resources should be acquired locally.
1421 When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external
1422 resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute
1423 resources among the whole processor list. In this case, resources should not be acquired locally by the
1424 single processor, but from the parent execution context.
1426 :return: True if resources are to be acquired locally by the processor. False, otherwise.
1427 :rtype: bool
1428 """
1429 return self._resource_acquisition
1431 @local_resource_acquisition.setter
1432 def local_resource_acquisition(self, flag: bool) -> None:
1433 self._resource_acquisition = flag
1435 @property
1436 def database(self) -> peewee.Database:
1437 """
1438 Returns the database instance
1440 :return: A database object.
1441 :raises MissingDatabase: If the database connection has not been established.
1442 """
1443 if self._database is None:
1444 raise MissingDatabase('Database connection not initialized')
1445 return self._database
1447 def execute(self) -> None:
1448 """Execute the processor tasks.
1450 This method works as a dispatcher, reassigning the call to a more specific execution implementation depending
1451 on the :attr:`~mafw.processor.Processor.loop_type`.
1452 """
1453 dispatcher: dict[LoopType, Callable[[], None]] = {
1454 LoopType.SingleLoop: self._execute_single,
1455 LoopType.ForLoop: self._execute_for_loop,
1456 LoopType.ParallelForLoop: self._execute_for_loop,
1457 LoopType.ParallelForLoopWithQueue: self._execute_for_loop,
1458 LoopType.WhileLoop: self._execute_while_loop,
1459 }
1460 dispatcher[self.loop_type]()
1462 @staticmethod
1463 def _compute_default_max_workers() -> int:
1464 """
1465 Helper to compute the default number of workers for parallel execution.
1467 Returns min(32, cpu_count + 4).
1468 """
1469 return min(32, (os.cpu_count() or 1) + 4)
1471 def _execute_single(self) -> None:
1472 """Execute the processor in single mode.
1474 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
1475 appropriate implementation depending on the processor LoopType.
1476 """
1477 with contextlib.ExitStack() as self._resource_stack:
1478 self.acquire_resources()
1479 self._wall_clock_start = time.perf_counter()
1480 self.start()
1481 self.processor_status = ProcessorStatus.Run
1482 self.process()
1483 self.finish()
1485 def _execute_for_loop(self) -> None:
1486 """Executes the processor within a for loop.
1488 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
1489 appropriate implementation depending on the processor LoopType.
1490 """
1492 with contextlib.ExitStack() as self._resource_stack:
1493 self.acquire_resources()
1494 # we cannot use a Timer context here to measure the whole duration because it spans
1495 # over different methods. Instead we directly use a performance clock.
1496 self._wall_clock_start = time.perf_counter()
1497 self.start()
1499 # get the input item list and filter it
1500 item_list = self.get_items()
1502 # get the total number of items.
1503 self.n_item = len(item_list)
1505 # turn the processor status to run
1506 self.processor_status = ProcessorStatus.Run
1508 # create a new task in the progress bar interface
1509 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item)
1511 # verify if we can use parallel for loop. If not, switch back to a serial for loop.
1512 if (
1513 self.loop_type in (LoopType.ParallelForLoop, LoopType.ParallelForLoopWithQueue)
1514 and not is_free_threading()
1515 ):
1516 warnings.warn(
1517 'Parallel for-loop requires free-threading; falling back to serial for loop.',
1518 stacklevel=2,
1519 )
1520 self.loop_type = LoopType.ForLoop
1522 if self.loop_type == LoopType.ParallelForLoopWithQueue:
1523 self._process_parallel_for_loop_with_queue(item_list)
1524 elif self.loop_type == LoopType.ParallelForLoop:
1525 self._process_parallel_for_loop(item_list)
1526 else:
1527 self._process_for_loop(item_list)
1529 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item)
1531 self.finish()
1533 def _build_loop_item(self) -> LoopItem:
1534 """
1535 Build a LoopItem payload for the current loop context.
1537 :return: The LoopItem payload.
1538 :rtype: mafw.models.LoopItem
1539 """
1540 return LoopItem(self.i_item, int(self.n_item or 0), self.item)
1542 def _build_loop_result(self, payload: Any, duration: float) -> LoopResult:
1543 """
1544 Build a LoopResult payload for the current loop context.
1546 :param payload: The optional payload returned by process.
1547 :type payload: Any
1548 :param duration: Wall-clock duration of the item processing.
1549 :type duration: float
1550 :return: The LoopResult payload.
1551 :rtype: mafw.models.LoopResult
1552 """
1553 return LoopResult(self.i_item, int(self.n_item or 0), self.looping_status, payload, duration)
1555 def _payload_annotation_matches(self, annotation: Any) -> bool:
1556 """
1557 Check whether an annotation matches LoopItem or LoopResult.
1559 :param annotation: The annotation to inspect.
1560 :type annotation: Any
1561 :return: True if the annotation matches LoopItem or LoopResult.
1562 :rtype: bool
1563 """
1564 if annotation in (LoopItem, LoopResult):
1565 return True
1566 origin = get_origin(annotation)
1567 if origin is Union:
1568 return any(arg in (LoopItem, LoopResult) for arg in get_args(annotation))
1569 return False
1571 def _call_with_optional_payload(self, func: Callable[..., Any], payload: Any) -> Any:
1572 """
1573 Invoke a processor hook with an optional payload if the signature allows it.
1575 :param func: The callable to invoke.
1576 :type func: Callable
1577 :param payload: The payload to pass if supported.
1578 :type payload: Any
1579 :return: The callable return value.
1580 :rtype: Any
1581 """
1582 signature = inspect.signature(func)
1583 parameters = list(signature.parameters.values())
1584 if parameters and parameters[0].name == 'self':
1585 parameters = parameters[1:]
1586 if not parameters:
1587 return func()
1588 if any(param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD) for param in parameters):
1589 return func(payload)
1590 first = parameters[0]
1591 if first.kind in (first.POSITIONAL_ONLY, first.POSITIONAL_OR_KEYWORD): 1591 ↛ 1596line 1591 didn't jump to line 1596 because the condition on line 1591 was always true
1592 if first.name in ('item', 'loop_item', 'result', 'loop_result', 'payload'):
1593 return func(payload)
1594 if self._payload_annotation_matches(first.annotation): 1594 ↛ 1595line 1594 didn't jump to line 1595 because the condition on line 1594 was never true
1595 return func(payload)
1596 return func()
1598 def _process_for_loop(self, item_list: Collection[Any]) -> None:
1599 """
1600 Process items with the standard serial for-loop.
1602 :param item_list: The list of items to process.
1603 :type item_list: Collection[Any]
1604 """
1605 for self.i_item, self.item in enumerate(item_list):
1606 self.looping_status = LoopingStatus.Continue
1608 self.format_progress_message()
1609 self._user_interface.display_progress_message(self.progress_message, self.i_item, self.n_item, 0.1)
1611 loop_item = self._build_loop_item()
1612 with Timer(suppress_message=True) as timer:
1613 payload = self._call_with_optional_payload(self.process, loop_item)
1614 self._process_durations.append(timer.duration)
1616 loop_result = self._build_loop_result(payload, timer.duration)
1618 if self.looping_status == LoopingStatus.Continue:
1619 self._call_with_optional_payload(self.accept_item, loop_result)
1620 elif self.looping_status == LoopingStatus.Skip:
1621 self._call_with_optional_payload(self.skip_item, loop_result)
1622 else: # Abort or Quit
1623 break
1625 self._user_interface.update_task(self.replica_name, increment=1)
1627 def _process_parallel_for_loop(self, item_list: Collection[Any]) -> None:
1628 """
1629 Process items in parallel using a thread pool.
1631 :param item_list: The list of items to process.
1632 :type item_list: Collection[Any]
1633 """
1634 max_workers = self.max_workers
1635 abort_event = threading.Event()
1636 abort_lock = threading.Lock()
1637 abort_status: LoopingStatus | None = None
1639 def _set_abort(status: LoopingStatus) -> None:
1640 nonlocal abort_status
1641 with abort_lock:
1642 if abort_status == LoopingStatus.Abort:
1643 return
1644 if status == LoopingStatus.Abort:
1645 abort_status = LoopingStatus.Abort
1646 elif abort_status is None: 1646 ↛ 1648line 1646 didn't jump to line 1648
1647 abort_status = LoopingStatus.Quit
1648 abort_event.set()
1650 def _worker(i_item: int, item: Any) -> tuple[int, Any, LoopingStatus, Any, float]:
1651 with self._thread_loop_context(i_item, int(self.n_item or 0), item):
1652 self.looping_status = LoopingStatus.Continue
1653 loop_item = self._build_loop_item()
1654 with Timer(suppress_message=True) as timer:
1655 payload = self._call_with_optional_payload(self.process, loop_item)
1656 status = self.looping_status
1658 if status in (LoopingStatus.Abort, LoopingStatus.Quit):
1659 _set_abort(status)
1660 return i_item, item, status, payload, timer.duration
1662 if abort_event.is_set():
1663 return i_item, item, status, payload, timer.duration
1665 loop_result = self._build_loop_result(payload, timer.duration)
1666 if status == LoopingStatus.Continue:
1667 self._call_with_optional_payload(self.accept_item, loop_result)
1668 elif status == LoopingStatus.Skip: 1668 ↛ 1671line 1668 didn't jump to line 1671 because the condition on line 1668 was always true
1669 self._call_with_optional_payload(self.skip_item, loop_result)
1671 return i_item, item, status, payload, timer.duration
1673 pending: set[Future[tuple[int, Any, LoopingStatus, Any, float]]] = set()
1674 items_iter = iter(enumerate(item_list))
1676 def _submit_next() -> bool:
1677 try:
1678 idx, itm = next(items_iter)
1679 except StopIteration:
1680 return False
1681 fut: Future[tuple[int, Any, LoopingStatus, Any, float]] = executor.submit(_worker, idx, itm)
1682 pending.add(fut)
1683 return True
1685 with ThreadPoolExecutor(max_workers=max_workers) as executor:
1686 while len(pending) < max_workers and _submit_next():
1687 pass
1689 while pending:
1690 done, _ = wait(pending, return_when=FIRST_COMPLETED)
1691 for fut in done:
1692 pending.remove(fut)
1693 i_item, item, status, _payload, duration = fut.result()
1694 self._process_durations.append(duration)
1695 self.item = item
1696 self.i_item = i_item
1697 self.format_progress_message()
1698 self._user_interface.display_progress_message(self.progress_message, i_item, self.n_item, 0.1)
1699 self._user_interface.update_task(self.replica_name, increment=1)
1701 if abort_event.is_set():
1702 continue
1704 while len(pending) < max_workers and _submit_next():
1705 pass
1707 if abort_status is not None:
1708 self.looping_status = abort_status
1710 def _process_parallel_for_loop_with_queue(self, item_list: Collection[Any]) -> None:
1711 """
1712 Process items in parallel using a producer/consumer queue.
1714 Items are processed in worker threads, while a dedicated consumer thread handles the post-processing hooks.
1716 :param item_list: The list of items to process.
1717 :type item_list: Collection[Any]
1718 """
1719 # the idea is the following, we have a single consumer thread that is constantly trying to pull
1720 # items out of a shared queue and we have a pool of producer threads that are putting items in the queue as
1721 # long as there is space into it (back-pressure).
1722 # it the queue is full, then the producer threads are set to sleep for a short interval of time and waken up
1723 # again after some times for another attempt to write the output in the queue.
1724 # when there are no more items to execute, the main thread is pushing a sentinel object into the queue
1725 # marking the end of the processing, so that the consumer thread can be gracefully terminated.
1726 max_workers = self.max_workers
1727 result_queue: queue.Queue[object] = queue.Queue(maxsize=self.queue_size)
1728 abort_event = threading.Event()
1729 abort_lock = threading.Lock()
1730 abort_status: LoopingStatus | None = None
1731 sentinel = object()
1733 def _set_abort(status: LoopingStatus) -> None:
1734 nonlocal abort_status
1735 with abort_lock:
1736 if abort_status == LoopingStatus.Abort: 1736 ↛ 1737line 1736 didn't jump to line 1737 because the condition on line 1736 was never true
1737 return
1738 if status == LoopingStatus.Abort:
1739 abort_status = LoopingStatus.Abort
1740 elif abort_status is None: 1740 ↛ 1742line 1740 didn't jump to line 1742
1741 abort_status = LoopingStatus.Quit
1742 abort_event.set()
1744 def _get_abort_status() -> LoopingStatus | None:
1745 with abort_lock:
1746 return abort_status
1748 def _worker(batch_items: list[tuple[int, Any]]) -> None:
1749 batch_results: list[tuple[LoopItem, LoopResult]] = []
1750 for i_item, item in batch_items:
1751 if abort_event.is_set():
1752 break
1753 with self._thread_loop_context(i_item, int(self.n_item or 0), item):
1754 self.looping_status = LoopingStatus.Continue
1755 loop_item = self._build_loop_item()
1756 with Timer(suppress_message=True) as timer:
1757 payload = self._call_with_optional_payload(self.process, loop_item)
1758 status = self.looping_status
1759 if status in (LoopingStatus.Abort, LoopingStatus.Quit):
1760 _set_abort(status)
1761 loop_result = self._build_loop_result(payload, timer.duration)
1762 batch_results.append((loop_item, loop_result))
1763 if status in (LoopingStatus.Abort, LoopingStatus.Quit):
1764 break
1765 if batch_results:
1766 result_queue.put(batch_results)
1768 def _consumer() -> None:
1769 self.consumer_start()
1770 try:
1771 while True:
1772 queued = result_queue.get()
1773 if queued is sentinel:
1774 break
1775 batch_results = cast(list[tuple[LoopItem, LoopResult]], queued)
1776 for loop_item, loop_result in batch_results:
1777 with self._thread_loop_context(loop_item.i_item, loop_item.n_item, loop_item.payload):
1778 self.looping_status = loop_result.looping_status
1779 self.format_progress_message()
1780 self._user_interface.display_progress_message(
1781 self.progress_message, loop_item.i_item, self.n_item, 0.1
1782 )
1783 self._process_durations.append(loop_result.duration)
1784 self._user_interface.update_task(self.replica_name, increment=1)
1786 if _get_abort_status() is None and loop_result.looping_status in (
1787 LoopingStatus.Continue,
1788 LoopingStatus.Skip,
1789 ):
1790 self._call_with_optional_payload(self.consumer_process, loop_result)
1791 finally:
1792 self.consumer_finish()
1794 pending: set[Future[None]] = set()
1795 items_iter = iter(enumerate(item_list))
1797 def _submit_next() -> bool:
1798 if abort_event.is_set():
1799 return False
1800 batch_items: list[tuple[int, Any]] = []
1801 for _ in range(self.queue_batch_size):
1802 try:
1803 idx, itm = next(items_iter)
1804 except StopIteration:
1805 break
1806 batch_items.append((idx, itm))
1807 if not batch_items:
1808 return False
1809 fut: Future[None] = executor.submit(_worker, batch_items)
1810 pending.add(fut)
1811 return True
1813 consumer_thread = threading.Thread(target=_consumer, name=f'{self.name}-consumer')
1814 consumer_thread.start()
1816 try:
1817 with ThreadPoolExecutor(max_workers=max_workers) as executor:
1818 while len(pending) < max_workers and _submit_next():
1819 pass
1821 while pending:
1822 done, _ = wait(pending, return_when=FIRST_COMPLETED)
1823 for fut in done:
1824 pending.remove(fut)
1825 fut.result()
1827 if abort_event.is_set():
1828 continue
1830 while len(pending) < max_workers and _submit_next():
1831 pass
1832 finally:
1833 result_queue.put(sentinel)
1834 consumer_thread.join()
1836 if abort_status is not None:
1837 self.looping_status = abort_status
1839 def _execute_while_loop(self) -> None:
1840 """Executes the processor within a while loop.
1842 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
1843 appropriate implementation depending on the processor LoopType.
1844 """
1845 # it is a while loop, so a priori we don't know how many iterations we will have, nevertheless, we
1846 # can have a progress bar with 'total' set to None, so that it goes in the so-called indeterminate
1847 # progress. See https://rich.readthedocs.io/en/stable/progress.html#indeterminate-progress
1848 # we initialise n_item outside the loop, because it is possible that the user has a way to define n_item
1849 # and he can do it within the loop.
1850 self.n_item = None
1851 with contextlib.ExitStack() as self._resource_stack:
1852 self.acquire_resources()
1853 self._wall_clock_start = time.perf_counter()
1854 self.start()
1856 # turn the processor status to run
1857 self.processor_status = ProcessorStatus.Run
1859 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item)
1861 # we are ready to start the looping. For statistics, we can count the iterations.
1862 self.i_item = 0
1863 while self.while_condition():
1864 # set the looping status to Continue. The user may want to change it in the process method.
1865 self.looping_status = LoopingStatus.Continue
1867 # send a message to the user interface
1868 self.format_progress_message()
1869 self._user_interface.display_progress_message(
1870 self.progress_message, self.i_item, self.n_item, frequency=0.1
1871 )
1873 # wrap the execution in a timer to measure how long it too for statistical reasons.
1874 with Timer(suppress_message=True) as timer:
1875 self.process()
1876 self._process_durations.append(timer.duration)
1878 # modify the loop depending on the looping status
1879 if self.looping_status == LoopingStatus.Continue:
1880 self.accept_item()
1881 elif self.looping_status == LoopingStatus.Skip:
1882 self.skip_item()
1883 else: # equiv to if self.looping_status in [LoopingStatus.Abort, LoopingStatus.Quit]:
1884 break
1886 # update the progress bar. if self.n_item is still None, then the progress bar will show indeterminate
1887 # progress.
1888 self._user_interface.update_task(self.replica_name, self.i_item + 1, 1, self.n_item)
1890 # now that the loop is finished, we know how many elements we processed
1891 if self.n_item is None:
1892 self.n_item = self.i_item
1893 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item)
1895 self.finish()
1897 def acquire_resources(self) -> None:
1898 """
1899 Acquires resources and add them to the resource stack.
1901 The whole body of the :meth:`execute` method is within a context structure. The idea is that if any part of
1902 the code inside should throw an exception that breaking the execution, we want to be sure that all stateful
1903 resources are properly closed.
1905 Since the number of resources may vary, the variable number of nested `with` statements has been replaced by
1906 an `ExitStack <https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack>`_. Resources,
1907 like open files, timers, db connections, need to be added to the resource stacks in this method.
1909 In the case a processor is being executed within a :class:`~mafw.processor.ProcessorList`, then some resources might be shared, and
1910 for this reason they are not added to the stack. This selection can be done via the private
1911 :attr:`local_resource_acquisition`. This is normally True, meaning that the processor will handle its resources
1912 independently, but when the processor is executed from a :class:`~mafw.processor.ProcessorList`, this flag is automatically turned to
1913 False.
1915 If the user wants to add additional resources, he has to overload this method calling the super to preserve
1916 the original resources. If he wants to have shared resources among different processors executed from inside
1917 a processor list, he has to overload the :class:`~mafw.processor.ProcessorList` class as well.
1918 """
1919 # Both the timer and the user interface will be added to the processor resource stack only if the processor is
1920 # set to acquire its own resources.
1921 # The timer and the user interface have in-built enter and exit method.
1922 if self._resource_acquisition:
1923 self.timer = self._resource_stack.enter_context(Timer(**self._timer_parameters))
1924 self._resource_stack.enter_context(self._user_interface)
1926 # For the database it is is a bit different.
1927 if self._database is None and self._database_conf is None:
1928 # no database, nor configuration.
1929 # we cannot do anything
1930 pass
1931 elif self._database is None and self._database_conf is not None:
1932 # no db, but we got a configuration.
1933 # we can make a db.
1934 # This processor will try to make a valid connection, and in case it succeeds, it will add the database to
1935 # the resource stack.
1936 # The database has an enter method, but it is to generate transaction.
1937 # We will add the database.close via the callback method.
1938 if 'DBConfiguration' in self._database_conf:
1939 conf = self._database_conf['DBConfiguration'] # type1
1940 else:
1941 conf = self._database_conf # type2
1943 # guess the database type from the URL
1944 protocol = extract_protocol(conf.get('URL'))
1946 # build the connection parameter
1947 # in case of sqlite, we add the pragmas group as well
1948 connection_parameters = {}
1949 if protocol == 'sqlite':
1950 connection_parameters['pragmas'] = conf.get('pragmas', {})
1951 for key, value in conf.items():
1952 if key not in ['URL', 'pragmas']:
1953 connection_parameters[key] = value
1955 self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB
1956 self._resource_stack.callback(self._database.close)
1957 try:
1958 self._database.connect()
1959 except peewee.OperationalError as e:
1960 log.critical('Unable to connect to %s', self._database_conf.get('URL'))
1961 raise e
1962 database_proxy.initialize(self._database)
1963 if self.create_standard_tables:
1964 standard_tables = mafw_model_register.get_standard_tables()
1965 self.database.create_tables(standard_tables)
1966 for table in standard_tables:
1967 table.init()
1969 else: # equivalent to: if self._database is not None:
1970 # we got a database, so very likely we are inside a processor list
1971 # the connection has been already set and the initialisation as well.
1972 # nothing else to do here.
1973 # do not put the database in the exit stack. who create it has also to close it.
1974 pass
1976 def start(self) -> None:
1977 """
1978 Start method.
1980 The user can overload this method, including all steps that should be performed at the beginning of the
1981 operation.
1983 If the user decides to overload it, it should include a call to the super method.
1984 """
1985 self._mark_super_call('start')
1986 self.processor_status = ProcessorStatus.Start
1987 self._remove_orphan_files()
1989 def get_items(self) -> Collection[Any]:
1990 """
1991 Returns the item collections for the processor loop.
1993 This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the
1994 database, or a list of files from the disk to be processed.
1996 :return: A collection of items for the loop
1997 :rtype: Collection[Any]
1998 """
1999 return []
2001 def while_condition(self) -> bool:
2002 """
2003 Return the while condition
2005 :return: True if the while loop has to continue, false otherwise.
2006 :rtype: bool
2007 """
2008 return False
2010 def process(self) -> None:
2011 """
2012 Processes the current item.
2014 This is the core of the Processor, where the user has to define the calculations required.
2016 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopItem`
2017 parameter if the user prefers not to rely on thread-local access to :attr:`.Processor.item`, :attr:`.Processor.i_item` and
2018 :attr:`.Processor.n_item`.
2019 """
2020 pass
2022 def accept_item(self) -> None:
2023 """
2024 Does post process actions on a successfully processed item.
2026 Within the :meth:`process`, the user left the looping status to Continue, so it means that everything looks
2027 good and this is the right place to perform database updates or file savings.
2029 .. seealso:
2030 Have a look at :meth:`skip_item` for what to do in case something went wrong.
2032 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult`
2033 parameter for direct access to the processed payload and looping status.
2034 """
2035 pass
2037 def skip_item(self) -> None:
2038 """
2039 Does post process actions on a *NOT* successfully processed item.
2041 Within the :meth:`process`, the user set the looping status to Skip, so it means that something went wrong
2042 and here corrective actions can be taken if needed.
2044 .. seealso:
2045 Have a look at :meth:`accept_item` for what to do in case everything was OK.
2047 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult`
2048 parameter for direct access to the processed payload and looping status.
2049 """
2050 pass
2052 def consumer_start(self) -> None:
2053 """
2054 Executes once in the consumer thread for the parallel queue loop.
2056 This hook mirrors :meth:`start` but is only used with the queue-based parallel loop.
2057 """
2058 pass
2060 def consumer_process(self, loop_result: LoopResult | None = None) -> None:
2061 """
2062 Handle processed items in the consumer thread for the parallel queue loop.
2064 By default this dispatches to :meth:`accept_item` or :meth:`skip_item` based on the looping status. The method
2065 can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult` payload.
2066 """
2067 if loop_result is None:
2068 loop_result = self._build_loop_result(None, 0.0)
2069 if self.looping_status == LoopingStatus.Continue:
2070 self._call_with_optional_payload(self.accept_item, loop_result)
2071 elif self.looping_status == LoopingStatus.Skip: 2071 ↛ exitline 2071 didn't return from function 'consumer_process' because the condition on line 2071 was always true
2072 self._call_with_optional_payload(self.skip_item, loop_result)
2074 def consumer_finish(self) -> None:
2075 """
2076 Executes once in the consumer thread after the queue has been drained.
2077 """
2078 pass
2080 def finish(self) -> None:
2081 """
2082 Concludes the execution.
2084 The user can reimplement this method if there are some conclusive tasks that must be achieved.
2085 Always include a call to super().
2086 """
2087 self._mark_super_call('finish')
2088 self.processor_status = ProcessorStatus.Finish
2089 if self.looping_status == LoopingStatus.Abort:
2090 self.processor_exit_status = ProcessorExitStatus.Aborted
2091 self.print_process_statistics()
2093 def print_process_statistics(self) -> None:
2094 """
2095 Print the process statistics.
2097 A utility method to display the fastest, the slowest and the average timing required to process on a single
2098 item. This is particularly useful when the looping processor is part of a ProcessorList.
2099 """
2100 if len(self._process_durations):
2101 log.info('[cyan] Processed %s items.' % len(self._process_durations))
2102 log.info(
2103 '[cyan] Fastest item process duration: %s '
2104 % pretty_format_duration(min(self._process_durations), n_digits=3)
2105 )
2106 log.info(
2107 '[cyan] Slowest item process duration: %s '
2108 % pretty_format_duration(max(self._process_durations), n_digits=3)
2109 )
2110 log.info(
2111 '[cyan] Average item process duration: %s '
2112 % pretty_format_duration((sum(self._process_durations) / len(self._process_durations)), n_digits=3)
2113 )
2114 if self._wall_clock_start is not None:
2115 total_duration = time.perf_counter() - self._wall_clock_start
2116 else:
2117 total_duration = sum(self._process_durations)
2118 log.info('[cyan] Total process duration: %s' % pretty_format_duration(total_duration, n_digits=3))
2120 def _remove_orphan_files(self) -> None:
2121 """
2122 Remove orphan files.
2124 If a connection to the database is available, then the OrphanFile standard table is queried for all its entries,
2125 and all the files are then removed.
2127 The user can turn off this behaviour by switching the :attr:`~mafw.processor.Processor.remove_orphan_files` to False.
2129 """
2130 if self._database is None or self.remove_orphan_files is False:
2131 # no database connection or no wish to remove orphan files, it does not make sense to continue
2132 return
2134 try:
2135 OrphanFile = cast(MAFwBaseModel, mafw_model_register.get_model('OrphanFile'))
2136 except KeyError:
2137 log.warning('OrphanFile table not found in DB. Please verify database integrity')
2138 return
2140 if TYPE_CHECKING:
2141 assert hasattr(OrphanFile, '_meta')
2143 orphan_files = OrphanFile.select().execute() # type: ignore[no-untyped-call]
2144 if len(orphan_files) != 0:
2145 msg = f'[yellow]Pruning orphan files ({sum(len(f.filenames) for f in orphan_files)})...'
2146 log.info(msg)
2147 for orphan in orphan_files:
2148 # filenames is a list of files:
2149 for f in orphan.filenames:
2150 f.unlink(missing_ok=True)
2152 OrphanFile.delete().execute() # type: ignore[no-untyped-call]
2155class ProcessorList(list[Union['Processor', 'ProcessorList']]):
2156 """
2157 A list like collection of processors.
2159 ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList.
2161 An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError.
2163 Along with an iterable of processors, a new processor list can be built using the following parameters.
2164 """
2166 def __init__(
2167 self,
2168 *args: Processor | ProcessorList,
2169 name: str | None = None,
2170 description: str | None = None,
2171 timer: Timer | None = None,
2172 timer_params: dict[str, Any] | None = None,
2173 user_interface: UserInterfaceBase | None = None,
2174 database: Database | None = None,
2175 database_conf: dict[str, Any] | None = None,
2176 create_standard_tables: bool = True,
2177 ):
2178 """
2179 Constructor parameters:
2181 :param name: The name of the processor list. Defaults to ProcessorList.
2182 :type name: str, Optional
2183 :param description: An optional short description. Default to ProcessorList.
2184 :type description: str, Optional
2185 :param timer: The timer object. If None is provided, a new one will be created. Defaults to None.
2186 :type timer: Timer, Optional
2187 :param timer_params: A dictionary of parameter to build the timer object. Defaults to None.
2188 :type timer_params: dict, Optional
2189 :param user_interface: A user interface. Defaults to None
2190 :type user_interface: UserInterfaceBase, Optional
2191 :param database: A database instance. Defaults to None.
2192 :type database: Database, Optional
2193 :param database_conf: Configuration for the database. Default to None.
2194 :type database_conf: dict, Optional
2195 :param create_standard_tables: Whether or not to create the standard tables. Defaults to True.
2196 :type create_standard_tables: bool, Optional
2197 """
2199 # validate_items takes a tuple of processors, that's why we don't unpack args.
2200 super().__init__(self.validate_items(args))
2201 self._name = name or self.__class__.__name__
2202 self.description = description or self._name
2204 self.timer = timer
2205 self.timer_params = timer_params or {}
2206 self._user_interface = user_interface or ConsoleInterface()
2208 self._resource_stack: contextlib.ExitStack
2209 self._processor_exit_status: ProcessorExitStatus = ProcessorExitStatus.Successful
2210 self.nested_list = False
2211 """
2212 Boolean flag to identify that this list is actually inside another list.
2214 Similarly to the local resource flag for the :class:`.Processor`, this flag prevent the user interface to be
2215 added to the resource stack.
2216 """
2218 # database stuff
2219 self._database: peewee.Database | None = database
2220 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf)
2221 self.create_standard_tables = create_standard_tables
2222 """The boolean flag to proceed or skip with standard table creation and initialisation"""
2224 def __setitem__( # type: ignore[override]
2225 self,
2226 __index: SupportsIndex,
2227 __object: Processor | ProcessorList,
2228 ) -> None:
2229 super().__setitem__(__index, self.validate_item(__object))
2231 def insert(self, __index: SupportsIndex, __object: Processor | ProcessorList) -> None:
2232 """Adds a new processor at the specified index."""
2233 super().insert(__index, self.validate_item(__object))
2235 def append(self, __object: Processor | ProcessorList) -> None:
2236 """Appends a new processor at the end of the list."""
2237 super().append(self.validate_item(__object))
2239 def extend(self, __iterable: Iterable[Processor | ProcessorList]) -> None:
2240 """Extends the processor list with a list of processors."""
2241 if isinstance(__iterable, type(self)):
2242 super().extend(__iterable)
2243 else:
2244 super().extend([self.validate_item(item) for item in __iterable])
2246 @staticmethod
2247 def validate_item(item: Processor | ProcessorList) -> Processor | ProcessorList:
2248 """Validates the item being added."""
2249 if isinstance(item, Processor):
2250 item.local_resource_acquisition = False
2251 return item
2252 elif isinstance(item, ProcessorList):
2253 item.timer_params = dict(suppress_message=True)
2254 item.nested_list = True
2255 return item
2256 else:
2257 raise TypeError(f'Expected Processor or ProcessorList, got {type(item).__name__}')
2259 @staticmethod
2260 def validate_items(items: tuple[Processor | ProcessorList, ...] = ()) -> tuple[Processor | ProcessorList, ...]:
2261 """Validates a tuple of items being added."""
2262 if not items:
2263 return tuple()
2264 return tuple([ProcessorList.validate_item(item) for item in items if item is not None])
2266 @property
2267 def name(self) -> str:
2268 """
2269 The name of the processor list
2271 :return: The name of the processor list
2272 :rtype: str
2273 """
2274 return self._name
2276 @name.setter
2277 def name(self, name: str) -> None:
2278 self._name = name
2280 @property
2281 def processor_exit_status(self) -> ProcessorExitStatus:
2282 """
2283 The processor exit status.
2285 It refers to the whole processor list execution.
2286 """
2287 return self._processor_exit_status
2289 @processor_exit_status.setter
2290 def processor_exit_status(self, status: ProcessorExitStatus) -> None:
2291 self._processor_exit_status = status
2293 @property
2294 def database(self) -> peewee.Database:
2295 """
2296 Returns the database instance
2298 :return: A database instance
2299 :raises MissingDatabase: if a database connection is missing.
2300 """
2301 if self._database is None:
2302 raise MissingDatabase('Database connection not initialized')
2303 return self._database
2305 def execute(self) -> ProcessorExitStatus:
2306 """
2307 Execute the list of processors.
2309 Similarly to the :class:`Processor`, ProcessorList can be executed. In simple words, the execute
2310 method of each processor in the list is called exactly in the same sequence as they were added.
2311 """
2312 with contextlib.ExitStack() as self._resource_stack:
2313 self.acquire_resources()
2314 self._user_interface.create_task(self.name, self.description, completed=0, increment=0, total=len(self))
2315 for i, item in enumerate(self):
2316 if isinstance(item, Processor):
2317 log.info('[bold]Executing [red]%s[/red] processor[/bold]' % item.replica_name)
2318 else:
2319 log.info('[bold]Executing [blue]%s[/blue] processor list[/bold]' % item.name)
2320 self.distribute_resources(item)
2321 item.execute()
2322 self._user_interface.update_task(self.name, increment=1) # i+1, 1, len(self))
2323 self._processor_exit_status = item.processor_exit_status
2324 if self._processor_exit_status == ProcessorExitStatus.Aborted:
2325 msg = 'Processor %s caused the processor list to abort' % item.name
2326 log.error(msg)
2327 raise AbortProcessorException(msg)
2328 self._user_interface.update_task(self.name, completed=len(self), total=len(self))
2329 return self._processor_exit_status
2331 def acquire_resources(self) -> None:
2332 """Acquires external resources."""
2333 # The strategy is similar to the one for processor. if we do get resources already active (not None) then we use
2334 # them, otherwise, we create them and we add them to the resource stack.
2335 if self.timer is None:
2336 self.timer = self._resource_stack.enter_context(Timer(**self.timer_params))
2337 # The user interface is very likely already initialised by the runner.
2338 # But if this is a nested list, then we must not push the user interface in the stack
2339 # otherwise the user interface context (progress for rich) will be stopped at the end
2340 # of the nested list.
2341 if not self.nested_list:
2342 self._resource_stack.enter_context(self._user_interface)
2343 if self._database is None and self._database_conf is None:
2344 # no database, nor configuration.
2345 # we cannot do anything
2346 pass
2347 elif self._database is None and self._database_conf is not None:
2348 # no db, but we got a configuration.
2349 # we can make a db
2350 if 'DBConfiguration' in self._database_conf:
2351 conf = self._database_conf['DBConfiguration'] # type1
2352 else:
2353 conf = self._database_conf # type2
2355 # guess the database type from the URL
2356 protocol = extract_protocol(conf.get('URL'))
2358 # build the connection parameter
2359 # in case of sqlite, we add the pragmas group as well
2360 connection_parameters = {}
2361 if protocol == 'sqlite':
2362 connection_parameters['pragmas'] = conf.get('pragmas', {})
2363 for key, value in conf.items():
2364 if key not in ['URL', 'pragmas']:
2365 connection_parameters[key] = value
2367 self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB
2368 try:
2369 self._database.connect()
2370 self._resource_stack.callback(self._database.close)
2371 except peewee.OperationalError as e:
2372 log.critical('Unable to connect to %s', self._database_conf.get('URL'))
2373 raise e
2374 database_proxy.initialize(self._database)
2375 if self.create_standard_tables:
2376 standard_tables = mafw_model_register.get_standard_tables()
2377 self.database.create_tables(standard_tables)
2378 for table in standard_tables:
2379 table.init()
2380 else: # equiv to if self._database is not None:
2381 # we got a database, so very likely we are inside a processor list
2382 # the connection has been already set and the initialisation as well.
2383 # nothing else to do here.
2384 pass
2386 def distribute_resources(self, processor: Processor | Self) -> None:
2387 """Distributes the external resources to the items in the list."""
2388 processor.timer = self.timer
2389 processor._user_interface = self._user_interface
2390 processor._database = self._database