Source code for mafw.tools.loop_tuning

#  Copyright 2025–2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""
Mixin utilities for tuning loop execution parameters via steering files.

This module defines :class:`LoopTuningMixin`, which exposes loop operational settings as
:class:`~mafw.processor.ActiveParameter` instances. This allows temporary tuning through
steering files without modifying processor code.

:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)

.. versionadded:: v2.2.0
"""

from __future__ import annotations

from typing import Any, cast

from mafw.enumerators import LoopType
from mafw.mafw_errors import ProcessorParameterError
from mafw.processor import ActiveParameter, Processor


[docs] class LoopTuningMixin: """ Provide steering-file access to loop operational parameters. This mixin is meant to be combined with :class:`~mafw.processor.Processor` subclasses when tuning parallel for-loop performance. Once optimal values are found, users can remove the mixin and pass the values via the processor constructor. """ tuning_loop_type: ActiveParameter[str] = ActiveParameter( 'loop_type', default=LoopType.ForLoop.value, help_doc='Loop type to use: for_loop, parallel_for_loop, parallel_for_loop_with_queue.', ) """Loop type used by the processor.""" tuning_max_workers: ActiveParameter[int] = ActiveParameter( 'max_workers', default=Processor._compute_default_max_workers(), help_doc='Number of worker threads for parallel loops.', ) """Number of worker threads for parallel loops.""" tuning_queue_size: ActiveParameter[int] = ActiveParameter( 'queue_size', default=max(1, Processor._compute_default_max_workers() * 2), help_doc='Queue size for the queue-based parallel loop.', ) """Queue size used by :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`.""" tuning_queue_batch_size: ActiveParameter[int] = ActiveParameter( 'queue_batch_size', default=1, help_doc='Batch size for queue processing in the queue-based parallel loop.', ) """Batch size for queue processing in the queue-based parallel loop."""
[docs] def initialise_parameters(self) -> None: """ Initialises parameters and applies loop-tuning overrides if provided. The tuning parameters override constructor settings only when they are explicitly set in the steering file or via keyword arguments. """ processor = cast(Processor, self) Processor.initialise_parameters(processor) loop_type_param = processor.get_parameter('loop_type') if loop_type_param.is_set: processor.loop_type = self._validate_loop_type(loop_type_param.value) max_workers_param = processor.get_parameter('max_workers') max_workers_set = max_workers_param.is_set if max_workers_set: processor.max_workers = int(max_workers_param.value) queue_size_param = processor.get_parameter('queue_size') if queue_size_param.is_set: processor.queue_size = int(queue_size_param.value) elif max_workers_set: processor.queue_size = max(1, processor.max_workers * 2) queue_batch_param = processor.get_parameter('queue_batch_size') if queue_batch_param.is_set: processor.queue_batch_size = max(1, int(queue_batch_param.value))
[docs] @staticmethod def _validate_loop_type(value: Any) -> LoopType: """ Validate loop type values supported by loop tuning. :param value: The loop type value to validate. :type value: Any :return: The validated loop type. :rtype: LoopType :raises ProcessorParameterError: If the loop type is not supported. """ try: loop_type = value if isinstance(value, LoopType) else LoopType(str(value)) except (TypeError, ValueError) as exc: raise ProcessorParameterError(f'Invalid loop type ({value}).') from exc allowed = { LoopType.ForLoop, LoopType.ParallelForLoop, LoopType.ParallelForLoopWithQueue, } if loop_type not in allowed: raise ProcessorParameterError( 'Loop tuning only supports for_loop, parallel_for_loop, and parallel_for_loop_with_queue.' ) return loop_type
__all__ = ['LoopTuningMixin'] """Public exports for the loop tuning module."""