# Copyright 2025–2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Mixin utilities for tuning loop execution parameters via steering files.
This module defines :class:`LoopTuningMixin`, which exposes loop operational settings as
:class:`~mafw.processor.ActiveParameter` instances. This allows temporary tuning through
steering files without modifying processor code.
:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
.. versionadded:: v2.2.0
"""
from __future__ import annotations
from typing import Any, cast
from mafw.enumerators import LoopType
from mafw.mafw_errors import ProcessorParameterError
from mafw.processor import ActiveParameter, Processor
[docs]
class LoopTuningMixin:
"""
Provide steering-file access to loop operational parameters.
This mixin is meant to be combined with :class:`~mafw.processor.Processor` subclasses when
tuning parallel for-loop performance. Once optimal values are found, users can remove the mixin
and pass the values via the processor constructor.
"""
tuning_loop_type: ActiveParameter[str] = ActiveParameter(
'loop_type',
default=LoopType.ForLoop.value,
help_doc='Loop type to use: for_loop, parallel_for_loop, parallel_for_loop_with_queue.',
)
"""Loop type used by the processor."""
tuning_max_workers: ActiveParameter[int] = ActiveParameter(
'max_workers',
default=Processor._compute_default_max_workers(),
help_doc='Number of worker threads for parallel loops.',
)
"""Number of worker threads for parallel loops."""
tuning_queue_size: ActiveParameter[int] = ActiveParameter(
'queue_size',
default=max(1, Processor._compute_default_max_workers() * 2),
help_doc='Queue size for the queue-based parallel loop.',
)
"""Queue size used by :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`."""
tuning_queue_batch_size: ActiveParameter[int] = ActiveParameter(
'queue_batch_size',
default=1,
help_doc='Batch size for queue processing in the queue-based parallel loop.',
)
"""Batch size for queue processing in the queue-based parallel loop."""
[docs]
def initialise_parameters(self) -> None:
"""
Initialises parameters and applies loop-tuning overrides if provided.
The tuning parameters override constructor settings only when they are explicitly set in the
steering file or via keyword arguments.
"""
processor = cast(Processor, self)
Processor.initialise_parameters(processor)
loop_type_param = processor.get_parameter('loop_type')
if loop_type_param.is_set:
processor.loop_type = self._validate_loop_type(loop_type_param.value)
max_workers_param = processor.get_parameter('max_workers')
max_workers_set = max_workers_param.is_set
if max_workers_set:
processor.max_workers = int(max_workers_param.value)
queue_size_param = processor.get_parameter('queue_size')
if queue_size_param.is_set:
processor.queue_size = int(queue_size_param.value)
elif max_workers_set:
processor.queue_size = max(1, processor.max_workers * 2)
queue_batch_param = processor.get_parameter('queue_batch_size')
if queue_batch_param.is_set:
processor.queue_batch_size = max(1, int(queue_batch_param.value))
[docs]
@staticmethod
def _validate_loop_type(value: Any) -> LoopType:
"""
Validate loop type values supported by loop tuning.
:param value: The loop type value to validate.
:type value: Any
:return: The validated loop type.
:rtype: LoopType
:raises ProcessorParameterError: If the loop type is not supported.
"""
try:
loop_type = value if isinstance(value, LoopType) else LoopType(str(value))
except (TypeError, ValueError) as exc:
raise ProcessorParameterError(f'Invalid loop type ({value}).') from exc
allowed = {
LoopType.ForLoop,
LoopType.ParallelForLoop,
LoopType.ParallelForLoopWithQueue,
}
if loop_type not in allowed:
raise ProcessorParameterError(
'Loop tuning only supports for_loop, parallel_for_loop, and parallel_for_loop_with_queue.'
)
return loop_type
__all__ = ['LoopTuningMixin']
"""Public exports for the loop tuning module."""