mafw.processor
Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core functionality of the MAFw.
Module Attributes
Generic variable type for the |
|
Type variable for generic callable with any return value. |
Functions
Decorator to ensure that before calling func the processor parameters have been registered. |
|
|
Validates the database configuration. |
Classes
|
The public interface to the processor parameter. |
|
A processor parameter that can be registered and configured. |
|
The basic processor. |
|
A list like collection of processors. |
A metaclass to implement the post-init method. |
- class mafw.processor.ActiveParameter(name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '')[source]
Bases:
Generic[ParameterType]The public interface to the processor parameter.
The behaviour of a
Processorcan be customized by using processor parameters. The value of these parameters can be either set via a configuration file or directly when creating the class.If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an ActiveParameter instance in this way:
class MyProcessor(Processor): # this is the input folder input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files') def __init__(self, *args, **kwargs): super().__init(*args, **kwargs) # change the input folder to something else self.input_folder = Path(r'D:\data') # get the value of the parameter print(self.input_folder)
The ActiveParameter is a descriptor, it means that when you create one of them, a lot of work is done behind the scene.
In simple words, a processor parameter is made by two objects: a public interface where the user can easily access the value of the parameter and a private interface where all other information (default, documentation…) is also stored.
The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as in the code snipped above, the private interface is automatically created and will stay in the class instance until the end of the class lifetime.
To access the private interface, the user can use the
Processor.get_parameter()method using the parameter name as a key.See also
The private counter part in the
PassiveParameter.Constructor parameters:
- Parameters:
name (str) – The name of the parameter.
value (ParameterType, Optional) – The initial value of the parameter. Defaults to None.
default (ParameterType, Optional) – The default value of the parameter, to be used when
valueis not set., Defaults to None.help_doc (str, Optional) – An explanatory text describing the parameter.
- class mafw.processor.F
Type variable for generic callable with any return value.
alias of TypeVar(‘F’, bound=
Callable[[…],Any])
- class mafw.processor.ParameterType
Generic variable type for the
ActiveParameterandPassiveParameter.alias of TypeVar(‘ParameterType’)
- class mafw.processor.PassiveParameter(name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '')[source]
Bases:
Generic[ParameterType]A processor parameter that can be registered and configured.
For Processors to perform their analytical task, it may be necessary to have some configurable parameters, like a DB input table or the output folder or a numeric parameter.
The name of the parameter must be unique within the Processor scope and a valid python identifier.
When defined as a PassiveParameter, an instance variable can have a default value if the user did not provide one, and it is much easier to configure via a configuration file.
A parameter for which only a default value is provided is automatically considered optional.
If both the value and the default value are not provided, an exception is raised.
This class is working behind the scene, that is why it is named passive. The user will very likely add class instances of
ActiveParameter, that are publicly exposed in the processor class namespace, and an PassiveParameter will automatically added to the class. To access this passive parameter the user can use theProcessor.get_parameter()using the name as key. Thevalueof the passive parameter is always accessible using calling the corresponding ActiveParameter.See also
An explanation on how processor parameters work and should be used is given in Understanding processor parameters
Constructor parameters:
- Parameters:
name (str) – The name of the parameter. It must be a valid python identifier.
value (ParameterType, Optional) – The set value of the parameter. If None, then the default value will be used. Defaults to None.
default (ParameterType, Optional) – The default value for the parameter. It is used if the
valueis not provided. Defaults to None.help_doc (str, Optional) – A brief explanation of the parameter.
- Raises:
ProcessorParameterError – if both value and default are not provided or if name is not a valid identifier.
- property is_optional: bool
Property to check if the parameter is optional.
- Returns:
True if the parameter is optional
- Return type:
bool
- property is_set: bool
Property to check if the value has been set.
It is useful for optional parameter to see if the current value is the default one, or if the user set it.
- property value: ParameterType
Gets the parameter value.
- Returns:
The parameter value.
- Return type:
- Raises:
ProcessorParameterError – if both value and default were not defined.
- class mafw.processor.Processor(*args: Any, **kwargs: Any)[source]
Bases:
objectThe basic processor.
A very comprehensive description of what a Processor does and how it works is available at Processor: The core of MAFw.
Constructor parameters
- Parameters:
name (str, Optional) – The name of the processor. If None is provided, the class name is used instead. Defaults to None.
description (str, Optional) – A short description of the processor task. Defaults to the processor name.
config (dict, Optional) – A configuration dictionary for this processor. Defaults to None.
looper (LoopType, Optional) – Enumerator to define the looping type. Defaults to LoopType.ForLoop
user_interface (UserInterfaceBase, Optional) – A user interface instance to be used by the processor to interact with the user.
timer (Timer, Optional) – A timer object to measure process duration.
timer_params (dict, Optional) – Parameters for the timer object.
database (Database, Optional) – A database instance. Defaults to None.
database_conf (dict, Optional) – Configuration for the database. Default to None.
remove_orphan_files (bool, Optional) – Boolean flag to remove files on disc without a reference to the database. See Standard tables and
_remove_orphan_files(). Defaults to Truekwargs – Keyword arguments that can be used to set processor parameters.
- _check_method_overload() None[source]
Check if the user overloaded the required methods.
Depending on the loop type, the user must overload different methods. This method is doing the check and if the required methods are not overloaded a warning is emitted.
- _check_method_super() None[source]
Check if some specific methods are calling their super.
For some specific methods (for example: start and finish), the user should always call their super method. This method verifies that the user implementation of these methods is including a super call, otherwise a warning is emitted to inform the user about the problem and possible misbehaviour of the processor.
The list of methods to be verified is stored in a private class attribute
_methods_to_be_checked_for_superas a list of tuples, made by the name of the methods to be verified and the base class for comparison. The base class is required because Processor subclasses may be extending this list with methods that are not present in the base Processor. See, for example, thepatch_data_frame()that is required to have a super call, but it is not present in the base Processor.
- _execute_for_loop() None[source]
Executes the processor within a for loop.
Private method. Do not overload nor invoke it directly. The
execute()method will call the appropriate implementation depending on the processor LoopType.
- _execute_single() None[source]
Execute the processor in single mode.
Private method. Do not overload nor invoke it directly. The
execute()method will call the appropriate implementation depending on the processor LoopType.
- _execute_while_loop() None[source]
Executes the processor within a while loop.
Private method. Do not overload nor invoke it directly. The
execute()method will call the appropriate implementation depending on the processor LoopType.
- _remove_orphan_files() None[source]
Remove orphan files.
If a connection to the database is available, then the OrphanFile standard table is queried for all its entries, and all the files are then removed.
The user can turn off this behaviour by switching the
remove_orphan_filesto False.
- accept_item() None[source]
Does post process actions on a successfully processed item.
Within the
process(), the user left the looping status to Continue, so it means that everything looks good and this is the right place to perform database updates or file savings.
- acquire_resources() None[source]
Acquires resources and add them to the resource stack.
The whole body of the
execute()method is within a context structure. The idea is that if any part of the code inside should throw an exception that breaking the execution, we want to be sure that all stateful resources are properly closed.Since the number of resources may vary, the variable number of nested with statements has been replaced by an ExitStack. Resources, like open files, timers, db connections, need to be added to the resource stacks in this method.
In the case a processor is being executed within a
ProcessorList, then some resources might be shared, and for this reason they are not added to the stack. This selection can be done via the privatelocal_resource_acquisition. This is normally True, meaning that the processor will handle its resources independently, but when the processor is executed from aProcessorList, this flag is automatically turned to False.If the user wants to add additional resources, he has to overload this method calling the super to preserve the original resources. If he wants to have shared resources among different processors executed from inside a processor list, he has to overload the
ProcessorListclass as well.
- delete_parameter(name: str) None[source]
Deletes a processor parameter.
- Parameters:
name (str) – The name of the parameter to be deleted.
- Raises:
ProcessorParameterError – If a parameter with name is not registered.
- dump_parameter_configuration(option: int = 1) dict[str, Any][source]
Dumps the processor parameter values in a dictionary.
The snipped below explains the meaning of option.
# option 1 conf_dict1 = { 'Processor': {'param1': 5, 'input_table': 'my_table'} } # option 2 conf_dict2 = {'param1': 5, 'input_table': 'my_table'}
- Parameters:
option (int, Optional) – Select the dictionary style. Defaults to 1.
- Returns:
A parameter configuration dictionary.
- Return type:
dict
- execute() None[source]
Execute the processor tasks.
This method works as a dispatcher, reassigning the call to a more specific execution implementation depending on the
loop_type.
- finish() None[source]
Concludes the execution.
The user can reimplement this method if there are some conclusive tasks that must be achieved. Always include a call to super().
- format_progress_message() None[source]
Customizes the progress message with information about the current item.
The user can overload this method in order to modify the message being displayed during the process loop with information about the current item.
The user can access the current value, its position in the looping cycle and the total number of items using
Processor.item,Processor.i_itemandProcessor.n_item.
- get_filter(model_name: str) Filter[source]
Returns a registered
Filtervia the model name.If a filter for the provided model_name does not exist, a KeyError is raised.
- Parameters:
model_name (str) – The model name for which the filter will be returned.
- Returns:
The registered filter
- Return type:
- Raises:
KeyError is a filter with the give name is not found.
- get_items() Collection[Any][source]
Returns the item collections for the processor loop.
This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the database, or a list of files from the disk to be processed.
- Returns:
A collection of items for the loop
- Return type:
Collection[Any]
- get_parameter(name: str) PassiveParameter[ParameterType][source]
Gets the processor parameter named name.
- Parameters:
name (str) – The name of the parameter.
- Returns:
The processor parameter
- Return type:
- Raises:
ProcessorParameterError – If a parameter with name is not registered.
- get_parameters() dict[str, PassiveParameter[ParameterType]][source]
Returns the full dictionary of registered parameters for this processor.
Useful when dumping the parameter specification in a configuration file, for example.
- Returns:
The dictionary with the registered parameters.
- Return type:
dict[str, PassiveParameter[ParameterType]
- on_looping_status_set(status: LoopingStatus) None[source]
Call back invoked when the looping status is set.
The user can overload this method according to the needs.
- Parameters:
status (LoopingStatus) – The set looping status.
- on_processor_status_change(old_status: ProcessorStatus, new_status: ProcessorStatus) None[source]
Callback invoked when the processor status is changed.
- Parameters:
old_status (ProcessorStatus) – The old processor status.
new_status (ProcessorStatus) – The new processor status.
- print_process_statistics() None[source]
Print the process statistics.
A utility method to display the fastest, the slowest and the average timing required to process on a single item. This is particularly useful when the looping processor is part of a ProcessorList.
- process() None[source]
Processes the current item.
This is the core of the Processor, where the user has to define the calculations required.
- set_parameter_value(name: str, value: ParameterType) None[source]
Sets the value of a processor parameter.
- Parameters:
name (str) – The name of the parameter to be deleted.
value (ParameterType) – The value to be assigned to the parameter.
- Raises:
ProcessorParameterError – If a parameter with name is not registered.
- skip_item() None[source]
Does post process actions on a NOT successfully processed item.
Within the
process(), the user set the looping status to Skip, so it means that something went wrong and here corrective actions can be taken if needed.
- start() None[source]
Start method.
The user can overload this method, including all steps that should be performed at the beginning of the operation.
If the user decides to overload it, it should include a call to the super method.
- while_condition() bool[source]
Return the while condition
- Returns:
True if the while loop has to continue, false otherwise.
- Return type:
bool
- _ids = count(0)
A counter for all processor instances
- _methods_to_be_checked_for_super
List of methods to be checked for super inclusion.
It is a list of tuple, with the first element the name of the method to be checked and the second the base class to the be compared.
- property database: Database
Returns the database instance
- Returns:
A database object.
- Raises:
MissingDatabase – If the database connection has not been established.
- description
A short description of the processor task.
- filter_register: FilterRegister
The DB filter register of the Processor.
- property i_item: int
The enumeration of the current item being processed.
- item: Any
The current item of the loop.
- property local_resource_acquisition: bool
Checks if resources should be acquired locally.
When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute resources among the whole processor list. In this case, resources should not be acquired locally by the single processor, but from the parent execution context.
- Returns:
True if resources are to be acquired locally by the processor. False, otherwise.
- Return type:
bool
- loop_type: LoopType
The loop type.
The value of this parameter can also be changed by the
execution_workflow()decorator factory.See
LoopTypefor more details.
- looping_status
Looping modifier
- property n_item: int | None
The total number of items to be processed or None for an undefined loop
- name
The name of the processor.
- processor_exit_status
Processor exit status
- processor_status
Processor execution status
- progress_message: str = 'Processor is working'
Message displayed to show the progress.
It can be customized with information about the current item in the loop by overloading the
format_progress_message().
- remove_orphan_files: bool
The flag to remove or protect the orphan files. Defaults to True
- unique_id
A unique identifier representing how many instances of Processor has been created.
- property unique_name: str
Returns the unique name for the processor.
- class mafw.processor.ProcessorList(*args: Processor | ProcessorList, name: str | None = None, description: str | None = None, timer: Timer | None = None, timer_params: dict[str, Any] | None = None, user_interface: UserInterfaceBase | None = None, database: Database | None = None, database_conf: dict[str, Any] | None = None)[source]
Bases:
list[Processor|ProcessorList]A list like collection of processors.
ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList.
An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError.
Along with an iterable of processors, a new processor list can be built using the following parameters.
Constructor parameters:
- Parameters:
name (str, Optional) – The name of the processor list. Defaults to ProcessorList.
description (str, Optional) – An optional short description. Default to ProcessorList.
timer (Timer, Optional) – The timer object. If None is provided, a new one will be created. Defaults to None.
timer_params (dict, Optional) – A dictionary of parameter to build the timer object. Defaults to None.
user_interface (UserInterfaceBase, Optional) – A user interface. Defaults to None
database (Database, Optional) – A database instance. Defaults to None.
database_conf (dict, Optional) – Configuration for the database. Default to None.
- static validate_item(item: Processor | ProcessorList) Processor | ProcessorList[source]
Validates the item being added.
- static validate_items(items: tuple[Processor | ProcessorList, ...] = ()) tuple[Processor | ProcessorList, ...][source]
Validates a tuple of items being added.
- append(_ProcessorList__object: Processor | ProcessorList) None[source]
Appends a new processor at the end of the list.
- distribute_resources(processor: Processor | Self) None[source]
Distributes the external resources to the items in the list.
- execute() ProcessorExitStatus[source]
Execute the list of processors.
Similarly to the
Processor, ProcessorList can be executed. In simple words, the execute method of each processor in the list is called exactly in the same sequence as they were added.
- extend(_ProcessorList__iterable: Iterable[Processor | ProcessorList]) None[source]
Extends the processor list with a list of processors.
- insert(_ProcessorList__index: SupportsIndex, _ProcessorList__object: Processor | ProcessorList) None[source]
Adds a new processor at the specified index.
- property database: Database
Returns the database instance
- Returns:
A database instance
- Raises:
MissingDatabase – if a database connection is missing.
- property name: str
The name of the processor list
- Returns:
The name of the processor list
- Return type:
str
- property processor_exit_status: ProcessorExitStatus
The processor exit status.
It refers to the whole processor list execution.
- class mafw.processor.ProcessorMeta[source]
Bases:
typeA metaclass to implement the post-init method.
- mafw.processor.ensure_parameter_registration(func: F) F[source]
Decorator to ensure that before calling func the processor parameters have been registered.
- mafw.processor.validate_database_conf(database_conf: dict[str, Any] | None = None) dict[str, Any] | None[source]
Validates the database configuration.
- Parameters:
database_conf (dict, Optional) – The input database configuration. Defaults to None.
- Returns:
Either the validated database configuration or None if it is invalid.
- Return type:
dict, None