Coverage for src / mafw / processor.py: 99%

1014 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-06-28 13:34 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core 

6functionality of the MAFw. 

7""" 

8 

9from __future__ import annotations 

10 

11import contextlib 

12import inspect 

13import logging 

14import os 

15import queue 

16import threading 

17import time 

18import warnings 

19from collections import OrderedDict 

20from collections.abc import Callable, Iterator 

21from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait 

22from copy import copy, deepcopy 

23from functools import wraps 

24from itertools import count 

25from typing import ( 

26 TYPE_CHECKING, 

27 Any, 

28 Collection, 

29 Generic, 

30 Iterable, 

31 Self, 

32 SupportsIndex, 

33 TypeVar, 

34 Union, 

35 cast, 

36 get_args, 

37 get_origin, 

38 get_type_hints, 

39) 

40 

41import peewee 

42from peewee import Database 

43 

44# noinspection PyUnresolvedReferences 

45from playhouse.db_url import connect 

46 

47import mafw.db.db_filter 

48from mafw.active import Active 

49from mafw.db.db_connection import build_connection_parameters 

50from mafw.db.db_model import MAFwBaseModel, database_proxy, mafw_model_register 

51from mafw.enumerators import LoopingStatus, LoopType, ProcessorExitStatus, ProcessorStatus 

52from mafw.mafw_errors import ( 

53 AbortProcessorException, 

54 MissingDatabase, 

55 MissingOverloadedMethod, 

56 MissingSuperCall, 

57 ProcessorParameterError, 

58) 

59from mafw.models.filter_schema import FilterSchema 

60from mafw.models.loop_payloads import LoopItem, LoopResult 

61from mafw.models.parameter_schema import ParameterSchema 

62from mafw.models.processor_schema import ProcessorSchema 

63from mafw.timer import Timer, pretty_format_duration 

64from mafw.tools.generics import deep_update 

65from mafw.tools.parallel import is_free_threading 

66from mafw.ui.abstract_user_interface import UserInterfaceBase 

67from mafw.ui.console_user_interface import ConsoleInterface 

68 

69log = logging.getLogger(__name__) 

70 

71ParameterType = TypeVar('ParameterType') 

72"""Generic variable type for the :class:`ActiveParameter` and :class:`PassiveParameter`.""" 

73 

74 

75def validate_database_conf(database_conf: dict[str, Any] | None = None) -> dict[str, Any] | None: 

76 """ 

77 Validates the database configuration. 

78 

79 :param database_conf: The input database configuration. Defaults to None. 

80 :type database_conf: dict, Optional 

81 :return: Either the validated database configuration or None if it is invalid. 

82 :rtype: dict, None 

83 """ 

84 if database_conf is None: 

85 return None 

86 

87 # dict is mutable, if I change it inside the function, I change it also outside. 

88 conf = database_conf.copy() 

89 

90 if 'DBConfiguration' in conf: 

91 # the database_conf is the steering file. Extract the DBConfiguration table 

92 conf = conf['DBConfiguration'] 

93 required_fields = ['URL'] 

94 if all([field in conf for field in required_fields]): 

95 return database_conf 

96 else: 

97 return None 

98 

99 

100class PassiveParameter(Generic[ParameterType]): 

101 """ 

102 An helper class to store processor parameter value and metadata. 

103 

104 This class is the private interface used by the :class:`ActiveParameter` descriptor to store its value and metadata. 

105 

106 When a new :class:`.ActiveParameter` is added to a class, an instance of a PassiveParameter is added to the 

107 processor parameter :attr:`register <.Processor._processor_parameters>`. 

108 

109 .. seealso:: 

110 

111 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor 

112 parameters <parameters>` 

113 

114 .. versionchanged:: v2.0.0 

115 

116 User should only use :class:`ActiveParameter` and never manually instantiate :class:`PassiveParameter`. 

117 """ 

118 

119 def __init__( 

120 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '' 

121 ): 

122 """ 

123 Constructor parameters: 

124 

125 :param name: The name of the parameter. It must be a valid python identifier. 

126 :type name: str 

127 :param value: The set value of the parameter. If None, then the default value will be used. Defaults to None. 

128 :type value: ParameterType, Optional 

129 :param default: The default value for the parameter. It is used if the :attr:`value` is not provided. Defaults to None. 

130 :type default: ParameterType, Optional 

131 :param help_doc: A brief explanation of the parameter. 

132 :type help_doc: str, Optional 

133 :raises ProcessorParameterError: if both `value` and `default` are not provided or if `name` is not a valid identifier. 

134 """ 

135 if not name.isidentifier(): 

136 raise ProcessorParameterError(f'{name} is not a valid python identifier.') 

137 

138 self.name = name 

139 

140 if value is not None: 

141 self._value: ParameterType = value 

142 self._is_set = True 

143 self._is_optional = False 

144 elif default is not None: 

145 self._value = default 

146 self._is_set = False 

147 self._is_optional = True 

148 else: 

149 raise ProcessorParameterError('Processor parameter cannot have both value and default value set to None') 

150 

151 self.doc = help_doc 

152 

153 def __rich_repr__(self) -> Iterator[Any]: 

154 yield 'name', self.name 

155 yield 'value', self.value, None 

156 yield 'help_doc', self.doc, '' 

157 

158 @property 

159 def is_set(self) -> bool: 

160 """ 

161 Property to check if the value has been set. 

162 

163 It is useful for optional parameter to see if the current value is the default one, or if the user set it. 

164 """ 

165 return self._is_set 

166 

167 @property 

168 def value(self) -> ParameterType: 

169 """ 

170 Gets the parameter value. 

171 

172 :return: The parameter value. 

173 :rtype: ParameterType 

174 :raises ProcessorParameterError: if both value and default were not defined. 

175 """ 

176 return self._value 

177 

178 @value.setter 

179 def value(self, value: ParameterType) -> None: 

180 """ 

181 Sets the parameter value. 

182 

183 :param value: The value to be set. 

184 :type value: ParameterType 

185 """ 

186 self._value = value 

187 self._is_set = True 

188 

189 @property 

190 def is_optional(self) -> bool: 

191 """ 

192 Property to check if the parameter is optional. 

193 

194 :return: True if the parameter is optional 

195 :rtype: bool 

196 """ 

197 return self._is_optional 

198 

199 def __repr__(self) -> str: 

200 args = ['name', 'value', 'doc'] 

201 values = [getattr(self, arg) for arg in args] 

202 return '{klass}({attrs})'.format( 

203 klass=self.__class__.__name__, 

204 attrs=', '.join('{}={!r}'.format(k, v) for k, v in zip(args, values)), 

205 ) 

206 

207 

208F = TypeVar('F', bound=Callable[..., Any]) 

209"""Type variable for generic callable with any return value.""" 

210 

211 

212def ensure_parameter_registration(func: F) -> F: 

213 """Decorator to ensure that before calling `func` the processor parameters have been registered.""" 

214 

215 @wraps(func) 

216 def wrapper(*args: Processor, **kwargs: Any) -> F: 

217 # the first positional arguments must be self 

218 if len(args) == 0: 

219 raise ProcessorParameterError( 

220 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.' 

221 ) 

222 self = args[0] 

223 if not isinstance(self, Processor): 

224 raise ProcessorParameterError( 

225 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.' 

226 ) 

227 if self._parameter_registered is False: 

228 self._register_parameters() 

229 return cast(F, func(*args, **kwargs)) 

230 

231 return cast(F, wrapper) 

232 

233 

234_ParameterRegistry = OrderedDict[str, 'ActiveParameter[Any]'] 

235 

236 

237def _copy_parent_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry: 

238 """ 

239 Build an ordered dictionary of ActiveParameters inherited from base classes. 

240 """ 

241 definitions: _ParameterRegistry = OrderedDict() 

242 for base in owner.__mro__[1:]: 

243 base_definitions = getattr(base, '_parameter_definitions', None) 

244 if base_definitions: 

245 for name, descriptor in base_definitions.items(): 

246 definitions[name] = descriptor 

247 return definitions 

248 

249 

250def _ensure_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry: 

251 """ 

252 Return the class-level parameter registry, creating it by copying base definitions if necessary. 

253 """ 

254 definitions: _ParameterRegistry | None = owner.__dict__.get('_parameter_definitions') 

255 if definitions is not None: 

256 return definitions 

257 

258 definitions = _copy_parent_parameter_definitions(owner) 

259 setattr(owner, '_parameter_definitions', definitions) 

260 return definitions 

261 

262 

263def _validate_filter_schema(owner: type[Processor]) -> None: 

264 """ 

265 Validate the optional :class:`~mafw.models.filter_schema.FilterSchema` declared on a Processor. 

266 """ 

267 schema = getattr(owner, '_filter_schema', None) 

268 if schema is None: 

269 return 

270 if not isinstance(schema, FilterSchema): 

271 raise ProcessorParameterError('Processor._filter_schema must be a FilterSchema instance') 

272 

273 root_model = schema.root_model 

274 if not (inspect.isclass(root_model) and issubclass(root_model, MAFwBaseModel)): 

275 raise ProcessorParameterError('FilterSchema.root_model must inherit from MAFwBaseModel') 

276 

277 seen_models = {root_model} 

278 for model in schema.allowed_models: 

279 if not (inspect.isclass(model) and issubclass(model, MAFwBaseModel)): 

280 raise ProcessorParameterError('FilterSchema.allowed_models must contain MAFwBaseModel subclasses') 

281 if model in seen_models: 

282 raise ProcessorParameterError(f'Model {model!r} already declared in FilterSchema') 

283 seen_models.add(model) 

284 

285 

286class ActiveParameter(Generic[ParameterType]): 

287 r""" 

288 The public interface to the processor parameter. 

289 

290 The behaviour of a :class:`Processor` can be customised by using processor parameters. The value of these 

291 parameters can be either set via a configuration file or directly when creating the class. 

292 

293 If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an 

294 ActiveParameter instance in this way: 

295 

296 .. code-block:: 

297 

298 class MyProcessor(Processor): 

299 

300 # this is the input folder 

301 input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files') 

302 

303 def __init__(self, *args, **kwargs): 

304 super().__init(*args, **kwargs) 

305 

306 # change the input folder to something else 

307 self.input_folder = Path(r'D:\data') 

308 

309 # get the value of the parameter 

310 print(self.input_folder) 

311 

312 The ActiveParameter is a `descriptor <https://docs.python.org/3/glossary.html#term-descriptor>`_, it means that 

313 when you create one of them, a lot of work is done behind the scene. 

314 

315 In simple words, a processor parameter is made by two objects: a public interface where the user can easily 

316 access the value of the parameter and a private interface where all other information (default, documentation...) 

317 is also stored. 

318 

319 The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as 

320 in the code snippet above, the private interface is automatically created and will stay in the class instance 

321 until the end of the class lifetime. 

322 

323 To access the private interface, the user can use the :meth:`Processor.get_parameter` method using the parameter 

324 name as a key. 

325 

326 The user can assign to an ActiveParameter almost any name. There are just a few invalid parameter names that are 

327 used for other purposes. The list of reserved names is available :attr:`here <reserved_names>`. Should the user 

328 inadvertently use a reserved named, a :exc:`.ProcessorParameterError` is raised. 

329 

330 .. seealso:: 

331 

332 The private counter part in the :class:`PassiveParameter`. 

333 

334 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor 

335 parameters <parameters>` 

336 

337 The list of :attr:`reserved names <reserved_names>`. 

338 """ 

339 

340 reserved_names: list[str] = ['__logic__', '__filter__', '__new_only__', '__inheritance__', '__enable__'] 

341 """A list of names that cannot be used as processor parameter names. 

342  

343 - `__logic__`  

344 - `__filter__` 

345 - `__new_only__` 

346 - `__inheritance__` 

347 - `__enable__` 

348 """ 

349 

350 def __init__( 

351 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '' 

352 ): 

353 """ 

354 Constructor parameters: 

355 

356 :param name: The name of the parameter. 

357 :type name: str 

358 :param value: The initial value of the parameter. Defaults to None. 

359 :type value: ParameterType, Optional 

360 :param default: The default value of the parameter, to be used when ``value`` is not set., Defaults to None. 

361 :type default: ParameterType, Optional 

362 :param help_doc: An explanatory text describing the parameter. 

363 :type help_doc: str, Optional 

364 """ 

365 

366 self._value = value 

367 self._default = default 

368 self._help_doc = help_doc 

369 self._external_name = self._validate_name(name) 

370 

371 def _validate_name(self, proposed_name: str) -> str: 

372 """ 

373 Validate that the proposed parameter name is not in the list of forbidden names. 

374 

375 This private method checks if the provided name is allowed for use as a processor parameter. 

376 Names that are listed in :attr:`reserved_names` cannot be used as parameter names. 

377 

378 :param proposed_name: The name to be validated for use as a processor parameter. 

379 :type proposed_name: str 

380 :return: The validated name if it passes the forbidden names check. 

381 :rtype: str 

382 :raises ProcessorParameterError: If the proposed name is in the list of forbidden names. 

383 """ 

384 if proposed_name not in self.reserved_names: 

385 return proposed_name 

386 raise ProcessorParameterError(f'Attempt to use a forbidden name ({proposed_name})') 

387 

388 def __set_name__(self, owner: type[Processor], name: str) -> None: 

389 self.public_name = name 

390 self.private_name = f'param_{name}' 

391 self._owner = owner 

392 

393 definitions = _ensure_parameter_definitions(owner) 

394 existing = definitions.get(self._external_name) 

395 if existing is not None and getattr(existing, '_owner', None) is owner: 

396 if not hasattr(owner, '_parameter_definition_error'): 

397 setattr( 

398 owner, 

399 '_parameter_definition_error', 

400 ProcessorParameterError(f'Duplicated parameter name ({self._external_name}).'), 

401 ) 

402 return 

403 definitions[self._external_name] = self 

404 

405 def __get__(self, obj: Processor, obj_type: type[Processor]) -> ActiveParameter[ParameterType] | ParameterType: 

406 if obj is None: 

407 return self 

408 

409 # retrieve instance-level passive parameter 

410 param = cast(PassiveParameter[ParameterType], obj._processor_parameters[self._external_name]) 

411 return param.value 

412 

413 def __set__(self, obj: Processor, value: ParameterType) -> None: 

414 param = obj._processor_parameters[self._external_name] 

415 param.value = value 

416 

417 def to_schema(self) -> ParameterSchema: 

418 """ 

419 Returns the static schema describing this parameter. 

420 

421 The schema is derived solely from the descriptor metadata and does not instantiate the owning processor. 

422 """ 

423 annotation = self._resolve_parameter_annotation() 

424 default_value = self._schema_default_value() 

425 help_text = self._help_doc or None 

426 is_list = self._is_list_annotation(annotation, default_value) 

427 is_dict = self._is_dict_annotation(annotation, default_value) 

428 return ParameterSchema( 

429 name=self._external_name, 

430 annotation=annotation, 

431 default=default_value, 

432 help=help_text, 

433 is_list=is_list, 

434 is_dict=is_dict, 

435 ) 

436 

437 def _resolve_parameter_annotation(self) -> type | None: 

438 if not hasattr(self, 'public_name') or getattr(self, '_owner', None) is None: 

439 return None 

440 

441 target = getattr(self, '_owner', None) 

442 

443 try: 

444 hints = get_type_hints(target) 

445 except Exception: 

446 hints = {} 

447 

448 hint = hints.get(self.public_name) 

449 if hint is None: 

450 fallback = self._default if self._default is not None else self._value 

451 if fallback is not None: 

452 return type(fallback) 

453 return None 

454 

455 origin = get_origin(hint) 

456 if origin is ActiveParameter: 

457 args = get_args(hint) 

458 if args: 

459 return cast(type | None, args[0]) 

460 return None 

461 return cast(type | None, hint) 

462 

463 def _schema_default_value(self) -> Any: 

464 candidate = self._default if self._default is not None else self._value 

465 if candidate is None: 

466 return None 

467 try: 

468 return deepcopy(candidate) 

469 except Exception: 

470 return candidate 

471 

472 def _is_list_annotation(self, annotation: type | None, default_value: Any) -> bool: 

473 return self._matches_container(annotation, default_value, list) 

474 

475 def _is_dict_annotation(self, annotation: type | None, default_value: Any) -> bool: 

476 return self._matches_container(annotation, default_value, dict) 

477 

478 def _matches_container(self, annotation: type | None, default_value: Any, container: type) -> bool: 

479 type_hint = annotation 

480 if type_hint is None and default_value is not None: 

481 type_hint = type(default_value) 

482 

483 if type_hint is None: 

484 return False 

485 

486 origin = get_origin(type_hint) 

487 if origin is container: 

488 return True 

489 

490 if isinstance(type_hint, type) and issubclass(type_hint, container): 

491 return True 

492 

493 return False 

494 

495 

496class ProcessorMeta(type): 

497 """Metaclass that finalizes Processor subclasses before instantiation.""" 

498 

499 def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict[str, Any]) -> None: 

500 """ 

501 Set up new Processor subclasses. 

502 

503 This method first checks the class for well-formed parameter definitions and filter metadata, then runs the 

504 super-call wrapping machinery described below. The wrappers are installed at class-creation time so that any 

505 later instance will be guarded before it even starts executing. 

506 """ 

507 super().__init__(name, bases, namespace) 

508 _ensure_parameter_definitions(cast(type['Processor'], cls)) 

509 _validate_filter_schema(cast(type['Processor'], cls)) 

510 error = getattr(cls, '_parameter_definition_error', None) 

511 if error is not None: 

512 delattr(cls, '_parameter_definition_error') 

513 raise error 

514 if any(base.__name__ == 'Processor' for base in cls.__mro__[1:]): 

515 apply_wrappers = getattr(cls, '_apply_super_call_wrappers', None) 

516 if apply_wrappers is not None: 

517 apply_wrappers() 

518 

519 def __call__(cls, *args: Any, **kwargs: Any) -> 'ProcessorMeta': 

520 obj = type.__call__(cls, *args, **kwargs) 

521 obj.__post_init__() 

522 return cast(ProcessorMeta, obj) 

523 

524 

525# noinspection PyProtectedMember 

526class Processor(metaclass=ProcessorMeta): 

527 """ 

528 The basic processor. 

529 

530 A very comprehensive description of what a Processor does and how it works is available at :ref:`doc_processor`. 

531 """ 

532 

533 processor_status = Active(ProcessorStatus.Unknown) 

534 """Processor execution status""" 

535 

536 progress_message: str = f'{__qualname__} is working' 

537 """Message displayed to show the progress.  

538  

539 It can be customized with information about the current item in the loop by overloading the  

540 :meth:`format_progress_message`.""" 

541 

542 #: List of methods that should invoke their super implementation when overridden. 

543 _methods_to_be_checked_for_super: tuple[str, ...] = ('start', 'finish') 

544 

545 @classmethod 

546 def parameter_schema(cls) -> list[ParameterSchema]: 

547 """ 

548 Return the ordered static schema for the processor parameters defined on the class. 

549 

550 The schema is derived without instantiating the processor, keeping toolchains free from side effects. 

551 """ 

552 definitions = getattr(cls, '_parameter_definitions', None) 

553 if definitions is None: 

554 definitions = _ensure_parameter_definitions(cls) 

555 return [param.to_schema() for param in definitions.values()] 

556 

557 @classmethod 

558 def filter_schema(cls) -> FilterSchema | None: 

559 """ 

560 Optional metadata describing the models available for filtering. 

561 """ 

562 return getattr(cls, '_filter_schema', None) 

563 

564 @classmethod 

565 def processor_schema(cls) -> ProcessorSchema: 

566 """ 

567 Return the static schema for the processor. 

568 """ 

569 return ProcessorSchema(parameters=cls.parameter_schema(), filter=cls.filter_schema()) 

570 

571 _ids = count(0) 

572 """A counter for all processor instances""" 

573 

574 new_defaults: dict[str, Any] = {} 

575 """ 

576 A dictionary containing defaults value for the parameters to be overridden 

577  

578 .. versionadded:: v2.0.0 

579 """ 

580 

581 new_only_flag = 'new_only' 

582 

583 def __init__( 

584 self, 

585 name: str | None = None, 

586 description: str | None = None, 

587 config: dict[str, Any] | None = None, 

588 looper: LoopType | str = LoopType.ForLoop, 

589 user_interface: UserInterfaceBase | None = None, 

590 timer: Timer | None = None, 

591 timer_params: dict[str, Any] | None = None, 

592 database: Database | None = None, 

593 database_conf: dict[str, Any] | None = None, 

594 remove_orphan_files: bool = True, 

595 replica_id: str | None = None, 

596 create_standard_tables: bool = True, 

597 max_workers: int | None = None, 

598 queue_size: int | None = None, 

599 queue_batch_size: int | None = None, 

600 *args: Any, 

601 **kwargs: Any, 

602 ) -> None: 

603 """ 

604 Constructor parameters 

605 

606 :param name: The name of the processor. If None is provided, the class name is used instead. Defaults to None. 

607 :type name: str, Optional 

608 :param description: A short description of the processor task. Defaults to the processor name. 

609 :type description: str, Optional 

610 :param config: A configuration dictionary for this processor. Defaults to None. 

611 :type config: dict, Optional 

612 :param looper: Enumerator to define the looping type. Defaults to LoopType.ForLoop 

613 :type looper: LoopType, Optional 

614 :param user_interface: A user interface instance to be used by the processor to interact with the user. 

615 :type user_interface: UserInterfaceBase, Optional 

616 :param timer: A timer object to measure process duration. 

617 :type timer: Timer, Optional 

618 :param timer_params: Parameters for the timer object. 

619 :type timer_params: dict, Optional 

620 :param database: A database instance. Defaults to None. 

621 :type database: Database, Optional 

622 :param database_conf: Configuration for the database. Default to None. 

623 :type database_conf: dict, Optional 

624 :param remove_orphan_files: Boolean flag to remove files on disc without a reference to the database. 

625 See :ref:`std_tables` and :meth:`~mafw.processor.Processor._remove_orphan_files`. Defaults to True 

626 :type remove_orphan_files: bool, Optional 

627 :param replica_id: The replica identifier for the current processor. 

628 :type replica_id: str, Optional 

629 :param create_standard_tables: Boolean flag to create std tables on disk. Defaults to True 

630 When a nested steering configuration is loaded, this value can be overridden by the 

631 global create_standard_tables entry. Flat processor configurations keep the constructor value. 

632 :type create_standard_tables: bool, Optional 

633 :param max_workers: Number of worker threads for parallel loops. 

634 :type max_workers: int, Optional 

635 :param queue_size: Maximum size of the internal queue for the queue-based parallel loop. 

636 :type queue_size: int, Optional 

637 :param queue_batch_size: Number of items processed per worker task in the queue-based parallel loop. 

638 :type queue_batch_size: int, Optional 

639 :param kwargs: Keyword arguments that can be used to set processor parameters. 

640 """ 

641 

642 self.name = name or self.__class__.__name__ 

643 """The name of the processor.""" 

644 

645 self.unique_id = next(self._ids) 

646 """A unique identifier representing how many instances of Processor has been created.""" 

647 

648 self.replica_id = replica_id 

649 """ 

650 The replica identifier specified in the constructor 

651  

652 .. versionadded:: v2.0.0 

653 """ 

654 

655 self.description = description or self.name 

656 """A short description of the processor task.""" 

657 

658 self._item: Any = None 

659 """The current item of the loop.""" 

660 

661 self._looping_status: LoopingStatus = LoopingStatus.Continue 

662 """The looping status for the main thread.""" 

663 

664 self._thread_local = threading.local() 

665 """Thread-local storage for loop attributes in parallel execution. 

666  

667 .. versionadded:: v2.1.0 

668 """ 

669 

670 self._wall_clock_start: float | None = None 

671 """Timestamp when the looping execution began.""" 

672 

673 self.processor_exit_status = ProcessorExitStatus.Successful 

674 """Processor exit status""" 

675 

676 self.loop_type: LoopType = LoopType(looper) 

677 """ 

678 The loop type.  

679  

680 The value of this parameter can also be changed by the :func:`~mafw.decorators.execution_workflow` decorator  

681 factory.  

682  

683 See :class:`~mafw.enumerators.LoopType` for more details. 

684 """ 

685 

686 self.create_standard_tables = create_standard_tables 

687 """The boolean flag to proceed or skip with standard table creation and initialisation""" 

688 

689 self.max_workers = max_workers if max_workers is not None else self._compute_default_max_workers() 

690 """Maximum number of worker threads used in parallel loops.""" 

691 

692 computed_queue_size = max(1, self.max_workers * 2) 

693 self.queue_size = queue_size if queue_size is not None else computed_queue_size 

694 """Maximum size of the queue used by :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`.""" 

695 

696 self.queue_batch_size = max(1, queue_batch_size or 1) 

697 """Number of items processed per worker task in :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`.""" 

698 

699 # private attributes 

700 self._config: dict[str, Any] = {} # deepcopy(config) if config is not None else {} 

701 """ 

702 A dictionary containing the processor configuration object. 

703  

704 This dictionary is populated with configuration parameter (always type 2) during the  

705 :meth:`._load_parameter_configuration` method. 

706  

707 The original value of the configuration dictionary that is passed to the constructor is stored in  

708 :attr:`._orig_config`. 

709  

710 .. versionchanged:: v2.0.0 

711 Now it is an empty dictionary until the :meth:`._load_parameter_configuration` is called.  

712  

713 """ 

714 

715 self._orig_config = deepcopy(config) if config is not None else {} 

716 """ 

717 A copy of the original configuration dictionary. 

718  

719 .. versionadded:: v2.0.0 

720 """ 

721 

722 self._processor_parameters: OrderedDict[str, PassiveParameter[Any]] = OrderedDict() 

723 """ 

724 A dictionary to store all the processor parameter instances.  

725  

726 The name of the parameter is used as a key, while for the value an instance of the  

727 :class:`.PassiveParameter` is used. 

728 """ 

729 # wait for answer from SO 

730 self._parameter_registered = False 

731 """A boolean flag to confirm successful parameter registration.""" 

732 self._kwargs = kwargs 

733 

734 # loops attributes 

735 self._i_item: int = -1 

736 self._n_item: int | None = -1 

737 self._process_durations: list[float] = [] 

738 self._super_call_flags: dict[str, bool] = {} 

739 """Tracks super-call usage for methods that require it.""" 

740 

741 # resource stack 

742 self._resource_stack: contextlib.ExitStack 

743 self._resource_acquisition: bool = True 

744 

745 # processor timer 

746 self.timer: Timer | None = timer 

747 self._timer_parameters: dict[str, Any] = timer_params or {} 

748 

749 # user interface 

750 if user_interface is None: 

751 self._user_interface: UserInterfaceBase = ConsoleInterface() 

752 else: 

753 self._user_interface = user_interface 

754 

755 # database stuff 

756 self._database: peewee.Database | None = database 

757 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf) 

758 self.filter_register: mafw.db.db_filter.ProcessorFilter = mafw.db.db_filter.ProcessorFilter() 

759 """The DB filter register of the Processor.""" 

760 self.remove_orphan_files: bool = remove_orphan_files 

761 """The flag to remove or protect the orphan files. Defaults to True""" 

762 

763 self.initialise_parameters() 

764 

765 def initialise_parameters(self) -> None: 

766 """ 

767 Initialises processor parameters by registering them and applying various configuration sources. 

768 

769 This method orchestrates the parameter initialisation process by performing the following steps in order: 

770 

771 #. Registers processor parameters defined as :class:`ActiveParameter` instances 

772 #. Overrides default parameter values with any configured overrides 

773 #. Loads parameter configuration from the processor's configuration dictionary 

774 #. Applies keyword arguments as parameter overrides 

775 

776 The method ensures that all processor parameters are properly configured before the processor 

777 execution begins. It is automatically called during processor initialisation and should not 

778 typically be called directly by users. 

779 

780 .. seealso:: 

781 :meth:`_register_parameters`, :meth:`_override_defaults`, 

782 :meth:`_load_parameter_configuration`, :meth:`_overrule_kws_parameters` 

783 

784 .. versionadded:: v2.0.0 

785 """ 

786 self._register_parameters() 

787 self._override_defaults() 

788 self._load_parameter_configuration() 

789 self._overrule_kws_parameters() 

790 

791 def __post_init__(self) -> None: 

792 """ 

793 Performs post-initialisation tasks for the processor. 

794 

795 This method is automatically called after the processor initialisation is complete. 

796 It performs validation checks on overloaded methods and sets the initial processor status. 

797 

798 .. seealso:: 

799 :meth:`validate_configuration`, :meth:`_check_method_overload`, 

800 :attr:`~mafw.processor.Processor.processor_status` 

801 

802 .. versionchanged:: v2.0.0 

803 Moved the parameter initialisation to :meth:`initialise_parameters` and now executed as last step of the 

804 init method. 

805 

806 Added the validate configuration check. This method should silently check that configuration provided 

807 with the processor parameters is valid. If not, a :exc:`.ProcessorParameterError` is raised. 

808 """ 

809 self.validate_configuration() 

810 self._check_method_overload() 

811 self.processor_status = ProcessorStatus.Init 

812 

813 def _register_parameters(self) -> None: 

814 """ 

815 Register processor parameters defined as ActiveParameter instances in the class. 

816 

817 This private method scans the class definition for any :class:`.ActiveParameter` instances and creates 

818 corresponding :class:`.PassiveParameter` instances to store the actual parameter values and metadata. 

819 It ensures that all processor parameters are properly initialised and available for configuration 

820 through the processor's configuration system. 

821 

822 The method checks for duplicate parameter names and raises a :exc:`.ProcessorParameterError` if duplicates 

823 are detected. It also sets the internal flag :attr:`._parameter_registered` to True once registration is 

824 complete. 

825 

826 .. note:: 

827 This method is automatically called during processor initialisation and should not be called directly 

828 by users. 

829 

830 .. seealso:: 

831 :class:`.Processor`, :meth:`.Processor._override_defaults`, 

832 :meth:`.Processor._load_parameter_configuration`, :meth:`.Processor._overrule_kws_parameters` 

833 

834 .. versionchanged:: v2.0.0 

835 Only :class:`ActiveParameter` are not registered. The use of :class:`PassiveParameter` is only meant to 

836 store the value and metadata of the active counter part. 

837 """ 

838 if self._parameter_registered: 

839 return 

840 

841 definitions = getattr(self.__class__, '_parameter_definitions', None) 

842 if definitions is None: 

843 definitions = _ensure_parameter_definitions(self.__class__) 

844 

845 for attr in definitions.values(): 

846 ext_name = attr._external_name 

847 if ext_name in self._processor_parameters: 

848 raise ProcessorParameterError(f'Duplicated parameter name ({ext_name}).') 

849 self._processor_parameters[ext_name] = PassiveParameter( 

850 ext_name, attr._value, attr._default, attr._help_doc 

851 ) 

852 

853 self._parameter_registered = True 

854 

855 def _override_defaults(self) -> None: 

856 """ 

857 Override default parameter values with values from :attr:`new_defaults`. 

858 

859 This private method iterates through the :attr:`new_defaults` dictionary and updates 

860 the corresponding processor parameters with new values. Only parameters that exist 

861 in both :attr:`new_defaults` and :attr:`_processor_parameters` are updated. 

862 

863 .. versionadded:: v2.0.0 

864 """ 

865 for key, value in self.new_defaults.items(): 

866 if key in self._processor_parameters: 

867 self._processor_parameters[key].value = value 

868 

869 def _reset_parameters(self) -> None: 

870 """ 

871 Reset processor parameters to their initial state. 

872 

873 This method clears all currently registered processor parameters and triggers 

874 a fresh registration process. It's useful when parameter configurations need 

875 to be reinitialized or when parameters have been modified and need to be reset. 

876 

877 .. seealso:: 

878 :meth:`_register_parameters`, :meth:`_register_parameters` 

879 """ 

880 self._processor_parameters = OrderedDict() 

881 self._parameter_registered = False 

882 self._register_parameters() 

883 

884 @ensure_parameter_registration 

885 def _load_parameter_configuration(self) -> None: 

886 """ 

887 Load processor parameter configuration from the internal configuration dictionary. 

888 

889 This method processes the processor's configuration dictionary to set parameter values. 

890 It handles two configuration formats: 

891 

892 1. Nested format: ``{'ProcessorName': {'param1': value1, ...}}`` 

893 2. Flat format: ``{'param1': value1, ...}`` 

894 

895 The method also handles filter configurations by collecting filter table names 

896 and deferring their initialisation until after the global filter has been processed. 

897 

898 .. versionchanged:: v2.0.0 

899 For option 1 combining configuration from name and name_replica 

900 

901 .. versionchanged:: v2.1.2 

902 When a nested steering configuration is loaded, the processor-level 

903 create_standard_tables value is overridden from the global steering-file setting. 

904 Flat configurations keep the constructor value untouched. 

905 

906 :raises ProcessorParameterError: If a parameter in the configuration is not registered. 

907 

908 .. seealso:: 

909 :meth:`mafw.db.db_filter.ModelFilter.from_conf` 

910 """ 

911 original_config = copy(self._orig_config) 

912 flt_list = [] 

913 

914 # by default the flag new_only is set to true 

915 # unless the user specify differently in the general section of the steering file 

916 self.filter_register.new_only = original_config.get(self.new_only_flag, True) 

917 

918 # we need to check if the configuration object is of type 1 or type 2 

919 if any([name for name in [self.name, self.replica_name] if name in original_config]): 

920 # one of the two names (the base or the replica) must be present in case of option 1 

921 # we start from the base name. If not there, then take an empty dict 

922 option1_config_base = original_config.get(self.name, {}) 

923 if self.name != self.replica_name: 

924 # if there is the replica name, then update the base configuration with the replica value 

925 # we get the replica configuration 

926 option1_config_replica = original_config.get(self.replica_name, {}) 

927 

928 # let's check if the user wants to have inheritance default 

929 # by default is True 

930 inheritance = option1_config_replica.get('__inheritance__', True) 

931 if inheritance: 

932 # we update the base with the replica without changing the base 

933 option1_config_update = deep_update(option1_config_base, option1_config_replica, copy_first=True) 

934 else: 

935 # we do not use the base with the replica specific, we pass the replica as the updated 

936 option1_config_update = option1_config_replica 

937 

938 # we modify the type 1 original so that the table for the replica has the updated configuration 

939 # this is used for the filter configuration at the end. 

940 original_config[self.replica_name] = option1_config_update 

941 else: 

942 # there is not replica, so the update is equal to the base. 

943 option1_config_update = option1_config_base 

944 

945 self._config = option1_config_update 

946 if 'create_standard_tables' in original_config: 

947 self.create_standard_tables = bool(original_config['create_standard_tables']) 

948 else: 

949 # for type 2 we are already good to go 

950 self._config = original_config 

951 

952 filter_config = deepcopy(original_config) 

953 

954 def _sanitize_filter_config(processor_name: str) -> None: 

955 processor_config = filter_config.get(processor_name) 

956 if not isinstance(processor_config, dict): 

957 return 

958 filter_table = processor_config.get('__filter__') 

959 if not isinstance(filter_table, dict): 

960 return 

961 

962 sanitized_table: dict[str, Any] = {} 

963 for model_name, model_config in filter_table.items(): 

964 if not isinstance(model_config, dict): 

965 sanitized_table[model_name] = model_config 

966 continue 

967 

968 model_config_copy = deepcopy(model_config) 

969 if not bool(model_config_copy.pop('__enable__', True)): 

970 continue 

971 

972 for field_name, field_value in list(model_config_copy.items()): 

973 if ( 

974 isinstance(field_value, dict) 

975 and not ('op' in field_value and 'value' in field_value) 

976 and '__enable__' in field_value 

977 ): 

978 field_enabled = bool(field_value.pop('__enable__', True)) 

979 if not field_enabled: 

980 model_config_copy.pop(field_name, None) 

981 

982 conditionals = model_config_copy.get('__conditional__') 

983 if isinstance(conditionals, list): 

984 filtered_conditionals: list[Any] = [] 

985 for conditional in conditionals: 

986 if isinstance(conditional, dict): 

987 conditional_enabled = bool(conditional.pop('__enable__', True)) 

988 if not conditional_enabled: 

989 continue 

990 filtered_conditionals.append(conditional) 

991 model_config_copy['__conditional__'] = filtered_conditionals 

992 

993 sanitized_table[model_name] = model_config_copy 

994 

995 processor_config['__filter__'] = sanitized_table 

996 

997 _sanitize_filter_config(self.replica_name) 

998 

999 for key, value in self._config.items(): 

1000 if key in self._processor_parameters: 

1001 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO 

1002 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above 

1003 elif key == '__filter__': 

1004 # we got a filter table! 

1005 # it should contain one table for each model 

1006 # we add all the names to a list for deferred initialisation 

1007 flt_table = self._config[key] 

1008 if isinstance(flt_table, dict): 

1009 for model_name, model_config in flt_table.items(): 

1010 if isinstance(model_config, dict) and not bool(model_config.get('__enable__', True)): 

1011 continue 

1012 flt_list.append(f'{self.replica_name}.__filter__.{model_name}') 

1013 elif key == '__logic__': 

1014 # we got a filter logic string 

1015 # we store it in the filter register directly 

1016 self.filter_register._logic = self._config[key] 

1017 elif key == '__new_only__': 

1018 # we got a new only boolean, we store it in the filter register 

1019 self.filter_register.new_only = self._config[key] 

1020 

1021 # only now, after the configuration file has been totally read, we can do the real filter initialisation. 

1022 # This is to be sure that if there were a GlobalFilter table, this has been read. 

1023 # The global filter region will be used as a starting point for the construction of a new filter (default 

1024 # parameter in the from_conf class method). 

1025 for flt_name in flt_list: 

1026 model_name = flt_name.split('.')[-1] 

1027 self.filter_register[model_name] = mafw.db.db_filter.ModelFilter.from_conf(flt_name, filter_config) 

1028 

1029 @ensure_parameter_registration 

1030 def _overrule_kws_parameters(self) -> None: 

1031 """ 

1032 Override processor parameters with values from keyword arguments. 

1033 

1034 This method applies parameter values passed as keyword arguments during processor 

1035 initialisation. It ensures that the parameter types match the expected types 

1036 before setting the values. 

1037 

1038 .. seealso:: 

1039 :meth:`_register_parameters`, :meth:`_load_parameter_configuration`, 

1040 :meth:`set_parameter_value` 

1041 """ 

1042 for key, value in self._kwargs.items(): 

1043 if key in self._processor_parameters: 

1044 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO 

1045 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above 

1046 

1047 def validate_configuration(self) -> None: 

1048 """ 

1049 Validate the configuration provided via the processor parameters. 

1050 

1051 Method to be implemented by subclasses if a configuration validation is needed. 

1052 

1053 The method should silently check for the proper configuration, if this is not obtained, 

1054 then the :exc:`.InvalidConfigurationError` must be raised. 

1055 

1056 .. versionadded:: v2.0.0 

1057 """ 

1058 pass 

1059 

1060 def _check_method_overload(self) -> None: 

1061 """ 

1062 Check if the user overloaded the required methods. 

1063 

1064 Depending on the loop type, the user must overload different methods. 

1065 This method is doing the check and if the required methods are not overloaded a warning is emitted. 

1066 """ 

1067 methods_dict: dict[LoopType, list[str]] = { 

1068 LoopType.WhileLoop: ['while_condition'], 

1069 LoopType.ForLoop: ['get_items'], 

1070 LoopType.ParallelForLoop: ['get_items'], 

1071 LoopType.ParallelForLoopWithQueue: ['get_items'], 

1072 } 

1073 required_methods: list[str] = methods_dict.get(self.loop_type, []) 

1074 for method in required_methods: 

1075 if getattr(type(self), method) == getattr(Processor, method): 

1076 warnings.warn( 

1077 MissingOverloadedMethod( 

1078 '%s was not overloaded. The process execution workflow might not work.' % method 

1079 ) 

1080 ) 

1081 

1082 @classmethod 

1083 def _apply_super_call_wrappers(cls) -> None: 

1084 """ 

1085 Wraps overridden methods so the class can detect whether they called `super()`. 

1086 

1087 This method runs immediately after the class is created (see :class:`.ProcessorMeta`). For every 

1088 method that we expect scientists to extend (start, finish, etc.) we replace their implementation with a 

1089 wrapper. The wrapper resets a per-instance flag, invokes the real override, and only after that method returns 

1090 it checks whether the real `super()` was ever reached; if not, it emits a :class:`~mafw.mafw_errors.MissingSuperCall` 

1091 warning. In other words, the wiring happens while the subclass is defined, and the actual smoke test executes 

1092 each time the method runs. 

1093 """ 

1094 methods = getattr(cls, '_methods_to_be_checked_for_super', ()) 

1095 for method in methods: 

1096 if method not in cls.__dict__: 

1097 continue 

1098 if not any(hasattr(base, method) for base in cls.__mro__[1:]): 

1099 continue 

1100 original: Callable[..., Any] = getattr(cls, method) 

1101 # we are adding an attribute to the method, it looks strange, but it is possible 

1102 # in this way we avoid wrapping the same method more than once. 

1103 if getattr(original, '_mafw_super_check_wrapped', False): 

1104 continue 

1105 

1106 def _make_wrapper( 

1107 __orig: Callable[..., Any], 

1108 __method: str, # pragma: no cover 

1109 ) -> Callable[..., Any]: 

1110 @wraps(__orig) 

1111 def _wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: 

1112 # self is the processor instance. 

1113 # _reset_super_call_flag is resetting the call status 

1114 # for method as False 

1115 self._reset_super_call_flag(__method) 

1116 # in the base method, the super call flag is set to True 

1117 result = __orig(self, *args, **kwargs) 

1118 # if the super call flag is not True, then it is because the base 

1119 # method was not called. 

1120 # emit the warning and return the original method return value 

1121 if not self._did_call_super(__method): 

1122 warnings.warn( 

1123 MissingSuperCall( 

1124 'The overloaded %s is not invoking its super method. The processor might not work.' 

1125 % __method 

1126 ) 

1127 ) 

1128 return result 

1129 

1130 return _wrapper 

1131 

1132 # we create a wrapped method from the original 

1133 wrapper = _make_wrapper(original, method) 

1134 # we set a flag for the method to avoid multiple wrapping 

1135 wrapper._mafw_super_check_wrapped = True # type: ignore[attr-defined] 

1136 setattr(cls, method, wrapper) 

1137 

1138 def _reset_super_call_flag(self, method: str) -> None: 

1139 """ 

1140 Reset the super-call flag for a method. 

1141 """ 

1142 self._super_call_flags[method] = False 

1143 

1144 def _mark_super_call(self, method: str) -> None: 

1145 """ 

1146 Mark a method as having called its super implementation. 

1147 """ 

1148 self._super_call_flags[method] = True 

1149 

1150 def _did_call_super(self, method: str) -> bool: 

1151 """ 

1152 Check whether a method called its super implementation. 

1153 """ 

1154 return self._super_call_flags.get(method, False) 

1155 

1156 @ensure_parameter_registration 

1157 def dump_parameter_configuration(self, option: int = 1) -> dict[str, Any]: 

1158 """ 

1159 Dumps the processor parameter values in a dictionary. 

1160 

1161 The snippet below explains the meaning of `option`. 

1162 

1163 .. code-block:: python 

1164 

1165 # option 1 

1166 conf_dict1 = { 

1167 'Processor': {'param1': 5, 'input_table': 'my_table'} 

1168 } 

1169 

1170 # option 2 

1171 conf_dict2 = {'param1': 5, 'input_table': 'my_table'} 

1172 

1173 In the case of option 1, the replica aware name (:meth:`.replica_name`) will be used as a key for the 

1174 configuration dictionary. 

1175 

1176 .. versionchanged:: v2.0.0 

1177 With option 1, using :meth:`.replica_name` instead of :attr:`~.Processor.name` as key of the configuration 

1178 dictionary. 

1179 

1180 :param option: Select the dictionary style. Defaults to 1. 

1181 :type option: int, Optional 

1182 :return: A parameter configuration dictionary. 

1183 :rtype: dict 

1184 """ 

1185 inner_dict = {} 

1186 for key, value in self._processor_parameters.items(): 

1187 inner_dict[key] = value.value 

1188 

1189 if option == 1: 

1190 outer_dict = {self.replica_name: inner_dict} 

1191 elif option == 2: 

1192 outer_dict = inner_dict 

1193 else: 

1194 log.warning('Unknown option %s. Using option 2' % option) 

1195 outer_dict = inner_dict 

1196 return outer_dict 

1197 

1198 @ensure_parameter_registration 

1199 def get_parameter(self, name: str) -> PassiveParameter[Any]: 

1200 """ 

1201 Gets the processor parameter named name. 

1202 

1203 :param name: The name of the parameter. 

1204 :type name: str 

1205 :return: The processor parameter 

1206 :rtype: PassiveParameter 

1207 :raises ProcessorParameterError: If a parameter with `name` is not registered. 

1208 """ 

1209 if name in self._processor_parameters: 

1210 return self._processor_parameters[name] 

1211 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}') 

1212 

1213 @ensure_parameter_registration 

1214 def get_parameters(self) -> dict[str, PassiveParameter[Any]]: 

1215 """ 

1216 Returns the full dictionary of registered parameters for this processor. 

1217 

1218 Useful when dumping the parameter specification in a configuration file, for example. 

1219 

1220 :return: The dictionary with the registered parameters. 

1221 :rtype: dict[str, PassiveParameter[ParameterType] 

1222 """ 

1223 return self._processor_parameters 

1224 

1225 @ensure_parameter_registration 

1226 def delete_parameter(self, name: str) -> None: 

1227 """ 

1228 Deletes a processor parameter. 

1229 

1230 :param name: The name of the parameter to be deleted. 

1231 :type name: str 

1232 :raises ProcessorParameterError: If a parameter with `name` is not registered. 

1233 """ 

1234 if name in self._processor_parameters: 

1235 del self._processor_parameters[name] 

1236 else: 

1237 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}') 

1238 

1239 @ensure_parameter_registration 

1240 def set_parameter_value(self, name: str, value: ParameterType) -> None: 

1241 """ 

1242 Sets the value of a processor parameter. 

1243 

1244 :param name: The name of the parameter to be deleted. 

1245 :type name: str 

1246 :param value: The value to be assigned to the parameter. 

1247 :type value: ParameterType 

1248 :raises ProcessorParameterError: If a parameter with `name` is not registered. 

1249 """ 

1250 if name in self._processor_parameters: 

1251 self._processor_parameters[name].value = value 

1252 else: 

1253 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}') 

1254 

1255 def get_filter(self, model_name: str) -> mafw.db.db_filter.ModelFilter: 

1256 """ 

1257 Returns a registered :class:`~mafw.db.db_filter.ModelFilter` via the model name. 

1258 

1259 If a filter for the provided model_name does not exist, a KeyError is raised. 

1260 

1261 :param model_name: The model name for which the filter will be returned. 

1262 :type model_name: str 

1263 :return: The registered filter 

1264 :rtype: mafw.db.db_filter.ModelFilter 

1265 :raises: KeyError is a filter with the give name is not found. 

1266 """ 

1267 return self.filter_register[model_name] 

1268 

1269 def on_processor_status_change(self, old_status: ProcessorStatus, new_status: ProcessorStatus) -> None: 

1270 """ 

1271 Callback invoked when the processor status is changed. 

1272 

1273 :param old_status: The old processor status. 

1274 :type old_status: ProcessorStatus 

1275 :param new_status: The new processor status. 

1276 :type new_status: ProcessorStatus 

1277 """ 

1278 self._user_interface.change_of_processor_status(self.name, old_status, new_status) 

1279 

1280 def on_looping_status_set(self, status: LoopingStatus) -> None: 

1281 """ 

1282 Call back invoked when the looping status is set. 

1283 

1284 The user can overload this method according to the needs. 

1285 

1286 :param status: The set looping status. 

1287 :type status: LoopingStatus 

1288 """ 

1289 if status == LoopingStatus.Skip: 

1290 log.warning('Skipping item %s' % self.i_item) 

1291 elif status == LoopingStatus.Abort: 

1292 log.error('Looping has been aborted') 

1293 elif status == LoopingStatus.Quit: 

1294 log.warning('Looping has been quit') 

1295 

1296 def format_progress_message(self) -> None: 

1297 """Customizes the progress message with information about the current item. 

1298 

1299 The user can overload this method in order to modify the message being displayed during the process loop with 

1300 information about the current item. 

1301 

1302 The user can access the current value, its position in the looping cycle and the total number of items using 

1303 :attr:`.Processor.item`, :obj:`.Processor.i_item` and :obj:`.Processor.n_item`. 

1304 """ 

1305 pass 

1306 

1307 @contextlib.contextmanager 

1308 def _thread_loop_context(self, i_item: int, n_item: int, item: Any) -> Iterator[None]: 

1309 """ 

1310 Context manager to set thread-local loop attributes for parallel execution. 

1311 

1312 :param i_item: Item index for the loop. 

1313 :type i_item: int 

1314 :param n_item: Total number of items in the loop. 

1315 :type n_item: int 

1316 :param item: Item payload. 

1317 :type item: Any 

1318 """ 

1319 self._thread_local.in_worker = True 

1320 self._thread_local.i_item = i_item 

1321 self._thread_local.n_item = n_item 

1322 self._thread_local.item = item 

1323 self._thread_local.looping_status = LoopingStatus.Continue 

1324 try: 

1325 yield 

1326 finally: 

1327 for name in ('i_item', 'n_item', 'item', 'looping_status', 'in_worker'): 

1328 if hasattr(self._thread_local, name): 

1329 delattr(self._thread_local, name) 

1330 

1331 def _in_thread_context(self) -> bool: 

1332 """Return True when running inside a parallel worker thread.""" 

1333 return bool(getattr(self._thread_local, 'in_worker', False)) 

1334 

1335 @property 

1336 def item(self) -> Any: 

1337 """The current item of the loop.""" 

1338 if self._in_thread_context() and hasattr(self._thread_local, 'item'): 

1339 return self._thread_local.item 

1340 return self._item 

1341 

1342 @item.setter 

1343 def item(self, value: Any) -> None: 

1344 if self._in_thread_context(): 

1345 self._thread_local.item = value 

1346 else: 

1347 self._item = value 

1348 

1349 @property 

1350 def i_item(self) -> int: 

1351 """The enumeration of the current item being processed.""" 

1352 if self._in_thread_context() and hasattr(self._thread_local, 'i_item'): 

1353 return cast(int, self._thread_local.i_item) 

1354 return self._i_item 

1355 

1356 @i_item.setter 

1357 def i_item(self, value: int) -> None: 

1358 if self._in_thread_context(): 

1359 self._thread_local.i_item = value 

1360 else: 

1361 self._i_item = value 

1362 

1363 @property 

1364 def n_item(self) -> int | None: 

1365 """The total number of items to be processed or None for an undefined loop""" 

1366 if self._in_thread_context() and hasattr(self._thread_local, 'n_item'): 

1367 return cast(int | None, self._thread_local.n_item) 

1368 return self._n_item 

1369 

1370 @n_item.setter 

1371 def n_item(self, value: int | None) -> None: 

1372 if self._in_thread_context(): 

1373 self._thread_local.n_item = value 

1374 else: 

1375 self._n_item = value 

1376 

1377 @property 

1378 def looping_status(self) -> LoopingStatus: 

1379 """The looping status for the current thread context.""" 

1380 if self._in_thread_context(): 

1381 value = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue) 

1382 else: 

1383 value = self._looping_status 

1384 if hasattr(self, 'on_looping_status_get'): 

1385 self.on_looping_status_get(value) 

1386 return value 

1387 

1388 @looping_status.setter 

1389 def looping_status(self, value: LoopingStatus) -> None: 

1390 if self._in_thread_context(): 

1391 current = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue) 

1392 self._thread_local.looping_status = value 

1393 else: 

1394 current = self._looping_status 

1395 self._looping_status = value 

1396 if current != value: 

1397 if hasattr(self, 'on_looping_status_change'): 

1398 self.on_looping_status_change(current, value) 

1399 else: 

1400 if hasattr(self, 'on_looping_status_set'): 

1401 self.on_looping_status_set(value) 

1402 

1403 @property 

1404 def unique_name(self) -> str: 

1405 """Returns the unique name for the processor.""" 

1406 return f'{self.name}_{self.unique_id}' 

1407 

1408 @property 

1409 def replica_name(self) -> str: 

1410 """ 

1411 Returns the replica aware name of the processor. 

1412 

1413 If no replica_id is specified, then return the pure name, otherwise join the two string using the '#' symbol. 

1414 

1415 .. versionadded:: v2.0.0 

1416 

1417 :return: The replica aware name of the processor. 

1418 :rtype: str 

1419 """ 

1420 if self.replica_id is None: 

1421 return self.name 

1422 else: 

1423 return self.name + '#' + self.replica_id 

1424 

1425 @property 

1426 def local_resource_acquisition(self) -> bool: 

1427 """ 

1428 Checks if resources should be acquired locally. 

1429 

1430 When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external 

1431 resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute 

1432 resources among the whole processor list. In this case, resources should not be acquired locally by the 

1433 single processor, but from the parent execution context. 

1434 

1435 :return: True if resources are to be acquired locally by the processor. False, otherwise. 

1436 :rtype: bool 

1437 """ 

1438 return self._resource_acquisition 

1439 

1440 @local_resource_acquisition.setter 

1441 def local_resource_acquisition(self, flag: bool) -> None: 

1442 self._resource_acquisition = flag 

1443 

1444 @property 

1445 def database(self) -> peewee.Database: 

1446 """ 

1447 Returns the database instance 

1448 

1449 :return: A database object. 

1450 :raises MissingDatabase: If the database connection has not been established. 

1451 """ 

1452 if self._database is None: 

1453 raise MissingDatabase('Database connection not initialized') 

1454 return self._database 

1455 

1456 def execute(self) -> None: 

1457 """Execute the processor tasks. 

1458 

1459 This method works as a dispatcher, reassigning the call to a more specific execution implementation depending 

1460 on the :attr:`~mafw.processor.Processor.loop_type`. 

1461 """ 

1462 dispatcher: dict[LoopType, Callable[[], None]] = { 

1463 LoopType.SingleLoop: self._execute_single, 

1464 LoopType.ForLoop: self._execute_for_loop, 

1465 LoopType.ParallelForLoop: self._execute_for_loop, 

1466 LoopType.ParallelForLoopWithQueue: self._execute_for_loop, 

1467 LoopType.WhileLoop: self._execute_while_loop, 

1468 } 

1469 dispatcher[self.loop_type]() 

1470 

1471 @staticmethod 

1472 def _compute_default_max_workers() -> int: 

1473 """ 

1474 Helper to compute the default number of workers for parallel execution. 

1475 

1476 Returns min(32, cpu_count + 4). 

1477 """ 

1478 return min(32, (os.cpu_count() or 1) + 4) 

1479 

1480 def _execute_single(self) -> None: 

1481 """Execute the processor in single mode. 

1482 

1483 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the 

1484 appropriate implementation depending on the processor LoopType. 

1485 """ 

1486 with contextlib.ExitStack() as self._resource_stack: 

1487 self.acquire_resources() 

1488 self._wall_clock_start = time.perf_counter() 

1489 self.start() 

1490 self.processor_status = ProcessorStatus.Run 

1491 self.process() 

1492 self.finish() 

1493 

1494 def _execute_for_loop(self) -> None: 

1495 """Executes the processor within a for loop. 

1496 

1497 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the 

1498 appropriate implementation depending on the processor LoopType. 

1499 """ 

1500 

1501 with contextlib.ExitStack() as self._resource_stack: 

1502 self.acquire_resources() 

1503 # we cannot use a Timer context here to measure the whole duration because it spans 

1504 # over different methods. Instead we directly use a performance clock. 

1505 self._wall_clock_start = time.perf_counter() 

1506 self.start() 

1507 

1508 # get the input item list and filter it 

1509 item_list = self.get_items() 

1510 

1511 # get the total number of items. 

1512 self.n_item = len(item_list) 

1513 

1514 # turn the processor status to run 

1515 self.processor_status = ProcessorStatus.Run 

1516 

1517 # create a new task in the progress bar interface 

1518 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item) 

1519 

1520 # verify if we can use parallel for loop. If not, switch back to a serial for loop. 

1521 if ( 

1522 self.loop_type in (LoopType.ParallelForLoop, LoopType.ParallelForLoopWithQueue) 

1523 and not is_free_threading() 

1524 ): 

1525 warnings.warn( 

1526 'Parallel for-loop requires free-threading; falling back to serial for loop.', 

1527 stacklevel=2, 

1528 ) 

1529 self.loop_type = LoopType.ForLoop 

1530 

1531 if self.loop_type == LoopType.ParallelForLoopWithQueue: 

1532 self._process_parallel_for_loop_with_queue(item_list) 

1533 elif self.loop_type == LoopType.ParallelForLoop: 

1534 self._process_parallel_for_loop(item_list) 

1535 else: 

1536 self._process_for_loop(item_list) 

1537 

1538 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item) 

1539 

1540 self.finish() 

1541 

1542 def _build_loop_item(self) -> LoopItem: 

1543 """ 

1544 Build a LoopItem payload for the current loop context. 

1545 

1546 :return: The LoopItem payload. 

1547 :rtype: mafw.models.LoopItem 

1548 """ 

1549 return LoopItem(self.i_item, int(self.n_item or 0), self.item) 

1550 

1551 def _build_loop_result(self, payload: Any, duration: float) -> LoopResult: 

1552 """ 

1553 Build a LoopResult payload for the current loop context. 

1554 

1555 :param payload: The optional payload returned by process. 

1556 :type payload: Any 

1557 :param duration: Wall-clock duration of the item processing. 

1558 :type duration: float 

1559 :return: The LoopResult payload. 

1560 :rtype: mafw.models.LoopResult 

1561 """ 

1562 return LoopResult(self.i_item, int(self.n_item or 0), self.looping_status, payload, duration) 

1563 

1564 def _payload_annotation_matches(self, annotation: Any) -> bool: 

1565 """ 

1566 Check whether an annotation matches LoopItem or LoopResult. 

1567 

1568 :param annotation: The annotation to inspect. 

1569 :type annotation: Any 

1570 :return: True if the annotation matches LoopItem or LoopResult. 

1571 :rtype: bool 

1572 """ 

1573 if annotation in (LoopItem, LoopResult): 

1574 return True 

1575 origin = get_origin(annotation) 

1576 if origin is Union: 

1577 return any(arg in (LoopItem, LoopResult) for arg in get_args(annotation)) 

1578 return False 

1579 

1580 def _call_with_optional_payload(self, func: Callable[..., Any], payload: Any) -> Any: 

1581 """ 

1582 Invoke a processor hook with an optional payload if the signature allows it. 

1583 

1584 :param func: The callable to invoke. 

1585 :type func: Callable 

1586 :param payload: The payload to pass if supported. 

1587 :type payload: Any 

1588 :return: The callable return value. 

1589 :rtype: Any 

1590 """ 

1591 signature = inspect.signature(func) 

1592 parameters = list(signature.parameters.values()) 

1593 if parameters and parameters[0].name == 'self': 

1594 parameters = parameters[1:] 

1595 if not parameters: 

1596 return func() 

1597 if any(param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD) for param in parameters): 

1598 return func(payload) 

1599 first = parameters[0] 

1600 if first.kind in (first.POSITIONAL_ONLY, first.POSITIONAL_OR_KEYWORD): 

1601 if first.name in ('item', 'loop_item', 'result', 'loop_result', 'payload'): 

1602 return func(payload) 

1603 if self._payload_annotation_matches(first.annotation): 

1604 return func(payload) 

1605 return func() 

1606 

1607 def _process_for_loop(self, item_list: Collection[Any]) -> None: 

1608 """ 

1609 Process items with the standard serial for-loop. 

1610 

1611 :param item_list: The list of items to process. 

1612 :type item_list: Collection[Any] 

1613 """ 

1614 for self.i_item, self.item in enumerate(item_list): 

1615 self.looping_status = LoopingStatus.Continue 

1616 

1617 self.format_progress_message() 

1618 self._user_interface.display_progress_message(self.progress_message, self.i_item, self.n_item, 0.1) 

1619 

1620 loop_item = self._build_loop_item() 

1621 with Timer(suppress_message=True) as timer: 

1622 payload = self._call_with_optional_payload(self.process, loop_item) 

1623 self._process_durations.append(timer.duration) 

1624 

1625 loop_result = self._build_loop_result(payload, timer.duration) 

1626 

1627 if self.looping_status == LoopingStatus.Continue: 

1628 self._call_with_optional_payload(self.accept_item, loop_result) 

1629 elif self.looping_status == LoopingStatus.Skip: 

1630 self._call_with_optional_payload(self.skip_item, loop_result) 

1631 else: # Abort or Quit 

1632 break 

1633 

1634 self._user_interface.update_task(self.replica_name, increment=1) 

1635 

1636 def _process_parallel_for_loop(self, item_list: Collection[Any]) -> None: 

1637 """ 

1638 Process items in parallel using a thread pool. 

1639 

1640 :param item_list: The list of items to process. 

1641 :type item_list: Collection[Any] 

1642 """ 

1643 max_workers = self.max_workers 

1644 abort_event = threading.Event() 

1645 abort_lock = threading.Lock() 

1646 abort_status: LoopingStatus | None = None 

1647 

1648 def _set_abort(status: LoopingStatus) -> None: 

1649 nonlocal abort_status 

1650 with abort_lock: 

1651 if abort_status == LoopingStatus.Abort: 

1652 return 

1653 if status == LoopingStatus.Abort: 

1654 abort_status = LoopingStatus.Abort 

1655 elif abort_status is None: 

1656 abort_status = LoopingStatus.Quit 

1657 abort_event.set() 

1658 

1659 def _worker(i_item: int, item: Any) -> tuple[int, Any, LoopingStatus, Any, float]: 

1660 with self._thread_loop_context(i_item, int(self.n_item or 0), item): 

1661 self.looping_status = LoopingStatus.Continue 

1662 loop_item = self._build_loop_item() 

1663 with Timer(suppress_message=True) as timer: 

1664 payload = self._call_with_optional_payload(self.process, loop_item) 

1665 status = self.looping_status 

1666 

1667 if status in (LoopingStatus.Abort, LoopingStatus.Quit): 

1668 _set_abort(status) 

1669 return i_item, item, status, payload, timer.duration 

1670 

1671 if abort_event.is_set(): 

1672 return i_item, item, status, payload, timer.duration 

1673 

1674 loop_result = self._build_loop_result(payload, timer.duration) 

1675 if status == LoopingStatus.Continue: 

1676 self._call_with_optional_payload(self.accept_item, loop_result) 

1677 elif status == LoopingStatus.Skip: 

1678 self._call_with_optional_payload(self.skip_item, loop_result) 

1679 

1680 return i_item, item, status, payload, timer.duration 

1681 

1682 pending: set[Future[tuple[int, Any, LoopingStatus, Any, float]]] = set() 

1683 items_iter = iter(enumerate(item_list)) 

1684 

1685 def _submit_next() -> bool: 

1686 try: 

1687 idx, itm = next(items_iter) 

1688 except StopIteration: 

1689 return False 

1690 fut: Future[tuple[int, Any, LoopingStatus, Any, float]] = executor.submit(_worker, idx, itm) 

1691 pending.add(fut) 

1692 return True 

1693 

1694 with ThreadPoolExecutor(max_workers=max_workers) as executor: 

1695 while len(pending) < max_workers and _submit_next(): 

1696 pass 

1697 

1698 while pending: 

1699 done, _ = wait(pending, return_when=FIRST_COMPLETED) 

1700 for fut in done: 

1701 pending.remove(fut) 

1702 i_item, item, status, _payload, duration = fut.result() 

1703 self._process_durations.append(duration) 

1704 self.item = item 

1705 self.i_item = i_item 

1706 self.format_progress_message() 

1707 self._user_interface.display_progress_message(self.progress_message, i_item, self.n_item, 0.1) 

1708 self._user_interface.update_task(self.replica_name, increment=1) 

1709 

1710 if abort_event.is_set(): 

1711 continue 

1712 

1713 while len(pending) < max_workers and _submit_next(): 

1714 pass 

1715 

1716 if abort_status is not None: 

1717 self.looping_status = abort_status 

1718 

1719 def _process_parallel_for_loop_with_queue(self, item_list: Collection[Any]) -> None: 

1720 """ 

1721 Process items in parallel using a producer/consumer queue. 

1722 

1723 Items are processed in worker threads, while a dedicated consumer thread handles the post-processing hooks. 

1724 

1725 :param item_list: The list of items to process. 

1726 :type item_list: Collection[Any] 

1727 """ 

1728 # the idea is the following, we have a single consumer thread that is constantly trying to pull 

1729 # items out of a shared queue and we have a pool of producer threads that are putting items in the queue as 

1730 # long as there is space into it (back-pressure). 

1731 # it the queue is full, then the producer threads are set to sleep for a short interval of time and waken up 

1732 # again after some times for another attempt to write the output in the queue. 

1733 # when there are no more items to execute, the main thread is pushing a sentinel object into the queue 

1734 # marking the end of the processing, so that the consumer thread can be gracefully terminated. 

1735 max_workers = self.max_workers 

1736 result_queue: queue.Queue[object] = queue.Queue(maxsize=self.queue_size) 

1737 abort_event = threading.Event() 

1738 abort_lock = threading.Lock() 

1739 abort_status: LoopingStatus | None = None 

1740 sentinel = object() 

1741 

1742 def _set_abort(status: LoopingStatus) -> None: 

1743 nonlocal abort_status 

1744 with abort_lock: 

1745 if abort_status == LoopingStatus.Abort: 1745 ↛ 1746line 1745 didn't jump to line 1746 because the condition on line 1745 was never true

1746 return 

1747 if status == LoopingStatus.Abort: 

1748 abort_status = LoopingStatus.Abort 

1749 elif abort_status is None: 1749 ↛ 1751line 1749 didn't jump to line 1751

1750 abort_status = LoopingStatus.Quit 

1751 abort_event.set() 

1752 

1753 def _get_abort_status() -> LoopingStatus | None: 

1754 with abort_lock: 

1755 return abort_status 

1756 

1757 def _worker(batch_items: list[tuple[int, Any]]) -> None: 

1758 batch_results: list[tuple[LoopItem, LoopResult]] = [] 

1759 for i_item, item in batch_items: 

1760 if abort_event.is_set(): 

1761 break 

1762 with self._thread_loop_context(i_item, int(self.n_item or 0), item): 

1763 self.looping_status = LoopingStatus.Continue 

1764 loop_item = self._build_loop_item() 

1765 with Timer(suppress_message=True) as timer: 

1766 payload = self._call_with_optional_payload(self.process, loop_item) 

1767 status = self.looping_status 

1768 if status in (LoopingStatus.Abort, LoopingStatus.Quit): 

1769 _set_abort(status) 

1770 loop_result = self._build_loop_result(payload, timer.duration) 

1771 batch_results.append((loop_item, loop_result)) 

1772 if status in (LoopingStatus.Abort, LoopingStatus.Quit): 

1773 break 

1774 if batch_results: 

1775 result_queue.put(batch_results) 

1776 

1777 def _consumer() -> None: 

1778 self.consumer_start() 

1779 try: 

1780 while True: 

1781 queued = result_queue.get() 

1782 if queued is sentinel: 

1783 break 

1784 batch_results = cast(list[tuple[LoopItem, LoopResult]], queued) 

1785 for loop_item, loop_result in batch_results: 

1786 with self._thread_loop_context(loop_item.i_item, loop_item.n_item, loop_item.payload): 

1787 self.looping_status = loop_result.looping_status 

1788 self.format_progress_message() 

1789 self._user_interface.display_progress_message( 

1790 self.progress_message, loop_item.i_item, self.n_item, 0.1 

1791 ) 

1792 self._process_durations.append(loop_result.duration) 

1793 self._user_interface.update_task(self.replica_name, increment=1) 

1794 

1795 if _get_abort_status() is None and loop_result.looping_status in ( 

1796 LoopingStatus.Continue, 

1797 LoopingStatus.Skip, 

1798 ): 

1799 self._call_with_optional_payload(self.consumer_process, loop_result) 

1800 finally: 

1801 self.consumer_finish() 

1802 

1803 pending: set[Future[None]] = set() 

1804 items_iter = iter(enumerate(item_list)) 

1805 

1806 def _submit_next() -> bool: 

1807 if abort_event.is_set(): 

1808 return False 

1809 batch_items: list[tuple[int, Any]] = [] 

1810 for _ in range(self.queue_batch_size): 

1811 try: 

1812 idx, itm = next(items_iter) 

1813 except StopIteration: 

1814 break 

1815 batch_items.append((idx, itm)) 

1816 if not batch_items: 

1817 return False 

1818 fut: Future[None] = executor.submit(_worker, batch_items) 

1819 pending.add(fut) 

1820 return True 

1821 

1822 consumer_thread = threading.Thread(target=_consumer, name=f'{self.name}-consumer') 

1823 consumer_thread.start() 

1824 

1825 try: 

1826 with ThreadPoolExecutor(max_workers=max_workers) as executor: 

1827 while len(pending) < max_workers and _submit_next(): 

1828 pass 

1829 

1830 while pending: 

1831 done, _ = wait(pending, return_when=FIRST_COMPLETED) 

1832 for fut in done: 

1833 pending.remove(fut) 

1834 fut.result() 

1835 

1836 if abort_event.is_set(): 

1837 continue 

1838 

1839 while len(pending) < max_workers and _submit_next(): 

1840 pass 

1841 finally: 

1842 result_queue.put(sentinel) 

1843 consumer_thread.join() 

1844 

1845 if abort_status is not None: 

1846 self.looping_status = abort_status 

1847 

1848 def _execute_while_loop(self) -> None: 

1849 """Executes the processor within a while loop. 

1850 

1851 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the 

1852 appropriate implementation depending on the processor LoopType. 

1853 """ 

1854 # it is a while loop, so a priori we don't know how many iterations we will have, nevertheless, we 

1855 # can have a progress bar with 'total' set to None, so that it goes in the so-called indeterminate 

1856 # progress. See https://rich.readthedocs.io/en/stable/progress.html#indeterminate-progress 

1857 # we initialise n_item outside the loop, because it is possible that the user has a way to define n_item 

1858 # and he can do it within the loop. 

1859 self.n_item = None 

1860 with contextlib.ExitStack() as self._resource_stack: 

1861 self.acquire_resources() 

1862 self._wall_clock_start = time.perf_counter() 

1863 self.start() 

1864 

1865 # turn the processor status to run 

1866 self.processor_status = ProcessorStatus.Run 

1867 

1868 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item) 

1869 

1870 # we are ready to start the looping. For statistics, we can count the iterations. 

1871 self.i_item = 0 

1872 while self.while_condition(): 

1873 # set the looping status to Continue. The user may want to change it in the process method. 

1874 self.looping_status = LoopingStatus.Continue 

1875 

1876 # send a message to the user interface 

1877 self.format_progress_message() 

1878 self._user_interface.display_progress_message( 

1879 self.progress_message, self.i_item, self.n_item, frequency=0.1 

1880 ) 

1881 

1882 # wrap the execution in a timer to measure how long it too for statistical reasons. 

1883 with Timer(suppress_message=True) as timer: 

1884 self.process() 

1885 self._process_durations.append(timer.duration) 

1886 

1887 # modify the loop depending on the looping status 

1888 if self.looping_status == LoopingStatus.Continue: 

1889 self.accept_item() 

1890 elif self.looping_status == LoopingStatus.Skip: 

1891 self.skip_item() 

1892 else: # equiv to if self.looping_status in [LoopingStatus.Abort, LoopingStatus.Quit]: 

1893 break 

1894 

1895 # update the progress bar. if self.n_item is still None, then the progress bar will show indeterminate 

1896 # progress. 

1897 self._user_interface.update_task(self.replica_name, self.i_item + 1, 1, self.n_item) 

1898 

1899 # now that the loop is finished, we know how many elements we processed 

1900 if self.n_item is None: 

1901 self.n_item = self.i_item 

1902 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item) 

1903 

1904 self.finish() 

1905 

1906 def acquire_resources(self) -> None: 

1907 """ 

1908 Acquires resources and add them to the resource stack. 

1909 

1910 The whole body of the :meth:`execute` method is within a context structure. The idea is that if any part of 

1911 the code inside should throw an exception that breaking the execution, we want to be sure that all stateful 

1912 resources are properly closed. 

1913 

1914 Since the number of resources may vary, the variable number of nested `with` statements has been replaced by 

1915 an `ExitStack <https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack>`_. Resources, 

1916 like open files, timers, db connections, need to be added to the resource stacks in this method. 

1917 

1918 In the case a processor is being executed within a :class:`~mafw.processor.ProcessorList`, then some resources might be shared, and 

1919 for this reason they are not added to the stack. This selection can be done via the private 

1920 :attr:`local_resource_acquisition`. This is normally True, meaning that the processor will handle its resources 

1921 independently, but when the processor is executed from a :class:`~mafw.processor.ProcessorList`, this flag is automatically turned to 

1922 False. 

1923 

1924 If the user wants to add additional resources, he has to overload this method calling the super to preserve 

1925 the original resources. If he wants to have shared resources among different processors executed from inside 

1926 a processor list, he has to overload the :class:`~mafw.processor.ProcessorList` class as well. 

1927 """ 

1928 # Both the timer and the user interface will be added to the processor resource stack only if the processor is 

1929 # set to acquire its own resources. 

1930 # The timer and the user interface have in-built enter and exit method. 

1931 if self._resource_acquisition: 

1932 self.timer = self._resource_stack.enter_context(Timer(**self._timer_parameters)) 

1933 self._resource_stack.enter_context(self._user_interface) 

1934 

1935 # For the database it is a bit different. 

1936 if self._database is None and self._database_conf is None: 

1937 # no database, nor configuration. 

1938 # we cannot do anything 

1939 pass 

1940 elif self._database is None and self._database_conf is not None: 

1941 # no db, but we got a configuration. 

1942 # we can make a db. 

1943 # This processor will try to make a valid connection, and in case it succeeds, it will add the database to 

1944 # the resource stack. 

1945 # The database has an enter method, but it is to generate transaction. 

1946 # We will add the database.close via the callback method. 

1947 if 'DBConfiguration' in self._database_conf: 

1948 conf = self._database_conf['DBConfiguration'] # type1 

1949 else: 

1950 conf = self._database_conf # type2 

1951 

1952 db_url, connection_parameters = build_connection_parameters(conf) 

1953 

1954 self._database = connect(db_url, **connection_parameters) # type: ignore # peewee is not returning a DB 

1955 self._resource_stack.callback(self._database.close) 

1956 try: 

1957 self._database.connect() 

1958 except peewee.OperationalError as e: 

1959 log.critical('Unable to connect to %s', db_url) 

1960 raise e 

1961 database_proxy.initialize(self._database) 

1962 if self.create_standard_tables: 

1963 standard_tables = mafw_model_register.get_standard_tables() 

1964 self.database.create_tables(standard_tables) 

1965 for table in standard_tables: 

1966 table.init() 

1967 

1968 else: # equivalent to: if self._database is not None: 

1969 # we got a database, so very likely we are inside a processor list 

1970 # the connection has been already set and the initialisation as well. 

1971 # nothing else to do here. 

1972 # do not put the database in the exit stack. who create it has also to close it. 

1973 pass 

1974 

1975 def start(self) -> None: 

1976 """ 

1977 Start method. 

1978 

1979 The user can overload this method, including all steps that should be performed at the beginning of the 

1980 operation. 

1981 

1982 If the user decides to overload it, it should include a call to the super method. 

1983 """ 

1984 self._mark_super_call('start') 

1985 self.processor_status = ProcessorStatus.Start 

1986 self._remove_orphan_files() 

1987 

1988 def get_items(self) -> Collection[Any]: 

1989 """ 

1990 Returns the item collections for the processor loop. 

1991 

1992 This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the 

1993 database, or a list of files from the disk to be processed. 

1994 

1995 :return: A collection of items for the loop 

1996 :rtype: Collection[Any] 

1997 """ 

1998 return [] 

1999 

2000 def while_condition(self) -> bool: 

2001 """ 

2002 Return the while condition 

2003 

2004 :return: True if the while loop has to continue, false otherwise. 

2005 :rtype: bool 

2006 """ 

2007 return False 

2008 

2009 def process(self) -> None: 

2010 """ 

2011 Processes the current item. 

2012 

2013 This is the core of the Processor, where the user has to define the calculations required. 

2014 

2015 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopItem` 

2016 parameter if the user prefers not to rely on thread-local access to :attr:`.Processor.item`, :attr:`.Processor.i_item` and 

2017 :attr:`.Processor.n_item`. 

2018 """ 

2019 pass 

2020 

2021 def accept_item(self) -> None: 

2022 """ 

2023 Does post process actions on a successfully processed item. 

2024 

2025 Within the :meth:`process`, the user left the looping status to Continue, so it means that everything looks 

2026 good and this is the right place to perform database updates or file savings. 

2027 

2028 .. seealso: 

2029 Have a look at :meth:`skip_item` for what to do in case something went wrong. 

2030 

2031 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult` 

2032 parameter for direct access to the processed payload and looping status. 

2033 """ 

2034 pass 

2035 

2036 def skip_item(self) -> None: 

2037 """ 

2038 Does post process actions on a *NOT* successfully processed item. 

2039 

2040 Within the :meth:`process`, the user set the looping status to Skip, so it means that something went wrong 

2041 and here corrective actions can be taken if needed. 

2042 

2043 .. seealso: 

2044 Have a look at :meth:`accept_item` for what to do in case everything was OK. 

2045 

2046 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult` 

2047 parameter for direct access to the processed payload and looping status. 

2048 """ 

2049 pass 

2050 

2051 def consumer_start(self) -> None: 

2052 """ 

2053 Executes once in the consumer thread for the parallel queue loop. 

2054 

2055 This hook mirrors :meth:`start` but is only used with the queue-based parallel loop. 

2056 """ 

2057 pass 

2058 

2059 def consumer_process(self, loop_result: LoopResult | None = None) -> None: 

2060 """ 

2061 Handle processed items in the consumer thread for the parallel queue loop. 

2062 

2063 By default this dispatches to :meth:`accept_item` or :meth:`skip_item` based on the looping status. The method 

2064 can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult` payload. 

2065 """ 

2066 if loop_result is None: 

2067 loop_result = self._build_loop_result(None, 0.0) 

2068 if self.looping_status == LoopingStatus.Continue: 

2069 self._call_with_optional_payload(self.accept_item, loop_result) 

2070 elif self.looping_status == LoopingStatus.Skip: 

2071 self._call_with_optional_payload(self.skip_item, loop_result) 

2072 

2073 def consumer_finish(self) -> None: 

2074 """ 

2075 Executes once in the consumer thread after the queue has been drained. 

2076 """ 

2077 pass 

2078 

2079 def finish(self) -> None: 

2080 """ 

2081 Concludes the execution. 

2082 

2083 The user can reimplement this method if there are some conclusive tasks that must be achieved. 

2084 Always include a call to super(). 

2085 """ 

2086 self._mark_super_call('finish') 

2087 self.processor_status = ProcessorStatus.Finish 

2088 if self.looping_status == LoopingStatus.Abort: 

2089 self.processor_exit_status = ProcessorExitStatus.Aborted 

2090 self.print_process_statistics() 

2091 

2092 def print_process_statistics(self) -> None: 

2093 """ 

2094 Print the process statistics. 

2095 

2096 A utility method to display the fastest, the slowest and the average timing required to process on a single 

2097 item. This is particularly useful when the looping processor is part of a ProcessorList. 

2098 """ 

2099 if len(self._process_durations): 

2100 log.info('[cyan] Processed %s items.' % len(self._process_durations)) 

2101 log.info( 

2102 '[cyan] Fastest item process duration: %s ' 

2103 % pretty_format_duration(min(self._process_durations), n_digits=3) 

2104 ) 

2105 log.info( 

2106 '[cyan] Slowest item process duration: %s ' 

2107 % pretty_format_duration(max(self._process_durations), n_digits=3) 

2108 ) 

2109 log.info( 

2110 '[cyan] Average item process duration: %s ' 

2111 % pretty_format_duration((sum(self._process_durations) / len(self._process_durations)), n_digits=3) 

2112 ) 

2113 if self._wall_clock_start is not None: 

2114 total_duration = time.perf_counter() - self._wall_clock_start 

2115 else: 

2116 total_duration = sum(self._process_durations) 

2117 log.info('[cyan] Total process duration: %s' % pretty_format_duration(total_duration, n_digits=3)) 

2118 

2119 def _remove_orphan_files(self) -> None: 

2120 """ 

2121 Remove orphan files. 

2122 

2123 If a connection to the database is available, then the OrphanFile standard table is queried for all its entries, 

2124 and all the files are then removed. 

2125 

2126 The user can turn off this behaviour by switching the :attr:`~mafw.processor.Processor.remove_orphan_files` to False. 

2127 

2128 """ 

2129 if self._database is None or self.remove_orphan_files is False: 

2130 # no database connection or no wish to remove orphan files, it does not make sense to continue 

2131 return 

2132 

2133 try: 

2134 OrphanFile = cast(MAFwBaseModel, mafw_model_register.get_model('OrphanFile')) 

2135 except KeyError: 

2136 log.warning('OrphanFile table not found in DB. Please verify database integrity') 

2137 return 

2138 

2139 if TYPE_CHECKING: 

2140 assert hasattr(OrphanFile, '_meta') 

2141 

2142 orphan_files = OrphanFile.select().execute() # type: ignore[no-untyped-call] 

2143 if len(orphan_files) != 0: 

2144 msg = f'[yellow]Pruning orphan files ({sum(len(f.filenames) for f in orphan_files)})...' 

2145 log.info(msg) 

2146 for orphan in orphan_files: 

2147 # filenames is a list of files: 

2148 for f in orphan.filenames: 

2149 f.unlink(missing_ok=True) 

2150 

2151 OrphanFile.delete().execute() # type: ignore[no-untyped-call] 

2152 

2153 

2154class ProcessorList(list[Union['Processor', 'ProcessorList']]): 

2155 """ 

2156 A list like collection of processors. 

2157 

2158 ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList. 

2159 

2160 An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError. 

2161 

2162 Along with an iterable of processors, a new processor list can be built using the following parameters. 

2163 """ 

2164 

2165 def __init__( 

2166 self, 

2167 *args: Processor | ProcessorList, 

2168 name: str | None = None, 

2169 description: str | None = None, 

2170 timer: Timer | None = None, 

2171 timer_params: dict[str, Any] | None = None, 

2172 user_interface: UserInterfaceBase | None = None, 

2173 database: Database | None = None, 

2174 database_conf: dict[str, Any] | None = None, 

2175 create_standard_tables: bool = True, 

2176 ): 

2177 """ 

2178 Constructor parameters: 

2179 

2180 :param name: The name of the processor list. Defaults to ProcessorList. 

2181 :type name: str, Optional 

2182 :param description: An optional short description. Default to ProcessorList. 

2183 :type description: str, Optional 

2184 :param timer: The timer object. If None is provided, a new one will be created. Defaults to None. 

2185 :type timer: Timer, Optional 

2186 :param timer_params: A dictionary of parameter to build the timer object. Defaults to None. 

2187 :type timer_params: dict, Optional 

2188 :param user_interface: A user interface. Defaults to None 

2189 :type user_interface: UserInterfaceBase, Optional 

2190 :param database: A database instance. Defaults to None. 

2191 :type database: Database, Optional 

2192 :param database_conf: Configuration for the database. Default to None. 

2193 :type database_conf: dict, Optional 

2194 :param create_standard_tables: Whether or not to create the standard tables. Defaults to True. 

2195 :type create_standard_tables: bool, Optional 

2196 """ 

2197 

2198 # validate_items takes a tuple of processors, that's why we don't unpack args. 

2199 super().__init__(self.validate_items(args)) 

2200 self._name = name or self.__class__.__name__ 

2201 self.description = description or self._name 

2202 

2203 self.timer = timer 

2204 self.timer_params = timer_params or {} 

2205 self._user_interface = user_interface or ConsoleInterface() 

2206 

2207 self._resource_stack: contextlib.ExitStack 

2208 self._processor_exit_status: ProcessorExitStatus = ProcessorExitStatus.Successful 

2209 self.nested_list = False 

2210 """ 

2211 Boolean flag to identify that this list is actually inside another list. 

2212  

2213 Similarly to the local resource flag for the :class:`.Processor`, this flag prevent the user interface to be  

2214 added to the resource stack. 

2215 """ 

2216 

2217 # database stuff 

2218 self._database: peewee.Database | None = database 

2219 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf) 

2220 self.create_standard_tables = create_standard_tables 

2221 """The boolean flag to proceed or skip with standard table creation and initialisation""" 

2222 

2223 def __setitem__( # type: ignore[override] 

2224 self, 

2225 __index: SupportsIndex, 

2226 __object: Processor | ProcessorList, 

2227 ) -> None: 

2228 super().__setitem__(__index, self.validate_item(__object)) 

2229 

2230 def insert(self, __index: SupportsIndex, __object: Processor | ProcessorList) -> None: 

2231 """Adds a new processor at the specified index.""" 

2232 super().insert(__index, self.validate_item(__object)) 

2233 

2234 def append(self, __object: Processor | ProcessorList) -> None: 

2235 """Appends a new processor at the end of the list.""" 

2236 super().append(self.validate_item(__object)) 

2237 

2238 def extend(self, __iterable: Iterable[Processor | ProcessorList]) -> None: 

2239 """Extends the processor list with a list of processors.""" 

2240 if isinstance(__iterable, type(self)): 

2241 super().extend(__iterable) 

2242 else: 

2243 super().extend([self.validate_item(item) for item in __iterable]) 

2244 

2245 @staticmethod 

2246 def validate_item(item: Processor | ProcessorList) -> Processor | ProcessorList: 

2247 """Validates the item being added.""" 

2248 if isinstance(item, Processor): 

2249 item.local_resource_acquisition = False 

2250 return item 

2251 elif isinstance(item, ProcessorList): 

2252 item.timer_params = dict(suppress_message=True) 

2253 item.nested_list = True 

2254 return item 

2255 else: 

2256 raise TypeError(f'Expected Processor or ProcessorList, got {type(item).__name__}') 

2257 

2258 @staticmethod 

2259 def validate_items(items: tuple[Processor | ProcessorList, ...] = ()) -> tuple[Processor | ProcessorList, ...]: 

2260 """Validates a tuple of items being added.""" 

2261 if not items: 

2262 return tuple() 

2263 return tuple([ProcessorList.validate_item(item) for item in items if item is not None]) 

2264 

2265 @property 

2266 def name(self) -> str: 

2267 """ 

2268 The name of the processor list 

2269 

2270 :return: The name of the processor list 

2271 :rtype: str 

2272 """ 

2273 return self._name 

2274 

2275 @name.setter 

2276 def name(self, name: str) -> None: 

2277 self._name = name 

2278 

2279 @property 

2280 def processor_exit_status(self) -> ProcessorExitStatus: 

2281 """ 

2282 The processor exit status. 

2283 

2284 It refers to the whole processor list execution. 

2285 """ 

2286 return self._processor_exit_status 

2287 

2288 @processor_exit_status.setter 

2289 def processor_exit_status(self, status: ProcessorExitStatus) -> None: 

2290 self._processor_exit_status = status 

2291 

2292 @property 

2293 def database(self) -> peewee.Database: 

2294 """ 

2295 Returns the database instance 

2296 

2297 :return: A database instance 

2298 :raises MissingDatabase: if a database connection is missing. 

2299 """ 

2300 if self._database is None: 

2301 raise MissingDatabase('Database connection not initialized') 

2302 return self._database 

2303 

2304 def execute(self) -> ProcessorExitStatus: 

2305 """ 

2306 Execute the list of processors. 

2307 

2308 Similarly to the :class:`Processor`, ProcessorList can be executed. In simple words, the execute 

2309 method of each processor in the list is called exactly in the same sequence as they were added. 

2310 """ 

2311 with contextlib.ExitStack() as self._resource_stack: 

2312 self.acquire_resources() 

2313 self._user_interface.create_task(self.name, self.description, completed=0, increment=0, total=len(self)) 

2314 for i, item in enumerate(self): 

2315 if isinstance(item, Processor): 

2316 log.info('[bold]Executing [red]%s[/red] processor[/bold]' % item.replica_name) 

2317 else: 

2318 log.info('[bold]Executing [blue]%s[/blue] processor list[/bold]' % item.name) 

2319 self.distribute_resources(item) 

2320 item.execute() 

2321 self._user_interface.update_task(self.name, increment=1) # i+1, 1, len(self)) 

2322 self._processor_exit_status = item.processor_exit_status 

2323 if self._processor_exit_status == ProcessorExitStatus.Aborted: 

2324 msg = 'Processor %s caused the processor list to abort' % item.name 

2325 log.error(msg) 

2326 raise AbortProcessorException(msg) 

2327 self._user_interface.update_task(self.name, completed=len(self), total=len(self)) 

2328 return self._processor_exit_status 

2329 

2330 def acquire_resources(self) -> None: 

2331 """Acquires external resources.""" 

2332 # The strategy is similar to the one for processor. if we do get resources already active (not None) then we use 

2333 # them, otherwise, we create them and we add them to the resource stack. 

2334 if self.timer is None: 

2335 self.timer = self._resource_stack.enter_context(Timer(**self.timer_params)) 

2336 # The user interface is very likely already initialised by the runner. 

2337 # But if this is a nested list, then we must not push the user interface in the stack 

2338 # otherwise the user interface context (progress for rich) will be stopped at the end 

2339 # of the nested list. 

2340 if not self.nested_list: 

2341 self._resource_stack.enter_context(self._user_interface) 

2342 if self._database is None and self._database_conf is None: 

2343 # no database, nor configuration. 

2344 # we cannot do anything 

2345 pass 

2346 elif self._database is None and self._database_conf is not None: 

2347 # no db, but we got a configuration. 

2348 # we can make a db 

2349 if 'DBConfiguration' in self._database_conf: 

2350 conf = self._database_conf['DBConfiguration'] # type1 

2351 else: 

2352 conf = self._database_conf # type2 

2353 

2354 db_url, connection_parameters = build_connection_parameters(conf) 

2355 

2356 self._database = connect(db_url, **connection_parameters) # type: ignore # peewee is not returning a DB 

2357 try: 

2358 self._database.connect() 

2359 self._resource_stack.callback(self._database.close) 

2360 except peewee.OperationalError as e: 

2361 log.critical('Unable to connect to %s', db_url) 

2362 raise e 

2363 database_proxy.initialize(self._database) 

2364 if self.create_standard_tables: 

2365 standard_tables = mafw_model_register.get_standard_tables() 

2366 self.database.create_tables(standard_tables) 

2367 for table in standard_tables: 

2368 table.init() 

2369 else: # equiv to if self._database is not None: 

2370 # we got a database, so very likely we are inside a processor list 

2371 # the connection has been already set and the initialisation as well. 

2372 # nothing else to do here. 

2373 pass 

2374 

2375 def distribute_resources(self, processor: Processor | Self) -> None: 

2376 """Distributes the external resources to the items in the list.""" 

2377 processor.timer = self.timer 

2378 processor._user_interface = self._user_interface 

2379 processor._database = self._database