mafw.processor
Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core functionality of the MAFw.
Module Attributes
Generic variable type for the |
|
Type variable for generic callable with any return value. |
Functions
Decorator to ensure that before calling func the processor parameters have been registered. |
|
|
Validates the database configuration. |
Classes
|
The public interface to the processor parameter. |
|
An helper class to store processor parameter value and metadata. |
|
The basic processor. |
|
A list like collection of processors. |
A metaclass to implement the post-init method. |
- class mafw.processor.ActiveParameter(name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '')[source]
Bases:
Generic[ParameterType]The public interface to the processor parameter.
The behaviour of a
Processorcan be customised by using processor parameters. The value of these parameters can be either set via a configuration file or directly when creating the class.If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an ActiveParameter instance in this way:
class MyProcessor(Processor): # this is the input folder input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files') def __init__(self, *args, **kwargs): super().__init(*args, **kwargs) # change the input folder to something else self.input_folder = Path(r'D:\data') # get the value of the parameter print(self.input_folder)
The ActiveParameter is a descriptor, it means that when you create one of them, a lot of work is done behind the scene.
In simple words, a processor parameter is made by two objects: a public interface where the user can easily access the value of the parameter and a private interface where all other information (default, documentation…) is also stored.
The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as in the code snippet above, the private interface is automatically created and will stay in the class instance until the end of the class lifetime.
To access the private interface, the user can use the
Processor.get_parameter()method using the parameter name as a key.The user can assign to an ActiveParameter almost any name. There are just a few invalid parameter names that are used for other purposes. The list of reserved names is available
here. Should the user inadvertently use a reserved named, aProcessorParameterErroris raised.See also
The private counter part in the
PassiveParameter.An explanation on how processor parameters work and should be used is given in Understanding processor parameters
The list of
reserved names.Constructor parameters:
- Parameters:
name (str) – The name of the parameter.
value (ParameterType, Optional) – The initial value of the parameter. Defaults to None.
default (ParameterType, Optional) – The default value of the parameter, to be used when
valueis not set., Defaults to None.help_doc (str, Optional) – An explanatory text describing the parameter.
- _validate_name(proposed_name: str) str[source]
Validate that the proposed parameter name is not in the list of forbidden names.
This private method checks if the provided name is allowed for use as a processor parameter. Names that are listed in
reserved_namescannot be used as parameter names.- Parameters:
proposed_name (str) – The name to be validated for use as a processor parameter.
- Returns:
The validated name if it passes the forbidden names check.
- Return type:
str
- Raises:
ProcessorParameterError – If the proposed name is in the list of forbidden names.
- reserved_names: list[str] = ['__logic__', '__filter__', '__new_only__', '__inheritance__']
A list of names that cannot be used as processor parameter names.
__logic__
__filter__
__new_only__
__inheritance__
- class mafw.processor.F
Type variable for generic callable with any return value.
alias of TypeVar(‘F’, bound=
Callable[[…],Any])
- class mafw.processor.ParameterType
Generic variable type for the
ActiveParameterandPassiveParameter.alias of TypeVar(‘ParameterType’)
- class mafw.processor.PassiveParameter(name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '')[source]
Bases:
Generic[ParameterType]An helper class to store processor parameter value and metadata.
This class is the private interface used by the
ActiveParameterdescriptor to store its value and metadata.When a new
ActiveParameteris added to a class, an instance of a PassiveParameter is added to the processor parameterregister.See also
An explanation on how processor parameters work and should be used is given in Understanding processor parameters
Changed in version v2.0.0: User should only use
ActiveParameterand never manually instantiatePassiveParameter.Constructor parameters:
- Parameters:
name (str) – The name of the parameter. It must be a valid python identifier.
value (ParameterType, Optional) – The set value of the parameter. If None, then the default value will be used. Defaults to None.
default (ParameterType, Optional) – The default value for the parameter. It is used if the
valueis not provided. Defaults to None.help_doc (str, Optional) – A brief explanation of the parameter.
- Raises:
ProcessorParameterError – if both value and default are not provided or if name is not a valid identifier.
- property is_optional: bool
Property to check if the parameter is optional.
- Returns:
True if the parameter is optional
- Return type:
bool
- property is_set: bool
Property to check if the value has been set.
It is useful for optional parameter to see if the current value is the default one, or if the user set it.
- property value: ParameterType
Gets the parameter value.
- Returns:
The parameter value.
- Return type:
- Raises:
ProcessorParameterError – if both value and default were not defined.
- class mafw.processor.Processor(*args: Any, **kwargs: Any)[source]
Bases:
objectThe basic processor.
A very comprehensive description of what a Processor does and how it works is available at Processor: The core of MAFw.
Constructor parameters
- Parameters:
name (str, Optional) – The name of the processor. If None is provided, the class name is used instead. Defaults to None.
description (str, Optional) – A short description of the processor task. Defaults to the processor name.
config (dict, Optional) – A configuration dictionary for this processor. Defaults to None.
looper (LoopType, Optional) – Enumerator to define the looping type. Defaults to LoopType.ForLoop
user_interface (UserInterfaceBase, Optional) – A user interface instance to be used by the processor to interact with the user.
timer (Timer, Optional) – A timer object to measure process duration.
timer_params (dict, Optional) – Parameters for the timer object.
database (Database, Optional) – A database instance. Defaults to None.
database_conf (dict, Optional) – Configuration for the database. Default to None.
remove_orphan_files (bool, Optional) – Boolean flag to remove files on disc without a reference to the database. See Standard tables and
_remove_orphan_files(). Defaults to Truereplica_id (str, Optional) – The replica identifier for the current processor.
create_standard_tables (bool, Optional) – Boolean flag to create std tables on disk. Defaults to True
kwargs – Keyword arguments that can be used to set processor parameters.
- _check_method_overload() None[source]
Check if the user overloaded the required methods.
Depending on the loop type, the user must overload different methods. This method is doing the check and if the required methods are not overloaded a warning is emitted.
- _check_method_super() None[source]
Check if some specific methods are calling their super.
For some specific methods (for example: start and finish), the user should always call their super method. This method verifies that the user implementation of these methods is including a super call, otherwise a warning is emitted to inform the user about the problem and possible misbehaviour of the processor.
The list of methods to be verified is stored in a private class attribute
_methods_to_be_checked_for_superas a list of tuples, made by the name of the methods to be verified and the base class for comparison. The base class is required because Processor subclasses may be extending this list with methods that are not present in the base Processor. See, for example, thepatch_data_frame()that is required to have a super call, but it is not present in the base Processor.
- _execute_for_loop() None[source]
Executes the processor within a for loop.
Private method. Do not overload nor invoke it directly. The
execute()method will call the appropriate implementation depending on the processor LoopType.
- _execute_single() None[source]
Execute the processor in single mode.
Private method. Do not overload nor invoke it directly. The
execute()method will call the appropriate implementation depending on the processor LoopType.
- _execute_while_loop() None[source]
Executes the processor within a while loop.
Private method. Do not overload nor invoke it directly. The
execute()method will call the appropriate implementation depending on the processor LoopType.
- _load_parameter_configuration() None[source]
Load processor parameter configuration from the internal configuration dictionary.
This method processes the processor’s configuration dictionary to set parameter values. It handles two configuration formats:
Nested format:
{'ProcessorName': {'param1': value1, ...}}Flat format:
{'param1': value1, ...}
The method also handles filter configurations by collecting filter table names and deferring their initialisation until after the global filter has been processed.
Changed in version v2.0.0: For option 1 combining configuration from name and name_replica
- Raises:
ProcessorParameterError – If a parameter in the configuration is not registered.
- _override_defaults() None[source]
Override default parameter values with values from
new_defaults.This private method iterates through the
new_defaultsdictionary and updates the corresponding processor parameters with new values. Only parameters that exist in bothnew_defaultsand_processor_parametersare updated.Added in version v2.0.0.
- _overrule_kws_parameters() None[source]
Override processor parameters with values from keyword arguments.
This method applies parameter values passed as keyword arguments during processor initialisation. It ensures that the parameter types match the expected types before setting the values.
- _register_parameters() None[source]
Register processor parameters defined as ActiveParameter instances in the class.
This private method scans the class definition for any
ActiveParameterinstances and creates correspondingPassiveParameterinstances to store the actual parameter values and metadata. It ensures that all processor parameters are properly initialised and available for configuration through the processor’s configuration system.The method checks for duplicate parameter names and raises a
ProcessorParameterErrorif duplicates are detected. It also sets the internal flag_parameter_registeredto True once registration is complete.Note
This method is automatically called during processor initialisation and should not be called directly by users.
See also
Processor,Processor._override_defaults(),Processor._load_parameter_configuration(),Processor._overrule_kws_parameters()Changed in version v2.0.0: Only
ActiveParameterare not registered. The use ofPassiveParameteris only meant to store the value and metadata of the active counter part.
- _remove_orphan_files() None[source]
Remove orphan files.
If a connection to the database is available, then the OrphanFile standard table is queried for all its entries, and all the files are then removed.
The user can turn off this behaviour by switching the
remove_orphan_filesto False.
- _reset_parameters() None[source]
Reset processor parameters to their initial state.
This method clears all currently registered processor parameters and triggers a fresh registration process. It’s useful when parameter configurations need to be reinitialized or when parameters have been modified and need to be reset.
See also
- accept_item() None[source]
Does post process actions on a successfully processed item.
Within the
process(), the user left the looping status to Continue, so it means that everything looks good and this is the right place to perform database updates or file savings.
- acquire_resources() None[source]
Acquires resources and add them to the resource stack.
The whole body of the
execute()method is within a context structure. The idea is that if any part of the code inside should throw an exception that breaking the execution, we want to be sure that all stateful resources are properly closed.Since the number of resources may vary, the variable number of nested with statements has been replaced by an ExitStack. Resources, like open files, timers, db connections, need to be added to the resource stacks in this method.
In the case a processor is being executed within a
ProcessorList, then some resources might be shared, and for this reason they are not added to the stack. This selection can be done via the privatelocal_resource_acquisition. This is normally True, meaning that the processor will handle its resources independently, but when the processor is executed from aProcessorList, this flag is automatically turned to False.If the user wants to add additional resources, he has to overload this method calling the super to preserve the original resources. If he wants to have shared resources among different processors executed from inside a processor list, he has to overload the
ProcessorListclass as well.
- delete_parameter(name: str) None[source]
Deletes a processor parameter.
- Parameters:
name (str) – The name of the parameter to be deleted.
- Raises:
ProcessorParameterError – If a parameter with name is not registered.
- dump_parameter_configuration(option: int = 1) dict[str, Any][source]
Dumps the processor parameter values in a dictionary.
The snippet below explains the meaning of option.
# option 1 conf_dict1 = { 'Processor': {'param1': 5, 'input_table': 'my_table'} } # option 2 conf_dict2 = {'param1': 5, 'input_table': 'my_table'}
In the case of option 1, the replica aware name (
replica_name()) will be used as a key for the configuration dictionary.Changed in version v2.0.0: With option 1, using
replica_name()instead ofnameas key of the configuration dictionary.- Parameters:
option (int, Optional) – Select the dictionary style. Defaults to 1.
- Returns:
A parameter configuration dictionary.
- Return type:
dict
- execute() None[source]
Execute the processor tasks.
This method works as a dispatcher, reassigning the call to a more specific execution implementation depending on the
loop_type.
- finish() None[source]
Concludes the execution.
The user can reimplement this method if there are some conclusive tasks that must be achieved. Always include a call to super().
- format_progress_message() None[source]
Customizes the progress message with information about the current item.
The user can overload this method in order to modify the message being displayed during the process loop with information about the current item.
The user can access the current value, its position in the looping cycle and the total number of items using
Processor.item,Processor.i_itemandProcessor.n_item.
- get_filter(model_name: str) ModelFilter[source]
Returns a registered
ModelFiltervia the model name.If a filter for the provided model_name does not exist, a KeyError is raised.
- Parameters:
model_name (str) – The model name for which the filter will be returned.
- Returns:
The registered filter
- Return type:
- Raises:
KeyError is a filter with the give name is not found.
- get_items() Collection[Any][source]
Returns the item collections for the processor loop.
This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the database, or a list of files from the disk to be processed.
- Returns:
A collection of items for the loop
- Return type:
Collection[Any]
- get_parameter(name: str) PassiveParameter[ParameterType][source]
Gets the processor parameter named name.
- Parameters:
name (str) – The name of the parameter.
- Returns:
The processor parameter
- Return type:
- Raises:
ProcessorParameterError – If a parameter with name is not registered.
- get_parameters() dict[str, PassiveParameter[ParameterType]][source]
Returns the full dictionary of registered parameters for this processor.
Useful when dumping the parameter specification in a configuration file, for example.
- Returns:
The dictionary with the registered parameters.
- Return type:
dict[str, PassiveParameter[ParameterType]
- initialise_parameters() None[source]
Initialises processor parameters by registering them and applying various configuration sources.
This method orchestrates the parameter initialisation process by performing the following steps in order:
Registers processor parameters defined as
ActiveParameterinstancesOverrides default parameter values with any configured overrides
Loads parameter configuration from the processor’s configuration dictionary
Applies keyword arguments as parameter overrides
The method ensures that all processor parameters are properly configured before the processor execution begins. It is automatically called during processor initialisation and should not typically be called directly by users.
See also
_register_parameters(),_override_defaults(),_load_parameter_configuration(),_overrule_kws_parameters()Added in version v2.0.0.
- on_looping_status_set(status: LoopingStatus) None[source]
Call back invoked when the looping status is set.
The user can overload this method according to the needs.
- Parameters:
status (LoopingStatus) – The set looping status.
- on_processor_status_change(old_status: ProcessorStatus, new_status: ProcessorStatus) None[source]
Callback invoked when the processor status is changed.
- Parameters:
old_status (ProcessorStatus) – The old processor status.
new_status (ProcessorStatus) – The new processor status.
- print_process_statistics() None[source]
Print the process statistics.
A utility method to display the fastest, the slowest and the average timing required to process on a single item. This is particularly useful when the looping processor is part of a ProcessorList.
- process() None[source]
Processes the current item.
This is the core of the Processor, where the user has to define the calculations required.
- set_parameter_value(name: str, value: ParameterType) None[source]
Sets the value of a processor parameter.
- Parameters:
name (str) – The name of the parameter to be deleted.
value (ParameterType) – The value to be assigned to the parameter.
- Raises:
ProcessorParameterError – If a parameter with name is not registered.
- skip_item() None[source]
Does post process actions on a NOT successfully processed item.
Within the
process(), the user set the looping status to Skip, so it means that something went wrong and here corrective actions can be taken if needed.
- start() None[source]
Start method.
The user can overload this method, including all steps that should be performed at the beginning of the operation.
If the user decides to overload it, it should include a call to the super method.
- validate_configuration() None[source]
Validate the configuration provided via the processor parameters.
Method to be implemented by subclasses if a configuration validation is needed.
The method should silently check for the proper configuration, if this is not obtained, then the
InvalidConfigurationErrormust be raised.Added in version v2.0.0.
- while_condition() bool[source]
Return the while condition
- Returns:
True if the while loop has to continue, false otherwise.
- Return type:
bool
- _config: dict[str, Any]
A dictionary containing the processor configuration object.
This dictionary is populated with configuration parameter (always type 2) during the
_load_parameter_configuration()method.The original value of the configuration dictionary that is passed to the constructor is stored in
_orig_config.Changed in version v2.0.0: Now it is an empty dictionary until the
_load_parameter_configuration()is called.
- _ids = count(0)
A counter for all processor instances
- _methods_to_be_checked_for_super
List of methods to be checked for super inclusion.
It is a list of tuple, with the first element the name of the method to be checked and the second the base class to the be compared.
- _orig_config
A copy of the original configuration dictionary.
Added in version v2.0.0.
- _parameter_registered
A boolean flag to confirm successful parameter registration.
- _processor_parameters: dict[str, PassiveParameter[ParameterType]]
A dictionary to store all the processor parameter instances.
The name of the parameter is used as a key, while for the value an instance of the
PassiveParameteris used.
- create_standard_tables
The boolean flag to proceed or skip with standard table creation and initialisation
- property database: Database
Returns the database instance
- Returns:
A database object.
- Raises:
MissingDatabase – If the database connection has not been established.
- description
A short description of the processor task.
- filter_register: ProcessorFilter
The DB filter register of the Processor.
- property i_item: int
The enumeration of the current item being processed.
- item: Any
The current item of the loop.
- property local_resource_acquisition: bool
Checks if resources should be acquired locally.
When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute resources among the whole processor list. In this case, resources should not be acquired locally by the single processor, but from the parent execution context.
- Returns:
True if resources are to be acquired locally by the processor. False, otherwise.
- Return type:
bool
- loop_type: LoopType
The loop type.
The value of this parameter can also be changed by the
execution_workflow()decorator factory.See
LoopTypefor more details.
- looping_status
Looping modifier
- property n_item: int | None
The total number of items to be processed or None for an undefined loop
- name
The name of the processor.
- new_defaults: dict[str, Any] = {}
A dictionary containing defaults value for the parameters to be overridden
Added in version v2.0.0.
- processor_exit_status
Processor exit status
- processor_status
Processor execution status
- progress_message: str = 'Processor is working'
Message displayed to show the progress.
It can be customized with information about the current item in the loop by overloading the
format_progress_message().
- remove_orphan_files: bool
The flag to remove or protect the orphan files. Defaults to True
- replica_id
The replica identifier specified in the constructor
Added in version v2.0.0.
- property replica_name: str
Returns the replica aware name of the processor.
If no replica_id is specified, then return the pure name, otherwise join the two string using the ‘#’ symbol.
Added in version v2.0.0.
- Returns:
The replica aware name of the processor.
- Return type:
str
- unique_id
A unique identifier representing how many instances of Processor has been created.
- property unique_name: str
Returns the unique name for the processor.
- class mafw.processor.ProcessorList(*args: Processor | ProcessorList, name: str | None = None, description: str | None = None, timer: Timer | None = None, timer_params: dict[str, Any] | None = None, user_interface: UserInterfaceBase | None = None, database: Database | None = None, database_conf: dict[str, Any] | None = None, create_standard_tables: bool = True)[source]
Bases:
list[Processor|ProcessorList]A list like collection of processors.
ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList.
An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError.
Along with an iterable of processors, a new processor list can be built using the following parameters.
Constructor parameters:
- Parameters:
name (str, Optional) – The name of the processor list. Defaults to ProcessorList.
description (str, Optional) – An optional short description. Default to ProcessorList.
timer (Timer, Optional) – The timer object. If None is provided, a new one will be created. Defaults to None.
timer_params (dict, Optional) – A dictionary of parameter to build the timer object. Defaults to None.
user_interface (UserInterfaceBase, Optional) – A user interface. Defaults to None
database (Database, Optional) – A database instance. Defaults to None.
database_conf (dict, Optional) – Configuration for the database. Default to None.
create_standard_tables (bool, Optional) – Whether or not to create the standard tables. Defaults to True.
- static validate_item(item: Processor | ProcessorList) Processor | ProcessorList[source]
Validates the item being added.
- static validate_items(items: tuple[Processor | ProcessorList, ...] = ()) tuple[Processor | ProcessorList, ...][source]
Validates a tuple of items being added.
- append(_ProcessorList__object: Processor | ProcessorList) None[source]
Appends a new processor at the end of the list.
- distribute_resources(processor: Processor | Self) None[source]
Distributes the external resources to the items in the list.
- execute() ProcessorExitStatus[source]
Execute the list of processors.
Similarly to the
Processor, ProcessorList can be executed. In simple words, the execute method of each processor in the list is called exactly in the same sequence as they were added.
- extend(_ProcessorList__iterable: Iterable[Processor | ProcessorList]) None[source]
Extends the processor list with a list of processors.
- insert(_ProcessorList__index: SupportsIndex, _ProcessorList__object: Processor | ProcessorList) None[source]
Adds a new processor at the specified index.
- create_standard_tables
The boolean flag to proceed or skip with standard table creation and initialisation
- property database: Database
Returns the database instance
- Returns:
A database instance
- Raises:
MissingDatabase – if a database connection is missing.
- property name: str
The name of the processor list
- Returns:
The name of the processor list
- Return type:
str
- nested_list
Boolean flag to identify that this list is actually inside another list.
Similarly to the local resource flag for the
Processor, this flag prevent the user interface to be added to the resource stack.
- property processor_exit_status: ProcessorExitStatus
The processor exit status.
It refers to the whole processor list execution.
- class mafw.processor.ProcessorMeta[source]
Bases:
typeA metaclass to implement the post-init method.
- mafw.processor.ensure_parameter_registration(func: F) F[source]
Decorator to ensure that before calling func the processor parameters have been registered.
- mafw.processor.validate_database_conf(database_conf: dict[str, Any] | None = None) dict[str, Any] | None[source]
Validates the database configuration.
- Parameters:
database_conf (dict, Optional) – The input database configuration. Defaults to None.
- Returns:
Either the validated database configuration or None if it is invalid.
- Return type:
dict, None