Coverage for src / mafw / processor.py: 99%

578 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core 

6functionality of the MAFw. 

7""" 

8 

9from __future__ import annotations 

10 

11import contextlib 

12import inspect 

13import logging 

14import warnings 

15from collections.abc import Callable, Iterator 

16from copy import copy, deepcopy 

17from functools import wraps 

18from itertools import count 

19from typing import TYPE_CHECKING, Any, Collection, Generic, Iterable, Self, SupportsIndex, TypeVar, Union, cast 

20 

21import peewee 

22from peewee import Database 

23 

24# noinspection PyUnresolvedReferences 

25from playhouse.db_url import connect 

26 

27import mafw.db.db_filter 

28from mafw.active import Active 

29from mafw.db.db_model import MAFwBaseModel, database_proxy, mafw_model_register 

30from mafw.enumerators import LoopingStatus, LoopType, ProcessorExitStatus, ProcessorStatus 

31from mafw.mafw_errors import ( 

32 AbortProcessorException, 

33 MissingDatabase, 

34 MissingOverloadedMethod, 

35 MissingSuperCall, 

36 ProcessorParameterError, 

37) 

38from mafw.timer import Timer, pretty_format_duration 

39from mafw.tools.generics import deep_update 

40from mafw.tools.regexp import extract_protocol 

41from mafw.ui.abstract_user_interface import UserInterfaceBase 

42from mafw.ui.console_user_interface import ConsoleInterface 

43 

44log = logging.getLogger(__name__) 

45 

46ParameterType = TypeVar('ParameterType') 

47"""Generic variable type for the :class:`ActiveParameter` and :class:`PassiveParameter`.""" 

48 

49 

50def validate_database_conf(database_conf: dict[str, Any] | None = None) -> dict[str, Any] | None: 

51 """ 

52 Validates the database configuration. 

53 

54 :param database_conf: The input database configuration. Defaults to None. 

55 :type database_conf: dict, Optional 

56 :return: Either the validated database configuration or None if it is invalid. 

57 :rtype: dict, None 

58 """ 

59 if database_conf is None: 

60 return None 

61 

62 # dict is mutable, if I change it inside the function, I change it also outside. 

63 conf = database_conf.copy() 

64 

65 if 'DBConfiguration' in conf: 

66 # the database_conf is the steering file. Extract the DBConfiguration table 

67 conf = conf['DBConfiguration'] 

68 required_fields = ['URL'] 

69 if all([field in conf for field in required_fields]): 

70 return database_conf 

71 else: 

72 return None 

73 

74 

75class PassiveParameter(Generic[ParameterType]): 

76 """ 

77 An helper class to store processor parameter value and metadata. 

78 

79 This class is the private interface used by the :class:`ActiveParameter` descriptor to store its value and metadata. 

80 

81 When a new :class:`.ActiveParameter` is added to a class, an instance of a PassiveParameter is added to the 

82 processor parameter :attr:`register <.Processor._processor_parameters>`. 

83 

84 .. seealso:: 

85 

86 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor 

87 parameters <parameters>` 

88 

89 .. versionchanged:: v2.0.0 

90 

91 User should only use :class:`ActiveParameter` and never manually instantiate :class:`PassiveParameter`. 

92 """ 

93 

94 def __init__( 

95 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '' 

96 ): 

97 """ 

98 Constructor parameters: 

99 

100 :param name: The name of the parameter. It must be a valid python identifier. 

101 :type name: str 

102 :param value: The set value of the parameter. If None, then the default value will be used. Defaults to None. 

103 :type value: ParameterType, Optional 

104 :param default: The default value for the parameter. It is used if the :attr:`value` is not provided. Defaults to None. 

105 :type default: ParameterType, Optional 

106 :param help_doc: A brief explanation of the parameter. 

107 :type help_doc: str, Optional 

108 :raises ProcessorParameterError: if both `value` and `default` are not provided or if `name` is not a valid identifier. 

109 """ 

110 if not name.isidentifier(): 

111 raise ProcessorParameterError(f'{name} is not a valid python identifier.') 

112 

113 self.name = name 

114 

115 if value is not None: 

116 self._value: ParameterType = value 

117 self._is_set = True 

118 self._is_optional = False 

119 elif default is not None: 

120 self._value = default 

121 self._is_set = False 

122 self._is_optional = True 

123 else: 

124 raise ProcessorParameterError('Processor parameter cannot have both value and default value set to None') 

125 

126 self.doc = help_doc 

127 

128 def __rich_repr__(self) -> Iterator[Any]: 

129 yield 'name', self.name 

130 yield 'value', self.value, None 

131 yield 'help_doc', self.doc, '' 

132 

133 @property 

134 def is_set(self) -> bool: 

135 """ 

136 Property to check if the value has been set. 

137 

138 It is useful for optional parameter to see if the current value is the default one, or if the user set it. 

139 """ 

140 return self._is_set 

141 

142 @property 

143 def value(self) -> ParameterType: 

144 """ 

145 Gets the parameter value. 

146 

147 :return: The parameter value. 

148 :rtype: ParameterType 

149 :raises ProcessorParameterError: if both value and default were not defined. 

150 """ 

151 return self._value 

152 

153 @value.setter 

154 def value(self, value: ParameterType) -> None: 

155 """ 

156 Sets the parameter value. 

157 

158 :param value: The value to be set. 

159 :type value: ParameterType 

160 """ 

161 self._value = value 

162 self._is_set = True 

163 

164 @property 

165 def is_optional(self) -> bool: 

166 """ 

167 Property to check if the parameter is optional. 

168 

169 :return: True if the parameter is optional 

170 :rtype: bool 

171 """ 

172 return self._is_optional 

173 

174 def __repr__(self) -> str: 

175 args = ['name', 'value', 'doc'] 

176 values = [getattr(self, arg) for arg in args] 

177 return '{klass}({attrs})'.format( 

178 klass=self.__class__.__name__, 

179 attrs=', '.join('{}={!r}'.format(k, v) for k, v in zip(args, values)), 

180 ) 

181 

182 

183F = TypeVar('F', bound=Callable[..., Any]) 

184"""Type variable for generic callable with any return value.""" 

185 

186 

187def ensure_parameter_registration(func: F) -> F: 

188 """Decorator to ensure that before calling `func` the processor parameters have been registered.""" 

189 

190 @wraps(func) 

191 def wrapper(*args: Processor, **kwargs: Any) -> F: 

192 # the first positional arguments must be self 

193 if len(args) == 0: 

194 raise ProcessorParameterError( 

195 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.' 

196 ) 

197 self = args[0] 

198 if not isinstance(self, Processor): 

199 raise ProcessorParameterError( 

200 'Attempt to apply the ensure_parameter_registration to something different to a Processor subclass.' 

201 ) 

202 if self._parameter_registered is False: 

203 self._register_parameters() 

204 return cast(F, func(*args, **kwargs)) 

205 

206 return cast(F, wrapper) 

207 

208 

209class ActiveParameter(Generic[ParameterType]): 

210 r""" 

211 The public interface to the processor parameter. 

212 

213 The behaviour of a :class:`Processor` can be customised by using processor parameters. The value of these 

214 parameters can be either set via a configuration file or directly when creating the class. 

215 

216 If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an 

217 ActiveParameter instance in this way: 

218 

219 .. code-block:: 

220 

221 class MyProcessor(Processor): 

222 

223 # this is the input folder 

224 input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files') 

225 

226 def __init__(self, *args, **kwargs): 

227 super().__init(*args, **kwargs) 

228 

229 # change the input folder to something else 

230 self.input_folder = Path(r'D:\data') 

231 

232 # get the value of the parameter 

233 print(self.input_folder) 

234 

235 The ActiveParameter is a `descriptor <https://docs.python.org/3/glossary.html#term-descriptor>`_, it means that 

236 when you create one of them, a lot of work is done behind the scene. 

237 

238 In simple words, a processor parameter is made by two objects: a public interface where the user can easily 

239 access the value of the parameter and a private interface where all other information (default, documentation...) 

240 is also stored. 

241 

242 The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as 

243 in the code snippet above, the private interface is automatically created and will stay in the class instance 

244 until the end of the class lifetime. 

245 

246 To access the private interface, the user can use the :meth:`Processor.get_parameter` method using the parameter 

247 name as a key. 

248 

249 The user can assign to an ActiveParameter almost any name. There are just a few invalid parameter names that are 

250 used for other purposes. The list of reserved names is available :attr:`here <reserved_names>`. Should the user 

251 inadvertently use a reserved named, a :exc:`.ProcessorParameterError` is raised. 

252 

253 .. seealso:: 

254 

255 The private counter part in the :class:`PassiveParameter`. 

256 

257 An explanation on how processor parameters work and should be used is given in :ref:`Understanding processor 

258 parameters <parameters>` 

259 

260 The list of :attr:`reserved names <reserved_names>`. 

261 """ 

262 

263 reserved_names: list[str] = ['__logic__', '__filter__', '__new_only__', '__inheritance__'] 

264 """A list of names that cannot be used as processor parameter names. 

265  

266 - `__logic__`  

267 - `__filter__` 

268 - `__new_only__` 

269 - `__inheritance__` 

270 """ 

271 

272 def __init__( 

273 self, name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '' 

274 ): 

275 """ 

276 Constructor parameters: 

277 

278 :param name: The name of the parameter. 

279 :type name: str 

280 :param value: The initial value of the parameter. Defaults to None. 

281 :type value: ParameterType, Optional 

282 :param default: The default value of the parameter, to be used when ``value`` is not set., Defaults to None. 

283 :type default: ParameterType, Optional 

284 :param help_doc: An explanatory text describing the parameter. 

285 :type help_doc: str, Optional 

286 """ 

287 

288 self._value = value 

289 self._default = default 

290 self._help_doc = help_doc 

291 self._external_name = self._validate_name(name) 

292 

293 def _validate_name(self, proposed_name: str) -> str: 

294 """ 

295 Validate that the proposed parameter name is not in the list of forbidden names. 

296 

297 This private method checks if the provided name is allowed for use as a processor parameter. 

298 Names that are listed in :attr:`reserved_names` cannot be used as parameter names. 

299 

300 :param proposed_name: The name to be validated for use as a processor parameter. 

301 :type proposed_name: str 

302 :return: The validated name if it passes the forbidden names check. 

303 :rtype: str 

304 :raises ProcessorParameterError: If the proposed name is in the list of forbidden names. 

305 """ 

306 if proposed_name not in self.reserved_names: 

307 return proposed_name 

308 raise ProcessorParameterError(f'Attempt to use a forbidden name ({proposed_name})') 

309 

310 def __set_name__(self, owner: type[Processor], name: str) -> None: 

311 self.public_name = name 

312 self.private_name = f'param_{name}' 

313 self._owner = owner 

314 

315 def __get__(self, obj: Processor, obj_type: type[Processor]) -> ActiveParameter[ParameterType] | ParameterType: 

316 if obj is None: 

317 return self 

318 

319 # retrieve instance-level passive parameter 

320 param = obj._processor_parameters[self._external_name] 

321 return param.value 

322 

323 def __set__(self, obj: Processor, value: ParameterType) -> None: 

324 param = obj._processor_parameters[self._external_name] 

325 param.value = value 

326 

327 

328class ProcessorMeta(type): 

329 """A metaclass to implement the post-init method.""" 

330 

331 def __call__(cls, *args: Any, **kwargs: Any) -> 'ProcessorMeta': 

332 obj = type.__call__(cls, *args, **kwargs) 

333 obj.__post_init__() 

334 return cast(ProcessorMeta, obj) 

335 

336 

337# noinspection PyProtectedMember 

338class Processor(metaclass=ProcessorMeta): 

339 """ 

340 The basic processor. 

341 

342 A very comprehensive description of what a Processor does and how it works is available at :ref:`doc_processor`. 

343 """ 

344 

345 processor_status = Active(ProcessorStatus.Unknown) 

346 """Processor execution status""" 

347 

348 looping_status = Active(LoopingStatus.Continue) 

349 """Looping modifier""" 

350 

351 progress_message: str = f'{__qualname__} is working' 

352 """Message displayed to show the progress.  

353  

354 It can be customized with information about the current item in the loop by overloading the  

355 :meth:`format_progress_message`.""" 

356 

357 _ids = count(0) 

358 """A counter for all processor instances""" 

359 

360 new_defaults: dict[str, Any] = {} 

361 """ 

362 A dictionary containing defaults value for the parameters to be overridden 

363  

364 .. versionadded:: v2.0.0 

365 """ 

366 

367 new_only_flag = 'new_only' 

368 

369 def __init__( 

370 self, 

371 name: str | None = None, 

372 description: str | None = None, 

373 config: dict[str, Any] | None = None, 

374 looper: LoopType | str = LoopType.ForLoop, 

375 user_interface: UserInterfaceBase | None = None, 

376 timer: Timer | None = None, 

377 timer_params: dict[str, Any] | None = None, 

378 database: Database | None = None, 

379 database_conf: dict[str, Any] | None = None, 

380 remove_orphan_files: bool = True, 

381 replica_id: str | None = None, 

382 create_standard_tables: bool = True, 

383 *args: Any, 

384 **kwargs: Any, 

385 ) -> None: 

386 """ 

387 Constructor parameters 

388 

389 :param name: The name of the processor. If None is provided, the class name is used instead. Defaults to None. 

390 :type name: str, Optional 

391 :param description: A short description of the processor task. Defaults to the processor name. 

392 :type description: str, Optional 

393 :param config: A configuration dictionary for this processor. Defaults to None. 

394 :type config: dict, Optional 

395 :param looper: Enumerator to define the looping type. Defaults to LoopType.ForLoop 

396 :type looper: LoopType, Optional 

397 :param user_interface: A user interface instance to be used by the processor to interact with the user. 

398 :type user_interface: UserInterfaceBase, Optional 

399 :param timer: A timer object to measure process duration. 

400 :type timer: Timer, Optional 

401 :param timer_params: Parameters for the timer object. 

402 :type timer_params: dict, Optional 

403 :param database: A database instance. Defaults to None. 

404 :type database: Database, Optional 

405 :param database_conf: Configuration for the database. Default to None. 

406 :type database_conf: dict, Optional 

407 :param remove_orphan_files: Boolean flag to remove files on disc without a reference to the database. 

408 See :ref:`std_tables` and :meth:`~mafw.processor.Processor._remove_orphan_files`. Defaults to True 

409 :type remove_orphan_files: bool, Optional 

410 :param replica_id: The replica identifier for the current processor. 

411 :type replica_id: str, Optional 

412 :param create_standard_tables: Boolean flag to create std tables on disk. Defaults to True 

413 :type create_standard_tables: bool, Optional 

414 :param kwargs: Keyword arguments that can be used to set processor parameters. 

415 """ 

416 

417 self.name = name or self.__class__.__name__ 

418 """The name of the processor.""" 

419 

420 self.unique_id = next(self._ids) 

421 """A unique identifier representing how many instances of Processor has been created.""" 

422 

423 self.replica_id = replica_id 

424 """ 

425 The replica identifier specified in the constructor 

426  

427 .. versionadded:: v2.0.0 

428 """ 

429 

430 self.description = description or self.name 

431 """A short description of the processor task.""" 

432 

433 self.item: Any = None 

434 """The current item of the loop.""" 

435 

436 self.processor_exit_status = ProcessorExitStatus.Successful 

437 """Processor exit status""" 

438 

439 self.loop_type: LoopType = LoopType(looper) 

440 """ 

441 The loop type.  

442  

443 The value of this parameter can also be changed by the :func:`~mafw.decorators.execution_workflow` decorator  

444 factory.  

445  

446 See :class:`~mafw.enumerators.LoopType` for more details. 

447 """ 

448 

449 self.create_standard_tables = create_standard_tables 

450 """The boolean flag to proceed or skip with standard table creation and initialisation""" 

451 

452 # private attributes 

453 self._config: dict[str, Any] = {} # deepcopy(config) if config is not None else {} 

454 """ 

455 A dictionary containing the processor configuration object. 

456  

457 This dictionary is populated with configuration parameter (always type 2) during the  

458 :meth:`._load_parameter_configuration` method. 

459  

460 The original value of the configuration dictionary that is passed to the constructor is stored in  

461 :attr:`._orig_config`. 

462  

463 .. versionchanged:: v2.0.0 

464 Now it is an empty dictionary until the :meth:`._load_parameter_configuration` is called.  

465  

466 """ 

467 

468 self._orig_config = deepcopy(config) if config is not None else {} 

469 """ 

470 A copy of the original configuration dictionary. 

471  

472 .. versionadded:: v2.0.0 

473 """ 

474 

475 self._processor_parameters: dict[str, PassiveParameter[ParameterType]] = {} # type: ignore 

476 """ 

477 A dictionary to store all the processor parameter instances.  

478  

479 The name of the parameter is used as a key, while for the value an instance of the  

480 :class:`.PassiveParameter` is used. 

481 """ 

482 # wait for answer from SO 

483 self._parameter_registered = False 

484 """A boolean flag to confirm successful parameter registration.""" 

485 self._kwargs = kwargs 

486 

487 # loops attributes 

488 self._i_item: int = -1 

489 self._n_item: int | None = -1 

490 self._process_durations: list[float] = [] 

491 

492 # resource stack 

493 self._resource_stack: contextlib.ExitStack 

494 self._resource_acquisition: bool = True 

495 

496 # processor timer 

497 self.timer: Timer | None = timer 

498 self._timer_parameters: dict[str, Any] = timer_params or {} 

499 

500 # user interface 

501 if user_interface is None: 

502 self._user_interface: UserInterfaceBase = ConsoleInterface() 

503 else: 

504 self._user_interface = user_interface 

505 

506 # database stuff 

507 self._database: peewee.Database | None = database 

508 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf) 

509 self.filter_register: mafw.db.db_filter.ProcessorFilter = mafw.db.db_filter.ProcessorFilter() 

510 """The DB filter register of the Processor.""" 

511 self.remove_orphan_files: bool = remove_orphan_files 

512 """The flag to remove or protect the orphan files. Defaults to True""" 

513 

514 # sub-classing stuff 

515 # todo: 

516 # should we make it a class attribute instead of an instance attribute? 

517 self._methods_to_be_checked_for_super = [('start', Processor), ('finish', Processor)] 

518 """ 

519 List of methods to be checked for super inclusion. 

520  

521 It is a list of tuple, with the first element the name of the method to be checked and the second the base  

522 class to the be compared. 

523 """ 

524 self.initialise_parameters() 

525 

526 def initialise_parameters(self) -> None: 

527 """ 

528 Initialises processor parameters by registering them and applying various configuration sources. 

529 

530 This method orchestrates the parameter initialisation process by performing the following steps in order: 

531 

532 #. Registers processor parameters defined as :class:`ActiveParameter` instances 

533 #. Overrides default parameter values with any configured overrides 

534 #. Loads parameter configuration from the processor's configuration dictionary 

535 #. Applies keyword arguments as parameter overrides 

536 

537 The method ensures that all processor parameters are properly configured before the processor 

538 execution begins. It is automatically called during processor initialisation and should not 

539 typically be called directly by users. 

540 

541 .. seealso:: 

542 :meth:`_register_parameters`, :meth:`_override_defaults`, 

543 :meth:`_load_parameter_configuration`, :meth:`_overrule_kws_parameters` 

544 

545 .. versionadded:: v2.0.0 

546 """ 

547 self._register_parameters() 

548 self._override_defaults() 

549 self._load_parameter_configuration() 

550 self._overrule_kws_parameters() 

551 

552 def __post_init__(self) -> None: 

553 """ 

554 Performs post-initialisation tasks for the processor. 

555 

556 This method is automatically called after the processor initialisation is complete. 

557 It performs validation checks on overloaded methods and sets the initial processor status. 

558 

559 .. seealso:: 

560 :meth:`validate_configuration`, :meth:`_check_method_overload`, :meth:`_check_method_super`, 

561 :attr:`~mafw.processor.Processor.processor_status` 

562 

563 .. versionchanged:: v2.0.0 

564 Moved the parameter initialisation to :meth:`initialise_parameters` and now executed as last step of the 

565 init method. 

566 

567 Added the validate configuration check. This method should silently check that configuration provided 

568 with the processor parameters is valid. If not, a :exc:`.ProcessorParameterError` is raised. 

569 """ 

570 self.validate_configuration() 

571 self._check_method_overload() 

572 self._check_method_super() 

573 self.processor_status = ProcessorStatus.Init 

574 

575 def _register_parameters(self) -> None: 

576 """ 

577 Register processor parameters defined as ActiveParameter instances in the class. 

578 

579 This private method scans the class definition for any :class:`.ActiveParameter` instances and creates 

580 corresponding :class:`.PassiveParameter` instances to store the actual parameter values and metadata. 

581 It ensures that all processor parameters are properly initialised and available for configuration 

582 through the processor's configuration system. 

583 

584 The method checks for duplicate parameter names and raises a :exc:`.ProcessorParameterError` if duplicates 

585 are detected. It also sets the internal flag :attr:`._parameter_registered` to True once registration is 

586 complete. 

587 

588 .. note:: 

589 This method is automatically called during processor initialisation and should not be called directly 

590 by users. 

591 

592 .. seealso:: 

593 :class:`.Processor`, :meth:`.Processor._override_defaults`, 

594 :meth:`.Processor._load_parameter_configuration`, :meth:`.Processor._overrule_kws_parameters` 

595 

596 .. versionchanged:: v2.0.0 

597 Only :class:`ActiveParameter` are not registered. The use of :class:`PassiveParameter` is only meant to 

598 store the value and metadata of the active counter part. 

599 """ 

600 if self._parameter_registered: 

601 return 

602 

603 for name in dir(self.__class__): 

604 attr = getattr(self.__class__, name) 

605 

606 if isinstance(attr, ActiveParameter): 

607 ext_name = attr._external_name 

608 if ext_name in self._processor_parameters: 

609 raise ProcessorParameterError(f'Duplicated parameter name ({ext_name}.') 

610 self._processor_parameters[ext_name] = PassiveParameter( 

611 ext_name, attr._value, attr._default, attr._help_doc 

612 ) 

613 

614 self._parameter_registered = True 

615 

616 def _override_defaults(self) -> None: 

617 """ 

618 Override default parameter values with values from :attr:`new_defaults`. 

619 

620 This private method iterates through the :attr:`new_defaults` dictionary and updates 

621 the corresponding processor parameters with new values. Only parameters that exist 

622 in both :attr:`new_defaults` and :attr:`_processor_parameters` are updated. 

623 

624 .. versionadded:: v2.0.0 

625 """ 

626 for key, value in self.new_defaults.items(): 

627 if key in self._processor_parameters: 

628 self._processor_parameters[key].value = value 

629 

630 def _reset_parameters(self) -> None: 

631 """ 

632 Reset processor parameters to their initial state. 

633 

634 This method clears all currently registered processor parameters and triggers 

635 a fresh registration process. It's useful when parameter configurations need 

636 to be reinitialized or when parameters have been modified and need to be reset. 

637 

638 .. seealso:: 

639 :meth:`_register_parameters`, :meth:`_register_parameters` 

640 """ 

641 self._processor_parameters = {} 

642 self._parameter_registered = False 

643 self._register_parameters() 

644 

645 @ensure_parameter_registration 

646 def _load_parameter_configuration(self) -> None: 

647 """ 

648 Load processor parameter configuration from the internal configuration dictionary. 

649 

650 This method processes the processor's configuration dictionary to set parameter values. 

651 It handles two configuration formats: 

652 

653 1. Nested format: ``{'ProcessorName': {'param1': value1, ...}}`` 

654 2. Flat format: ``{'param1': value1, ...}`` 

655 

656 The method also handles filter configurations by collecting filter table names 

657 and deferring their initialisation until after the global filter has been processed. 

658 

659 .. versionchanged:: v2.0.0 

660 For option 1 combining configuration from name and name_replica 

661 

662 :raises ProcessorParameterError: If a parameter in the configuration is not registered. 

663 

664 .. seealso:: 

665 :meth:`mafw.db.db_filter.ModelFilter.from_conf` 

666 """ 

667 original_config = copy(self._orig_config) 

668 flt_list = [] 

669 

670 # by default the flag new_only is set to true 

671 # unless the user specify differently in the general section of the steering file 

672 self.filter_register.new_only = original_config.get(self.new_only_flag, True) 

673 

674 # we need to check if the configuration object is of type 1 or type 2 

675 if any([name for name in [self.name, self.replica_name] if name in original_config]): 

676 # one of the two names (the base or the replica) must be present in case of option 1 

677 # we start from the base name. If not there, then take an empty dict 

678 option1_config_base = original_config.get(self.name, {}) 

679 if self.name != self.replica_name: 

680 # if there is the replica name, then update the base configuration with the replica value 

681 # we get the replica configuration 

682 option1_config_replica = original_config.get(self.replica_name, {}) 

683 

684 # let's check if the user wants to have inheritance default 

685 # by default is True 

686 inheritance = option1_config_replica.get('__inheritance__', True) 

687 if inheritance: 

688 # we update the base with the replica without changing the base 

689 option1_config_update = deep_update(option1_config_base, option1_config_replica, copy_first=True) 

690 else: 

691 # we do not use the base with the replica specific, we pass the replica as the updated 

692 option1_config_update = option1_config_replica 

693 

694 # we modify the type 1 original so that the table for the replica has the updated configuration 

695 # this is used for the filter configuration at the end. 

696 original_config[self.replica_name] = option1_config_update 

697 else: 

698 # there is not replica, so the update is equal to the base. 

699 option1_config_update = option1_config_base 

700 

701 self._config = option1_config_update 

702 else: 

703 # for type 2 we are already good to go 

704 self._config = original_config 

705 

706 for key, value in self._config.items(): 

707 if key in self._processor_parameters: 

708 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO 

709 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above 

710 elif key == '__filter__': 

711 # we got a filter table! 

712 # it should contain one table for each model 

713 # we add all the names to a list for deferred initialisation 

714 flt_table = self._config[key] 

715 flt_list.extend([f'{self.replica_name}.__filter__.{model}' for model in flt_table]) 

716 elif key == '__logic__': 

717 # we got a filter logic string 

718 # we store it in the filter register directly 

719 self.filter_register._logic = self._config[key] 

720 elif key == '__new_only__': 

721 # we got a new only boolean, we store it in the filter register 

722 self.filter_register.new_only = self._config[key] 

723 

724 # only now, after the configuration file has been totally read, we can do the real filter initialisation. 

725 # This is to be sure that if there were a GlobalFilter table, this has been read. 

726 # The global filter region will be used as a starting point for the construction of a new filter (default 

727 # parameter in the from_conf class method). 

728 for flt_name in flt_list: 

729 model_name = flt_name.split('.')[-1] 

730 self.filter_register[model_name] = mafw.db.db_filter.ModelFilter.from_conf(flt_name, original_config) 

731 

732 @ensure_parameter_registration 

733 def _overrule_kws_parameters(self) -> None: 

734 """ 

735 Override processor parameters with values from keyword arguments. 

736 

737 This method applies parameter values passed as keyword arguments during processor 

738 initialisation. It ensures that the parameter types match the expected types 

739 before setting the values. 

740 

741 .. seealso:: 

742 :meth:`_register_parameters`, :meth:`_load_parameter_configuration`, 

743 :meth:`set_parameter_value` 

744 """ 

745 for key, value in self._kwargs.items(): 

746 if key in self._processor_parameters: 

747 type_: ParameterType = type(self.get_parameter(key).value) # type: ignore # wait for answer from SO 

748 self.set_parameter_value(key, type_(value)) # type: ignore # no idea how to fix it, may be linked with above 

749 

750 def validate_configuration(self) -> None: 

751 """ 

752 Validate the configuration provided via the processor parameters. 

753 

754 Method to be implemented by subclasses if a configuration validation is needed. 

755 

756 The method should silently check for the proper configuration, if this is not obtained, 

757 then the :exc:`.InvalidConfigurationError` must be raised. 

758 

759 .. versionadded:: v2.0.0 

760 """ 

761 pass 

762 

763 def _check_method_overload(self) -> None: 

764 """ 

765 Check if the user overloaded the required methods. 

766 

767 Depending on the loop type, the user must overload different methods. 

768 This method is doing the check and if the required methods are not overloaded a warning is emitted. 

769 """ 

770 methods_dict: dict[LoopType, list[str]] = { 

771 LoopType.WhileLoop: ['while_condition'], 

772 LoopType.ForLoop: ['get_items'], 

773 } 

774 required_methods: list[str] = methods_dict.get(self.loop_type, []) 

775 for method in required_methods: 

776 if getattr(type(self), method) == getattr(Processor, method): 

777 warnings.warn( 

778 MissingOverloadedMethod( 

779 '%s was not overloaded. The process execution workflow might not work.' % method 

780 ) 

781 ) 

782 

783 def _check_method_super(self) -> None: 

784 """ 

785 Check if some specific methods are calling their super. 

786 

787 For some specific methods (for example: start and finish), the user should always call their super method. 

788 This method verifies that the user implementation of these methods is including a super call, otherwise a 

789 warning is emitted to inform the user about the problem and possible misbehaviour of the processor. 

790 

791 The list of methods to be verified is stored in a private class attribute 

792 :attr:`~._methods_to_be_checked_for_super` as a list of tuples, made by the name of the methods to be 

793 verified and the base class for comparison. The base class is required because Processor subclasses may be 

794 extending this list with methods that are not present in the base Processor. See, for example, the 

795 :meth:`~.GenericPlotter.patch_data_frame` that is required to have a super call, but it is not present in the 

796 base Processor. 

797 

798 """ 

799 for method, base in self._methods_to_be_checked_for_super: 

800 # first check if the user overloaded the method. 

801 if getattr(type(self), method) != getattr(base, method): 

802 # if the method is the start method, then it might be that the user decorated the class with the @database_required decorator, 

803 # so it looks different, but it is not 

804 if method == 'start': 

805 sub_start_src = inspect.getsource(getattr(type(self), method)) 

806 base_start_src = inspect.getsource(getattr(base, method)) 

807 if sub_start_src == base_start_src: 

808 # it is actually the same method even though the function object is different, then 

809 # there is no need to check for the super_call 

810 continue 

811 

812 # let's check if in the overloaded method there is super calls 

813 super_call = f'super().{method}' 

814 method_object = getattr(type(self), method) 

815 

816 # this is the overloaded method source code. 

817 method_source_code = inspect.getsource(method_object) 

818 # we split the whole code in lines 

819 code_lines = method_source_code.split('\n') 

820 # we remove all comments, because the user may have commented out the super 

821 code_lines = [line.strip() for line in code_lines if not line.strip().startswith('#')] 

822 # we rebuild the whole source code, without indentation and comments. 

823 method_source_code = '\n'.join(code_lines) 

824 

825 # check if the super call is in the source. if not then emit a warning 

826 if super_call not in method_source_code: 

827 warnings.warn( 

828 MissingSuperCall( 

829 'The overloaded %s is not invoking its super method. The processor might not work.' % method 

830 ) 

831 ) 

832 

833 @ensure_parameter_registration 

834 def dump_parameter_configuration(self, option: int = 1) -> dict[str, Any]: 

835 """ 

836 Dumps the processor parameter values in a dictionary. 

837 

838 The snippet below explains the meaning of `option`. 

839 

840 .. code-block:: python 

841 

842 # option 1 

843 conf_dict1 = { 

844 'Processor': {'param1': 5, 'input_table': 'my_table'} 

845 } 

846 

847 # option 2 

848 conf_dict2 = {'param1': 5, 'input_table': 'my_table'} 

849 

850 In the case of option 1, the replica aware name (:meth:`.replica_name`) will be used as a key for the 

851 configuration dictionary. 

852 

853 .. versionchanged:: v2.0.0 

854 With option 1, using :meth:`.replica_name` instead of :attr:`~.Processor.name` as key of the configuration 

855 dictionary. 

856 

857 :param option: Select the dictionary style. Defaults to 1. 

858 :type option: int, Optional 

859 :return: A parameter configuration dictionary. 

860 :rtype: dict 

861 """ 

862 inner_dict = {} 

863 for key, value in self._processor_parameters.items(): 

864 inner_dict[key] = value.value 

865 

866 if option == 1: 

867 outer_dict = {self.replica_name: inner_dict} 

868 elif option == 2: 

869 outer_dict = inner_dict 

870 else: 

871 log.warning('Unknown option %s. Using option 2' % option) 

872 outer_dict = inner_dict 

873 return outer_dict 

874 

875 @ensure_parameter_registration 

876 def get_parameter(self, name: str) -> PassiveParameter[ParameterType]: 

877 """ 

878 Gets the processor parameter named name. 

879 

880 :param name: The name of the parameter. 

881 :type name: str 

882 :return: The processor parameter 

883 :rtype: PassiveParameter 

884 :raises ProcessorParameterError: If a parameter with `name` is not registered. 

885 """ 

886 if name in self._processor_parameters: 

887 return self._processor_parameters[name] 

888 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}') 

889 

890 @ensure_parameter_registration 

891 def get_parameters(self) -> dict[str, PassiveParameter[ParameterType]]: 

892 """ 

893 Returns the full dictionary of registered parameters for this processor. 

894 

895 Useful when dumping the parameter specification in a configuration file, for example. 

896 

897 :return: The dictionary with the registered parameters. 

898 :rtype: dict[str, PassiveParameter[ParameterType] 

899 """ 

900 return self._processor_parameters 

901 

902 @ensure_parameter_registration 

903 def delete_parameter(self, name: str) -> None: 

904 """ 

905 Deletes a processor parameter. 

906 

907 :param name: The name of the parameter to be deleted. 

908 :type name: str 

909 :raises ProcessorParameterError: If a parameter with `name` is not registered. 

910 """ 

911 if name in self._processor_parameters: 

912 del self._processor_parameters[name] 

913 else: 

914 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}') 

915 

916 @ensure_parameter_registration 

917 def set_parameter_value(self, name: str, value: ParameterType) -> None: 

918 """ 

919 Sets the value of a processor parameter. 

920 

921 :param name: The name of the parameter to be deleted. 

922 :type name: str 

923 :param value: The value to be assigned to the parameter. 

924 :type value: ParameterType 

925 :raises ProcessorParameterError: If a parameter with `name` is not registered. 

926 """ 

927 if name in self._processor_parameters: 

928 self._processor_parameters[name].value = value 

929 else: 

930 raise ProcessorParameterError(f'No parameter ({name}) found for {self.name}') 

931 

932 def get_filter(self, model_name: str) -> mafw.db.db_filter.ModelFilter: 

933 """ 

934 Returns a registered :class:`~mafw.db.db_filter.ModelFilter` via the model name. 

935 

936 If a filter for the provided model_name does not exist, a KeyError is raised. 

937 

938 :param model_name: The model name for which the filter will be returned. 

939 :type model_name: str 

940 :return: The registered filter 

941 :rtype: mafw.db.db_filter.ModelFilter 

942 :raises: KeyError is a filter with the give name is not found. 

943 """ 

944 return self.filter_register[model_name] 

945 

946 def on_processor_status_change(self, old_status: ProcessorStatus, new_status: ProcessorStatus) -> None: 

947 """ 

948 Callback invoked when the processor status is changed. 

949 

950 :param old_status: The old processor status. 

951 :type old_status: ProcessorStatus 

952 :param new_status: The new processor status. 

953 :type new_status: ProcessorStatus 

954 """ 

955 self._user_interface.change_of_processor_status(self.name, old_status, new_status) 

956 

957 def on_looping_status_set(self, status: LoopingStatus) -> None: 

958 """ 

959 Call back invoked when the looping status is set. 

960 

961 The user can overload this method according to the needs. 

962 

963 :param status: The set looping status. 

964 :type status: LoopingStatus 

965 """ 

966 if status == LoopingStatus.Skip: 

967 log.warning('Skipping item %s' % self.i_item) 

968 elif status == LoopingStatus.Abort: 

969 log.error('Looping has been aborted') 

970 elif status == LoopingStatus.Quit: 

971 log.warning('Looping has been quit') 

972 

973 def format_progress_message(self) -> None: 

974 """Customizes the progress message with information about the current item. 

975 

976 The user can overload this method in order to modify the message being displayed during the process loop with 

977 information about the current item. 

978 

979 The user can access the current value, its position in the looping cycle and the total number of items using 

980 :attr:`.Processor.item`, :obj:`.Processor.i_item` and :obj:`.Processor.n_item`. 

981 """ 

982 pass 

983 

984 @property 

985 def i_item(self) -> int: 

986 """The enumeration of the current item being processed.""" 

987 return self._i_item 

988 

989 @i_item.setter 

990 def i_item(self, value: int) -> None: 

991 self._i_item = value 

992 

993 @property 

994 def n_item(self) -> int | None: 

995 """The total number of items to be processed or None for an undefined loop""" 

996 return self._n_item 

997 

998 @n_item.setter 

999 def n_item(self, value: int | None) -> None: 

1000 self._n_item = value 

1001 

1002 @property 

1003 def unique_name(self) -> str: 

1004 """Returns the unique name for the processor.""" 

1005 return f'{self.name}_{self.unique_id}' 

1006 

1007 @property 

1008 def replica_name(self) -> str: 

1009 """ 

1010 Returns the replica aware name of the processor. 

1011 

1012 If no replica_id is specified, then return the pure name, otherwise join the two string using the '#' symbol. 

1013 

1014 .. versionadded:: v2.0.0 

1015 

1016 :return: The replica aware name of the processor. 

1017 :rtype: str 

1018 """ 

1019 if self.replica_id is None: 

1020 return self.name 

1021 else: 

1022 return self.name + '#' + self.replica_id 

1023 

1024 @property 

1025 def local_resource_acquisition(self) -> bool: 

1026 """ 

1027 Checks if resources should be acquired locally. 

1028 

1029 When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external 

1030 resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute 

1031 resources among the whole processor list. In this case, resources should not be acquired locally by the 

1032 single processor, but from the parent execution context. 

1033 

1034 :return: True if resources are to be acquired locally by the processor. False, otherwise. 

1035 :rtype: bool 

1036 """ 

1037 return self._resource_acquisition 

1038 

1039 @local_resource_acquisition.setter 

1040 def local_resource_acquisition(self, flag: bool) -> None: 

1041 self._resource_acquisition = flag 

1042 

1043 @property 

1044 def database(self) -> peewee.Database: 

1045 """ 

1046 Returns the database instance 

1047 

1048 :return: A database object. 

1049 :raises MissingDatabase: If the database connection has not been established. 

1050 """ 

1051 if self._database is None: 

1052 raise MissingDatabase('Database connection not initialized') 

1053 return self._database 

1054 

1055 def execute(self) -> None: 

1056 """Execute the processor tasks. 

1057 

1058 This method works as a dispatcher, reassigning the call to a more specific execution implementation depending 

1059 on the :attr:`~mafw.processor.Processor.loop_type`. 

1060 """ 

1061 dispatcher: dict[LoopType, Callable[[], None]] = { 

1062 LoopType.SingleLoop: self._execute_single, 

1063 LoopType.ForLoop: self._execute_for_loop, 

1064 LoopType.WhileLoop: self._execute_while_loop, 

1065 } 

1066 dispatcher[self.loop_type]() 

1067 

1068 def _execute_single(self) -> None: 

1069 """Execute the processor in single mode. 

1070 

1071 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the 

1072 appropriate implementation depending on the processor LoopType. 

1073 """ 

1074 with contextlib.ExitStack() as self._resource_stack: 

1075 self.acquire_resources() 

1076 self.start() 

1077 self.processor_status = ProcessorStatus.Run 

1078 self.process() 

1079 self.finish() 

1080 

1081 def _execute_for_loop(self) -> None: 

1082 """Executes the processor within a for loop. 

1083 

1084 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the 

1085 appropriate implementation depending on the processor LoopType. 

1086 """ 

1087 

1088 with contextlib.ExitStack() as self._resource_stack: 

1089 self.acquire_resources() 

1090 self.start() 

1091 

1092 # get the input item list and filter it 

1093 item_list = self.get_items() 

1094 

1095 # get the total number of items. 

1096 self.n_item = len(item_list) 

1097 

1098 # turn the processor status to run 

1099 self.processor_status = ProcessorStatus.Run 

1100 

1101 # create a new task in the progress bar interface 

1102 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item) 

1103 

1104 # start the looping 

1105 for self.i_item, self.item in enumerate(item_list): 

1106 # set the looping status to Continue. The user may want to change it in the process. 

1107 self.looping_status = LoopingStatus.Continue 

1108 

1109 # send a message to the user interface 

1110 self.format_progress_message() 

1111 self._user_interface.display_progress_message(self.progress_message, self.i_item, self.n_item, 0.1) 

1112 

1113 # wrap the execution in a timer to measure how long it took for statistical reasons. 

1114 with Timer(suppress_message=True) as timer: 

1115 self.process() 

1116 self._process_durations.append(timer.duration) 

1117 

1118 # modify the loop depending on the looping status 

1119 if self.looping_status == LoopingStatus.Continue: 

1120 self.accept_item() 

1121 elif self.looping_status == LoopingStatus.Skip: 

1122 self.skip_item() 

1123 else: # equiv to if self.looping_status in [LoopingStatus.Abort, LoopingStatus.Quit]: 

1124 break 

1125 

1126 # update the progress bar 

1127 self._user_interface.update_task(self.replica_name, increment=1) 

1128 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item) 

1129 

1130 self.finish() 

1131 

1132 def _execute_while_loop(self) -> None: 

1133 """Executes the processor within a while loop. 

1134 

1135 **Private method**. Do not overload nor invoke it directly. The :meth:`execute` method will call the 

1136 appropriate implementation depending on the processor LoopType. 

1137 """ 

1138 # it is a while loop, so a priori we don't know how many iterations we will have, nevertheless, we 

1139 # can have a progress bar with 'total' set to None, so that it goes in the so-called indeterminate 

1140 # progress. See https://rich.readthedocs.io/en/stable/progress.html#indeterminate-progress 

1141 # we initialise n_item outside the loop, because it is possible that the user has a way to define n_item 

1142 # and he can do it within the loop. 

1143 self.n_item = None 

1144 with contextlib.ExitStack() as self._resource_stack: 

1145 self.acquire_resources() 

1146 self.start() 

1147 

1148 # turn the processor status to run 

1149 self.processor_status = ProcessorStatus.Run 

1150 

1151 self._user_interface.create_task(self.replica_name, self.description, completed=0, total=self.n_item) 

1152 

1153 # we are ready to start the looping. For statistics, we can count the iterations. 

1154 self.i_item = 0 

1155 while self.while_condition(): 

1156 # set the looping status to Continue. The user may want to change it in the process method. 

1157 self.looping_status = LoopingStatus.Continue 

1158 

1159 # send a message to the user interface 

1160 self.format_progress_message() 

1161 self._user_interface.display_progress_message( 

1162 self.progress_message, self.i_item, self.n_item, frequency=0.1 

1163 ) 

1164 

1165 # wrap the execution in a timer to measure how long it too for statistical reasons. 

1166 with Timer(suppress_message=True) as timer: 

1167 self.process() 

1168 self._process_durations.append(timer.duration) 

1169 

1170 # modify the loop depending on the looping status 

1171 if self.looping_status == LoopingStatus.Continue: 

1172 self.accept_item() 

1173 elif self.looping_status == LoopingStatus.Skip: 

1174 self.skip_item() 

1175 else: # equiv to if self.looping_status in [LoopingStatus.Abort, LoopingStatus.Quit]: 

1176 break 

1177 

1178 # update the progress bar. if self.n_item is still None, then the progress bar will show indeterminate 

1179 # progress. 

1180 self._user_interface.update_task(self.replica_name, self.i_item + 1, 1, self.n_item) 

1181 

1182 # now that the loop is finished, we know how many elements we processed 

1183 if self.n_item is None: 

1184 self.n_item = self.i_item 

1185 self._user_interface.update_task(self.replica_name, completed=self.n_item, total=self.n_item) 

1186 

1187 self.finish() 

1188 

1189 def acquire_resources(self) -> None: 

1190 """ 

1191 Acquires resources and add them to the resource stack. 

1192 

1193 The whole body of the :meth:`execute` method is within a context structure. The idea is that if any part of 

1194 the code inside should throw an exception that breaking the execution, we want to be sure that all stateful 

1195 resources are properly closed. 

1196 

1197 Since the number of resources may vary, the variable number of nested `with` statements has been replaced by 

1198 an `ExitStack <https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack>`_. Resources, 

1199 like open files, timers, db connections, need to be added to the resource stacks in this method. 

1200 

1201 In the case a processor is being executed within a :class:`~mafw.processor.ProcessorList`, then some resources might be shared, and 

1202 for this reason they are not added to the stack. This selection can be done via the private 

1203 :attr:`local_resource_acquisition`. This is normally True, meaning that the processor will handle its resources 

1204 independently, but when the processor is executed from a :class:`~mafw.processor.ProcessorList`, this flag is automatically turned to 

1205 False. 

1206 

1207 If the user wants to add additional resources, he has to overload this method calling the super to preserve 

1208 the original resources. If he wants to have shared resources among different processors executed from inside 

1209 a processor list, he has to overload the :class:`~mafw.processor.ProcessorList` class as well. 

1210 """ 

1211 # Both the timer and the user interface will be added to the processor resource stack only if the processor is 

1212 # set to acquire its own resources. 

1213 # The timer and the user interface have in-built enter and exit method. 

1214 if self._resource_acquisition: 

1215 self.timer = self._resource_stack.enter_context(Timer(**self._timer_parameters)) 

1216 self._resource_stack.enter_context(self._user_interface) 

1217 

1218 # For the database it is is a bit different. 

1219 if self._database is None and self._database_conf is None: 

1220 # no database, nor configuration. 

1221 # we cannot do anything 

1222 pass 

1223 elif self._database is None and self._database_conf is not None: 

1224 # no db, but we got a configuration. 

1225 # we can make a db. 

1226 # This processor will try to make a valid connection, and in case it succeeds, it will add the database to 

1227 # the resource stack. 

1228 # The database has an enter method, but it is to generate transaction. 

1229 # We will add the database.close via the callback method. 

1230 if 'DBConfiguration' in self._database_conf: 

1231 conf = self._database_conf['DBConfiguration'] # type1 

1232 else: 

1233 conf = self._database_conf # type2 

1234 

1235 # guess the database type from the URL 

1236 protocol = extract_protocol(conf.get('URL')) 

1237 

1238 # build the connection parameter 

1239 # in case of sqlite, we add the pragmas group as well 

1240 connection_parameters = {} 

1241 if protocol == 'sqlite': 

1242 connection_parameters['pragmas'] = conf.get('pragmas', {}) 

1243 for key, value in conf.items(): 

1244 if key not in ['URL', 'pragmas']: 

1245 connection_parameters[key] = value 

1246 

1247 self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB 

1248 self._resource_stack.callback(self._database.close) 

1249 try: 

1250 self._database.connect() 

1251 except peewee.OperationalError as e: 

1252 log.critical('Unable to connect to %s', self._database_conf.get('URL')) 

1253 raise e 

1254 database_proxy.initialize(self._database) 

1255 if self.create_standard_tables: 

1256 standard_tables = mafw_model_register.get_standard_tables() 

1257 self.database.create_tables(standard_tables) 

1258 for table in standard_tables: 

1259 table.init() 

1260 

1261 else: # equivalent to: if self._database is not None: 

1262 # we got a database, so very likely we are inside a processor list 

1263 # the connection has been already set and the initialisation as well. 

1264 # nothing else to do here. 

1265 # do not put the database in the exit stack. who create it has also to close it. 

1266 pass 

1267 

1268 def start(self) -> None: 

1269 """ 

1270 Start method. 

1271 

1272 The user can overload this method, including all steps that should be performed at the beginning of the 

1273 operation. 

1274 

1275 If the user decides to overload it, it should include a call to the super method. 

1276 """ 

1277 self.processor_status = ProcessorStatus.Start 

1278 self._remove_orphan_files() 

1279 

1280 def get_items(self) -> Collection[Any]: 

1281 """ 

1282 Returns the item collections for the processor loop. 

1283 

1284 This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the 

1285 database, or a list of files from the disk to be processed. 

1286 

1287 :return: A collection of items for the loop 

1288 :rtype: Collection[Any] 

1289 """ 

1290 return [] 

1291 

1292 def while_condition(self) -> bool: 

1293 """ 

1294 Return the while condition 

1295 

1296 :return: True if the while loop has to continue, false otherwise. 

1297 :rtype: bool 

1298 """ 

1299 return False 

1300 

1301 def process(self) -> None: 

1302 """ 

1303 Processes the current item. 

1304 

1305 This is the core of the Processor, where the user has to define the calculations required. 

1306 """ 

1307 pass 

1308 

1309 def accept_item(self) -> None: 

1310 """ 

1311 Does post process actions on a successfully processed item. 

1312 

1313 Within the :meth:`process`, the user left the looping status to Continue, so it means that everything looks 

1314 good and this is the right place to perform database updates or file savings. 

1315 

1316 .. seealso: 

1317 Have a look at :meth:`skip_item` for what to do in case something went wrong. 

1318 """ 

1319 pass 

1320 

1321 def skip_item(self) -> None: 

1322 """ 

1323 Does post process actions on a *NOT* successfully processed item. 

1324 

1325 Within the :meth:`process`, the user set the looping status to Skip, so it means that something went wrong 

1326 and here corrective actions can be taken if needed. 

1327 

1328 .. seealso: 

1329 Have a look at :meth:`accept_item` for what to do in case everything was OK. 

1330 """ 

1331 pass 

1332 

1333 def finish(self) -> None: 

1334 """ 

1335 Concludes the execution. 

1336 

1337 The user can reimplement this method if there are some conclusive tasks that must be achieved. 

1338 Always include a call to super(). 

1339 """ 

1340 self.processor_status = ProcessorStatus.Finish 

1341 if self.looping_status == LoopingStatus.Abort: 

1342 self.processor_exit_status = ProcessorExitStatus.Aborted 

1343 self.print_process_statistics() 

1344 

1345 def print_process_statistics(self) -> None: 

1346 """ 

1347 Print the process statistics. 

1348 

1349 A utility method to display the fastest, the slowest and the average timing required to process on a single 

1350 item. This is particularly useful when the looping processor is part of a ProcessorList. 

1351 """ 

1352 if len(self._process_durations): 

1353 log.info('[cyan] Processed %s items.' % len(self._process_durations)) 

1354 log.info( 

1355 '[cyan] Fastest item process duration: %s ' 

1356 % pretty_format_duration(min(self._process_durations), n_digits=3) 

1357 ) 

1358 log.info( 

1359 '[cyan] Slowest item process duration: %s ' 

1360 % pretty_format_duration(max(self._process_durations), n_digits=3) 

1361 ) 

1362 log.info( 

1363 '[cyan] Average item process duration: %s ' 

1364 % pretty_format_duration((sum(self._process_durations) / len(self._process_durations)), n_digits=3) 

1365 ) 

1366 log.info( 

1367 '[cyan] Total process duration: %s' % pretty_format_duration(sum(self._process_durations), n_digits=3) 

1368 ) 

1369 

1370 def _remove_orphan_files(self) -> None: 

1371 """ 

1372 Remove orphan files. 

1373 

1374 If a connection to the database is available, then the OrphanFile standard table is queried for all its entries, 

1375 and all the files are then removed. 

1376 

1377 The user can turn off this behaviour by switching the :attr:`~mafw.processor.Processor.remove_orphan_files` to False. 

1378 

1379 """ 

1380 if self._database is None or self.remove_orphan_files is False: 

1381 # no database connection or no wish to remove orphan files, it does not make sense to continue 

1382 return 

1383 

1384 try: 

1385 OrphanFile = cast(MAFwBaseModel, mafw_model_register.get_model('OrphanFile')) 

1386 except KeyError: 

1387 log.warning('OrphanFile table not found in DB. Please verify database integrity') 

1388 return 

1389 

1390 if TYPE_CHECKING: 

1391 assert hasattr(OrphanFile, '_meta') 

1392 

1393 orphan_files = OrphanFile.select().execute() # type: ignore[no-untyped-call] 

1394 if len(orphan_files) != 0: 

1395 msg = f'[yellow]Pruning orphan files ({sum(len(f.filenames) for f in orphan_files)})...' 

1396 log.info(msg) 

1397 for orphan in orphan_files: 

1398 # filenames is a list of files: 

1399 for f in orphan.filenames: 

1400 f.unlink(missing_ok=True) 

1401 

1402 OrphanFile.delete().execute() # type: ignore[no-untyped-call] 

1403 

1404 

1405class ProcessorList(list[Union['Processor', 'ProcessorList']]): 

1406 """ 

1407 A list like collection of processors. 

1408 

1409 ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList. 

1410 

1411 An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError. 

1412 

1413 Along with an iterable of processors, a new processor list can be built using the following parameters. 

1414 """ 

1415 

1416 def __init__( 

1417 self, 

1418 *args: Processor | ProcessorList, 

1419 name: str | None = None, 

1420 description: str | None = None, 

1421 timer: Timer | None = None, 

1422 timer_params: dict[str, Any] | None = None, 

1423 user_interface: UserInterfaceBase | None = None, 

1424 database: Database | None = None, 

1425 database_conf: dict[str, Any] | None = None, 

1426 create_standard_tables: bool = True, 

1427 ): 

1428 """ 

1429 Constructor parameters: 

1430 

1431 :param name: The name of the processor list. Defaults to ProcessorList. 

1432 :type name: str, Optional 

1433 :param description: An optional short description. Default to ProcessorList. 

1434 :type description: str, Optional 

1435 :param timer: The timer object. If None is provided, a new one will be created. Defaults to None. 

1436 :type timer: Timer, Optional 

1437 :param timer_params: A dictionary of parameter to build the timer object. Defaults to None. 

1438 :type timer_params: dict, Optional 

1439 :param user_interface: A user interface. Defaults to None 

1440 :type user_interface: UserInterfaceBase, Optional 

1441 :param database: A database instance. Defaults to None. 

1442 :type database: Database, Optional 

1443 :param database_conf: Configuration for the database. Default to None. 

1444 :type database_conf: dict, Optional 

1445 :param create_standard_tables: Whether or not to create the standard tables. Defaults to True. 

1446 :type create_standard_tables: bool, Optional 

1447 """ 

1448 

1449 # validate_items takes a tuple of processors, that's why we don't unpack args. 

1450 super().__init__(self.validate_items(args)) 

1451 self._name = name or self.__class__.__name__ 

1452 self.description = description or self._name 

1453 

1454 self.timer = timer 

1455 self.timer_params = timer_params or {} 

1456 self._user_interface = user_interface or ConsoleInterface() 

1457 

1458 self._resource_stack: contextlib.ExitStack 

1459 self._processor_exit_status: ProcessorExitStatus = ProcessorExitStatus.Successful 

1460 self.nested_list = False 

1461 """ 

1462 Boolean flag to identify that this list is actually inside another list. 

1463  

1464 Similarly to the local resource flag for the :class:`.Processor`, this flag prevent the user interface to be  

1465 added to the resource stack. 

1466 """ 

1467 

1468 # database stuff 

1469 self._database: peewee.Database | None = database 

1470 self._database_conf: dict[str, Any] | None = validate_database_conf(database_conf) 

1471 self.create_standard_tables = create_standard_tables 

1472 """The boolean flag to proceed or skip with standard table creation and initialisation""" 

1473 

1474 def __setitem__( # type: ignore[override] 

1475 self, 

1476 __index: SupportsIndex, 

1477 __object: Processor | ProcessorList, 

1478 ) -> None: 

1479 super().__setitem__(__index, self.validate_item(__object)) 

1480 

1481 def insert(self, __index: SupportsIndex, __object: Processor | ProcessorList) -> None: 

1482 """Adds a new processor at the specified index.""" 

1483 super().insert(__index, self.validate_item(__object)) 

1484 

1485 def append(self, __object: Processor | ProcessorList) -> None: 

1486 """Appends a new processor at the end of the list.""" 

1487 super().append(self.validate_item(__object)) 

1488 

1489 def extend(self, __iterable: Iterable[Processor | ProcessorList]) -> None: 

1490 """Extends the processor list with a list of processors.""" 

1491 if isinstance(__iterable, type(self)): 

1492 super().extend(__iterable) 

1493 else: 

1494 super().extend([self.validate_item(item) for item in __iterable]) 

1495 

1496 @staticmethod 

1497 def validate_item(item: Processor | ProcessorList) -> Processor | ProcessorList: 

1498 """Validates the item being added.""" 

1499 if isinstance(item, Processor): 

1500 item.local_resource_acquisition = False 

1501 return item 

1502 elif isinstance(item, ProcessorList): 

1503 item.timer_params = dict(suppress_message=True) 

1504 item.nested_list = True 

1505 return item 

1506 else: 

1507 raise TypeError(f'Expected Processor or ProcessorList, got {type(item).__name__}') 

1508 

1509 @staticmethod 

1510 def validate_items(items: tuple[Processor | ProcessorList, ...] = ()) -> tuple[Processor | ProcessorList, ...]: 

1511 """Validates a tuple of items being added.""" 

1512 if not items: 

1513 return tuple() 

1514 return tuple([ProcessorList.validate_item(item) for item in items if item is not None]) 

1515 

1516 @property 

1517 def name(self) -> str: 

1518 """ 

1519 The name of the processor list 

1520 

1521 :return: The name of the processor list 

1522 :rtype: str 

1523 """ 

1524 return self._name 

1525 

1526 @name.setter 

1527 def name(self, name: str) -> None: 

1528 self._name = name 

1529 

1530 @property 

1531 def processor_exit_status(self) -> ProcessorExitStatus: 

1532 """ 

1533 The processor exit status. 

1534 

1535 It refers to the whole processor list execution. 

1536 """ 

1537 return self._processor_exit_status 

1538 

1539 @processor_exit_status.setter 

1540 def processor_exit_status(self, status: ProcessorExitStatus) -> None: 

1541 self._processor_exit_status = status 

1542 

1543 @property 

1544 def database(self) -> peewee.Database: 

1545 """ 

1546 Returns the database instance 

1547 

1548 :return: A database instance 

1549 :raises MissingDatabase: if a database connection is missing. 

1550 """ 

1551 if self._database is None: 

1552 raise MissingDatabase('Database connection not initialized') 

1553 return self._database 

1554 

1555 def execute(self) -> ProcessorExitStatus: 

1556 """ 

1557 Execute the list of processors. 

1558 

1559 Similarly to the :class:`Processor`, ProcessorList can be executed. In simple words, the execute 

1560 method of each processor in the list is called exactly in the same sequence as they were added. 

1561 """ 

1562 with contextlib.ExitStack() as self._resource_stack: 

1563 self.acquire_resources() 

1564 self._user_interface.create_task(self.name, self.description, completed=0, increment=0, total=len(self)) 

1565 for i, item in enumerate(self): 

1566 if isinstance(item, Processor): 

1567 log.info('[bold]Executing [red]%s[/red] processor[/bold]' % item.replica_name) 

1568 else: 

1569 log.info('[bold]Executing [blue]%s[/blue] processor list[/bold]' % item.name) 

1570 self.distribute_resources(item) 

1571 item.execute() 

1572 self._user_interface.update_task(self.name, increment=1) # i+1, 1, len(self)) 

1573 self._processor_exit_status = item.processor_exit_status 

1574 if self._processor_exit_status == ProcessorExitStatus.Aborted: 

1575 msg = 'Processor %s caused the processor list to abort' % item.name 

1576 log.error(msg) 

1577 raise AbortProcessorException(msg) 

1578 self._user_interface.update_task(self.name, completed=len(self), total=len(self)) 

1579 return self._processor_exit_status 

1580 

1581 def acquire_resources(self) -> None: 

1582 """Acquires external resources.""" 

1583 # The strategy is similar to the one for processor. if we do get resources already active (not None) then we use 

1584 # them, otherwise, we create them and we add them to the resource stack. 

1585 if self.timer is None: 

1586 self.timer = self._resource_stack.enter_context(Timer(**self.timer_params)) 

1587 # The user interface is very likely already initialised by the runner. 

1588 # But if this is a nested list, then we must not push the user interface in the stack 

1589 # otherwise the user interface context (progress for rich) will be stopped at the end 

1590 # of the nested list. 

1591 if not self.nested_list: 1591 ↛ 1593line 1591 didn't jump to line 1593 because the condition on line 1591 was always true

1592 self._resource_stack.enter_context(self._user_interface) 

1593 if self._database is None and self._database_conf is None: 

1594 # no database, nor configuration. 

1595 # we cannot do anything 

1596 pass 

1597 elif self._database is None and self._database_conf is not None: 

1598 # no db, but we got a configuration. 

1599 # we can make a db 

1600 if 'DBConfiguration' in self._database_conf: 

1601 conf = self._database_conf['DBConfiguration'] # type1 

1602 else: 

1603 conf = self._database_conf # type2 

1604 

1605 # guess the database type from the URL 

1606 protocol = extract_protocol(conf.get('URL')) 

1607 

1608 # build the connection parameter 

1609 # in case of sqlite, we add the pragmas group as well 

1610 connection_parameters = {} 

1611 if protocol == 'sqlite': 

1612 connection_parameters['pragmas'] = conf.get('pragmas', {}) 

1613 for key, value in conf.items(): 

1614 if key not in ['URL', 'pragmas']: 

1615 connection_parameters[key] = value 

1616 

1617 self._database = connect(conf.get('URL'), **connection_parameters) # type: ignore # peewee is not returning a DB 

1618 try: 

1619 self._database.connect() 

1620 self._resource_stack.callback(self._database.close) 

1621 except peewee.OperationalError as e: 

1622 log.critical('Unable to connect to %s', self._database_conf.get('URL')) 

1623 raise e 

1624 database_proxy.initialize(self._database) 

1625 if self.create_standard_tables: 

1626 standard_tables = mafw_model_register.get_standard_tables() 

1627 self.database.create_tables(standard_tables) 

1628 for table in standard_tables: 

1629 table.init() 

1630 else: # equiv to if self._database is not None: 

1631 # we got a database, so very likely we are inside a processor list 

1632 # the connection has been already set and the initialisation as well. 

1633 # nothing else to do here. 

1634 pass 

1635 

1636 def distribute_resources(self, processor: Processor | Self) -> None: 

1637 """Distributes the external resources to the items in the list.""" 

1638 processor.timer = self.timer 

1639 processor._user_interface = self._user_interface 

1640 processor._database = self._database