Coverage for src / mafw / processor.py: 99%

1024 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 16:10 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core 

6functionality of the MAFw. 

7""" 

8 

9from __future__ import annotations 

10 

11import contextlib 

12import inspect 

13import logging 

14import os 

15import queue 

16import threading 

17import time 

18import warnings 

19from collections import OrderedDict 

20from collections.abc import Callable, Iterator 

21from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait 

22from copy import copy, deepcopy 

23from functools import wraps 

24from itertools import count 

25from typing import ( 

26 TYPE_CHECKING, 

27 Any, 

28 Collection, 

29 Generic, 

30 Iterable, 

31 Self, 

32 SupportsIndex, 

33 TypeVar, 

34 Union, 

35 cast, 

36 get_args, 

37 get_origin, 

38 get_type_hints, 

39) 

40 

41import peewee 

42from peewee import Database 

43 

44# noinspection PyUnresolvedReferences 

45from playhouse.db_url import connect 

46 

47import mafw.db.db_filter 

48from mafw.active import Active 

49from mafw.db.db_model import MAFwBaseModel, database_proxy, mafw_model_register 

50from mafw.enumerators import LoopingStatus, LoopType, ProcessorExitStatus, ProcessorStatus 

51from mafw.mafw_errors import ( 

52 AbortProcessorException, 

53 MissingDatabase, 

54 MissingOverloadedMethod, 

55 MissingSuperCall, 

56 ProcessorParameterError, 

57) 

58from mafw.models.filter_schema import FilterSchema 

59from mafw.models.loop_payloads import LoopItem, LoopResult 

60from mafw.models.parameter_schema import ParameterSchema 

61from mafw.models.processor_schema import ProcessorSchema 

62from mafw.timer import Timer, pretty_format_duration 

63from mafw.tools.generics import deep_update 

64from mafw.tools.parallel import is_free_threading 

65from mafw.tools.regexp import extract_protocol 

66from mafw.ui.abstract_user_interface import UserInterfaceBase 

67from mafw.ui.console_user_interface import ConsoleInterface 

68 

69log = logging.getLogger(__name__) 

70 

71ParameterType = TypeVar('ParameterType') 

72"""Generic variable type for the :class:`ActiveParameter` and :class:`PassiveParameter`.""" 

73 

74 

75def validate_database_conf(database_conf: dict[str, Any] | None = None) -> dict[str, Any] | None: 

76 """ 

77 Validates the database configuration. 

78 

79 :param database_conf: The input database configuration. Defaults to None. 

80 :type database_conf: dict, Optional 

81 :return: Either the validated database configuration or None if it is invalid. 

82 :rtype: dict, None 

83 """ 

84 if database_conf is None: 

85 return None 

86 

87 # dict is mutable, if I change it inside the function, I change it also outside. 

88 conf = database_conf.copy() 

89 

90 if 'DBConfiguration' in conf: 

91 # the database_conf is the steering file. Extract the DBConfiguration table 

92 conf = conf['DBConfiguration'] 

93 required_fields = ['URL'] 

94 if all([field in conf for field in required_fields]): 

95 return database_conf 

96 else: 

97 return None 

98 

99 

100class PassiveParameter(Generic[ParameterType]): 

101 """ 

102 An helper class to store processor parameter value and metadata. 

103 

104 This class is the private interface used by the :class:`ActiveParameter` descriptor to store its value and metadata. 

105 

106 When a new :class:`.ActiveParameter` is added to a class, an instance of a PassiveParameter is added to the 

107 processor parameter :attr:`register <.Processor._processor_parameters>`. 

108 

109 .. seealso:: 

110 

111 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor 

112 parameters <parameters>` 

113 

114 .. versionchanged:: v2.0.0 

115 

116 User should only use :class:`ActiveParameter` and never manually instantiate :class:`PassiveParameter`. 

117 """ 

118 

119 def __init__( 

120 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '' 

121 ): 

122 """ 

123 Constructor parameters: 

124 

125 :param name: The name of the parameter. It must be a valid python identifier. 

126 :type name: str 

127 :param value: The set value of the parameter. If None, then the default value will be used. Defaults to None. 

128 :type value: ParameterType, Optional 

129 :param default: The default value for the parameter. It is used if the :attr:`value` is not provided. Defaults to None. 

130 :type default: ParameterType, Optional 

131 :param help_doc: A brief explanation of the parameter. 

132 :type help_doc: str, Optional 

133 :raises ProcessorParameterError: if both `value` and `default` are not provided or if `name` is not a valid identifier. 

134 """ 

135 if not name.isidentifier(): 

136 raise ProcessorParameterError(f'{name} is not a valid python identifier.') 

137 

138 self.name = name 

139 

140 if value is not None: 

141 self._value: ParameterType = value 

142 self._is_set = True 

143 self._is_optional = False 

144 elif default is not None: 

145 self._value = default 

146 self._is_set = False 

147 self._is_optional = True 

148 else: 

149 raise ProcessorParameterError('Processor parameter cannot have both value and default value set to None') 

150 

151 self.doc = help_doc 

152 

153 def __rich_repr__(self) -> Iterator[Any]: 

154 yield 'name', self.name 

155 yield 'value', self.value, None 

156 yield 'help_doc', self.doc, '' 

157 

158 @property 

159 def is_set(self) -> bool: 

160 """ 

161 Property to check if the value has been set. 

162 

163 It is useful for optional parameter to see if the current value is the default one, or if the user set it. 

164 """ 

165 return self._is_set 

166 

167 @property 

168 def value(self) -> ParameterType: 

169 """ 

170 Gets the parameter value. 

171 

172 :return: The parameter value. 

173 :rtype: ParameterType 

174 :raises ProcessorParameterError: if both value and default were not defined. 

175 """ 

176 return self._value 

177 

178 @value.setter 

179 def value(self, value: ParameterType) -> None: 

180 """ 

181 Sets the parameter value. 

182 

183 :param value: The value to be set. 

184 :type value: ParameterType 

185 """ 

186 self._value = value 

187 self._is_set = True 

188 

189 @property 

190 def is_optional(self) -> bool: 

191 """ 

192 Property to check if the parameter is optional. 

193 

194 :return: True if the parameter is optional 

195 :rtype: bool 

196 """ 

197 return self._is_optional 

198 

199 def __repr__(self) -> str: 

200 args = ['name', 'value', 'doc'] 

201 values = [getattr(self, arg) for arg in args] 

202 return '{klass}({attrs})'.format( 

203 klass=self.__class__.__name__, 

204 attrs=', '.join('{}={!r}'.format(k, v) for k, v in zip(args, values)), 

205 ) 

206 

207 

208F = TypeVar('F', bound=Callable[..., Any]) 

209"""Type variable for generic callable with any return value.""" 

210 

211 

212def ensure_parameter_registration(func: F) -> F: 

213 """Decorator to ensure that before calling `func` the processor parameters have been registered.""" 

214 

215 @wraps(func) 

216 def wrapper(*args: Processor, **kwargs: Any) -> F: 

217 # the first positional arguments must be self 

218 if len(args) == 0: 

219 raise ProcessorParameterError( 

220 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.' 

221 ) 

222 self = args[0] 

223 if not isinstance(self, Processor): 

224 raise ProcessorParameterError( 

225 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.' 

226 ) 

227 if self._parameter_registered is False: 

228 self._register_parameters() 

229 return cast(F, func(*args, **kwargs)) 

230 

231 return cast(F, wrapper) 

232 

233 

234_ParameterRegistry = OrderedDict[str, 'ActiveParameter[Any]'] 

235 

236 

237def _copy_parent_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry: 

238 """ 

239 Build an ordered dictionary of ActiveParameters inherited from base classes. 

240 """ 

241 definitions: _ParameterRegistry = OrderedDict() 

242 for base in owner.__mro__[1:]: 

243 base_definitions = getattr(base, '_parameter_definitions', None) 

244 if base_definitions: 

245 for name, descriptor in base_definitions.items(): 

246 definitions[name] = descriptor 

247 return definitions 

248 

249 

250def _ensure_parameter_definitions(owner: type[Processor]) -> _ParameterRegistry: 

251 """ 

252 Return the class-level parameter registry, creating it by copying base definitions if necessary. 

253 """ 

254 definitions: _ParameterRegistry | None = owner.__dict__.get('_parameter_definitions') 

255 if definitions is not None: 

256 return definitions 

257 

258 definitions = _copy_parent_parameter_definitions(owner) 

259 setattr(owner, '_parameter_definitions', definitions) 

260 return definitions 

261 

262 

263def _validate_filter_schema(owner: type[Processor]) -> None: 

264 """ 

265 Validate the optional :class:`~mafw.models.filter_schema.FilterSchema` declared on a Processor. 

266 """ 

267 schema = getattr(owner, '_filter_schema', None) 

268 if schema is None: 

269 return 

270 if not isinstance(schema, FilterSchema): 

271 raise ProcessorParameterError('Processor._filter_schema must be a FilterSchema instance') 

272 

273 root_model = schema.root_model 

274 if not (inspect.isclass(root_model) and issubclass(root_model, MAFwBaseModel)): 

275 raise ProcessorParameterError('FilterSchema.root_model must inherit from MAFwBaseModel') 

276 

277 seen_models = {root_model} 

278 for model in schema.allowed_models: 

279 if not (inspect.isclass(model) and issubclass(model, MAFwBaseModel)): 

280 raise ProcessorParameterError('FilterSchema.allowed_models must contain MAFwBaseModel subclasses') 

281 if model in seen_models: 

282 raise ProcessorParameterError(f'Model {model!r} already declared in FilterSchema') 

283 seen_models.add(model) 

284 

285 

286class ActiveParameter(Generic[ParameterType]): 

287 r""" 

288 The public interface to the processor parameter. 

289 

290 The behaviour of a :class:`Processor` can be customised by using processor parameters. The value of these 

291 parameters can be either set via a configuration file or directly when creating the class. 

292 

293 If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an 

294 ActiveParameter instance in this way: 

295 

296 .. code-block:: 

297 

298 class MyProcessor(Processor): 

299 

300 # this is the input folder 

301 input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files') 

302 

303 def __init__(self, *args, **kwargs): 

304 super().__init(*args, **kwargs) 

305 

306 # change the input folder to something else 

307 self.input_folder = Path(r'D:\data') 

308 

309 # get the value of the parameter 

310 print(self.input_folder) 

311 

312 The ActiveParameter is a `descriptor <https://docs.python.org/3/glossary.html#term-descriptor>`_, it means that 

313 when you create one of them, a lot of work is done behind the scene. 

314 

315 In simple words, a processor parameter is made by two objects: a public interface where the user can easily 

316 access the value of the parameter and a private interface where all other information (default, documentation...) 

317 is also stored. 

318 

319 The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as 

320 in the code snippet above, the private interface is automatically created and will stay in the class instance 

321 until the end of the class lifetime. 

322 

323 To access the private interface, the user can use the :meth:`Processor.get_parameter` method using the parameter 

324 name as a key. 

325 

326 The user can assign to an ActiveParameter almost any name. There are just a few invalid parameter names that are 

327 used for other purposes. The list of reserved names is available :attr:`here <reserved_names>`. Should the user 

328 inadvertently use a reserved named, a :exc:`.ProcessorParameterError` is raised. 

329 

330 .. seealso:: 

331 

332 The private counter part in the :class:`PassiveParameter`. 

333 

334 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor 

335 parameters <parameters>` 

336 

337 The list of :attr:`reserved names <reserved_names>`. 

338 """ 

339 

340 reserved_names: list[str] = ['__logic__', '__filter__', '__new_only__', '__inheritance__', '__enable__'] 

341 """A list of names that cannot be used as processor parameter names. 

342  

343 - `__logic__`  

344 - `__filter__` 

345 - `__new_only__` 

346 - `__inheritance__` 

347 - `__enable__` 

348 """ 

349 

350 def __init__( 

351 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '' 

352 ): 

353 """ 

354 Constructor parameters: 

355 

356 :param name: The name of the parameter. 

357 :type name: str 

358 :param value: The initial value of the parameter. Defaults to None. 

359 :type value: ParameterType, Optional 

360 :param default: The default value of the parameter, to be used when ``value`` is not set., Defaults to None. 

361 :type default: ParameterType, Optional 

362 :param help_doc: An explanatory text describing the parameter. 

363 :type help_doc: str, Optional 

364 """ 

365 

366 self._value = value 

367 self._default = default 

368 self._help_doc = help_doc 

369 self._external_name = self._validate_name(name) 

370 

371 def _validate_name(self, proposed_name: str) -> str: 

372 """ 

373 Validate that the proposed parameter name is not in the list of forbidden names. 

374 

375 This private method checks if the provided name is allowed for use as a processor parameter. 

376 Names that are listed in :attr:`reserved_names` cannot be used as parameter names. 

377 

378 :param proposed_name: The name to be validated for use as a processor parameter. 

379 :type proposed_name: str 

380 :return: The validated name if it passes the forbidden names check. 

381 :rtype: str 

382 :raises ProcessorParameterError: If the proposed name is in the list of forbidden names. 

383 """ 

384 if proposed_name not in self.reserved_names: 

385 return proposed_name 

386 raise ProcessorParameterError(f'Attempt to use a forbidden name ({proposed_name})') 

387 

388 def __set_name__(self, owner: type[Processor], name: str) -> None: 

389 self.public_name = name 

390 self.private_name = f'param_{name}' 

391 self._owner = owner 

392 

393 definitions = _ensure_parameter_definitions(owner) 

394 existing = definitions.get(self._external_name) 

395 if existing is not None and getattr(existing, '_owner', None) is owner: 

396 if not hasattr(owner, '_parameter_definition_error'): 

397 setattr( 

398 owner, 

399 '_parameter_definition_error', 

400 ProcessorParameterError(f'Duplicated parameter name ({self._external_name}).'), 

401 ) 

402 return 

403 definitions[self._external_name] = self 

404 

405 def __get__(self, obj: Processor, obj_type: type[Processor]) -> ActiveParameter[ParameterType] | ParameterType: 

406 if obj is None: 

407 return self 

408 

409 # retrieve instance-level passive parameter 

410 param = cast(PassiveParameter[ParameterType], obj._processor_parameters[self._external_name]) 

411 return param.value 

412 

413 def __set__(self, obj: Processor, value: ParameterType) -> None: 

414 param = obj._processor_parameters[self._external_name] 

415 param.value = value 

416 

417 def to_schema(self) -> ParameterSchema: 

418 """ 

419 Returns the static schema describing this parameter. 

420 

421 The schema is derived solely from the descriptor metadata and does not instantiate the owning processor. 

422 """ 

423 annotation = self._resolve_parameter_annotation() 

424 default_value = self._schema_default_value() 

425 help_text = self._help_doc or None 

426 is_list = self._is_list_annotation(annotation, default_value) 

427 is_dict = self._is_dict_annotation(annotation, default_value) 

428 return ParameterSchema( 

429 name=self._external_name, 

430 annotation=annotation, 

431 default=default_value, 

432 help=help_text, 

433 is_list=is_list, 

434 is_dict=is_dict, 

435 ) 

436 

437 def _resolve_parameter_annotation(self) -> type | None: 

438 if not hasattr(self, 'public_name') or getattr(self, '_owner', None) is None: 

439 return None 

440 

441 target = getattr(self, '_owner', None) 

442 

443 try: 

444 hints = get_type_hints(target) 

445 except Exception: 

446 hints = {} 

447 

448 hint = hints.get(self.public_name) 

449 if hint is None: 

450 fallback = self._default if self._default is not None else self._value 

451 if fallback is not None: 

452 return type(fallback) 

453 return None 

454 

455 origin = get_origin(hint) 

456 if origin is ActiveParameter: 

457 args = get_args(hint) 

458 if args: 

459 return cast(type | None, args[0]) 

460 return None 

461 return cast(type | None, hint) 

462 

463 def _schema_default_value(self) -> Any: 

464 candidate = self._default if self._default is not None else self._value 

465 if candidate is None: 

466 return None 

467 try: 

468 return deepcopy(candidate) 

469 except Exception: 

470 return candidate 

471 

472 def _is_list_annotation(self, annotation: type | None, default_value: Any) -> bool: 

473 return self._matches_container(annotation, default_value, list) 

474 

475 def _is_dict_annotation(self, annotation: type | None, default_value: Any) -> bool: 

476 return self._matches_container(annotation, default_value, dict) 

477 

478 def _matches_container(self, annotation: type | None, default_value: Any, container: type) -> bool: 

479 type_hint = annotation 

480 if type_hint is None and default_value is not None: 

481 type_hint = type(default_value) 

482 

483 if type_hint is None: 

484 return False 

485 

486 origin = get_origin(type_hint) 

487 if origin is container: 

488 return True 

489 

490 if isinstance(type_hint, type) and issubclass(type_hint, container): 

491 return True 

492 

493 return False 

494 

495 

496class ProcessorMeta(type): 

497 """Metaclass that finalizes Processor subclasses before instantiation.""" 

498 

499 def __init__(cls, name: str, bases: tuple[type, ...], namespace: dict[str, Any]) -> None: 

500 """ 

501 Set up new Processor subclasses. 

502 

503 This method first checks the class for well-formed parameter definitions and filter metadata, then runs the 

504 super-call wrapping machinery described below. The wrappers are installed at class-creation time so that any 

505 later instance will be guarded before it even starts executing. 

506 """ 

507 super().__init__(name, bases, namespace) 

508 _ensure_parameter_definitions(cast(type['Processor'], cls)) 

509 _validate_filter_schema(cast(type['Processor'], cls)) 

510 error = getattr(cls, '_parameter_definition_error', None) 

511 if error is not None: 

512 delattr(cls, '_parameter_definition_error') 

513 raise error 

514 if any(base.__name__ == 'Processor' for base in cls.__mro__[1:]): 

515 apply_wrappers = getattr(cls, '_apply_super_call_wrappers', None) 

516 if apply_wrappers is not None: 516 ↛ exitline 516 didn't return from function '__init__' because the condition on line 516 was always true

517 apply_wrappers() 

518 

519 def __call__(cls, *args: Any, **kwargs: Any) -> 'ProcessorMeta': 

520 obj = type.__call__(cls, *args, **kwargs) 

521 obj.__post_init__() 

522 return cast(ProcessorMeta, obj) 

523 

524 

525# noinspection PyProtectedMember 

526class Processor(metaclass=ProcessorMeta): 

527 """ 

528 The basic processor. 

529 

530 A very comprehensive description of what a Processor does and how it works is available at :ref:`doc_processor`. 

531 """ 

532 

533 processor_status = Active(ProcessorStatus.Unknown) 

534 """Processor execution status""" 

535 

536 progress_message: str = f'{__qualname__} is working' 

537 """Message displayed to show the progress.  

538  

539 It can be customized with information about the current item in the loop by overloading the  

540 :meth:`format_progress_message`.""" 

541 

542 #: List of methods that should invoke their super implementation when overridden. 

543 _methods_to_be_checked_for_super: tuple[str, ...] = ('start', 'finish') 

544 

545 @classmethod 

546 def parameter_schema(cls) -> list[ParameterSchema]: 

547 """ 

548 Return the ordered static schema for the processor parameters defined on the class. 

549 

550 The schema is derived without instantiating the processor, keeping toolchains free from side effects. 

551 """ 

552 definitions = getattr(cls, '_parameter_definitions', None) 

553 if definitions is None: 

554 definitions = _ensure_parameter_definitions(cls) 

555 return [param.to_schema() for param in definitions.values()] 

556 

557 @classmethod 

558 def filter_schema(cls) -> FilterSchema | None: 

559 """ 

560 Optional metadata describing the models available for filtering. 

561 """ 

562 return getattr(cls, '_filter_schema', None) 

563 

564 @classmethod 

565 def processor_schema(cls) -> ProcessorSchema: 

566 """ 

567 Return the static schema for the processor. 

568 """ 

569 return ProcessorSchema(parameters=cls.parameter_schema(), filter=cls.filter_schema()) 

570 

571 _ids = count(0) 

572 """A counter for all processor instances""" 

573 

574 new_defaults: dict[str, Any] = {} 

575 """ 

576 A dictionary containing defaults value for the parameters to be overridden 

577  

578 .. versionadded:: v2.0.0 

579 """ 

580 

581 new_only_flag = 'new_only' 

582 

583 def __init__( 

584 self, 

585 name: str | None = None, 

586 description: str | None = None, 

587 config: dict[str, Any] | None = None, 

588 looper: LoopType | str = LoopType.ForLoop, 

589 user_interface: UserInterfaceBase | None = None, 

590 timer: Timer | None = None, 

591 timer_params: dict[str, Any] | None = None, 

592 database: Database | None = None, 

593 database_conf: dict[str, Any] | None = None, 

594 remove_orphan_files: bool = True, 

595 replica_id: str | None = None, 

596 create_standard_tables: bool = True, 

597 max_workers: int | None = None, 

598 queue_size: int | None = None, 

599 queue_batch_size: int | None = None, 

600 *args: Any, 

601 **kwargs: Any, 

602 ) -> None: 

603 """ 

604 Constructor parameters 

605 

606 :param name: The name of the processor. If None is provided, the class name is used instead. Defaults to None. 

607 :type name: str, Optional 

608 :param description: A short description of the processor task. Defaults to the processor name. 

609 :type description: str, Optional 

610 :param config: A configuration dictionary for this processor. Defaults to None. 

611 :type config: dict, Optional 

612 :param looper: Enumerator to define the looping type. Defaults to LoopType.ForLoop 

613 :type looper: LoopType, Optional 

614 :param user_interface: A user interface instance to be used by the processor to interact with the user. 

615 :type user_interface: UserInterfaceBase, Optional 

616 :param timer: A timer object to measure process duration. 

617 :type timer: Timer, Optional 

618 :param timer_params: Parameters for the timer object. 

619 :type timer_params: dict, Optional 

620 :param database: A database instance. Defaults to None. 

621 :type database: Database, Optional 

622 :param database_conf: Configuration for the database. Default to None. 

623 :type database_conf: dict, Optional 

624 :param remove_orphan_files: Boolean flag to remove files on disc without a reference to the database. 

625 See :ref:`std_tables` and :meth:`~mafw.processor.Processor._remove_orphan_files`. Defaults to True 

626 :type remove_orphan_files: bool, Optional 

627 :param replica_id: The replica identifier for the current processor. 

628 :type replica_id: str, Optional 

629 :param create_standard_tables: Boolean flag to create std tables on disk. Defaults to True 

630 :type create_standard_tables: bool, Optional 

631 :param max_workers: Number of worker threads for parallel loops. 

632 :type max_workers: int, Optional 

633 :param queue_size: Maximum size of the internal queue for the queue-based parallel loop. 

634 :type queue_size: int, Optional 

635 :param queue_batch_size: Number of items processed per worker task in the queue-based parallel loop. 

636 :type queue_batch_size: int, Optional 

637 :param kwargs: Keyword arguments that can be used to set processor parameters. 

638 """ 

639 

640 self.name = name or self.__class__.__name__ 

641 """The name of the processor.""" 

642 

643 self.unique_id = next(self._ids) 

644 """A unique identifier representing how many instances of Processor has been created.""" 

645 

646 self.replica_id = replica_id 

647 """ 

648 The replica identifier specified in the constructor 

649  

650 .. versionadded:: v2.0.0 

651 """ 

652 

653 self.description = description or self.name 

654 """A short description of the processor task.""" 

655 

656 self._item: Any = None 

657 """The current item of the loop.""" 

658 

659 self._looping_status: LoopingStatus = LoopingStatus.Continue 

660 """The looping status for the main thread.""" 

661 

662 self._thread_local = threading.local() 

663 """Thread-local storage for loop attributes in parallel execution. 

664  

665 .. versionadded:: v2.1.0 

666 """ 

667 

668 self._wall_clock_start: float | None = None 

669 """Timestamp when the looping execution began.""" 

670 

671 self.processor_exit_status = ProcessorExitStatus.Successful 

672 """Processor exit status""" 

673 

674 self.loop_type: LoopType = LoopType(looper) 

675 """ 

676 The loop type.  

677  

678 The value of this parameter can also be changed by the :func:`~mafw.decorators.execution_workflow` decorator  

679 factory.  

680  

681 See :class:`~mafw.enumerators.LoopType` for more details. 

682 """ 

683 

684 self.create_standard_tables = create_standard_tables 

685 """The boolean flag to proceed or skip with standard table creation and initialisation""" 

686 

687 self.max_workers = max_workers if max_workers is not None else self._compute_default_max_workers() 

688 """Maximum number of worker threads used in parallel loops.""" 

689 

690 computed_queue_size = max(1, self.max_workers * 2) 

691 self.queue_size = queue_size if queue_size is not None else computed_queue_size 

692 """Maximum size of the queue used by :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`.""" 

693 

694 self.queue_batch_size = max(1, queue_batch_size or 1) 

695 """Number of items processed per worker task in :class:`~mafw.enumerators.LoopType.ParallelForLoopWithQueue`.""" 

696 

697 # private attributes 

698 self._config: dict[str, Any] = {} # deepcopy(config) if config is not None else {} 

699 """ 

700 A dictionary containing the processor configuration object. 

701  

702 This dictionary is populated with configuration parameter (always type 2) during the  

703 :meth:`._load_parameter_configuration` method. 

704  

705 The original value of the configuration dictionary that is passed to the constructor is stored in  

706 :attr:`._orig_config`. 

707  

708 .. versionchanged:: v2.0.0 

709 Now it is an empty dictionary until the :meth:`._load_parameter_configuration` is called.  

710  

711 """ 

712 

713 self._orig_config = deepcopy(config) if config is not None else {} 

714 """ 

715 A copy of the original configuration dictionary. 

716  

717 .. versionadded:: v2.0.0 

718 """ 

719 

720 self._processor_parameters: OrderedDict[str, PassiveParameter[Any]] = OrderedDict() 

721 """ 

722 A dictionary to store all the processor parameter instances.  

723  

724 The name of the parameter is used as a key, while for the value an instance of the  

725 :class:`.PassiveParameter` is used. 

726 """ 

727 # wait for answer from SO 

728 self._parameter_registered = False 

729 """A boolean flag to confirm successful parameter registration.""" 

730 self._kwargs = kwargs 

731 

732 # loops attributes 

733 self._i_item: int = -1 

734 self._n_item: int | None = -1 

735 self._process_durations: list[float] = [] 

736 self._super_call_flags: dict[str, bool] = {} 

737 """Tracks super-call usage for methods that require it.""" 

738 

739 # resource stack 

740 self._resource_stack: contextlib.ExitStack 

741 self._resource_acquisition: bool = True 

742 

743 # processor timer 

744 self.timer: Timer | None = timer 

745 self._timer_parameters: dict[str, Any] = timer_params or {} 

746 

747 # user interface 

748 if user_interface is None: 

749 self._user_interface: UserInterfaceBase = ConsoleInterface() 

750 else: 

751 self._user_interface = user_interface 

752 

753 # database stuff 

754 self._database: peewee.Database | None = database 

755 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf) 

756 self.filter_register: mafw.db.db_filter.ProcessorFilter = mafw.db.db_filter.ProcessorFilter() 

757 """The DB filter register of the Processor.""" 

758 self.remove_orphan_files: bool = remove_orphan_files 

759 """The flag to remove or protect the orphan files. Defaults to True""" 

760 

761 self.initialise_parameters() 

762 

763 def initialise_parameters(self) -> None: 

764 """ 

765 Initialises processor parameters by registering them and applying various configuration sources. 

766 

767 This method orchestrates the parameter initialisation process by performing the following steps in order: 

768 

769 #. Registers processor parameters defined as :class:`ActiveParameter` instances 

770 #. Overrides default parameter values with any configured overrides 

771 #. Loads parameter configuration from the processor's configuration dictionary 

772 #. Applies keyword arguments as parameter overrides 

773 

774 The method ensures that all processor parameters are properly configured before the processor 

775 execution begins. It is automatically called during processor initialisation and should not 

776 typically be called directly by users. 

777 

778 .. seealso:: 

779 :meth:`_register_parameters`, :meth:`_override_defaults`, 

780 :meth:`_load_parameter_configuration`, :meth:`_overrule_kws_parameters` 

781 

782 .. versionadded:: v2.0.0 

783 """ 

784 self._register_parameters() 

785 self._override_defaults() 

786 self._load_parameter_configuration() 

787 self._overrule_kws_parameters() 

788 

789 def __post_init__(self) -> None: 

790 """ 

791 Performs post-initialisation tasks for the processor. 

792 

793 This method is automatically called after the processor initialisation is complete. 

794 It performs validation checks on overloaded methods and sets the initial processor status. 

795 

796 .. seealso:: 

797 :meth:`validate_configuration`, :meth:`_check_method_overload`, 

798 :attr:`~mafw.processor.Processor.processor_status` 

799 

800 .. versionchanged:: v2.0.0 

801 Moved the parameter initialisation to :meth:`initialise_parameters` and now executed as last step of the 

802 init method. 

803 

804 Added the validate configuration check. This method should silently check that configuration provided 

805 with the processor parameters is valid. If not, a :exc:`.ProcessorParameterError` is raised. 

806 """ 

807 self.validate_configuration() 

808 self._check_method_overload() 

809 self.processor_status = ProcessorStatus.Init 

810 

811 def _register_parameters(self) -> None: 

812 """ 

813 Register processor parameters defined as ActiveParameter instances in the class. 

814 

815 This private method scans the class definition for any :class:`.ActiveParameter` instances and creates 

816 corresponding :class:`.PassiveParameter` instances to store the actual parameter values and metadata. 

817 It ensures that all processor parameters are properly initialised and available for configuration 

818 through the processor's configuration system. 

819 

820 The method checks for duplicate parameter names and raises a :exc:`.ProcessorParameterError` if duplicates 

821 are detected. It also sets the internal flag :attr:`._parameter_registered` to True once registration is 

822 complete. 

823 

824 .. note:: 

825 This method is automatically called during processor initialisation and should not be called directly 

826 by users. 

827 

828 .. seealso:: 

829 :class:`.Processor`, :meth:`.Processor._override_defaults`, 

830 :meth:`.Processor._load_parameter_configuration`, :meth:`.Processor._overrule_kws_parameters` 

831 

832 .. versionchanged:: v2.0.0 

833 Only :class:`ActiveParameter` are not registered. The use of :class:`PassiveParameter` is only meant to 

834 store the value and metadata of the active counter part. 

835 """ 

836 if self._parameter_registered: 

837 return 

838 

839 definitions = getattr(self.__class__, '_parameter_definitions', None) 

840 if definitions is None: 

841 definitions = _ensure_parameter_definitions(self.__class__) 

842 

843 for attr in definitions.values(): 

844 ext_name = attr._external_name 

845 if ext_name in self._processor_parameters: 

846 raise ProcessorParameterError(f'Duplicated parameter name ({ext_name}).') 

847 self._processor_parameters[ext_name] = PassiveParameter( 

848 ext_name, attr._value, attr._default, attr._help_doc 

849 ) 

850 

851 self._parameter_registered = True 

852 

853 def _override_defaults(self) -> None: 

854 """ 

855 Override default parameter values with values from :attr:`new_defaults`. 

856 

857 This private method iterates through the :attr:`new_defaults` dictionary and updates 

858 the corresponding processor parameters with new values. Only parameters that exist 

859 in both :attr:`new_defaults` and :attr:`_processor_parameters` are updated. 

860 

861 .. versionadded:: v2.0.0 

862 """ 

863 for key, value in self.new_defaults.items(): 

864 if key in self._processor_parameters: 

865 self._processor_parameters[key].value = value 

866 

867 def _reset_parameters(self) -> None: 

868 """ 

869 Reset processor parameters to their initial state. 

870 

871 This method clears all currently registered processor parameters and triggers 

872 a fresh registration process. It's useful when parameter configurations need 

873 to be reinitialized or when parameters have been modified and need to be reset. 

874 

875 .. seealso:: 

876 :meth:`_register_parameters`, :meth:`_register_parameters` 

877 """ 

878 self._processor_parameters = OrderedDict() 

879 self._parameter_registered = False 

880 self._register_parameters() 

881 

882 @ensure_parameter_registration 

883 def _load_parameter_configuration(self) -> None: 

884 """ 

885 Load processor parameter configuration from the internal configuration dictionary. 

886 

887 This method processes the processor's configuration dictionary to set parameter values. 

888 It handles two configuration formats: 

889 

890 1. Nested format: ``{'ProcessorName': {'param1': value1, ...}}`` 

891 2. Flat format: ``{'param1': value1, ...}`` 

892 

893 The method also handles filter configurations by collecting filter table names 

894 and deferring their initialisation until after the global filter has been processed. 

895 

896 .. versionchanged:: v2.0.0 

897 For option 1 combining configuration from name and name_replica 

898 

899 :raises ProcessorParameterError: If a parameter in the configuration is not registered. 

900 

901 .. seealso:: 

902 :meth:`mafw.db.db_filter.ModelFilter.from_conf` 

903 """ 

904 original_config = copy(self._orig_config) 

905 flt_list = [] 

906 

907 # by default the flag new_only is set to true 

908 # unless the user specify differently in the general section of the steering file 

909 self.filter_register.new_only = original_config.get(self.new_only_flag, True) 

910 

911 # we need to check if the configuration object is of type 1 or type 2 

912 if any([name for name in [self.name, self.replica_name] if name in original_config]): 

913 # one of the two names (the base or the replica) must be present in case of option 1 

914 # we start from the base name. If not there, then take an empty dict 

915 option1_config_base = original_config.get(self.name, {}) 

916 if self.name != self.replica_name: 

917 # if there is the replica name, then update the base configuration with the replica value 

918 # we get the replica configuration 

919 option1_config_replica = original_config.get(self.replica_name, {}) 

920 

921 # let's check if the user wants to have inheritance default 

922 # by default is True 

923 inheritance = option1_config_replica.get('__inheritance__', True) 

924 if inheritance: 

925 # we update the base with the replica without changing the base 

926 option1_config_update = deep_update(option1_config_base, option1_config_replica, copy_first=True) 

927 else: 

928 # we do not use the base with the replica specific, we pass the replica as the updated 

929 option1_config_update = option1_config_replica 

930 

931 # we modify the type 1 original so that the table for the replica has the updated configuration 

932 # this is used for the filter configuration at the end. 

933 original_config[self.replica_name] = option1_config_update 

934 else: 

935 # there is not replica, so the update is equal to the base. 

936 option1_config_update = option1_config_base 

937 

938 self._config = option1_config_update 

939 else: 

940 # for type 2 we are already good to go 

941 self._config = original_config 

942 

943 filter_config = deepcopy(original_config) 

944 

945 def _sanitize_filter_config(processor_name: str) -> None: 

946 processor_config = filter_config.get(processor_name) 

947 if not isinstance(processor_config, dict): 

948 return 

949 filter_table = processor_config.get('__filter__') 

950 if not isinstance(filter_table, dict): 

951 return 

952 

953 sanitized_table: dict[str, Any] = {} 

954 for model_name, model_config in filter_table.items(): 

955 if not isinstance(model_config, dict): 

956 sanitized_table[model_name] = model_config 

957 continue 

958 

959 model_config_copy = deepcopy(model_config) 

960 if not bool(model_config_copy.pop('__enable__', True)): 

961 continue 

962 

963 for field_name, field_value in list(model_config_copy.items()): 

964 if ( 

965 isinstance(field_value, dict) 

966 and not ('op' in field_value and 'value' in field_value) 

967 and '__enable__' in field_value 

968 ): 

969 field_enabled = bool(field_value.pop('__enable__', True)) 

970 if not field_enabled: 970 ↛ 963line 970 didn't jump to line 963 because the condition on line 970 was always true

971 model_config_copy.pop(field_name, None) 

972 

973 conditionals = model_config_copy.get('__conditional__') 

974 if isinstance(conditionals, list): 

975 filtered_conditionals: list[Any] = [] 

976 for conditional in conditionals: 

977 if isinstance(conditional, dict): 

978 conditional_enabled = bool(conditional.pop('__enable__', True)) 

979 if not conditional_enabled: 

980 continue 

981 filtered_conditionals.append(conditional) 

982 model_config_copy['__conditional__'] = filtered_conditionals 

983 

984 sanitized_table[model_name] = model_config_copy 

985 

986 processor_config['__filter__'] = sanitized_table 

987 

988 _sanitize_filter_config(self.replica_name) 

989 

990 for key, value in self._config.items(): 

991 if key in self._processor_parameters: 

992 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO 

993 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above 

994 elif key == '__filter__': 

995 # we got a filter table! 

996 # it should contain one table for each model 

997 # we add all the names to a list for deferred initialisation 

998 flt_table = self._config[key] 

999 if isinstance(flt_table, dict): 

1000 for model_name, model_config in flt_table.items(): 

1001 if isinstance(model_config, dict) and not bool(model_config.get('__enable__', True)): 

1002 continue 

1003 flt_list.append(f'{self.replica_name}.__filter__.{model_name}') 

1004 elif key == '__logic__': 

1005 # we got a filter logic string 

1006 # we store it in the filter register directly 

1007 self.filter_register._logic = self._config[key] 

1008 elif key == '__new_only__': 

1009 # we got a new only boolean, we store it in the filter register 

1010 self.filter_register.new_only = self._config[key] 

1011 

1012 # only now, after the configuration file has been totally read, we can do the real filter initialisation. 

1013 # This is to be sure that if there were a GlobalFilter table, this has been read. 

1014 # The global filter region will be used as a starting point for the construction of a new filter (default 

1015 # parameter in the from_conf class method). 

1016 for flt_name in flt_list: 

1017 model_name = flt_name.split('.')[-1] 

1018 self.filter_register[model_name] = mafw.db.db_filter.ModelFilter.from_conf(flt_name, filter_config) 

1019 

1020 @ensure_parameter_registration 

1021 def _overrule_kws_parameters(self) -> None: 

1022 """ 

1023 Override processor parameters with values from keyword arguments. 

1024 

1025 This method applies parameter values passed as keyword arguments during processor 

1026 initialisation. It ensures that the parameter types match the expected types 

1027 before setting the values. 

1028 

1029 .. seealso:: 

1030 :meth:`_register_parameters`, :meth:`_load_parameter_configuration`, 

1031 :meth:`set_parameter_value` 

1032 """ 

1033 for key, value in self._kwargs.items(): 

1034 if key in self._processor_parameters: 

1035 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO 

1036 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above 

1037 

1038 def validate_configuration(self) -> None: 

1039 """ 

1040 Validate the configuration provided via the processor parameters. 

1041 

1042 Method to be implemented by subclasses if a configuration validation is needed. 

1043 

1044 The method should silently check for the proper configuration, if this is not obtained, 

1045 then the :exc:`.InvalidConfigurationError` must be raised. 

1046 

1047 .. versionadded:: v2.0.0 

1048 """ 

1049 pass 

1050 

1051 def _check_method_overload(self) -> None: 

1052 """ 

1053 Check if the user overloaded the required methods. 

1054 

1055 Depending on the loop type, the user must overload different methods. 

1056 This method is doing the check and if the required methods are not overloaded a warning is emitted. 

1057 """ 

1058 methods_dict: dict[LoopType, list[str]] = { 

1059 LoopType.WhileLoop: ['while_condition'], 

1060 LoopType.ForLoop: ['get_items'], 

1061 LoopType.ParallelForLoop: ['get_items'], 

1062 LoopType.ParallelForLoopWithQueue: ['get_items'], 

1063 } 

1064 required_methods: list[str] = methods_dict.get(self.loop_type, []) 

1065 for method in required_methods: 

1066 if getattr(type(self), method) == getattr(Processor, method): 

1067 warnings.warn( 

1068 MissingOverloadedMethod( 

1069 '%s was not overloaded. The process execution workflow might not work.' % method 

1070 ) 

1071 ) 

1072 

1073 @classmethod 

1074 def _apply_super_call_wrappers(cls) -> None: 

1075 """ 

1076 Wraps overridden methods so the class can detect whether they called `super()`. 

1077 

1078 This method runs immediately after the class is created (see :class:`.ProcessorMeta`). For every 

1079 method that we expect scientists to extend (start, finish, etc.) we replace their implementation with a 

1080 wrapper. The wrapper resets a per-instance flag, invokes the real override, and only after that method returns 

1081 it checks whether the real `super()` was ever reached; if not, it emits a :class:`~mafw.mafw_errors.MissingSuperCall` 

1082 warning. In other words, the wiring happens while the subclass is defined, and the actual smoke test executes 

1083 each time the method runs. 

1084 """ 

1085 methods = getattr(cls, '_methods_to_be_checked_for_super', ()) 

1086 for method in methods: 

1087 if method not in cls.__dict__: 

1088 continue 

1089 if not any(hasattr(base, method) for base in cls.__mro__[1:]): 

1090 continue 

1091 original: Callable[..., Any] = getattr(cls, method) 

1092 # we are adding an attribute to the method, it looks strange, but it is possible 

1093 # in this way we avoid wrapping the same method more than once. 

1094 if getattr(original, '_mafw_super_check_wrapped', False): 

1095 continue 

1096 

1097 def _make_wrapper( 

1098 __orig: Callable[..., Any], 

1099 __method: str, # pragma: no cover 

1100 ) -> Callable[..., Any]: 

1101 @wraps(__orig) 

1102 def _wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: 

1103 # self is the processor instance. 

1104 # _reset_super_call_flag is resetting the call status 

1105 # for method as False 

1106 self._reset_super_call_flag(__method) 

1107 # in the base method, the super call flag is set to True 

1108 result = __orig(self, *args, **kwargs) 

1109 # if the super call flag is not True, then it is because the base 

1110 # method was not called. 

1111 # emit the warning and return the original method return value 

1112 if not self._did_call_super(__method): 

1113 warnings.warn( 

1114 MissingSuperCall( 

1115 'The overloaded %s is not invoking its super method. The processor might not work.' 

1116 % __method 

1117 ) 

1118 ) 

1119 return result 

1120 

1121 return _wrapper 

1122 

1123 # we create a wrapped method from the original 

1124 wrapper = _make_wrapper(original, method) 

1125 # we set a flag for the method to avoid multiple wrapping 

1126 wrapper._mafw_super_check_wrapped = True # type: ignore[attr-defined] 

1127 setattr(cls, method, wrapper) 

1128 

1129 def _reset_super_call_flag(self, method: str) -> None: 

1130 """ 

1131 Reset the super-call flag for a method. 

1132 """ 

1133 self._super_call_flags[method] = False 

1134 

1135 def _mark_super_call(self, method: str) -> None: 

1136 """ 

1137 Mark a method as having called its super implementation. 

1138 """ 

1139 self._super_call_flags[method] = True 

1140 

1141 def _did_call_super(self, method: str) -> bool: 

1142 """ 

1143 Check whether a method called its super implementation. 

1144 """ 

1145 return self._super_call_flags.get(method, False) 

1146 

1147 @ensure_parameter_registration 

1148 def dump_parameter_configuration(self, option: int = 1) -> dict[str, Any]: 

1149 """ 

1150 Dumps the processor parameter values in a dictionary. 

1151 

1152 The snippet below explains the meaning of `option`. 

1153 

1154 .. code-block:: python 

1155 

1156 # option 1 

1157 conf_dict1 = { 

1158 'Processor': {'param1': 5, 'input_table': 'my_table'} 

1159 } 

1160 

1161 # option 2 

1162 conf_dict2 = {'param1': 5, 'input_table': 'my_table'} 

1163 

1164 In the case of option 1, the replica aware name (:meth:`.replica_name`) will be used as a key for the 

1165 configuration dictionary. 

1166 

1167 .. versionchanged:: v2.0.0 

1168 With option 1, using :meth:`.replica_name` instead of :attr:`~.Processor.name` as key of the configuration 

1169 dictionary. 

1170 

1171 :param option: Select the dictionary style. Defaults to 1. 

1172 :type option: int, Optional 

1173 :return: A parameter configuration dictionary. 

1174 :rtype: dict 

1175 """ 

1176 inner_dict = {} 

1177 for key, value in self._processor_parameters.items(): 

1178 inner_dict[key] = value.value 

1179 

1180 if option == 1: 

1181 outer_dict = {self.replica_name: inner_dict} 

1182 elif option == 2: 

1183 outer_dict = inner_dict 

1184 else: 

1185 log.warning('Unknown option %s. Using option 2' % option) 

1186 outer_dict = inner_dict 

1187 return outer_dict 

1188 

1189 @ensure_parameter_registration 

1190 def get_parameter(self, name: str) -> PassiveParameter[Any]: 

1191 """ 

1192 Gets the processor parameter named name. 

1193 

1194 :param name: The name of the parameter. 

1195 :type name: str 

1196 :return: The processor parameter 

1197 :rtype: PassiveParameter 

1198 :raises ProcessorParameterError: If a parameter with `name` is not registered. 

1199 """ 

1200 if name in self._processor_parameters: 

1201 return self._processor_parameters[name] 

1202 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}') 

1203 

1204 @ensure_parameter_registration 

1205 def get_parameters(self) -> dict[str, PassiveParameter[Any]]: 

1206 """ 

1207 Returns the full dictionary of registered parameters for this processor. 

1208 

1209 Useful when dumping the parameter specification in a configuration file, for example. 

1210 

1211 :return: The dictionary with the registered parameters. 

1212 :rtype: dict[str, PassiveParameter[ParameterType] 

1213 """ 

1214 return self._processor_parameters 

1215 

1216 @ensure_parameter_registration 

1217 def delete_parameter(self, name: str) -> None: 

1218 """ 

1219 Deletes a processor parameter. 

1220 

1221 :param name: The name of the parameter to be deleted. 

1222 :type name: str 

1223 :raises ProcessorParameterError: If a parameter with `name` is not registered. 

1224 """ 

1225 if name in self._processor_parameters: 

1226 del self._processor_parameters[name] 

1227 else: 

1228 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}') 

1229 

1230 @ensure_parameter_registration 

1231 def set_parameter_value(self, name: str, value: ParameterType) -> None: 

1232 """ 

1233 Sets the value of a processor parameter. 

1234 

1235 :param name: The name of the parameter to be deleted. 

1236 :type name: str 

1237 :param value: The value to be assigned to the parameter. 

1238 :type value: ParameterType 

1239 :raises ProcessorParameterError: If a parameter with `name` is not registered. 

1240 """ 

1241 if name in self._processor_parameters: 

1242 self._processor_parameters[name].value = value 

1243 else: 

1244 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}') 

1245 

1246 def get_filter(self, model_name: str) -> mafw.db.db_filter.ModelFilter: 

1247 """ 

1248 Returns a registered :class:`~mafw.db.db_filter.ModelFilter` via the model name. 

1249 

1250 If a filter for the provided model_name does not exist, a KeyError is raised. 

1251 

1252 :param model_name: The model name for which the filter will be returned. 

1253 :type model_name: str 

1254 :return: The registered filter 

1255 :rtype: mafw.db.db_filter.ModelFilter 

1256 :raises: KeyError is a filter with the give name is not found. 

1257 """ 

1258 return self.filter_register[model_name] 

1259 

1260 def on_processor_status_change(self, old_status: ProcessorStatus, new_status: ProcessorStatus) -> None: 

1261 """ 

1262 Callback invoked when the processor status is changed. 

1263 

1264 :param old_status: The old processor status. 

1265 :type old_status: ProcessorStatus 

1266 :param new_status: The new processor status. 

1267 :type new_status: ProcessorStatus 

1268 """ 

1269 self._user_interface.change_of_processor_status(self.name, old_status, new_status) 

1270 

1271 def on_looping_status_set(self, status: LoopingStatus) -> None: 

1272 """ 

1273 Call back invoked when the looping status is set. 

1274 

1275 The user can overload this method according to the needs. 

1276 

1277 :param status: The set looping status. 

1278 :type status: LoopingStatus 

1279 """ 

1280 if status == LoopingStatus.Skip: 

1281 log.warning('Skipping item %s' % self.i_item) 

1282 elif status == LoopingStatus.Abort: 

1283 log.error('Looping has been aborted') 

1284 elif status == LoopingStatus.Quit: 

1285 log.warning('Looping has been quit') 

1286 

1287 def format_progress_message(self) -> None: 

1288 """Customizes the progress message with information about the current item. 

1289 

1290 The user can overload this method in order to modify the message being displayed during the process loop with 

1291 information about the current item. 

1292 

1293 The user can access the current value, its position in the looping cycle and the total number of items using 

1294 :attr:`.Processor.item`, :obj:`.Processor.i_item` and :obj:`.Processor.n_item`. 

1295 """ 

1296 pass 

1297 

1298 @contextlib.contextmanager 

1299 def _thread_loop_context(self, i_item: int, n_item: int, item: Any) -> Iterator[None]: 

1300 """ 

1301 Context manager to set thread-local loop attributes for parallel execution. 

1302 

1303 :param i_item: Item index for the loop. 

1304 :type i_item: int 

1305 :param n_item: Total number of items in the loop. 

1306 :type n_item: int 

1307 :param item: Item payload. 

1308 :type item: Any 

1309 """ 

1310 self._thread_local.in_worker = True 

1311 self._thread_local.i_item = i_item 

1312 self._thread_local.n_item = n_item 

1313 self._thread_local.item = item 

1314 self._thread_local.looping_status = LoopingStatus.Continue 

1315 try: 

1316 yield 

1317 finally: 

1318 for name in ('i_item', 'n_item', 'item', 'looping_status', 'in_worker'): 

1319 if hasattr(self._thread_local, name): 1319 ↛ 1318line 1319 didn't jump to line 1318 because the condition on line 1319 was always true

1320 delattr(self._thread_local, name) 

1321 

1322 def _in_thread_context(self) -> bool: 

1323 """Return True when running inside a parallel worker thread.""" 

1324 return bool(getattr(self._thread_local, 'in_worker', False)) 

1325 

1326 @property 

1327 def item(self) -> Any: 

1328 """The current item of the loop.""" 

1329 if self._in_thread_context() and hasattr(self._thread_local, 'item'): 

1330 return self._thread_local.item 

1331 return self._item 

1332 

1333 @item.setter 

1334 def item(self, value: Any) -> None: 

1335 if self._in_thread_context(): 

1336 self._thread_local.item = value 

1337 else: 

1338 self._item = value 

1339 

1340 @property 

1341 def i_item(self) -> int: 

1342 """The enumeration of the current item being processed.""" 

1343 if self._in_thread_context() and hasattr(self._thread_local, 'i_item'): 

1344 return cast(int, self._thread_local.i_item) 

1345 return self._i_item 

1346 

1347 @i_item.setter 

1348 def i_item(self, value: int) -> None: 

1349 if self._in_thread_context(): 

1350 self._thread_local.i_item = value 

1351 else: 

1352 self._i_item = value 

1353 

1354 @property 

1355 def n_item(self) -> int | None: 

1356 """The total number of items to be processed or None for an undefined loop""" 

1357 if self._in_thread_context() and hasattr(self._thread_local, 'n_item'): 

1358 return cast(int | None, self._thread_local.n_item) 

1359 return self._n_item 

1360 

1361 @n_item.setter 

1362 def n_item(self, value: int | None) -> None: 

1363 if self._in_thread_context(): 

1364 self._thread_local.n_item = value 

1365 else: 

1366 self._n_item = value 

1367 

1368 @property 

1369 def looping_status(self) -> LoopingStatus: 

1370 """The looping status for the current thread context.""" 

1371 if self._in_thread_context(): 

1372 value = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue) 

1373 else: 

1374 value = self._looping_status 

1375 if hasattr(self, 'on_looping_status_get'): 

1376 self.on_looping_status_get(value) 

1377 return value 

1378 

1379 @looping_status.setter 

1380 def looping_status(self, value: LoopingStatus) -> None: 

1381 if self._in_thread_context(): 

1382 current = getattr(self._thread_local, 'looping_status', LoopingStatus.Continue) 

1383 self._thread_local.looping_status = value 

1384 else: 

1385 current = self._looping_status 

1386 self._looping_status = value 

1387 if current != value: 

1388 if hasattr(self, 'on_looping_status_change'): 

1389 self.on_looping_status_change(current, value) 

1390 else: 

1391 if hasattr(self, 'on_looping_status_set'): 1391 ↛ exitline 1391 didn't return from function 'looping_status' because the condition on line 1391 was always true

1392 self.on_looping_status_set(value) 

1393 

1394 @property 

1395 def unique_name(self) -> str: 

1396 """Returns the unique name for the processor.""" 

1397 return f'{self.name}_{self.unique_id}' 

1398 

1399 @property 

1400 def replica_name(self) -> str: 

1401 """ 

1402 Returns the replica aware name of the processor. 

1403 

1404 If no replica_id is specified, then return the pure name, otherwise join the two string using the '#' symbol. 

1405 

1406 .. versionadded:: v2.0.0 

1407 

1408 :return: The replica aware name of the processor. 

1409 :rtype: str 

1410 """ 

1411 if self.replica_id is None: 

1412 return self.name 

1413 else: 

1414 return self.name + '#' + self.replica_id 

1415 

1416 @property 

1417 def local_resource_acquisition(self) -> bool: 

1418 """ 

1419 Checks if resources should be acquired locally. 

1420 

1421 When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external 

1422 resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute 

1423 resources among the whole processor list. In this case, resources should not be acquired locally by the 

1424 single processor, but from the parent execution context. 

1425 

1426 :return: True if resources are to be acquired locally by the processor. False, otherwise. 

1427 :rtype: bool 

1428 """ 

1429 return self._resource_acquisition 

1430 

1431 @local_resource_acquisition.setter 

1432 def local_resource_acquisition(self, flag: bool) -> None: 

1433 self._resource_acquisition = flag 

1434 

1435 @property 

1436 def database(self) -> peewee.Database: 

1437 """ 

1438 Returns the database instance 

1439 

1440 :return: A database object. 

1441 :raises MissingDatabase: If the database connection has not been established. 

1442 """ 

1443 if self._database is None: 

1444 raise MissingDatabase('Database connection not initialized') 

1445 return self._database 

1446 

1447 def execute(self) -> None: 

1448 """Execute the processor tasks. 

1449 

1450 This method works as a dispatcher, reassigning the call to a more specific execution implementation depending 

1451 on the :attr:`~mafw.processor.Processor.loop_type`. 

1452 """ 

1453 dispatcher: dict[LoopType, Callable[[], None]] = { 

1454 LoopType.SingleLoop: self._execute_single, 

1455 LoopType.ForLoop: self._execute_for_loop, 

1456 LoopType.ParallelForLoop: self._execute_for_loop, 

1457 LoopType.ParallelForLoopWithQueue: self._execute_for_loop, 

1458 LoopType.WhileLoop: self._execute_while_loop, 

1459 } 

1460 dispatcher[self.loop_type]() 

1461 

1462 @staticmethod 

1463 def _compute_default_max_workers() -> int: 

1464 """ 

1465 Helper to compute the default number of workers for parallel execution. 

1466 

1467 Returns min(32, cpu_count + 4). 

1468 """ 

1469 return min(32, (os.cpu_count() or 1) + 4) 

1470 

1471 def _execute_single(self) -> None: 

1472 """Execute the processor in single mode. 

1473 

1474 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the 

1475 appropriate implementation depending on the processor LoopType. 

1476 """ 

1477 with contextlib.ExitStack() as self._resource_stack: 

1478 self.acquire_resources() 

1479 self._wall_clock_start = time.perf_counter() 

1480 self.start() 

1481 self.processor_status = ProcessorStatus.Run 

1482 self.process() 

1483 self.finish() 

1484 

1485 def _execute_for_loop(self) -> None: 

1486 """Executes the processor within a for loop. 

1487 

1488 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the 

1489 appropriate implementation depending on the processor LoopType. 

1490 """ 

1491 

1492 with contextlib.ExitStack() as self._resource_stack: 

1493 self.acquire_resources() 

1494 # we cannot use a Timer context here to measure the whole duration because it spans 

1495 # over different methods. Instead we directly use a performance clock. 

1496 self._wall_clock_start = time.perf_counter() 

1497 self.start() 

1498 

1499 # get the input item list and filter it 

1500 item_list = self.get_items() 

1501 

1502 # get the total number of items. 

1503 self.n_item = len(item_list) 

1504 

1505 # turn the processor status to run 

1506 self.processor_status = ProcessorStatus.Run 

1507 

1508 # create a new task in the progress bar interface 

1509 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item) 

1510 

1511 # verify if we can use parallel for loop. If not, switch back to a serial for loop. 

1512 if ( 

1513 self.loop_type in (LoopType.ParallelForLoop, LoopType.ParallelForLoopWithQueue) 

1514 and not is_free_threading() 

1515 ): 

1516 warnings.warn( 

1517 'Parallel for-loop requires free-threading; falling back to serial for loop.', 

1518 stacklevel=2, 

1519 ) 

1520 self.loop_type = LoopType.ForLoop 

1521 

1522 if self.loop_type == LoopType.ParallelForLoopWithQueue: 

1523 self._process_parallel_for_loop_with_queue(item_list) 

1524 elif self.loop_type == LoopType.ParallelForLoop: 

1525 self._process_parallel_for_loop(item_list) 

1526 else: 

1527 self._process_for_loop(item_list) 

1528 

1529 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item) 

1530 

1531 self.finish() 

1532 

1533 def _build_loop_item(self) -> LoopItem: 

1534 """ 

1535 Build a LoopItem payload for the current loop context. 

1536 

1537 :return: The LoopItem payload. 

1538 :rtype: mafw.models.LoopItem 

1539 """ 

1540 return LoopItem(self.i_item, int(self.n_item or 0), self.item) 

1541 

1542 def _build_loop_result(self, payload: Any, duration: float) -> LoopResult: 

1543 """ 

1544 Build a LoopResult payload for the current loop context. 

1545 

1546 :param payload: The optional payload returned by process. 

1547 :type payload: Any 

1548 :param duration: Wall-clock duration of the item processing. 

1549 :type duration: float 

1550 :return: The LoopResult payload. 

1551 :rtype: mafw.models.LoopResult 

1552 """ 

1553 return LoopResult(self.i_item, int(self.n_item or 0), self.looping_status, payload, duration) 

1554 

1555 def _payload_annotation_matches(self, annotation: Any) -> bool: 

1556 """ 

1557 Check whether an annotation matches LoopItem or LoopResult. 

1558 

1559 :param annotation: The annotation to inspect. 

1560 :type annotation: Any 

1561 :return: True if the annotation matches LoopItem or LoopResult. 

1562 :rtype: bool 

1563 """ 

1564 if annotation in (LoopItem, LoopResult): 

1565 return True 

1566 origin = get_origin(annotation) 

1567 if origin is Union: 

1568 return any(arg in (LoopItem, LoopResult) for arg in get_args(annotation)) 

1569 return False 

1570 

1571 def _call_with_optional_payload(self, func: Callable[..., Any], payload: Any) -> Any: 

1572 """ 

1573 Invoke a processor hook with an optional payload if the signature allows it. 

1574 

1575 :param func: The callable to invoke. 

1576 :type func: Callable 

1577 :param payload: The payload to pass if supported. 

1578 :type payload: Any 

1579 :return: The callable return value. 

1580 :rtype: Any 

1581 """ 

1582 signature = inspect.signature(func) 

1583 parameters = list(signature.parameters.values()) 

1584 if parameters and parameters[0].name == 'self': 

1585 parameters = parameters[1:] 

1586 if not parameters: 

1587 return func() 

1588 if any(param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD) for param in parameters): 

1589 return func(payload) 

1590 first = parameters[0] 

1591 if first.kind in (first.POSITIONAL_ONLY, first.POSITIONAL_OR_KEYWORD): 1591 ↛ 1596line 1591 didn't jump to line 1596 because the condition on line 1591 was always true

1592 if first.name in ('item', 'loop_item', 'result', 'loop_result', 'payload'): 

1593 return func(payload) 

1594 if self._payload_annotation_matches(first.annotation): 1594 ↛ 1595line 1594 didn't jump to line 1595 because the condition on line 1594 was never true

1595 return func(payload) 

1596 return func() 

1597 

1598 def _process_for_loop(self, item_list: Collection[Any]) -> None: 

1599 """ 

1600 Process items with the standard serial for-loop. 

1601 

1602 :param item_list: The list of items to process. 

1603 :type item_list: Collection[Any] 

1604 """ 

1605 for self.i_item, self.item in enumerate(item_list): 

1606 self.looping_status = LoopingStatus.Continue 

1607 

1608 self.format_progress_message() 

1609 self._user_interface.display_progress_message(self.progress_message, self.i_item, self.n_item, 0.1) 

1610 

1611 loop_item = self._build_loop_item() 

1612 with Timer(suppress_message=True) as timer: 

1613 payload = self._call_with_optional_payload(self.process, loop_item) 

1614 self._process_durations.append(timer.duration) 

1615 

1616 loop_result = self._build_loop_result(payload, timer.duration) 

1617 

1618 if self.looping_status == LoopingStatus.Continue: 

1619 self._call_with_optional_payload(self.accept_item, loop_result) 

1620 elif self.looping_status == LoopingStatus.Skip: 

1621 self._call_with_optional_payload(self.skip_item, loop_result) 

1622 else: # Abort or Quit 

1623 break 

1624 

1625 self._user_interface.update_task(self.replica_name, increment=1) 

1626 

1627 def _process_parallel_for_loop(self, item_list: Collection[Any]) -> None: 

1628 """ 

1629 Process items in parallel using a thread pool. 

1630 

1631 :param item_list: The list of items to process. 

1632 :type item_list: Collection[Any] 

1633 """ 

1634 max_workers = self.max_workers 

1635 abort_event = threading.Event() 

1636 abort_lock = threading.Lock() 

1637 abort_status: LoopingStatus | None = None 

1638 

1639 def _set_abort(status: LoopingStatus) -> None: 

1640 nonlocal abort_status 

1641 with abort_lock: 

1642 if abort_status == LoopingStatus.Abort: 

1643 return 

1644 if status == LoopingStatus.Abort: 

1645 abort_status = LoopingStatus.Abort 

1646 elif abort_status is None: 1646 ↛ 1648line 1646 didn't jump to line 1648

1647 abort_status = LoopingStatus.Quit 

1648 abort_event.set() 

1649 

1650 def _worker(i_item: int, item: Any) -> tuple[int, Any, LoopingStatus, Any, float]: 

1651 with self._thread_loop_context(i_item, int(self.n_item or 0), item): 

1652 self.looping_status = LoopingStatus.Continue 

1653 loop_item = self._build_loop_item() 

1654 with Timer(suppress_message=True) as timer: 

1655 payload = self._call_with_optional_payload(self.process, loop_item) 

1656 status = self.looping_status 

1657 

1658 if status in (LoopingStatus.Abort, LoopingStatus.Quit): 

1659 _set_abort(status) 

1660 return i_item, item, status, payload, timer.duration 

1661 

1662 if abort_event.is_set(): 

1663 return i_item, item, status, payload, timer.duration 

1664 

1665 loop_result = self._build_loop_result(payload, timer.duration) 

1666 if status == LoopingStatus.Continue: 

1667 self._call_with_optional_payload(self.accept_item, loop_result) 

1668 elif status == LoopingStatus.Skip: 1668 ↛ 1671line 1668 didn't jump to line 1671 because the condition on line 1668 was always true

1669 self._call_with_optional_payload(self.skip_item, loop_result) 

1670 

1671 return i_item, item, status, payload, timer.duration 

1672 

1673 pending: set[Future[tuple[int, Any, LoopingStatus, Any, float]]] = set() 

1674 items_iter = iter(enumerate(item_list)) 

1675 

1676 def _submit_next() -> bool: 

1677 try: 

1678 idx, itm = next(items_iter) 

1679 except StopIteration: 

1680 return False 

1681 fut: Future[tuple[int, Any, LoopingStatus, Any, float]] = executor.submit(_worker, idx, itm) 

1682 pending.add(fut) 

1683 return True 

1684 

1685 with ThreadPoolExecutor(max_workers=max_workers) as executor: 

1686 while len(pending) < max_workers and _submit_next(): 

1687 pass 

1688 

1689 while pending: 

1690 done, _ = wait(pending, return_when=FIRST_COMPLETED) 

1691 for fut in done: 

1692 pending.remove(fut) 

1693 i_item, item, status, _payload, duration = fut.result() 

1694 self._process_durations.append(duration) 

1695 self.item = item 

1696 self.i_item = i_item 

1697 self.format_progress_message() 

1698 self._user_interface.display_progress_message(self.progress_message, i_item, self.n_item, 0.1) 

1699 self._user_interface.update_task(self.replica_name, increment=1) 

1700 

1701 if abort_event.is_set(): 

1702 continue 

1703 

1704 while len(pending) < max_workers and _submit_next(): 

1705 pass 

1706 

1707 if abort_status is not None: 

1708 self.looping_status = abort_status 

1709 

1710 def _process_parallel_for_loop_with_queue(self, item_list: Collection[Any]) -> None: 

1711 """ 

1712 Process items in parallel using a producer/consumer queue. 

1713 

1714 Items are processed in worker threads, while a dedicated consumer thread handles the post-processing hooks. 

1715 

1716 :param item_list: The list of items to process. 

1717 :type item_list: Collection[Any] 

1718 """ 

1719 # the idea is the following, we have a single consumer thread that is constantly trying to pull 

1720 # items out of a shared queue and we have a pool of producer threads that are putting items in the queue as 

1721 # long as there is space into it (back-pressure). 

1722 # it the queue is full, then the producer threads are set to sleep for a short interval of time and waken up 

1723 # again after some times for another attempt to write the output in the queue. 

1724 # when there are no more items to execute, the main thread is pushing a sentinel object into the queue 

1725 # marking the end of the processing, so that the consumer thread can be gracefully terminated. 

1726 max_workers = self.max_workers 

1727 result_queue: queue.Queue[object] = queue.Queue(maxsize=self.queue_size) 

1728 abort_event = threading.Event() 

1729 abort_lock = threading.Lock() 

1730 abort_status: LoopingStatus | None = None 

1731 sentinel = object() 

1732 

1733 def _set_abort(status: LoopingStatus) -> None: 

1734 nonlocal abort_status 

1735 with abort_lock: 

1736 if abort_status == LoopingStatus.Abort: 1736 ↛ 1737line 1736 didn't jump to line 1737 because the condition on line 1736 was never true

1737 return 

1738 if status == LoopingStatus.Abort: 

1739 abort_status = LoopingStatus.Abort 

1740 elif abort_status is None: 1740 ↛ 1742line 1740 didn't jump to line 1742

1741 abort_status = LoopingStatus.Quit 

1742 abort_event.set() 

1743 

1744 def _get_abort_status() -> LoopingStatus | None: 

1745 with abort_lock: 

1746 return abort_status 

1747 

1748 def _worker(batch_items: list[tuple[int, Any]]) -> None: 

1749 batch_results: list[tuple[LoopItem, LoopResult]] = [] 

1750 for i_item, item in batch_items: 

1751 if abort_event.is_set(): 

1752 break 

1753 with self._thread_loop_context(i_item, int(self.n_item or 0), item): 

1754 self.looping_status = LoopingStatus.Continue 

1755 loop_item = self._build_loop_item() 

1756 with Timer(suppress_message=True) as timer: 

1757 payload = self._call_with_optional_payload(self.process, loop_item) 

1758 status = self.looping_status 

1759 if status in (LoopingStatus.Abort, LoopingStatus.Quit): 

1760 _set_abort(status) 

1761 loop_result = self._build_loop_result(payload, timer.duration) 

1762 batch_results.append((loop_item, loop_result)) 

1763 if status in (LoopingStatus.Abort, LoopingStatus.Quit): 

1764 break 

1765 if batch_results: 

1766 result_queue.put(batch_results) 

1767 

1768 def _consumer() -> None: 

1769 self.consumer_start() 

1770 try: 

1771 while True: 

1772 queued = result_queue.get() 

1773 if queued is sentinel: 

1774 break 

1775 batch_results = cast(list[tuple[LoopItem, LoopResult]], queued) 

1776 for loop_item, loop_result in batch_results: 

1777 with self._thread_loop_context(loop_item.i_item, loop_item.n_item, loop_item.payload): 

1778 self.looping_status = loop_result.looping_status 

1779 self.format_progress_message() 

1780 self._user_interface.display_progress_message( 

1781 self.progress_message, loop_item.i_item, self.n_item, 0.1 

1782 ) 

1783 self._process_durations.append(loop_result.duration) 

1784 self._user_interface.update_task(self.replica_name, increment=1) 

1785 

1786 if _get_abort_status() is None and loop_result.looping_status in ( 

1787 LoopingStatus.Continue, 

1788 LoopingStatus.Skip, 

1789 ): 

1790 self._call_with_optional_payload(self.consumer_process, loop_result) 

1791 finally: 

1792 self.consumer_finish() 

1793 

1794 pending: set[Future[None]] = set() 

1795 items_iter = iter(enumerate(item_list)) 

1796 

1797 def _submit_next() -> bool: 

1798 if abort_event.is_set(): 

1799 return False 

1800 batch_items: list[tuple[int, Any]] = [] 

1801 for _ in range(self.queue_batch_size): 

1802 try: 

1803 idx, itm = next(items_iter) 

1804 except StopIteration: 

1805 break 

1806 batch_items.append((idx, itm)) 

1807 if not batch_items: 

1808 return False 

1809 fut: Future[None] = executor.submit(_worker, batch_items) 

1810 pending.add(fut) 

1811 return True 

1812 

1813 consumer_thread = threading.Thread(target=_consumer, name=f'{self.name}-consumer') 

1814 consumer_thread.start() 

1815 

1816 try: 

1817 with ThreadPoolExecutor(max_workers=max_workers) as executor: 

1818 while len(pending) < max_workers and _submit_next(): 

1819 pass 

1820 

1821 while pending: 

1822 done, _ = wait(pending, return_when=FIRST_COMPLETED) 

1823 for fut in done: 

1824 pending.remove(fut) 

1825 fut.result() 

1826 

1827 if abort_event.is_set(): 

1828 continue 

1829 

1830 while len(pending) < max_workers and _submit_next(): 

1831 pass 

1832 finally: 

1833 result_queue.put(sentinel) 

1834 consumer_thread.join() 

1835 

1836 if abort_status is not None: 

1837 self.looping_status = abort_status 

1838 

1839 def _execute_while_loop(self) -> None: 

1840 """Executes the processor within a while loop. 

1841 

1842 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the 

1843 appropriate implementation depending on the processor LoopType. 

1844 """ 

1845 # it is a while loop, so a priori we don't know how many iterations we will have, nevertheless, we 

1846 # can have a progress bar with 'total' set to None, so that it goes in the so-called indeterminate 

1847 # progress. See https://rich.readthedocs.io/en/stable/progress.html#indeterminate-progress 

1848 # we initialise n_item outside the loop, because it is possible that the user has a way to define n_item 

1849 # and he can do it within the loop. 

1850 self.n_item = None 

1851 with contextlib.ExitStack() as self._resource_stack: 

1852 self.acquire_resources() 

1853 self._wall_clock_start = time.perf_counter() 

1854 self.start() 

1855 

1856 # turn the processor status to run 

1857 self.processor_status = ProcessorStatus.Run 

1858 

1859 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item) 

1860 

1861 # we are ready to start the looping. For statistics, we can count the iterations. 

1862 self.i_item = 0 

1863 while self.while_condition(): 

1864 # set the looping status to Continue. The user may want to change it in the process method. 

1865 self.looping_status = LoopingStatus.Continue 

1866 

1867 # send a message to the user interface 

1868 self.format_progress_message() 

1869 self._user_interface.display_progress_message( 

1870 self.progress_message, self.i_item, self.n_item, frequency=0.1 

1871 ) 

1872 

1873 # wrap the execution in a timer to measure how long it too for statistical reasons. 

1874 with Timer(suppress_message=True) as timer: 

1875 self.process() 

1876 self._process_durations.append(timer.duration) 

1877 

1878 # modify the loop depending on the looping status 

1879 if self.looping_status == LoopingStatus.Continue: 

1880 self.accept_item() 

1881 elif self.looping_status == LoopingStatus.Skip: 

1882 self.skip_item() 

1883 else: # equiv to if self.looping_status in [LoopingStatus.Abort, LoopingStatus.Quit]: 

1884 break 

1885 

1886 # update the progress bar. if self.n_item is still None, then the progress bar will show indeterminate 

1887 # progress. 

1888 self._user_interface.update_task(self.replica_name, self.i_item + 1, 1, self.n_item) 

1889 

1890 # now that the loop is finished, we know how many elements we processed 

1891 if self.n_item is None: 

1892 self.n_item = self.i_item 

1893 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item) 

1894 

1895 self.finish() 

1896 

1897 def acquire_resources(self) -> None: 

1898 """ 

1899 Acquires resources and add them to the resource stack. 

1900 

1901 The whole body of the :meth:`execute` method is within a context structure. The idea is that if any part of 

1902 the code inside should throw an exception that breaking the execution, we want to be sure that all stateful 

1903 resources are properly closed. 

1904 

1905 Since the number of resources may vary, the variable number of nested `with` statements has been replaced by 

1906 an `ExitStack <https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack>`_. Resources, 

1907 like open files, timers, db connections, need to be added to the resource stacks in this method. 

1908 

1909 In the case a processor is being executed within a :class:`~mafw.processor.ProcessorList`, then some resources might be shared, and 

1910 for this reason they are not added to the stack. This selection can be done via the private 

1911 :attr:`local_resource_acquisition`. This is normally True, meaning that the processor will handle its resources 

1912 independently, but when the processor is executed from a :class:`~mafw.processor.ProcessorList`, this flag is automatically turned to 

1913 False. 

1914 

1915 If the user wants to add additional resources, he has to overload this method calling the super to preserve 

1916 the original resources. If he wants to have shared resources among different processors executed from inside 

1917 a processor list, he has to overload the :class:`~mafw.processor.ProcessorList` class as well. 

1918 """ 

1919 # Both the timer and the user interface will be added to the processor resource stack only if the processor is 

1920 # set to acquire its own resources. 

1921 # The timer and the user interface have in-built enter and exit method. 

1922 if self._resource_acquisition: 

1923 self.timer = self._resource_stack.enter_context(Timer(**self._timer_parameters)) 

1924 self._resource_stack.enter_context(self._user_interface) 

1925 

1926 # For the database it is is a bit different. 

1927 if self._database is None and self._database_conf is None: 

1928 # no database, nor configuration. 

1929 # we cannot do anything 

1930 pass 

1931 elif self._database is None and self._database_conf is not None: 

1932 # no db, but we got a configuration. 

1933 # we can make a db. 

1934 # This processor will try to make a valid connection, and in case it succeeds, it will add the database to 

1935 # the resource stack. 

1936 # The database has an enter method, but it is to generate transaction. 

1937 # We will add the database.close via the callback method. 

1938 if 'DBConfiguration' in self._database_conf: 

1939 conf = self._database_conf['DBConfiguration'] # type1 

1940 else: 

1941 conf = self._database_conf # type2 

1942 

1943 # guess the database type from the URL 

1944 protocol = extract_protocol(conf.get('URL')) 

1945 

1946 # build the connection parameter 

1947 # in case of sqlite, we add the pragmas group as well 

1948 connection_parameters = {} 

1949 if protocol == 'sqlite': 

1950 connection_parameters['pragmas'] = conf.get('pragmas', {}) 

1951 for key, value in conf.items(): 

1952 if key not in ['URL', 'pragmas']: 

1953 connection_parameters[key] = value 

1954 

1955 self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB 

1956 self._resource_stack.callback(self._database.close) 

1957 try: 

1958 self._database.connect() 

1959 except peewee.OperationalError as e: 

1960 log.critical('Unable to connect to %s', self._database_conf.get('URL')) 

1961 raise e 

1962 database_proxy.initialize(self._database) 

1963 if self.create_standard_tables: 

1964 standard_tables = mafw_model_register.get_standard_tables() 

1965 self.database.create_tables(standard_tables) 

1966 for table in standard_tables: 

1967 table.init() 

1968 

1969 else: # equivalent to: if self._database is not None: 

1970 # we got a database, so very likely we are inside a processor list 

1971 # the connection has been already set and the initialisation as well. 

1972 # nothing else to do here. 

1973 # do not put the database in the exit stack. who create it has also to close it. 

1974 pass 

1975 

1976 def start(self) -> None: 

1977 """ 

1978 Start method. 

1979 

1980 The user can overload this method, including all steps that should be performed at the beginning of the 

1981 operation. 

1982 

1983 If the user decides to overload it, it should include a call to the super method. 

1984 """ 

1985 self._mark_super_call('start') 

1986 self.processor_status = ProcessorStatus.Start 

1987 self._remove_orphan_files() 

1988 

1989 def get_items(self) -> Collection[Any]: 

1990 """ 

1991 Returns the item collections for the processor loop. 

1992 

1993 This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the 

1994 database, or a list of files from the disk to be processed. 

1995 

1996 :return: A collection of items for the loop 

1997 :rtype: Collection[Any] 

1998 """ 

1999 return [] 

2000 

2001 def while_condition(self) -> bool: 

2002 """ 

2003 Return the while condition 

2004 

2005 :return: True if the while loop has to continue, false otherwise. 

2006 :rtype: bool 

2007 """ 

2008 return False 

2009 

2010 def process(self) -> None: 

2011 """ 

2012 Processes the current item. 

2013 

2014 This is the core of the Processor, where the user has to define the calculations required. 

2015 

2016 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopItem` 

2017 parameter if the user prefers not to rely on thread-local access to :attr:`.Processor.item`, :attr:`.Processor.i_item` and 

2018 :attr:`.Processor.n_item`. 

2019 """ 

2020 pass 

2021 

2022 def accept_item(self) -> None: 

2023 """ 

2024 Does post process actions on a successfully processed item. 

2025 

2026 Within the :meth:`process`, the user left the looping status to Continue, so it means that everything looks 

2027 good and this is the right place to perform database updates or file savings. 

2028 

2029 .. seealso: 

2030 Have a look at :meth:`skip_item` for what to do in case something went wrong. 

2031 

2032 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult` 

2033 parameter for direct access to the processed payload and looping status. 

2034 """ 

2035 pass 

2036 

2037 def skip_item(self) -> None: 

2038 """ 

2039 Does post process actions on a *NOT* successfully processed item. 

2040 

2041 Within the :meth:`process`, the user set the looping status to Skip, so it means that something went wrong 

2042 and here corrective actions can be taken if needed. 

2043 

2044 .. seealso: 

2045 Have a look at :meth:`accept_item` for what to do in case everything was OK. 

2046 

2047 In parallel for loops, the method can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult` 

2048 parameter for direct access to the processed payload and looping status. 

2049 """ 

2050 pass 

2051 

2052 def consumer_start(self) -> None: 

2053 """ 

2054 Executes once in the consumer thread for the parallel queue loop. 

2055 

2056 This hook mirrors :meth:`start` but is only used with the queue-based parallel loop. 

2057 """ 

2058 pass 

2059 

2060 def consumer_process(self, loop_result: LoopResult | None = None) -> None: 

2061 """ 

2062 Handle processed items in the consumer thread for the parallel queue loop. 

2063 

2064 By default this dispatches to :meth:`accept_item` or :meth:`skip_item` based on the looping status. The method 

2065 can optionally accept a :class:`~mafw.models.loop_payloads.LoopResult` payload. 

2066 """ 

2067 if loop_result is None: 

2068 loop_result = self._build_loop_result(None, 0.0) 

2069 if self.looping_status == LoopingStatus.Continue: 

2070 self._call_with_optional_payload(self.accept_item, loop_result) 

2071 elif self.looping_status == LoopingStatus.Skip: 2071 ↛ exitline 2071 didn't return from function 'consumer_process' because the condition on line 2071 was always true

2072 self._call_with_optional_payload(self.skip_item, loop_result) 

2073 

2074 def consumer_finish(self) -> None: 

2075 """ 

2076 Executes once in the consumer thread after the queue has been drained. 

2077 """ 

2078 pass 

2079 

2080 def finish(self) -> None: 

2081 """ 

2082 Concludes the execution. 

2083 

2084 The user can reimplement this method if there are some conclusive tasks that must be achieved. 

2085 Always include a call to super(). 

2086 """ 

2087 self._mark_super_call('finish') 

2088 self.processor_status = ProcessorStatus.Finish 

2089 if self.looping_status == LoopingStatus.Abort: 

2090 self.processor_exit_status = ProcessorExitStatus.Aborted 

2091 self.print_process_statistics() 

2092 

2093 def print_process_statistics(self) -> None: 

2094 """ 

2095 Print the process statistics. 

2096 

2097 A utility method to display the fastest, the slowest and the average timing required to process on a single 

2098 item. This is particularly useful when the looping processor is part of a ProcessorList. 

2099 """ 

2100 if len(self._process_durations): 

2101 log.info('[cyan] Processed %s items.' % len(self._process_durations)) 

2102 log.info( 

2103 '[cyan] Fastest item process duration: %s ' 

2104 % pretty_format_duration(min(self._process_durations), n_digits=3) 

2105 ) 

2106 log.info( 

2107 '[cyan] Slowest item process duration: %s ' 

2108 % pretty_format_duration(max(self._process_durations), n_digits=3) 

2109 ) 

2110 log.info( 

2111 '[cyan] Average item process duration: %s ' 

2112 % pretty_format_duration((sum(self._process_durations) / len(self._process_durations)), n_digits=3) 

2113 ) 

2114 if self._wall_clock_start is not None: 

2115 total_duration = time.perf_counter() - self._wall_clock_start 

2116 else: 

2117 total_duration = sum(self._process_durations) 

2118 log.info('[cyan] Total process duration: %s' % pretty_format_duration(total_duration, n_digits=3)) 

2119 

2120 def _remove_orphan_files(self) -> None: 

2121 """ 

2122 Remove orphan files. 

2123 

2124 If a connection to the database is available, then the OrphanFile standard table is queried for all its entries, 

2125 and all the files are then removed. 

2126 

2127 The user can turn off this behaviour by switching the :attr:`~mafw.processor.Processor.remove_orphan_files` to False. 

2128 

2129 """ 

2130 if self._database is None or self.remove_orphan_files is False: 

2131 # no database connection or no wish to remove orphan files, it does not make sense to continue 

2132 return 

2133 

2134 try: 

2135 OrphanFile = cast(MAFwBaseModel, mafw_model_register.get_model('OrphanFile')) 

2136 except KeyError: 

2137 log.warning('OrphanFile table not found in DB. Please verify database integrity') 

2138 return 

2139 

2140 if TYPE_CHECKING: 

2141 assert hasattr(OrphanFile, '_meta') 

2142 

2143 orphan_files = OrphanFile.select().execute() # type: ignore[no-untyped-call] 

2144 if len(orphan_files) != 0: 

2145 msg = f'[yellow]Pruning orphan files ({sum(len(f.filenames) for f in orphan_files)})...' 

2146 log.info(msg) 

2147 for orphan in orphan_files: 

2148 # filenames is a list of files: 

2149 for f in orphan.filenames: 

2150 f.unlink(missing_ok=True) 

2151 

2152 OrphanFile.delete().execute() # type: ignore[no-untyped-call] 

2153 

2154 

2155class ProcessorList(list[Union['Processor', 'ProcessorList']]): 

2156 """ 

2157 A list like collection of processors. 

2158 

2159 ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList. 

2160 

2161 An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError. 

2162 

2163 Along with an iterable of processors, a new processor list can be built using the following parameters. 

2164 """ 

2165 

2166 def __init__( 

2167 self, 

2168 *args: Processor | ProcessorList, 

2169 name: str | None = None, 

2170 description: str | None = None, 

2171 timer: Timer | None = None, 

2172 timer_params: dict[str, Any] | None = None, 

2173 user_interface: UserInterfaceBase | None = None, 

2174 database: Database | None = None, 

2175 database_conf: dict[str, Any] | None = None, 

2176 create_standard_tables: bool = True, 

2177 ): 

2178 """ 

2179 Constructor parameters: 

2180 

2181 :param name: The name of the processor list. Defaults to ProcessorList. 

2182 :type name: str, Optional 

2183 :param description: An optional short description. Default to ProcessorList. 

2184 :type description: str, Optional 

2185 :param timer: The timer object. If None is provided, a new one will be created. Defaults to None. 

2186 :type timer: Timer, Optional 

2187 :param timer_params: A dictionary of parameter to build the timer object. Defaults to None. 

2188 :type timer_params: dict, Optional 

2189 :param user_interface: A user interface. Defaults to None 

2190 :type user_interface: UserInterfaceBase, Optional 

2191 :param database: A database instance. Defaults to None. 

2192 :type database: Database, Optional 

2193 :param database_conf: Configuration for the database. Default to None. 

2194 :type database_conf: dict, Optional 

2195 :param create_standard_tables: Whether or not to create the standard tables. Defaults to True. 

2196 :type create_standard_tables: bool, Optional 

2197 """ 

2198 

2199 # validate_items takes a tuple of processors, that's why we don't unpack args. 

2200 super().__init__(self.validate_items(args)) 

2201 self._name = name or self.__class__.__name__ 

2202 self.description = description or self._name 

2203 

2204 self.timer = timer 

2205 self.timer_params = timer_params or {} 

2206 self._user_interface = user_interface or ConsoleInterface() 

2207 

2208 self._resource_stack: contextlib.ExitStack 

2209 self._processor_exit_status: ProcessorExitStatus = ProcessorExitStatus.Successful 

2210 self.nested_list = False 

2211 """ 

2212 Boolean flag to identify that this list is actually inside another list. 

2213  

2214 Similarly to the local resource flag for the :class:`.Processor`, this flag prevent the user interface to be  

2215 added to the resource stack. 

2216 """ 

2217 

2218 # database stuff 

2219 self._database: peewee.Database | None = database 

2220 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf) 

2221 self.create_standard_tables = create_standard_tables 

2222 """The boolean flag to proceed or skip with standard table creation and initialisation""" 

2223 

2224 def __setitem__( # type: ignore[override] 

2225 self, 

2226 __index: SupportsIndex, 

2227 __object: Processor | ProcessorList, 

2228 ) -> None: 

2229 super().__setitem__(__index, self.validate_item(__object)) 

2230 

2231 def insert(self, __index: SupportsIndex, __object: Processor | ProcessorList) -> None: 

2232 """Adds a new processor at the specified index.""" 

2233 super().insert(__index, self.validate_item(__object)) 

2234 

2235 def append(self, __object: Processor | ProcessorList) -> None: 

2236 """Appends a new processor at the end of the list.""" 

2237 super().append(self.validate_item(__object)) 

2238 

2239 def extend(self, __iterable: Iterable[Processor | ProcessorList]) -> None: 

2240 """Extends the processor list with a list of processors.""" 

2241 if isinstance(__iterable, type(self)): 

2242 super().extend(__iterable) 

2243 else: 

2244 super().extend([self.validate_item(item) for item in __iterable]) 

2245 

2246 @staticmethod 

2247 def validate_item(item: Processor | ProcessorList) -> Processor | ProcessorList: 

2248 """Validates the item being added.""" 

2249 if isinstance(item, Processor): 

2250 item.local_resource_acquisition = False 

2251 return item 

2252 elif isinstance(item, ProcessorList): 

2253 item.timer_params = dict(suppress_message=True) 

2254 item.nested_list = True 

2255 return item 

2256 else: 

2257 raise TypeError(f'Expected Processor or ProcessorList, got {type(item).__name__}') 

2258 

2259 @staticmethod 

2260 def validate_items(items: tuple[Processor | ProcessorList, ...] = ()) -> tuple[Processor | ProcessorList, ...]: 

2261 """Validates a tuple of items being added.""" 

2262 if not items: 

2263 return tuple() 

2264 return tuple([ProcessorList.validate_item(item) for item in items if item is not None]) 

2265 

2266 @property 

2267 def name(self) -> str: 

2268 """ 

2269 The name of the processor list 

2270 

2271 :return: The name of the processor list 

2272 :rtype: str 

2273 """ 

2274 return self._name 

2275 

2276 @name.setter 

2277 def name(self, name: str) -> None: 

2278 self._name = name 

2279 

2280 @property 

2281 def processor_exit_status(self) -> ProcessorExitStatus: 

2282 """ 

2283 The processor exit status. 

2284 

2285 It refers to the whole processor list execution. 

2286 """ 

2287 return self._processor_exit_status 

2288 

2289 @processor_exit_status.setter 

2290 def processor_exit_status(self, status: ProcessorExitStatus) -> None: 

2291 self._processor_exit_status = status 

2292 

2293 @property 

2294 def database(self) -> peewee.Database: 

2295 """ 

2296 Returns the database instance 

2297 

2298 :return: A database instance 

2299 :raises MissingDatabase: if a database connection is missing. 

2300 """ 

2301 if self._database is None: 

2302 raise MissingDatabase('Database connection not initialized') 

2303 return self._database 

2304 

2305 def execute(self) -> ProcessorExitStatus: 

2306 """ 

2307 Execute the list of processors. 

2308 

2309 Similarly to the :class:`Processor`, ProcessorList can be executed. In simple words, the execute 

2310 method of each processor in the list is called exactly in the same sequence as they were added. 

2311 """ 

2312 with contextlib.ExitStack() as self._resource_stack: 

2313 self.acquire_resources() 

2314 self._user_interface.create_task(self.name, self.description, completed=0, increment=0, total=len(self)) 

2315 for i, item in enumerate(self): 

2316 if isinstance(item, Processor): 

2317 log.info('[bold]Executing [red]%s[/red] processor[/bold]' % item.replica_name) 

2318 else: 

2319 log.info('[bold]Executing [blue]%s[/blue] processor list[/bold]' % item.name) 

2320 self.distribute_resources(item) 

2321 item.execute() 

2322 self._user_interface.update_task(self.name, increment=1) # i+1, 1, len(self)) 

2323 self._processor_exit_status = item.processor_exit_status 

2324 if self._processor_exit_status == ProcessorExitStatus.Aborted: 

2325 msg = 'Processor %s caused the processor list to abort' % item.name 

2326 log.error(msg) 

2327 raise AbortProcessorException(msg) 

2328 self._user_interface.update_task(self.name, completed=len(self), total=len(self)) 

2329 return self._processor_exit_status 

2330 

2331 def acquire_resources(self) -> None: 

2332 """Acquires external resources.""" 

2333 # The strategy is similar to the one for processor. if we do get resources already active (not None) then we use 

2334 # them, otherwise, we create them and we add them to the resource stack. 

2335 if self.timer is None: 

2336 self.timer = self._resource_stack.enter_context(Timer(**self.timer_params)) 

2337 # The user interface is very likely already initialised by the runner. 

2338 # But if this is a nested list, then we must not push the user interface in the stack 

2339 # otherwise the user interface context (progress for rich) will be stopped at the end 

2340 # of the nested list. 

2341 if not self.nested_list: 

2342 self._resource_stack.enter_context(self._user_interface) 

2343 if self._database is None and self._database_conf is None: 

2344 # no database, nor configuration. 

2345 # we cannot do anything 

2346 pass 

2347 elif self._database is None and self._database_conf is not None: 

2348 # no db, but we got a configuration. 

2349 # we can make a db 

2350 if 'DBConfiguration' in self._database_conf: 

2351 conf = self._database_conf['DBConfiguration'] # type1 

2352 else: 

2353 conf = self._database_conf # type2 

2354 

2355 # guess the database type from the URL 

2356 protocol = extract_protocol(conf.get('URL')) 

2357 

2358 # build the connection parameter 

2359 # in case of sqlite, we add the pragmas group as well 

2360 connection_parameters = {} 

2361 if protocol == 'sqlite': 

2362 connection_parameters['pragmas'] = conf.get('pragmas', {}) 

2363 for key, value in conf.items(): 

2364 if key not in ['URL', 'pragmas']: 

2365 connection_parameters[key] = value 

2366 

2367 self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB 

2368 try: 

2369 self._database.connect() 

2370 self._resource_stack.callback(self._database.close) 

2371 except peewee.OperationalError as e: 

2372 log.critical('Unable to connect to %s', self._database_conf.get('URL')) 

2373 raise e 

2374 database_proxy.initialize(self._database) 

2375 if self.create_standard_tables: 

2376 standard_tables = mafw_model_register.get_standard_tables() 

2377 self.database.create_tables(standard_tables) 

2378 for table in standard_tables: 

2379 table.init() 

2380 else: # equiv to if self._database is not None: 

2381 # we got a database, so very likely we are inside a processor list 

2382 # the connection has been already set and the initialisation as well. 

2383 # nothing else to do here. 

2384 pass 

2385 

2386 def distribute_resources(self, processor: Processor | Self) -> None: 

2387 """Distributes the external resources to the items in the list.""" 

2388 processor.timer = self.timer 

2389 processor._user_interface = self._user_interface 

2390 processor._database = self._database