# Copyright 2025–2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core
functionality of the MAFw.
"""
from __future__ import annotations
import contextlib
import inspect
import logging
import os
import queue
import threading
import time
import warnings
from collections import OrderedDict
from collections.abc import Callable, Iterator
from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
from copy import copy, deepcopy
from functools import wraps
from itertools import count
from typing import (
TYPE_CHECKING,
Any,
Collection,
Generic,
Iterable,
Self,
SupportsIndex,
TypeVar,
Union,
cast,
get_args,
get_origin,
get_type_hints,
)
import peewee
from peewee import Database
# noinspection PyUnresolvedReferences
from playhouse.db_url import connect
import mafw.db.db_filter
from mafw.active import Active
from mafw.db.db_model import MAFwBaseModel, database_proxy, mafw_model_register
from mafw.enumerators import LoopingStatus, LoopType, ProcessorExitStatus, ProcessorStatus
from mafw.mafw_errors import (
AbortProcessorException,
MissingDatabase,
MissingOverloadedMethod,
MissingSuperCall,
ProcessorParameterError,
)
from mafw.models.filter_schema import FilterSchema
from mafw.models.loop_payloads import LoopItem, LoopResult
from mafw.models.parameter_schema import ParameterSchema
from mafw.models.processor_schema import ProcessorSchema
from mafw.timer import Timer, pretty_format_duration
from mafw.tools.generics import deep_update
from mafw.tools.parallel import is_free_threading
from mafw.tools.regexp import extract_protocol
from mafw.ui.abstract_user_interface import UserInterfaceBase
from mafw.ui.console_user_interface import ConsoleInterface
log = logging.getLogger(__name__)
ParameterType = TypeVar('ParameterType')
"""Generic variable type for the :class:`ActiveParameter` and :class:`PassiveParameter`."""
[docs]
def validate_database_conf(database_conf: dict[str, Any] | None = None) -> dict[str, Any] | None:
"""
Validates the database configuration.
:param database_conf: The input database configuration. Defaults to None.
:type database_conf: dict, Optional
:return: Either the validated database configuration or None if it is invalid.
:rtype: dict, None
"""
if database_conf is None:
return None
# dict is mutable, if I change it inside the function, I change it also outside.
conf = database_conf.copy()
if 'DBConfiguration' in conf:
# the database_conf is the steering file. Extract the DBConfiguration table
conf = conf['DBConfiguration']
required_fields = ['URL']
if all([field in conf for field in required_fields]):
return database_conf
else:
return None
[docs]
class PassiveParameter(Generic[ParameterType]):
"""
An helper class to store processor parameter value and metadata.
This class is the private interface used by the :class:`ActiveParameter` descriptor to store its value and metadata.
When a new :class:`.ActiveParameter` is added to a class, an instance of a PassiveParameter is added to the
processor parameter :attr:`register <.Processor._processor_parameters>`.
.. seealso::
An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor
parameters <parameters>`
.. versionchanged:: v2.0.0
User should only use :class:`ActiveParameter` and never manually instantiate :class:`PassiveParameter`.
"""
def __init__(
self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = ''
):
"""
Constructor parameters:
:param name: The name of the parameter. It must be a valid python identifier.
:type name: str
:param value: The set value of the parameter. If None, then the default value will be used. Defaults to None.
:type value: ParameterType, Optional
:param default: The default value for the parameter. It is used if the :attr:`value` is not provided. Defaults to None.
:type default: ParameterType, Optional
:param help_doc: A brief explanation of the parameter.
:type help_doc: str, Optional
:raises ProcessorParameterError: if both `value` and `default` are not provided or if `name` is not a valid identifier.
"""
if not name.isidentifier():
raise ProcessorParameterError(f'{name} is not a valid python identifier.')
self.name = name
if value is not None:
self._value: ParameterType = value
self._is_set = True
self._is_optional = False
elif default is not None:
self._value = default
self._is_set = False
self._is_optional = True
else:
raise ProcessorParameterError('Processor parameter cannot have both value and default value set to None')
self.doc = help_doc
def __rich_repr__(self) -> Iterator[Any]:
yield 'name', self.name
yield 'value', self.value, None
yield 'help_doc', self.doc, ''
@property
def is_set(self) -> bool:
"""
Property to check if the value has been set.
It is useful for optional parameter to see if the current value is the default one, or if the user set it.
"""
return self._is_set
@property
def value(self) -> ParameterType:
"""
Gets the parameter value.
:return: The parameter value.
:rtype: ParameterType
:raises ProcessorParameterError: if both value and default were not defined.
"""
return self._value
@value.setter
def value(self, value: ParameterType) -> None:
"""
Sets the parameter value.
:param value: The value to be set.
:type value: ParameterType
"""
self._value = value
self._is_set = True
@property
def is_optional(self) -> bool:
"""
Property to check if the parameter is optional.
:return: True if the parameter is optional
:rtype: bool
"""
return self._is_optional
def __repr__(self) -> str:
args = ['name', 'value', 'doc']
values = [getattr(self, arg) for arg in args]
return '{klass}({attrs})'.format(
klass=self.__class__.__name__,
attrs=', '.join('{}={!r}'.format(k, v) for k, v in zip(args, values)),
)
F = TypeVar('F', bound=Callable[..., Any])
"""Type variable for generic callable with any return value."""
[docs]
def ensure_parameter_registration(func: F) -> F:
"""Decorator to ensure that before calling `func` the processor parameters have been registered."""
@wraps(func)
def wrapper(*args: Processor, **kwargs: Any) -> F:
# the first positional arguments must be self
if len(args) == 0:
raise ProcessorParameterError(
'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.'
)
self = args[0]
if not isinstance(self, Processor):
raise ProcessorParameterError(
'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.'
)
if self._parameter_registered is False:
self._register_parameters()
return cast(F, func(*args, **kwargs))
return cast(F, wrapper)
_ParameterRegistry = OrderedDict[str, 'ActiveParameter[Any]']
[docs]
def _copy_parent_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry:
"""
Build an ordered dictionary of ActiveParameters inherited from base classes.
"""
definitions: _ParameterRegistry = OrderedDict()
for base in owner.__mro__[1:]:
base_definitions = getattr(base, '_parameter_definitions', None)
if base_definitions:
for name, descriptor in base_definitions.items():
definitions[name] = descriptor
return definitions
[docs]
def _ensure_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry:
"""
Return the class-level parameter registry, creating it by copying base definitions if necessary.
"""
definitions: _ParameterRegistry | None = owner.__dict__.get('_parameter_definitions')
if definitions is not None:
return definitions
definitions = _copy_parent_parameter_definitions(owner)
setattr(owner, '_parameter_definitions', definitions)
return definitions
[docs]
def _validate_filter_schema(owner: type[Processor]) -> None:
"""
Validate the optional :class:`~mafw.models.filter_schema.FilterSchema` declared on a Processor.
"""
schema = getattr(owner, '_filter_schema', None)
if schema is None:
return
if not isinstance(schema, FilterSchema):
raise ProcessorParameterError('Processor._filter_schema must be a FilterSchema instance')
root_model = schema.root_model
if not (inspect.isclass(root_model) and issubclass(root_model, MAFwBaseModel)):
raise ProcessorParameterError('FilterSchema.root_model must inherit from MAFwBaseModel')
seen_models = {root_model}
for model in schema.allowed_models:
if not (inspect.isclass(model) and issubclass(model, MAFwBaseModel)):
raise ProcessorParameterError('FilterSchema.allowed_models must contain MAFwBaseModel subclasses')
if model in seen_models:
raise ProcessorParameterError(f'Model {model!r} already declared in FilterSchema')
seen_models.add(model)
[docs]
class ActiveParameter(Generic[ParameterType]):
r"""
The public interface to the processor parameter.
The behaviour of a :class:`Processor` can be customised by using processor parameters. The value of these
parameters can be either set via a configuration file or directly when creating the class.
If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an
ActiveParameter instance in this way:
.. code-block::
class MyProcessor(Processor):
# this is the input folder
input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files')
def __init__(self, *args, **kwargs):
super().__init(*args, **kwargs)
# change the input folder to something else
self.input_folder = Path(r'D:\data')
# get the value of the parameter
print(self.input_folder)
The ActiveParameter is a `descriptor <https://docs.python.org/3/glossary.html#term-descriptor>`_, it means that
when you create one of them, a lot of work is done behind the scene.
In simple words, a processor parameter is made by two objects: a public interface where the user can easily
access the value of the parameter and a private interface where all other information (default, documentation...)
is also stored.
The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as
in the code snippet above, the private interface is automatically created and will stay in the class instance
until the end of the class lifetime.
To access the private interface, the user can use the :meth:`Processor.get_parameter` method using the parameter
name as a key.
The user can assign to an ActiveParameter almost any name. There are just a few invalid parameter names that are
used for other purposes. The list of reserved names is available :attr:`here <reserved_names>`. Should the user
inadvertently use a reserved named, a :exc:`.ProcessorParameterError` is raised.
.. seealso::
The private counter part in the :class:`PassiveParameter`.
An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor
parameters <parameters>`
The list of :attr:`reserved names <reserved_names>`.
"""
reserved_names: list[str] = ['__logic__', '__filter__', '__new_only__', '__inheritance__', '__enable__']
"""A list of names that cannot be used as processor parameter names.
- `__logic__`
- `__filter__`
- `__new_only__`
- `__inheritance__`
- `__enable__`
"""
def __init__(
self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = ''
):
"""
Constructor parameters:
:param name: The name of the parameter.
:type name: str
:param value: The initial value of the parameter. Defaults to None.
:type value: ParameterType, Optional
:param default: The default value of the parameter, to be used when ``value`` is not set., Defaults to None.
:type default: ParameterType, Optional
:param help_doc: An explanatory text describing the parameter.
:type help_doc: str, Optional
"""
self._value = value
self._default = default
self._help_doc = help_doc
self._external_name = self._validate_name(name)
[docs]
def _validate_name(self, proposed_name: str) -> str:
"""
Validate that the proposed parameter name is not in the list of forbidden names.
This private method checks if the provided name is allowed for use as a processor parameter.
Names that are listed in :attr:`reserved_names` cannot be used as parameter names.
:param proposed_name: The name to be validated for use as a processor parameter.
:type proposed_name: str
:return: The validated name if it passes the forbidden names check.
:rtype: str
:raises ProcessorParameterError: If the proposed name is in the list of forbidden names.
"""
if proposed_name not in self.reserved_names:
return proposed_name
raise ProcessorParameterError(f'Attempt to use a forbidden name ({proposed_name})')
def __set_name__(self, owner: type[Processor], name: str) -> None:
self.public_name = name
self.private_name = f'param_{name}'
self._owner = owner
definitions = _ensure_parameter_definitions(owner)
existing = definitions.get(self._external_name)
if existing is not None and getattr(existing, '_owner', None) is owner:
if not hasattr(owner, '_parameter_definition_error'):
setattr(
owner,
'_parameter_definition_error',
ProcessorParameterError(f'Duplicated parameter name ({self._external_name}).'),
)
return
definitions[self._external_name] = self
def __get__(self, obj: Processor, obj_type: type[Processor]) -> ActiveParameter[ParameterType] | ParameterType:
if obj is None:
return self
# retrieve instance-level passive parameter
param = cast(PassiveParameter[ParameterType], obj._processor_parameters[self._external_name])
return param.value
def __set__(self, obj: Processor, value: ParameterType) -> None:
param = obj._processor_parameters[self._external_name]
param.value = value
[docs]
def to_schema(self) -> ParameterSchema:
"""
Returns the static schema describing this parameter.
The schema is derived solely from the descriptor metadata and does not instantiate the owning processor.
"""
annotation = self._resolve_parameter_annotation()
default_value = self._schema_default_value()
help_text = self._help_doc or None
is_list = self._is_list_annotation(annotation, default_value)
is_dict = self._is_dict_annotation(annotation, default_value)
return ParameterSchema(
name=self._external_name,
annotation=annotation,
default=default_value,
help=help_text,
is_list=is_list,
is_dict=is_dict,
)
def _resolve_parameter_annotation(self) -> type | None:
if not hasattr(self, 'public_name') or getattr(self, '_owner', None) is None:
return None
target = getattr(self, '_owner', None)
try:
hints = get_type_hints(target)
except Exception:
hints = {}
hint = hints.get(self.public_name)
if hint is None:
fallback = self._default if self._default is not None else self._value
if fallback is not None:
return type(fallback)
return None
origin = get_origin(hint)
if origin is ActiveParameter:
args = get_args(hint)
if args:
return cast(type | None, args[0])
return None
return cast(type | None, hint)
def _schema_default_value(self) -> Any:
candidate = self._default if self._default is not None else self._value
if candidate is None:
return None
try:
return deepcopy(candidate)
except Exception:
return candidate
def _is_list_annotation(self, annotation: type | None, default_value: Any) -> bool:
return self._matches_container(annotation, default_value, list)
def _is_dict_annotation(self, annotation: type | None, default_value: Any) -> bool:
return self._matches_container(annotation, default_value, dict)
def _matches_container(self, annotation: type | None, default_value: Any, container: type) -> bool:
type_hint = annotation
if type_hint is None and default_value is not None:
type_hint = type(default_value)
if type_hint is None:
return False
origin = get_origin(type_hint)
if origin is container:
return True
if isinstance(type_hint, type) and issubclass(type_hint, container):
return True
return False
# noinspection PyProtectedMember
[docs]
class Processor(metaclass=ProcessorMeta):
"""
The basic processor.
A very comprehensive description of what a Processor does and how it works is available at :ref:`doc_processor`.
"""
processor_status = Active(ProcessorStatus.Unknown)
"""Processor execution status"""
progress_message: str = f'{__qualname__} is working'
"""Message displayed to show the progress.
It can be customized with information about the current item in the loop by overloading the
:meth:`format_progress_message`."""
#: List of methods that should invoke their super implementation when overridden.
_methods_to_be_checked_for_super: tuple[str, ...] = ('start', 'finish')
[docs]
@classmethod
def parameter_schema(cls) -> list[ParameterSchema]:
"""
Return the ordered static schema for the processor parameters defined on the class.
The schema is derived without instantiating the processor, keeping toolchains free from side effects.
"""
definitions = getattr(cls, '_parameter_definitions', None)
if definitions is None:
definitions = _ensure_parameter_definitions(cls)
return [param.to_schema() for param in definitions.values()]
[docs]
@classmethod
def filter_schema(cls) -> FilterSchema | None:
"""
Optional metadata describing the models available for filtering.
"""
return getattr(cls, '_filter_schema', None)
[docs]
@classmethod
def processor_schema(cls) -> ProcessorSchema:
"""
Return the static schema for the processor.
"""
return ProcessorSchema(parameters=cls.parameter_schema(), filter=cls.filter_schema())
_ids = count(0)
"""A counter for all processor instances"""
new_defaults: dict[str, Any] = {}
"""
A dictionary containing defaults value for the parameters to be overridden
.. versionadded:: v2.0.0
"""
new_only_flag = 'new_only'
def __init__(
self,
name: str | None = None,
description: str | None = None,
config: dict[str, Any] | None = None,
looper: LoopType | str = LoopType.ForLoop,
user_interface: UserInterfaceBase | None = None,
timer: Timer | None = None,
timer_params: dict[str, Any] | None = None,
database: Database | None = None,
database_conf: dict[str, Any] | None = None,
remove_orphan_files: bool = True,
replica_id: str | None = None,
create_standard_tables: bool = True,
max_workers: int | None = None,
queue_size: int | None = None,
queue_batch_size: int | None = None,
*args: Any,
**kwargs: Any,
) -> None:
"""
Constructor parameters
:param name: The name of the processor. If None is provided, the class name is used instead. Defaults to None.
:type name: str, Optional
:param description: A short description of the processor task. Defaults to the processor name.
:type description: str, Optional
:param config: A configuration dictionary for this processor. Defaults to None.
:type config: dict, Optional
:param looper: Enumerator to define the looping type. Defaults to LoopType.ForLoop
:type looper: LoopType, Optional
:param user_interface: A user interface instance to be used by the processor to interact with the user.
:type user_interface: UserInterfaceBase, Optional
:param timer: A timer object to measure process duration.
:type timer: Timer, Optional
:param timer_params: Parameters for the timer object.
:type timer_params: dict, Optional
:param database: A database instance. Defaults to None.
:type database: Database, Optional
:param database_conf: Configuration for the database. Default to None.
:type database_conf: dict, Optional
:param remove_orphan_files: Boolean flag to remove files on disc without a reference to the database.
See :ref:`std_tables` and :meth:`~mafw.processor.Processor._remove_orphan_files`. Defaults to True
:type remove_orphan_files: bool, Optional
:param replica_id: The replica identifier for the current processor.
:type replica_id: str, Optional
:param create_standard_tables: Boolean flag to create std tables on disk. Defaults to True
:type create_standard_tables: bool, Optional
:param max_workers: Number of worker threads for parallel loops.
:type max_workers: int, Optional
:param queue_size: Maximum size of the internal queue for the queue-based parallel loop.
:type queue_size: int, Optional
:param queue_batch_size: Number of items processed per worker task in the queue-based parallel loop.
:type queue_batch_size: int, Optional
:param kwargs: Keyword arguments that can be used to set processor parameters.
"""
self.name = name or self.__class__.__name__
"""The name of the processor."""
self.unique_id = next(self._ids)
"""A unique identifier representing how many instances of Processor has been created."""
self.replica_id = replica_id
"""
The replica identifier specified in the constructor
.. versionadded:: v2.0.0
"""
self.description = description or self.name
"""A short description of the processor task."""
self._item: Any = None
"""The current item of the loop."""
self._looping_status: LoopingStatus = LoopingStatus.Continue
"""The looping status for the main thread."""
self._thread_local = threading.local()
"""Thread-local storage for loop attributes in parallel execution.
.. versionadded:: v2.1.0
"""
self._wall_clock_start: float | None = None
"""Timestamp when the looping execution began."""
self.processor_exit_status = ProcessorExitStatus.Successful
"""Processor exit status"""
self.loop_type: LoopType = LoopType(looper)
"""
The loop type.
The value of this parameter can also be changed by the :func:`~mafw.decorators.execution_workflow` decorator
factory.
See :class:`~mafw.enumerators.LoopType` for more details.
"""
self.create_standard_tables = create_standard_tables
"""The boolean flag to proceed or skip with standard table creation and initialisation"""
self.max_workers = max_workers if max_workers is not None else self._compute_default_max_workers()
"""Maximum number of worker threads used in parallel loops."""
computed_queue_size = max(1, self.max_workers * 2)
self.queue_size = queue_size if queue_size is not None else computed_queue_size
"""Maximum size of the queue used by :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`."""
self.queue_batch_size = max(1, queue_batch_size or 1)
"""Number of items processed per worker task in :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`."""
# private attributes
self._config: dict[str, Any] = {} # deepcopy(config) if config is not None else {}
"""
A dictionary containing the processor configuration object.
This dictionary is populated with configuration parameter (always type 2) during the
:meth:`._load_parameter_configuration` method.
The original value of the configuration dictionary that is passed to the constructor is stored in
:attr:`._orig_config`.
.. versionchanged:: v2.0.0
Now it is an empty dictionary until the :meth:`._load_parameter_configuration` is called.
"""
self._orig_config = deepcopy(config) if config is not None else {}
"""
A copy of the original configuration dictionary.
.. versionadded:: v2.0.0
"""
self._processor_parameters: OrderedDict[str, PassiveParameter[Any]] = OrderedDict()
"""
A dictionary to store all the processor parameter instances.
The name of the parameter is used as a key, while for the value an instance of the
:class:`.PassiveParameter` is used.
"""
# wait for answer from SO
self._parameter_registered = False
"""A boolean flag to confirm successful parameter registration."""
self._kwargs = kwargs
# loops attributes
self._i_item: int = -1
self._n_item: int | None = -1
self._process_durations: list[float] = []
self._super_call_flags: dict[str, bool] = {}
"""Tracks super-call usage for methods that require it."""
# resource stack
self._resource_stack: contextlib.ExitStack
self._resource_acquisition: bool = True
# processor timer
self.timer: Timer | None = timer
self._timer_parameters: dict[str, Any] = timer_params or {}
# user interface
if user_interface is None:
self._user_interface: UserInterfaceBase = ConsoleInterface()
else:
self._user_interface = user_interface
# database stuff
self._database: peewee.Database | None = database
self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf)
self.filter_register: mafw.db.db_filter.ProcessorFilter = mafw.db.db_filter.ProcessorFilter()
"""The DB filter register of the Processor."""
self.remove_orphan_files: bool = remove_orphan_files
"""The flag to remove or protect the orphan files. Defaults to True"""
self.initialise_parameters()
[docs]
def initialise_parameters(self) -> None:
"""
Initialises processor parameters by registering them and applying various configuration sources.
This method orchestrates the parameter initialisation process by performing the following steps in order:
#. Registers processor parameters defined as :class:`ActiveParameter` instances
#. Overrides default parameter values with any configured overrides
#. Loads parameter configuration from the processor's configuration dictionary
#. Applies keyword arguments as parameter overrides
The method ensures that all processor parameters are properly configured before the processor
execution begins. It is automatically called during processor initialisation and should not
typically be called directly by users.
.. seealso::
:meth:`_register_parameters`, :meth:`_override_defaults`,
:meth:`_load_parameter_configuration`, :meth:`_overrule_kws_parameters`
.. versionadded:: v2.0.0
"""
self._register_parameters()
self._override_defaults()
self._load_parameter_configuration()
self._overrule_kws_parameters()
def __post_init__(self) -> None:
"""
Performs post-initialisation tasks for the processor.
This method is automatically called after the processor initialisation is complete.
It performs validation checks on overloaded methods and sets the initial processor status.
.. seealso::
:meth:`validate_configuration`, :meth:`_check_method_overload`,
:attr:`~mafw.processor.Processor.processor_status`
.. versionchanged:: v2.0.0
Moved the parameter initialisation to :meth:`initialise_parameters` and now executed as last step of the
init method.
Added the validate configuration check. This method should silently check that configuration provided
with the processor parameters is valid. If not, a :exc:`.ProcessorParameterError` is raised.
"""
self.validate_configuration()
self._check_method_overload()
self.processor_status = ProcessorStatus.Init
[docs]
def _register_parameters(self) -> None:
"""
Register processor parameters defined as ActiveParameter instances in the class.
This private method scans the class definition for any :class:`.ActiveParameter` instances and creates
corresponding :class:`.PassiveParameter` instances to store the actual parameter values and metadata.
It ensures that all processor parameters are properly initialised and available for configuration
through the processor's configuration system.
The method checks for duplicate parameter names and raises a :exc:`.ProcessorParameterError` if duplicates
are detected. It also sets the internal flag :attr:`._parameter_registered` to True once registration is
complete.
.. note::
This method is automatically called during processor initialisation and should not be called directly
by users.
.. seealso::
:class:`.Processor`, :meth:`.Processor._override_defaults`,
:meth:`.Processor._load_parameter_configuration`, :meth:`.Processor._overrule_kws_parameters`
.. versionchanged:: v2.0.0
Only :class:`ActiveParameter` are not registered. The use of :class:`PassiveParameter` is only meant to
store the value and metadata of the active counter part.
"""
if self._parameter_registered:
return
definitions = getattr(self.__class__, '_parameter_definitions', None)
if definitions is None:
definitions = _ensure_parameter_definitions(self.__class__)
for attr in definitions.values():
ext_name = attr._external_name
if ext_name in self._processor_parameters:
raise ProcessorParameterError(f'Duplicated parameter name ({ext_name}).')
self._processor_parameters[ext_name] = PassiveParameter(
ext_name, attr._value, attr._default, attr._help_doc
)
self._parameter_registered = True
[docs]
def _override_defaults(self) -> None:
"""
Override default parameter values with values from :attr:`new_defaults`.
This private method iterates through the :attr:`new_defaults` dictionary and updates
the corresponding processor parameters with new values. Only parameters that exist
in both :attr:`new_defaults` and :attr:`_processor_parameters` are updated.
.. versionadded:: v2.0.0
"""
for key, value in self.new_defaults.items():
if key in self._processor_parameters:
self._processor_parameters[key].value = value
[docs]
def _reset_parameters(self) -> None:
"""
Reset processor parameters to their initial state.
This method clears all currently registered processor parameters and triggers
a fresh registration process. It's useful when parameter configurations need
to be reinitialized or when parameters have been modified and need to be reset.
.. seealso::
:meth:`_register_parameters`, :meth:`_register_parameters`
"""
self._processor_parameters = OrderedDict()
self._parameter_registered = False
self._register_parameters()
[docs]
@ensure_parameter_registration
def _load_parameter_configuration(self) -> None:
"""
Load processor parameter configuration from the internal configuration dictionary.
This method processes the processor's configuration dictionary to set parameter values.
It handles two configuration formats:
1. Nested format: ``{'ProcessorName': {'param1': value1, ...}}``
2. Flat format: ``{'param1': value1, ...}``
The method also handles filter configurations by collecting filter table names
and deferring their initialisation until after the global filter has been processed.
.. versionchanged:: v2.0.0
For option 1 combining configuration from name and name_replica
:raises ProcessorParameterError: If a parameter in the configuration is not registered.
.. seealso::
:meth:`mafw.db.db_filter.ModelFilter.from_conf`
"""
original_config = copy(self._orig_config)
flt_list = []
# by default the flag new_only is set to true
# unless the user specify differently in the general section of the steering file
self.filter_register.new_only = original_config.get(self.new_only_flag, True)
# we need to check if the configuration object is of type 1 or type 2
if any([name for name in [self.name, self.replica_name] if name in original_config]):
# one of the two names (the base or the replica) must be present in case of option 1
# we start from the base name. If not there, then take an empty dict
option1_config_base = original_config.get(self.name, {})
if self.name != self.replica_name:
# if there is the replica name, then update the base configuration with the replica value
# we get the replica configuration
option1_config_replica = original_config.get(self.replica_name, {})
# let's check if the user wants to have inheritance default
# by default is True
inheritance = option1_config_replica.get('__inheritance__', True)
if inheritance:
# we update the base with the replica without changing the base
option1_config_update = deep_update(option1_config_base, option1_config_replica, copy_first=True)
else:
# we do not use the base with the replica specific, we pass the replica as the updated
option1_config_update = option1_config_replica
# we modify the type 1 original so that the table for the replica has the updated configuration
# this is used for the filter configuration at the end.
original_config[self.replica_name] = option1_config_update
else:
# there is not replica, so the update is equal to the base.
option1_config_update = option1_config_base
self._config = option1_config_update
else:
# for type 2 we are already good to go
self._config = original_config
filter_config = deepcopy(original_config)
def _sanitize_filter_config(processor_name: str) -> None:
processor_config = filter_config.get(processor_name)
if not isinstance(processor_config, dict):
return
filter_table = processor_config.get('__filter__')
if not isinstance(filter_table, dict):
return
sanitized_table: dict[str, Any] = {}
for model_name, model_config in filter_table.items():
if not isinstance(model_config, dict):
sanitized_table[model_name] = model_config
continue
model_config_copy = deepcopy(model_config)
if not bool(model_config_copy.pop('__enable__', True)):
continue
for field_name, field_value in list(model_config_copy.items()):
if (
isinstance(field_value, dict)
and not ('op' in field_value and 'value' in field_value)
and '__enable__' in field_value
):
field_enabled = bool(field_value.pop('__enable__', True))
if not field_enabled:
model_config_copy.pop(field_name, None)
conditionals = model_config_copy.get('__conditional__')
if isinstance(conditionals, list):
filtered_conditionals: list[Any] = []
for conditional in conditionals:
if isinstance(conditional, dict):
conditional_enabled = bool(conditional.pop('__enable__', True))
if not conditional_enabled:
continue
filtered_conditionals.append(conditional)
model_config_copy['__conditional__'] = filtered_conditionals
sanitized_table[model_name] = model_config_copy
processor_config['__filter__'] = sanitized_table
_sanitize_filter_config(self.replica_name)
for key, value in self._config.items():
if key in self._processor_parameters:
type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO
self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above
elif key == '__filter__':
# we got a filter table!
# it should contain one table for each model
# we add all the names to a list for deferred initialisation
flt_table = self._config[key]
if isinstance(flt_table, dict):
for model_name, model_config in flt_table.items():
if isinstance(model_config, dict) and not bool(model_config.get('__enable__', True)):
continue
flt_list.append(f'{self.replica_name}.__filter__.{model_name}')
elif key == '__logic__':
# we got a filter logic string
# we store it in the filter register directly
self.filter_register._logic = self._config[key]
elif key == '__new_only__':
# we got a new only boolean, we store it in the filter register
self.filter_register.new_only = self._config[key]
# only now, after the configuration file has been totally read, we can do the real filter initialisation.
# This is to be sure that if there were a GlobalFilter table, this has been read.
# The global filter region will be used as a starting point for the construction of a new filter (default
# parameter in the from_conf class method).
for flt_name in flt_list:
model_name = flt_name.split('.')[-1]
self.filter_register[model_name] = mafw.db.db_filter.ModelFilter.from_conf(flt_name, filter_config)
[docs]
@ensure_parameter_registration
def _overrule_kws_parameters(self) -> None:
"""
Override processor parameters with values from keyword arguments.
This method applies parameter values passed as keyword arguments during processor
initialisation. It ensures that the parameter types match the expected types
before setting the values.
.. seealso::
:meth:`_register_parameters`, :meth:`_load_parameter_configuration`,
:meth:`set_parameter_value`
"""
for key, value in self._kwargs.items():
if key in self._processor_parameters:
type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO
self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above
[docs]
def validate_configuration(self) -> None:
"""
Validate the configuration provided via the processor parameters.
Method to be implemented by subclasses if a configuration validation is needed.
The method should silently check for the proper configuration, if this is not obtained,
then the :exc:`.InvalidConfigurationError` must be raised.
.. versionadded:: v2.0.0
"""
pass
[docs]
def _check_method_overload(self) -> None:
"""
Check if the user overloaded the required methods.
Depending on the loop type, the user must overload different methods.
This method is doing the check and if the required methods are not overloaded a warning is emitted.
"""
methods_dict: dict[LoopType, list[str]] = {
LoopType.WhileLoop: ['while_condition'],
LoopType.ForLoop: ['get_items'],
LoopType.ParallelForLoop: ['get_items'],
LoopType.ParallelForLoopWithQueue: ['get_items'],
}
required_methods: list[str] = methods_dict.get(self.loop_type, [])
for method in required_methods:
if getattr(type(self), method) == getattr(Processor, method):
warnings.warn(
MissingOverloadedMethod(
'%s was not overloaded. The process execution workflow might not work.' % method
)
)
[docs]
@classmethod
def _apply_super_call_wrappers(cls) -> None:
"""
Wraps overridden methods so the class can detect whether they called `super()`.
This method runs immediately after the class is created (see :class:`.ProcessorMeta`). For every
method that we expect scientists to extend (start, finish, etc.) we replace their implementation with a
wrapper. The wrapper resets a per-instance flag, invokes the real override, and only after that method returns
it checks whether the real `super()` was ever reached; if not, it emits a :class:`~mafw.mafw_errors.MissingSuperCall`
warning. In other words, the wiring happens while the subclass is defined, and the actual smoke test executes
each time the method runs.
"""
methods = getattr(cls, '_methods_to_be_checked_for_super', ())
for method in methods:
if method not in cls.__dict__:
continue
if not any(hasattr(base, method) for base in cls.__mro__[1:]):
continue
original: Callable[..., Any] = getattr(cls, method)
# we are adding an attribute to the method, it looks strange, but it is possible
# in this way we avoid wrapping the same method more than once.
if getattr(original, '_mafw_super_check_wrapped', False):
continue
def _make_wrapper(
__orig: Callable[..., Any],
__method: str, # pragma: no cover
) -> Callable[..., Any]:
@wraps(__orig)
def _wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
# self is the processor instance.
# _reset_super_call_flag is resetting the call status
# for method as False
self._reset_super_call_flag(__method)
# in the base method, the super call flag is set to True
result = __orig(self, *args, **kwargs)
# if the super call flag is not True, then it is because the base
# method was not called.
# emit the warning and return the original method return value
if not self._did_call_super(__method):
warnings.warn(
MissingSuperCall(
'The overloaded %s is not invoking its super method. The processor might not work.'
% __method
)
)
return result
return _wrapper
# we create a wrapped method from the original
wrapper = _make_wrapper(original, method)
# we set a flag for the method to avoid multiple wrapping
wrapper._mafw_super_check_wrapped = True # type: ignore[attr-defined]
setattr(cls, method, wrapper)
[docs]
def _reset_super_call_flag(self, method: str) -> None:
"""
Reset the super-call flag for a method.
"""
self._super_call_flags[method] = False
[docs]
def _mark_super_call(self, method: str) -> None:
"""
Mark a method as having called its super implementation.
"""
self._super_call_flags[method] = True
[docs]
def _did_call_super(self, method: str) -> bool:
"""
Check whether a method called its super implementation.
"""
return self._super_call_flags.get(method, False)
[docs]
@ensure_parameter_registration
def dump_parameter_configuration(self, option: int = 1) -> dict[str, Any]:
"""
Dumps the processor parameter values in a dictionary.
The snippet below explains the meaning of `option`.
.. code-block:: python
# option 1
conf_dict1 = {
'Processor': {'param1': 5, 'input_table': 'my_table'}
}
# option 2
conf_dict2 = {'param1': 5, 'input_table': 'my_table'}
In the case of option 1, the replica aware name (:meth:`.replica_name`) will be used as a key for the
configuration dictionary.
.. versionchanged:: v2.0.0
With option 1, using :meth:`.replica_name` instead of :attr:`~.Processor.name` as key of the configuration
dictionary.
:param option: Select the dictionary style. Defaults to 1.
:type option: int, Optional
:return: A parameter configuration dictionary.
:rtype: dict
"""
inner_dict = {}
for key, value in self._processor_parameters.items():
inner_dict[key] = value.value
if option == 1:
outer_dict = {self.replica_name: inner_dict}
elif option == 2:
outer_dict = inner_dict
else:
log.warning('Unknown option %s. Using option 2' % option)
outer_dict = inner_dict
return outer_dict
[docs]
@ensure_parameter_registration
def get_parameter(self, name: str) -> PassiveParameter[Any]:
"""
Gets the processor parameter named name.
:param name: The name of the parameter.
:type name: str
:return: The processor parameter
:rtype: PassiveParameter
:raises ProcessorParameterError: If a parameter with `name` is not registered.
"""
if name in self._processor_parameters:
return self._processor_parameters[name]
raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
[docs]
@ensure_parameter_registration
def get_parameters(self) -> dict[str, PassiveParameter[Any]]:
"""
Returns the full dictionary of registered parameters for this processor.
Useful when dumping the parameter specification in a configuration file, for example.
:return: The dictionary with the registered parameters.
:rtype: dict[str, PassiveParameter[ParameterType]
"""
return self._processor_parameters
[docs]
@ensure_parameter_registration
def delete_parameter(self, name: str) -> None:
"""
Deletes a processor parameter.
:param name: The name of the parameter to be deleted.
:type name: str
:raises ProcessorParameterError: If a parameter with `name` is not registered.
"""
if name in self._processor_parameters:
del self._processor_parameters[name]
else:
raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
[docs]
@ensure_parameter_registration
def set_parameter_value(self, name: str, value: ParameterType) -> None:
"""
Sets the value of a processor parameter.
:param name: The name of the parameter to be deleted.
:type name: str
:param value: The value to be assigned to the parameter.
:type value: ParameterType
:raises ProcessorParameterError: If a parameter with `name` is not registered.
"""
if name in self._processor_parameters:
self._processor_parameters[name].value = value
else:
raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}')
[docs]
def get_filter(self, model_name: str) -> mafw.db.db_filter.ModelFilter:
"""
Returns a registered :class:`~mafw.db.db_filter.ModelFilter` via the model name.
If a filter for the provided model_name does not exist, a KeyError is raised.
:param model_name: The model name for which the filter will be returned.
:type model_name: str
:return: The registered filter
:rtype: mafw.db.db_filter.ModelFilter
:raises: KeyError is a filter with the give name is not found.
"""
return self.filter_register[model_name]
[docs]
def on_processor_status_change(self, old_status: ProcessorStatus, new_status: ProcessorStatus) -> None:
"""
Callback invoked when the processor status is changed.
:param old_status: The old processor status.
:type old_status: ProcessorStatus
:param new_status: The new processor status.
:type new_status: ProcessorStatus
"""
self._user_interface.change_of_processor_status(self.name, old_status, new_status)
[docs]
def on_looping_status_set(self, status: LoopingStatus) -> None:
"""
Call back invoked when the looping status is set.
The user can overload this method according to the needs.
:param status: The set looping status.
:type status: LoopingStatus
"""
if status == LoopingStatus.Skip:
log.warning('Skipping item %s' % self.i_item)
elif status == LoopingStatus.Abort:
log.error('Looping has been aborted')
elif status == LoopingStatus.Quit:
log.warning('Looping has been quit')
[docs]
@contextlib.contextmanager
def _thread_loop_context(self, i_item: int, n_item: int, item: Any) -> Iterator[None]:
"""
Context manager to set thread-local loop attributes for parallel execution.
:param i_item: Item index for the loop.
:type i_item: int
:param n_item: Total number of items in the loop.
:type n_item: int
:param item: Item payload.
:type item: Any
"""
self._thread_local.in_worker = True
self._thread_local.i_item = i_item
self._thread_local.n_item = n_item
self._thread_local.item = item
self._thread_local.looping_status = LoopingStatus.Continue
try:
yield
finally:
for name in ('i_item', 'n_item', 'item', 'looping_status', 'in_worker'):
if hasattr(self._thread_local, name):
delattr(self._thread_local, name)
[docs]
def _in_thread_context(self) -> bool:
"""Return True when running inside a parallel worker thread."""
return bool(getattr(self._thread_local, 'in_worker', False))
@property
def item(self) -> Any:
"""The current item of the loop."""
if self._in_thread_context() and hasattr(self._thread_local, 'item'):
return self._thread_local.item
return self._item
@item.setter
def item(self, value: Any) -> None:
if self._in_thread_context():
self._thread_local.item = value
else:
self._item = value
@property
def i_item(self) -> int:
"""The enumeration of the current item being processed."""
if self._in_thread_context() and hasattr(self._thread_local, 'i_item'):
return cast(int, self._thread_local.i_item)
return self._i_item
@i_item.setter
def i_item(self, value: int) -> None:
if self._in_thread_context():
self._thread_local.i_item = value
else:
self._i_item = value
@property
def n_item(self) -> int | None:
"""The total number of items to be processed or None for an undefined loop"""
if self._in_thread_context() and hasattr(self._thread_local, 'n_item'):
return cast(int | None, self._thread_local.n_item)
return self._n_item
@n_item.setter
def n_item(self, value: int | None) -> None:
if self._in_thread_context():
self._thread_local.n_item = value
else:
self._n_item = value
@property
def looping_status(self) -> LoopingStatus:
"""The looping status for the current thread context."""
if self._in_thread_context():
value = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue)
else:
value = self._looping_status
if hasattr(self, 'on_looping_status_get'):
self.on_looping_status_get(value)
return value
@looping_status.setter
def looping_status(self, value: LoopingStatus) -> None:
if self._in_thread_context():
current = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue)
self._thread_local.looping_status = value
else:
current = self._looping_status
self._looping_status = value
if current != value:
if hasattr(self, 'on_looping_status_change'):
self.on_looping_status_change(current, value)
else:
if hasattr(self, 'on_looping_status_set'):
self.on_looping_status_set(value)
@property
def unique_name(self) -> str:
"""Returns the unique name for the processor."""
return f'{self.name}_{self.unique_id}'
@property
def replica_name(self) -> str:
"""
Returns the replica aware name of the processor.
If no replica_id is specified, then return the pure name, otherwise join the two string using the '#' symbol.
.. versionadded:: v2.0.0
:return: The replica aware name of the processor.
:rtype: str
"""
if self.replica_id is None:
return self.name
else:
return self.name + '#' + self.replica_id
@property
def local_resource_acquisition(self) -> bool:
"""
Checks if resources should be acquired locally.
When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external
resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute
resources among the whole processor list. In this case, resources should not be acquired locally by the
single processor, but from the parent execution context.
:return: True if resources are to be acquired locally by the processor. False, otherwise.
:rtype: bool
"""
return self._resource_acquisition
@local_resource_acquisition.setter
def local_resource_acquisition(self, flag: bool) -> None:
self._resource_acquisition = flag
@property
def database(self) -> peewee.Database:
"""
Returns the database instance
:return: A database object.
:raises MissingDatabase: If the database connection has not been established.
"""
if self._database is None:
raise MissingDatabase('Database connection not initialized')
return self._database
[docs]
def execute(self) -> None:
"""Execute the processor tasks.
This method works as a dispatcher, reassigning the call to a more specific execution implementation depending
on the :attr:`~mafw.processor.Processor.loop_type`.
"""
dispatcher: dict[LoopType, Callable[[], None]] = {
LoopType.SingleLoop: self._execute_single,
LoopType.ForLoop: self._execute_for_loop,
LoopType.ParallelForLoop: self._execute_for_loop,
LoopType.ParallelForLoopWithQueue: self._execute_for_loop,
LoopType.WhileLoop: self._execute_while_loop,
}
dispatcher[self.loop_type]()
[docs]
@staticmethod
def _compute_default_max_workers() -> int:
"""
Helper to compute the default number of workers for parallel execution.
Returns min(32, cpu_count + 4).
"""
return min(32, (os.cpu_count() or 1) + 4)
[docs]
def _execute_single(self) -> None:
"""Execute the processor in single mode.
**Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
appropriate implementation depending on the processor LoopType.
"""
with contextlib.ExitStack() as self._resource_stack:
self.acquire_resources()
self._wall_clock_start = time.perf_counter()
self.start()
self.processor_status = ProcessorStatus.Run
self.process()
self.finish()
[docs]
def _execute_for_loop(self) -> None:
"""Executes the processor within a for loop.
**Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
appropriate implementation depending on the processor LoopType.
"""
with contextlib.ExitStack() as self._resource_stack:
self.acquire_resources()
# we cannot use a Timer context here to measure the whole duration because it spans
# over different methods. Instead we directly use a performance clock.
self._wall_clock_start = time.perf_counter()
self.start()
# get the input item list and filter it
item_list = self.get_items()
# get the total number of items.
self.n_item = len(item_list)
# turn the processor status to run
self.processor_status = ProcessorStatus.Run
# create a new task in the progress bar interface
self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item)
# verify if we can use parallel for loop. If not, switch back to a serial for loop.
if (
self.loop_type in (LoopType.ParallelForLoop, LoopType.ParallelForLoopWithQueue)
and not is_free_threading()
):
warnings.warn(
'Parallel for-loop requires free-threading; falling back to serial for loop.',
stacklevel=2,
)
self.loop_type = LoopType.ForLoop
if self.loop_type == LoopType.ParallelForLoopWithQueue:
self._process_parallel_for_loop_with_queue(item_list)
elif self.loop_type == LoopType.ParallelForLoop:
self._process_parallel_for_loop(item_list)
else:
self._process_for_loop(item_list)
self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item)
self.finish()
[docs]
def _build_loop_item(self) -> LoopItem:
"""
Build a LoopItem payload for the current loop context.
:return: The LoopItem payload.
:rtype: mafw.models.LoopItem
"""
return LoopItem(self.i_item, int(self.n_item or 0), self.item)
[docs]
def _build_loop_result(self, payload: Any, duration: float) -> LoopResult:
"""
Build a LoopResult payload for the current loop context.
:param payload: The optional payload returned by process.
:type payload: Any
:param duration: Wall-clock duration of the item processing.
:type duration: float
:return: The LoopResult payload.
:rtype: mafw.models.LoopResult
"""
return LoopResult(self.i_item, int(self.n_item or 0), self.looping_status, payload, duration)
[docs]
def _payload_annotation_matches(self, annotation: Any) -> bool:
"""
Check whether an annotation matches LoopItem or LoopResult.
:param annotation: The annotation to inspect.
:type annotation: Any
:return: True if the annotation matches LoopItem or LoopResult.
:rtype: bool
"""
if annotation in (LoopItem, LoopResult):
return True
origin = get_origin(annotation)
if origin is Union:
return any(arg in (LoopItem, LoopResult) for arg in get_args(annotation))
return False
[docs]
def _call_with_optional_payload(self, func: Callable[..., Any], payload: Any) -> Any:
"""
Invoke a processor hook with an optional payload if the signature allows it.
:param func: The callable to invoke.
:type func: Callable
:param payload: The payload to pass if supported.
:type payload: Any
:return: The callable return value.
:rtype: Any
"""
signature = inspect.signature(func)
parameters = list(signature.parameters.values())
if parameters and parameters[0].name == 'self':
parameters = parameters[1:]
if not parameters:
return func()
if any(param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD) for param in parameters):
return func(payload)
first = parameters[0]
if first.kind in (first.POSITIONAL_ONLY, first.POSITIONAL_OR_KEYWORD):
if first.name in ('item', 'loop_item', 'result', 'loop_result', 'payload'):
return func(payload)
if self._payload_annotation_matches(first.annotation):
return func(payload)
return func()
[docs]
def _process_for_loop(self, item_list: Collection[Any]) -> None:
"""
Process items with the standard serial for-loop.
:param item_list: The list of items to process.
:type item_list: Collection[Any]
"""
for self.i_item, self.item in enumerate(item_list):
self.looping_status = LoopingStatus.Continue
self.format_progress_message()
self._user_interface.display_progress_message(self.progress_message, self.i_item, self.n_item, 0.1)
loop_item = self._build_loop_item()
with Timer(suppress_message=True) as timer:
payload = self._call_with_optional_payload(self.process, loop_item)
self._process_durations.append(timer.duration)
loop_result = self._build_loop_result(payload, timer.duration)
if self.looping_status == LoopingStatus.Continue:
self._call_with_optional_payload(self.accept_item, loop_result)
elif self.looping_status == LoopingStatus.Skip:
self._call_with_optional_payload(self.skip_item, loop_result)
else: # Abort or Quit
break
self._user_interface.update_task(self.replica_name, increment=1)
[docs]
def _process_parallel_for_loop(self, item_list: Collection[Any]) -> None:
"""
Process items in parallel using a thread pool.
:param item_list: The list of items to process.
:type item_list: Collection[Any]
"""
max_workers = self.max_workers
abort_event = threading.Event()
abort_lock = threading.Lock()
abort_status: LoopingStatus | None = None
def _set_abort(status: LoopingStatus) -> None:
nonlocal abort_status
with abort_lock:
if abort_status == LoopingStatus.Abort:
return
if status == LoopingStatus.Abort:
abort_status = LoopingStatus.Abort
elif abort_status is None:
abort_status = LoopingStatus.Quit
abort_event.set()
def _worker(i_item: int, item: Any) -> tuple[int, Any, LoopingStatus, Any, float]:
with self._thread_loop_context(i_item, int(self.n_item or 0), item):
self.looping_status = LoopingStatus.Continue
loop_item = self._build_loop_item()
with Timer(suppress_message=True) as timer:
payload = self._call_with_optional_payload(self.process, loop_item)
status = self.looping_status
if status in (LoopingStatus.Abort, LoopingStatus.Quit):
_set_abort(status)
return i_item, item, status, payload, timer.duration
if abort_event.is_set():
return i_item, item, status, payload, timer.duration
loop_result = self._build_loop_result(payload, timer.duration)
if status == LoopingStatus.Continue:
self._call_with_optional_payload(self.accept_item, loop_result)
elif status == LoopingStatus.Skip:
self._call_with_optional_payload(self.skip_item, loop_result)
return i_item, item, status, payload, timer.duration
pending: set[Future[tuple[int, Any, LoopingStatus, Any, float]]] = set()
items_iter = iter(enumerate(item_list))
def _submit_next() -> bool:
try:
idx, itm = next(items_iter)
except StopIteration:
return False
fut: Future[tuple[int, Any, LoopingStatus, Any, float]] = executor.submit(_worker, idx, itm)
pending.add(fut)
return True
with ThreadPoolExecutor(max_workers=max_workers) as executor:
while len(pending) < max_workers and _submit_next():
pass
while pending:
done, _ = wait(pending, return_when=FIRST_COMPLETED)
for fut in done:
pending.remove(fut)
i_item, item, status, _payload, duration = fut.result()
self._process_durations.append(duration)
self.item = item
self.i_item = i_item
self.format_progress_message()
self._user_interface.display_progress_message(self.progress_message, i_item, self.n_item, 0.1)
self._user_interface.update_task(self.replica_name, increment=1)
if abort_event.is_set():
continue
while len(pending) < max_workers and _submit_next():
pass
if abort_status is not None:
self.looping_status = abort_status
[docs]
def _process_parallel_for_loop_with_queue(self, item_list: Collection[Any]) -> None:
"""
Process items in parallel using a producer/consumer queue.
Items are processed in worker threads, while a dedicated consumer thread handles the post-processing hooks.
:param item_list: The list of items to process.
:type item_list: Collection[Any]
"""
# the idea is the following, we have a single consumer thread that is constantly trying to pull
# items out of a shared queue and we have a pool of producer threads that are putting items in the queue as
# long as there is space into it (back-pressure).
# it the queue is full, then the producer threads are set to sleep for a short interval of time and waken up
# again after some times for another attempt to write the output in the queue.
# when there are no more items to execute, the main thread is pushing a sentinel object into the queue
# marking the end of the processing, so that the consumer thread can be gracefully terminated.
max_workers = self.max_workers
result_queue: queue.Queue[object] = queue.Queue(maxsize=self.queue_size)
abort_event = threading.Event()
abort_lock = threading.Lock()
abort_status: LoopingStatus | None = None
sentinel = object()
def _set_abort(status: LoopingStatus) -> None:
nonlocal abort_status
with abort_lock:
if abort_status == LoopingStatus.Abort:
return
if status == LoopingStatus.Abort:
abort_status = LoopingStatus.Abort
elif abort_status is None:
abort_status = LoopingStatus.Quit
abort_event.set()
def _get_abort_status() -> LoopingStatus | None:
with abort_lock:
return abort_status
def _worker(batch_items: list[tuple[int, Any]]) -> None:
batch_results: list[tuple[LoopItem, LoopResult]] = []
for i_item, item in batch_items:
if abort_event.is_set():
break
with self._thread_loop_context(i_item, int(self.n_item or 0), item):
self.looping_status = LoopingStatus.Continue
loop_item = self._build_loop_item()
with Timer(suppress_message=True) as timer:
payload = self._call_with_optional_payload(self.process, loop_item)
status = self.looping_status
if status in (LoopingStatus.Abort, LoopingStatus.Quit):
_set_abort(status)
loop_result = self._build_loop_result(payload, timer.duration)
batch_results.append((loop_item, loop_result))
if status in (LoopingStatus.Abort, LoopingStatus.Quit):
break
if batch_results:
result_queue.put(batch_results)
def _consumer() -> None:
self.consumer_start()
try:
while True:
queued = result_queue.get()
if queued is sentinel:
break
batch_results = cast(list[tuple[LoopItem, LoopResult]], queued)
for loop_item, loop_result in batch_results:
with self._thread_loop_context(loop_item.i_item, loop_item.n_item, loop_item.payload):
self.looping_status = loop_result.looping_status
self.format_progress_message()
self._user_interface.display_progress_message(
self.progress_message, loop_item.i_item, self.n_item, 0.1
)
self._process_durations.append(loop_result.duration)
self._user_interface.update_task(self.replica_name, increment=1)
if _get_abort_status() is None and loop_result.looping_status in (
LoopingStatus.Continue,
LoopingStatus.Skip,
):
self._call_with_optional_payload(self.consumer_process, loop_result)
finally:
self.consumer_finish()
pending: set[Future[None]] = set()
items_iter = iter(enumerate(item_list))
def _submit_next() -> bool:
if abort_event.is_set():
return False
batch_items: list[tuple[int, Any]] = []
for _ in range(self.queue_batch_size):
try:
idx, itm = next(items_iter)
except StopIteration:
break
batch_items.append((idx, itm))
if not batch_items:
return False
fut: Future[None] = executor.submit(_worker, batch_items)
pending.add(fut)
return True
consumer_thread = threading.Thread(target=_consumer, name=f'{self.name}-consumer')
consumer_thread.start()
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
while len(pending) < max_workers and _submit_next():
pass
while pending:
done, _ = wait(pending, return_when=FIRST_COMPLETED)
for fut in done:
pending.remove(fut)
fut.result()
if abort_event.is_set():
continue
while len(pending) < max_workers and _submit_next():
pass
finally:
result_queue.put(sentinel)
consumer_thread.join()
if abort_status is not None:
self.looping_status = abort_status
[docs]
def _execute_while_loop(self) -> None:
"""Executes the processor within a while loop.
**Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the
appropriate implementation depending on the processor LoopType.
"""
# it is a while loop, so a priori we don't know how many iterations we will have, nevertheless, we
# can have a progress bar with 'total' set to None, so that it goes in the so-called indeterminate
# progress. See https://rich.readthedocs.io/en/stable/progress.html#indeterminate-progress
# we initialise n_item outside the loop, because it is possible that the user has a way to define n_item
# and he can do it within the loop.
self.n_item = None
with contextlib.ExitStack() as self._resource_stack:
self.acquire_resources()
self._wall_clock_start = time.perf_counter()
self.start()
# turn the processor status to run
self.processor_status = ProcessorStatus.Run
self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item)
# we are ready to start the looping. For statistics, we can count the iterations.
self.i_item = 0
while self.while_condition():
# set the looping status to Continue. The user may want to change it in the process method.
self.looping_status = LoopingStatus.Continue
# send a message to the user interface
self.format_progress_message()
self._user_interface.display_progress_message(
self.progress_message, self.i_item, self.n_item, frequency=0.1
)
# wrap the execution in a timer to measure how long it too for statistical reasons.
with Timer(suppress_message=True) as timer:
self.process()
self._process_durations.append(timer.duration)
# modify the loop depending on the looping status
if self.looping_status == LoopingStatus.Continue:
self.accept_item()
elif self.looping_status == LoopingStatus.Skip:
self.skip_item()
else: # equiv to if self.looping_status in [LoopingStatus.Abort, LoopingStatus.Quit]:
break
# update the progress bar. if self.n_item is still None, then the progress bar will show indeterminate
# progress.
self._user_interface.update_task(self.replica_name, self.i_item + 1, 1, self.n_item)
# now that the loop is finished, we know how many elements we processed
if self.n_item is None:
self.n_item = self.i_item
self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item)
self.finish()
[docs]
def acquire_resources(self) -> None:
"""
Acquires resources and add them to the resource stack.
The whole body of the :meth:`execute` method is within a context structure. The idea is that if any part of
the code inside should throw an exception that breaking the execution, we want to be sure that all stateful
resources are properly closed.
Since the number of resources may vary, the variable number of nested `with` statements has been replaced by
an `ExitStack <https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack>`_. Resources,
like open files, timers, db connections, need to be added to the resource stacks in this method.
In the case a processor is being executed within a :class:`~mafw.processor.ProcessorList`, then some resources might be shared, and
for this reason they are not added to the stack. This selection can be done via the private
:attr:`local_resource_acquisition`. This is normally True, meaning that the processor will handle its resources
independently, but when the processor is executed from a :class:`~mafw.processor.ProcessorList`, this flag is automatically turned to
False.
If the user wants to add additional resources, he has to overload this method calling the super to preserve
the original resources. If he wants to have shared resources among different processors executed from inside
a processor list, he has to overload the :class:`~mafw.processor.ProcessorList` class as well.
"""
# Both the timer and the user interface will be added to the processor resource stack only if the processor is
# set to acquire its own resources.
# The timer and the user interface have in-built enter and exit method.
if self._resource_acquisition:
self.timer = self._resource_stack.enter_context(Timer(**self._timer_parameters))
self._resource_stack.enter_context(self._user_interface)
# For the database it is is a bit different.
if self._database is None and self._database_conf is None:
# no database, nor configuration.
# we cannot do anything
pass
elif self._database is None and self._database_conf is not None:
# no db, but we got a configuration.
# we can make a db.
# This processor will try to make a valid connection, and in case it succeeds, it will add the database to
# the resource stack.
# The database has an enter method, but it is to generate transaction.
# We will add the database.close via the callback method.
if 'DBConfiguration' in self._database_conf:
conf = self._database_conf['DBConfiguration'] # type1
else:
conf = self._database_conf # type2
# guess the database type from the URL
protocol = extract_protocol(conf.get('URL'))
# build the connection parameter
# in case of sqlite, we add the pragmas group as well
connection_parameters = {}
if protocol == 'sqlite':
connection_parameters['pragmas'] = conf.get('pragmas', {})
for key, value in conf.items():
if key not in ['URL', 'pragmas']:
connection_parameters[key] = value
self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB
self._resource_stack.callback(self._database.close)
try:
self._database.connect()
except peewee.OperationalError as e:
log.critical('Unable to connect to %s', self._database_conf.get('URL'))
raise e
database_proxy.initialize(self._database)
if self.create_standard_tables:
standard_tables = mafw_model_register.get_standard_tables()
self.database.create_tables(standard_tables)
for table in standard_tables:
table.init()
else: # equivalent to: if self._database is not None:
# we got a database, so very likely we are inside a processor list
# the connection has been already set and the initialisation as well.
# nothing else to do here.
# do not put the database in the exit stack. who create it has also to close it.
pass
[docs]
def start(self) -> None:
"""
Start method.
The user can overload this method, including all steps that should be performed at the beginning of the
operation.
If the user decides to overload it, it should include a call to the super method.
"""
self._mark_super_call('start')
self.processor_status = ProcessorStatus.Start
self._remove_orphan_files()
[docs]
def get_items(self) -> Collection[Any]:
"""
Returns the item collections for the processor loop.
This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the
database, or a list of files from the disk to be processed.
:return: A collection of items for the loop
:rtype: Collection[Any]
"""
return []
[docs]
def while_condition(self) -> bool:
"""
Return the while condition
:return: True if the while loop has to continue, false otherwise.
:rtype: bool
"""
return False
[docs]
def process(self) -> None:
"""
Processes the current item.
This is the core of the Processor, where the user has to define the calculations required.
In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopItem`
parameter if the user prefers not to rely on thread-local access to :attr:`.Processor.item`, :attr:`.Processor.i_item` and
:attr:`.Processor.n_item`.
"""
pass
[docs]
def accept_item(self) -> None:
"""
Does post process actions on a successfully processed item.
Within the :meth:`process`, the user left the looping status to Continue, so it means that everything looks
good and this is the right place to perform database updates or file savings.
.. seealso:
Have a look at :meth:`skip_item` for what to do in case something went wrong.
In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult`
parameter for direct access to the processed payload and looping status.
"""
pass
[docs]
def skip_item(self) -> None:
"""
Does post process actions on a *NOT* successfully processed item.
Within the :meth:`process`, the user set the looping status to Skip, so it means that something went wrong
and here corrective actions can be taken if needed.
.. seealso:
Have a look at :meth:`accept_item` for what to do in case everything was OK.
In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult`
parameter for direct access to the processed payload and looping status.
"""
pass
[docs]
def consumer_start(self) -> None:
"""
Executes once in the consumer thread for the parallel queue loop.
This hook mirrors :meth:`start` but is only used with the queue-based parallel loop.
"""
pass
[docs]
def consumer_process(self, loop_result: LoopResult | None = None) -> None:
"""
Handle processed items in the consumer thread for the parallel queue loop.
By default this dispatches to :meth:`accept_item` or :meth:`skip_item` based on the looping status. The method
can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult` payload.
"""
if loop_result is None:
loop_result = self._build_loop_result(None, 0.0)
if self.looping_status == LoopingStatus.Continue:
self._call_with_optional_payload(self.accept_item, loop_result)
elif self.looping_status == LoopingStatus.Skip:
self._call_with_optional_payload(self.skip_item, loop_result)
[docs]
def consumer_finish(self) -> None:
"""
Executes once in the consumer thread after the queue has been drained.
"""
pass
[docs]
def finish(self) -> None:
"""
Concludes the execution.
The user can reimplement this method if there are some conclusive tasks that must be achieved.
Always include a call to super().
"""
self._mark_super_call('finish')
self.processor_status = ProcessorStatus.Finish
if self.looping_status == LoopingStatus.Abort:
self.processor_exit_status = ProcessorExitStatus.Aborted
self.print_process_statistics()
[docs]
def print_process_statistics(self) -> None:
"""
Print the process statistics.
A utility method to display the fastest, the slowest and the average timing required to process on a single
item. This is particularly useful when the looping processor is part of a ProcessorList.
"""
if len(self._process_durations):
log.info('[cyan] Processed %s items.' % len(self._process_durations))
log.info(
'[cyan] Fastest item process duration: %s '
% pretty_format_duration(min(self._process_durations), n_digits=3)
)
log.info(
'[cyan] Slowest item process duration: %s '
% pretty_format_duration(max(self._process_durations), n_digits=3)
)
log.info(
'[cyan] Average item process duration: %s '
% pretty_format_duration((sum(self._process_durations) / len(self._process_durations)), n_digits=3)
)
if self._wall_clock_start is not None:
total_duration = time.perf_counter() - self._wall_clock_start
else:
total_duration = sum(self._process_durations)
log.info('[cyan] Total process duration: %s' % pretty_format_duration(total_duration, n_digits=3))
[docs]
def _remove_orphan_files(self) -> None:
"""
Remove orphan files.
If a connection to the database is available, then the OrphanFile standard table is queried for all its entries,
and all the files are then removed.
The user can turn off this behaviour by switching the :attr:`~mafw.processor.Processor.remove_orphan_files` to False.
"""
if self._database is None or self.remove_orphan_files is False:
# no database connection or no wish to remove orphan files, it does not make sense to continue
return
try:
OrphanFile = cast(MAFwBaseModel, mafw_model_register.get_model('OrphanFile'))
except KeyError:
log.warning('OrphanFile table not found in DB. Please verify database integrity')
return
if TYPE_CHECKING:
assert hasattr(OrphanFile, '_meta')
orphan_files = OrphanFile.select().execute() # type: ignore[no-untyped-call]
if len(orphan_files) != 0:
msg = f'[yellow]Pruning orphan files ({sum(len(f.filenames) for f in orphan_files)})...'
log.info(msg)
for orphan in orphan_files:
# filenames is a list of files:
for f in orphan.filenames:
f.unlink(missing_ok=True)
OrphanFile.delete().execute() # type: ignore[no-untyped-call]
[docs]
class ProcessorList(list[Union['Processor', 'ProcessorList']]):
"""
A list like collection of processors.
ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList.
An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError.
Along with an iterable of processors, a new processor list can be built using the following parameters.
"""
def __init__(
self,
*args: Processor | ProcessorList,
name: str | None = None,
description: str | None = None,
timer: Timer | None = None,
timer_params: dict[str, Any] | None = None,
user_interface: UserInterfaceBase | None = None,
database: Database | None = None,
database_conf: dict[str, Any] | None = None,
create_standard_tables: bool = True,
):
"""
Constructor parameters:
:param name: The name of the processor list. Defaults to ProcessorList.
:type name: str, Optional
:param description: An optional short description. Default to ProcessorList.
:type description: str, Optional
:param timer: The timer object. If None is provided, a new one will be created. Defaults to None.
:type timer: Timer, Optional
:param timer_params: A dictionary of parameter to build the timer object. Defaults to None.
:type timer_params: dict, Optional
:param user_interface: A user interface. Defaults to None
:type user_interface: UserInterfaceBase, Optional
:param database: A database instance. Defaults to None.
:type database: Database, Optional
:param database_conf: Configuration for the database. Default to None.
:type database_conf: dict, Optional
:param create_standard_tables: Whether or not to create the standard tables. Defaults to True.
:type create_standard_tables: bool, Optional
"""
# validate_items takes a tuple of processors, that's why we don't unpack args.
super().__init__(self.validate_items(args))
self._name = name or self.__class__.__name__
self.description = description or self._name
self.timer = timer
self.timer_params = timer_params or {}
self._user_interface = user_interface or ConsoleInterface()
self._resource_stack: contextlib.ExitStack
self._processor_exit_status: ProcessorExitStatus = ProcessorExitStatus.Successful
self.nested_list = False
"""
Boolean flag to identify that this list is actually inside another list.
Similarly to the local resource flag for the :class:`.Processor`, this flag prevent the user interface to be
added to the resource stack.
"""
# database stuff
self._database: peewee.Database | None = database
self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf)
self.create_standard_tables = create_standard_tables
"""The boolean flag to proceed or skip with standard table creation and initialisation"""
def __setitem__( # type: ignore[override]
self,
__index: SupportsIndex,
__object: Processor | ProcessorList,
) -> None:
super().__setitem__(__index, self.validate_item(__object))
[docs]
def insert(self, __index: SupportsIndex, __object: Processor | ProcessorList) -> None:
"""Adds a new processor at the specified index."""
super().insert(__index, self.validate_item(__object))
[docs]
def append(self, __object: Processor | ProcessorList) -> None:
"""Appends a new processor at the end of the list."""
super().append(self.validate_item(__object))
[docs]
def extend(self, __iterable: Iterable[Processor | ProcessorList]) -> None:
"""Extends the processor list with a list of processors."""
if isinstance(__iterable, type(self)):
super().extend(__iterable)
else:
super().extend([self.validate_item(item) for item in __iterable])
[docs]
@staticmethod
def validate_item(item: Processor | ProcessorList) -> Processor | ProcessorList:
"""Validates the item being added."""
if isinstance(item, Processor):
item.local_resource_acquisition = False
return item
elif isinstance(item, ProcessorList):
item.timer_params = dict(suppress_message=True)
item.nested_list = True
return item
else:
raise TypeError(f'Expected Processor or ProcessorList, got {type(item).__name__}')
[docs]
@staticmethod
def validate_items(items: tuple[Processor | ProcessorList, ...] = ()) -> tuple[Processor | ProcessorList, ...]:
"""Validates a tuple of items being added."""
if not items:
return tuple()
return tuple([ProcessorList.validate_item(item) for item in items if item is not None])
@property
def name(self) -> str:
"""
The name of the processor list
:return: The name of the processor list
:rtype: str
"""
return self._name
@name.setter
def name(self, name: str) -> None:
self._name = name
@property
def processor_exit_status(self) -> ProcessorExitStatus:
"""
The processor exit status.
It refers to the whole processor list execution.
"""
return self._processor_exit_status
@processor_exit_status.setter
def processor_exit_status(self, status: ProcessorExitStatus) -> None:
self._processor_exit_status = status
@property
def database(self) -> peewee.Database:
"""
Returns the database instance
:return: A database instance
:raises MissingDatabase: if a database connection is missing.
"""
if self._database is None:
raise MissingDatabase('Database connection not initialized')
return self._database
[docs]
def execute(self) -> ProcessorExitStatus:
"""
Execute the list of processors.
Similarly to the :class:`Processor`, ProcessorList can be executed. In simple words, the execute
method of each processor in the list is called exactly in the same sequence as they were added.
"""
with contextlib.ExitStack() as self._resource_stack:
self.acquire_resources()
self._user_interface.create_task(self.name, self.description, completed=0, increment=0, total=len(self))
for i, item in enumerate(self):
if isinstance(item, Processor):
log.info('[bold]Executing [red]%s[/red] processor[/bold]' % item.replica_name)
else:
log.info('[bold]Executing [blue]%s[/blue] processor list[/bold]' % item.name)
self.distribute_resources(item)
item.execute()
self._user_interface.update_task(self.name, increment=1) # i+1, 1, len(self))
self._processor_exit_status = item.processor_exit_status
if self._processor_exit_status == ProcessorExitStatus.Aborted:
msg = 'Processor %s caused the processor list to abort' % item.name
log.error(msg)
raise AbortProcessorException(msg)
self._user_interface.update_task(self.name, completed=len(self), total=len(self))
return self._processor_exit_status
[docs]
def acquire_resources(self) -> None:
"""Acquires external resources."""
# The strategy is similar to the one for processor. if we do get resources already active (not None) then we use
# them, otherwise, we create them and we add them to the resource stack.
if self.timer is None:
self.timer = self._resource_stack.enter_context(Timer(**self.timer_params))
# The user interface is very likely already initialised by the runner.
# But if this is a nested list, then we must not push the user interface in the stack
# otherwise the user interface context (progress for rich) will be stopped at the end
# of the nested list.
if not self.nested_list:
self._resource_stack.enter_context(self._user_interface)
if self._database is None and self._database_conf is None:
# no database, nor configuration.
# we cannot do anything
pass
elif self._database is None and self._database_conf is not None:
# no db, but we got a configuration.
# we can make a db
if 'DBConfiguration' in self._database_conf:
conf = self._database_conf['DBConfiguration'] # type1
else:
conf = self._database_conf # type2
# guess the database type from the URL
protocol = extract_protocol(conf.get('URL'))
# build the connection parameter
# in case of sqlite, we add the pragmas group as well
connection_parameters = {}
if protocol == 'sqlite':
connection_parameters['pragmas'] = conf.get('pragmas', {})
for key, value in conf.items():
if key not in ['URL', 'pragmas']:
connection_parameters[key] = value
self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB
try:
self._database.connect()
self._resource_stack.callback(self._database.close)
except peewee.OperationalError as e:
log.critical('Unable to connect to %s', self._database_conf.get('URL'))
raise e
database_proxy.initialize(self._database)
if self.create_standard_tables:
standard_tables = mafw_model_register.get_standard_tables()
self.database.create_tables(standard_tables)
for table in standard_tables:
table.init()
else: # equiv to if self._database is not None:
# we got a database, so very likely we are inside a processor list
# the connection has been already set and the initialisation as well.
# nothing else to do here.
pass
[docs]
def distribute_resources(self, processor: Processor | Self) -> None:
"""Distributes the external resources to the items in the list."""
processor.timer = self.timer
processor._user_interface = self._user_interface
processor._database = self._database