Coverage for src / mafw / tools / loop_tuning.py: 96%
44 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Mixin utilities for tuning loop execution parameters via steering files.
7This module defines :class:`LoopTuningMixin`, which exposes loop operational settings as
8:class:`~mafw.processor.ActiveParameter` instances. This allows temporary tuning through
9steering files without modifying processor code.
11:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
13.. versionadded:: v2.2.0
14"""
16from __future__ import annotations
18from typing import Any, cast
20from mafw.enumerators import LoopType
21from mafw.mafw_errors import ProcessorParameterError
22from mafw.processor import ActiveParameter, Processor
25class LoopTuningMixin:
26 """
27 Provide steering-file access to loop operational parameters.
29 This mixin is meant to be combined with :class:`~mafw.processor.Processor` subclasses when
30 tuning parallel for-loop performance. Once optimal values are found, users can remove the mixin
31 and pass the values via the processor constructor.
32 """
34 tuning_loop_type: ActiveParameter[str] = ActiveParameter(
35 'loop_type',
36 default=LoopType.ForLoop.value,
37 help_doc='Loop type to use: for_loop, parallel_for_loop, parallel_for_loop_with_queue.',
38 )
39 """Loop type used by the processor."""
41 tuning_max_workers: ActiveParameter[int] = ActiveParameter(
42 'max_workers',
43 default=Processor._compute_default_max_workers(),
44 help_doc='Number of worker threads for parallel loops.',
45 )
46 """Number of worker threads for parallel loops."""
48 tuning_queue_size: ActiveParameter[int] = ActiveParameter(
49 'queue_size',
50 default=max(1, Processor._compute_default_max_workers() * 2),
51 help_doc='Queue size for the queue-based parallel loop.',
52 )
53 """Queue size used by :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`."""
55 tuning_queue_batch_size: ActiveParameter[int] = ActiveParameter(
56 'queue_batch_size',
57 default=1,
58 help_doc='Batch size for queue processing in the queue-based parallel loop.',
59 )
60 """Batch size for queue processing in the queue-based parallel loop."""
62 def initialise_parameters(self) -> None:
63 """
64 Initialises parameters and applies loop-tuning overrides if provided.
66 The tuning parameters override constructor settings only when they are explicitly set in the
67 steering file or via keyword arguments.
68 """
69 processor = cast(Processor, self)
70 Processor.initialise_parameters(processor)
72 loop_type_param = processor.get_parameter('loop_type')
73 if loop_type_param.is_set:
74 processor.loop_type = self._validate_loop_type(loop_type_param.value)
76 max_workers_param = processor.get_parameter('max_workers')
77 max_workers_set = max_workers_param.is_set
78 if max_workers_set:
79 processor.max_workers = int(max_workers_param.value)
81 queue_size_param = processor.get_parameter('queue_size')
82 if queue_size_param.is_set:
83 processor.queue_size = int(queue_size_param.value)
84 elif max_workers_set:
85 processor.queue_size = max(1, processor.max_workers * 2)
87 queue_batch_param = processor.get_parameter('queue_batch_size')
88 if queue_batch_param.is_set:
89 processor.queue_batch_size = max(1, int(queue_batch_param.value))
91 @staticmethod
92 def _validate_loop_type(value: Any) -> LoopType:
93 """
94 Validate loop type values supported by loop tuning.
96 :param value: The loop type value to validate.
97 :type value: Any
98 :return: The validated loop type.
99 :rtype: LoopType
100 :raises ProcessorParameterError: If the loop type is not supported.
101 """
102 try:
103 loop_type = value if isinstance(value, LoopType) else LoopType(str(value))
104 except (TypeError, ValueError) as exc:
105 raise ProcessorParameterError(f'Invalid loop type ({value}).') from exc
107 allowed = {
108 LoopType.ForLoop,
109 LoopType.ParallelForLoop,
110 LoopType.ParallelForLoopWithQueue,
111 }
112 if loop_type not in allowed:
113 raise ProcessorParameterError(
114 'Loop tuning only supports for_loop, parallel_for_loop, and parallel_for_loop_with_queue.'
115 )
116 return loop_type
119__all__ = ['LoopTuningMixin']
120"""Public exports for the loop tuning module."""