Coverage for src / mafw / tools / loop_tuning.py: 96%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 16:10 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Mixin utilities for tuning loop execution parameters via steering files. 

6 

7This module defines :class:`LoopTuningMixin`, which exposes loop operational settings as 

8:class:`~mafw.processor.ActiveParameter` instances. This allows temporary tuning through 

9steering files without modifying processor code. 

10 

11:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

12 

13.. versionadded:: v2.2.0 

14""" 

15 

16from __future__ import annotations 

17 

18from typing import Any, cast 

19 

20from mafw.enumerators import LoopType 

21from mafw.mafw_errors import ProcessorParameterError 

22from mafw.processor import ActiveParameter, Processor 

23 

24 

25class LoopTuningMixin: 

26 """ 

27 Provide steering-file access to loop operational parameters. 

28 

29 This mixin is meant to be combined with :class:`~mafw.processor.Processor` subclasses when 

30 tuning parallel for-loop performance. Once optimal values are found, users can remove the mixin 

31 and pass the values via the processor constructor. 

32 """ 

33 

34 tuning_loop_type: ActiveParameter[str] = ActiveParameter( 

35 'loop_type', 

36 default=LoopType.ForLoop.value, 

37 help_doc='Loop type to use: for_loop, parallel_for_loop, parallel_for_loop_with_queue.', 

38 ) 

39 """Loop type used by the processor.""" 

40 

41 tuning_max_workers: ActiveParameter[int] = ActiveParameter( 

42 'max_workers', 

43 default=Processor._compute_default_max_workers(), 

44 help_doc='Number of worker threads for parallel loops.', 

45 ) 

46 """Number of worker threads for parallel loops.""" 

47 

48 tuning_queue_size: ActiveParameter[int] = ActiveParameter( 

49 'queue_size', 

50 default=max(1, Processor._compute_default_max_workers() * 2), 

51 help_doc='Queue size for the queue-based parallel loop.', 

52 ) 

53 """Queue size used by :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`.""" 

54 

55 tuning_queue_batch_size: ActiveParameter[int] = ActiveParameter( 

56 'queue_batch_size', 

57 default=1, 

58 help_doc='Batch size for queue processing in the queue-based parallel loop.', 

59 ) 

60 """Batch size for queue processing in the queue-based parallel loop.""" 

61 

62 def initialise_parameters(self) -> None: 

63 """ 

64 Initialises parameters and applies loop-tuning overrides if provided. 

65 

66 The tuning parameters override constructor settings only when they are explicitly set in the 

67 steering file or via keyword arguments. 

68 """ 

69 processor = cast(Processor, self) 

70 Processor.initialise_parameters(processor) 

71 

72 loop_type_param = processor.get_parameter('loop_type') 

73 if loop_type_param.is_set: 

74 processor.loop_type = self._validate_loop_type(loop_type_param.value) 

75 

76 max_workers_param = processor.get_parameter('max_workers') 

77 max_workers_set = max_workers_param.is_set 

78 if max_workers_set: 

79 processor.max_workers = int(max_workers_param.value) 

80 

81 queue_size_param = processor.get_parameter('queue_size') 

82 if queue_size_param.is_set: 

83 processor.queue_size = int(queue_size_param.value) 

84 elif max_workers_set: 

85 processor.queue_size = max(1, processor.max_workers * 2) 

86 

87 queue_batch_param = processor.get_parameter('queue_batch_size') 

88 if queue_batch_param.is_set: 

89 processor.queue_batch_size = max(1, int(queue_batch_param.value)) 

90 

91 @staticmethod 

92 def _validate_loop_type(value: Any) -> LoopType: 

93 """ 

94 Validate loop type values supported by loop tuning. 

95 

96 :param value: The loop type value to validate. 

97 :type value: Any 

98 :return: The validated loop type. 

99 :rtype: LoopType 

100 :raises ProcessorParameterError: If the loop type is not supported. 

101 """ 

102 try: 

103 loop_type = value if isinstance(value, LoopType) else LoopType(str(value)) 

104 except (TypeError, ValueError) as exc: 

105 raise ProcessorParameterError(f'Invalid loop type ({value}).') from exc 

106 

107 allowed = { 

108 LoopType.ForLoop, 

109 LoopType.ParallelForLoop, 

110 LoopType.ParallelForLoopWithQueue, 

111 } 

112 if loop_type not in allowed: 

113 raise ProcessorParameterError( 

114 'Loop tuning only supports for_loop, parallel_for_loop, and parallel_for_loop_with_queue.' 

115 ) 

116 return loop_type 

117 

118 

119__all__ = ['LoopTuningMixin'] 

120"""Public exports for the loop tuning module."""