mafw.processor

Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core functionality of the MAFw.

Module Attributes

ParameterType

Generic variable type for the ActiveParameter and PassiveParameter.

F

Type variable for generic callable with any return value.

Functions

ensure_parameter_registration(func)

Decorator to ensure that before calling func the processor parameters have been registered.

validate_database_conf([database_conf])

Validates the database configuration.

Classes

ActiveParameter(name[, value, default, help_doc])

The public interface to the processor parameter.

PassiveParameter(name[, value, default, ...])

A processor parameter that can be registered and configured.

Processor(*args, **kwargs)

The basic processor.

ProcessorList(*args[, name, description, ...])

A list like collection of processors.

ProcessorMeta

A metaclass to implement the post-init method.

class mafw.processor.ActiveParameter(name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '')[source]

Bases: Generic[ParameterType]

The public interface to the processor parameter.

The behaviour of a Processor can be customized by using processor parameters. The value of these parameters can be either set via a configuration file or directly when creating the class.

If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an ActiveParameter instance in this way:

class MyProcessor(Processor):

    # this is the input folder
    input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files')

    def __init__(self, *args, **kwargs):
        super().__init(*args, **kwargs)

        # change the input folder to something else
        self.input_folder = Path(r'D:\data')

        # get the value of the parameter
        print(self.input_folder)

The ActiveParameter is a descriptor, it means that when you create one of them, a lot of work is done behind the scene.

In simple words, a processor parameter is made by two objects: a public interface where the user can easily access the value of the parameter and a private interface where all other information (default, documentation…) is also stored.

The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as in the code snipped above, the private interface is automatically created and will stay in the class instance until the end of the class lifetime.

To access the private interface, the user can use the Processor.get_parameter() method using the parameter name as a key.

See also

The private counter part in the PassiveParameter.

Constructor parameters:

Parameters:
  • name (str) – The name of the parameter.

  • value (ParameterType, Optional) – The initial value of the parameter. Defaults to None.

  • default (ParameterType, Optional) – The default value of the parameter, to be used when value is not set., Defaults to None.

  • help_doc (str, Optional) – An explanatory text describing the parameter.

class mafw.processor.F

Type variable for generic callable with any return value.

alias of TypeVar(‘F’, bound=Callable[[…], Any])

class mafw.processor.ParameterType

Generic variable type for the ActiveParameter and PassiveParameter.

alias of TypeVar(‘ParameterType’)

class mafw.processor.PassiveParameter(name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '')[source]

Bases: Generic[ParameterType]

A processor parameter that can be registered and configured.

For Processors to perform their analytical task, it may be necessary to have some configurable parameters, like a DB input table or the output folder or a numeric parameter.

The name of the parameter must be unique within the Processor scope and a valid python identifier.

When defined as a PassiveParameter, an instance variable can have a default value if the user did not provide one, and it is much easier to configure via a configuration file.

A parameter for which only a default value is provided is automatically considered optional.

If both the value and the default value are not provided, an exception is raised.

This class is working behind the scene, that is why it is named passive. The user will very likely add class instances of ActiveParameter, that are publicly exposed in the processor class namespace, and an PassiveParameter will automatically added to the class. To access this passive parameter the user can use the Processor.get_parameter() using the name as key. The value of the passive parameter is always accessible using calling the corresponding ActiveParameter.

See also

An explanation on how processor parameters work and should be used is given in Understanding processor parameters

Constructor parameters:

Parameters:
  • name (str) – The name of the parameter. It must be a valid python identifier.

  • value (ParameterType, Optional) – The set value of the parameter. If None, then the default value will be used. Defaults to None.

  • default (ParameterType, Optional) – The default value for the parameter. It is used if the value is not provided. Defaults to None.

  • help_doc (str, Optional) – A brief explanation of the parameter.

Raises:

ProcessorParameterError – if both value and default are not provided or if name is not a valid identifier.

property is_optional: bool

Property to check if the parameter is optional.

Returns:

True if the parameter is optional

Return type:

bool

property is_set: bool

Property to check if the value has been set.

It is useful for optional parameter to see if the current value is the default one, or if the user set it.

property value: ParameterType

Gets the parameter value.

Returns:

The parameter value.

Return type:

ParameterType

Raises:

ProcessorParameterError – if both value and default were not defined.

class mafw.processor.Processor(*args: Any, **kwargs: Any)[source]

Bases: object

The basic processor.

A very comprehensive description of what a Processor does and how it works is available at Processor: The core of MAFw.

Constructor parameters

Parameters:
  • name (str, Optional) – The name of the processor. If None is provided, the class name is used instead. Defaults to None.

  • description (str, Optional) – A short description of the processor task. Defaults to the processor name.

  • config (dict, Optional) – A configuration dictionary for this processor. Defaults to None.

  • looper (LoopType, Optional) – Enumerator to define the looping type. Defaults to LoopType.ForLoop

  • user_interface (UserInterfaceBase, Optional) – A user interface instance to be used by the processor to interact with the user.

  • timer (Timer, Optional) – A timer object to measure process duration.

  • timer_params (dict, Optional) – Parameters for the timer object.

  • database (Database, Optional) – A database instance. Defaults to None.

  • database_conf (dict, Optional) – Configuration for the database. Default to None.

  • remove_orphan_files (bool, Optional) – Boolean flag to remove files on disc without a reference to the database. See Standard tables and _remove_orphan_files(). Defaults to True

  • kwargs – Keyword arguments that can be used to set processor parameters.

_check_method_overload() None[source]

Check if the user overloaded the required methods.

Depending on the loop type, the user must overload different methods. This method is doing the check and if the required methods are not overloaded a warning is emitted.

_check_method_super() None[source]

Check if some specific methods are calling their super.

For some specific methods (for example: start and finish), the user should always call their super method. This method verifies that the user implementation of these methods is including a super call, otherwise a warning is emitted to inform the user about the problem and possible misbehaviour of the processor.

The list of methods to be verified is stored in a private class attribute _methods_to_be_checked_for_super as a list of tuples, made by the name of the methods to be verified and the base class for comparison. The base class is required because Processor subclasses may be extending this list with methods that are not present in the base Processor. See, for example, the patch_data_frame() that is required to have a super call, but it is not present in the base Processor.

_execute_for_loop() None[source]

Executes the processor within a for loop.

Private method. Do not overload nor invoke it directly. The execute() method will call the appropriate implementation depending on the processor LoopType.

_execute_single() None[source]

Execute the processor in single mode.

Private method. Do not overload nor invoke it directly. The execute() method will call the appropriate implementation depending on the processor LoopType.

_execute_while_loop() None[source]

Executes the processor within a while loop.

Private method. Do not overload nor invoke it directly. The execute() method will call the appropriate implementation depending on the processor LoopType.

_remove_orphan_files() None[source]

Remove orphan files.

If a connection to the database is available, then the OrphanFile standard table is queried for all its entries, and all the files are then removed.

The user can turn off this behaviour by switching the remove_orphan_files to False.

accept_item() None[source]

Does post process actions on a successfully processed item.

Within the process(), the user left the looping status to Continue, so it means that everything looks good and this is the right place to perform database updates or file savings.

acquire_resources() None[source]

Acquires resources and add them to the resource stack.

The whole body of the execute() method is within a context structure. The idea is that if any part of the code inside should throw an exception that breaking the execution, we want to be sure that all stateful resources are properly closed.

Since the number of resources may vary, the variable number of nested with statements has been replaced by an ExitStack. Resources, like open files, timers, db connections, need to be added to the resource stacks in this method.

In the case a processor is being executed within a ProcessorList, then some resources might be shared, and for this reason they are not added to the stack. This selection can be done via the private local_resource_acquisition. This is normally True, meaning that the processor will handle its resources independently, but when the processor is executed from a ProcessorList, this flag is automatically turned to False.

If the user wants to add additional resources, he has to overload this method calling the super to preserve the original resources. If he wants to have shared resources among different processors executed from inside a processor list, he has to overload the ProcessorList class as well.

delete_parameter(name: str) None[source]

Deletes a processor parameter.

Parameters:

name (str) – The name of the parameter to be deleted.

Raises:

ProcessorParameterError – If a parameter with name is not registered.

dump_parameter_configuration(option: int = 1) dict[str, Any][source]

Dumps the processor parameter values in a dictionary.

The snipped below explains the meaning of option.

# option 1
conf_dict1 = {
    'Processor': {'param1': 5, 'input_table': 'my_table'}
}

# option 2
conf_dict2 = {'param1': 5, 'input_table': 'my_table'}
Parameters:

option (int, Optional) – Select the dictionary style. Defaults to 1.

Returns:

A parameter configuration dictionary.

Return type:

dict

execute() None[source]

Execute the processor tasks.

This method works as a dispatcher, reassigning the call to a more specific execution implementation depending on the loop_type.

finish() None[source]

Concludes the execution.

The user can reimplement this method if there are some conclusive tasks that must be achieved. Always include a call to super().

format_progress_message() None[source]

Customizes the progress message with information about the current item.

The user can overload this method in order to modify the message being displayed during the process loop with information about the current item.

The user can access the current value, its position in the looping cycle and the total number of items using Processor.item, Processor.i_item and Processor.n_item.

get_filter(model_name: str) Filter[source]

Returns a registered Filter via the model name.

If a filter for the provided model_name does not exist, a KeyError is raised.

Parameters:

model_name (str) – The model name for which the filter will be returned.

Returns:

The registered filter

Return type:

mafw.db.db_filter.Filter

Raises:

KeyError is a filter with the give name is not found.

get_items() Collection[Any][source]

Returns the item collections for the processor loop.

This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the database, or a list of files from the disk to be processed.

Returns:

A collection of items for the loop

Return type:

Collection[Any]

get_parameter(name: str) PassiveParameter[ParameterType][source]

Gets the processor parameter named name.

Parameters:

name (str) – The name of the parameter.

Returns:

The processor parameter

Return type:

PassiveParameter

Raises:

ProcessorParameterError – If a parameter with name is not registered.

get_parameters() dict[str, PassiveParameter[ParameterType]][source]

Returns the full dictionary of registered parameters for this processor.

Useful when dumping the parameter specification in a configuration file, for example.

Returns:

The dictionary with the registered parameters.

Return type:

dict[str, PassiveParameter[ParameterType]

on_looping_status_set(status: LoopingStatus) None[source]

Call back invoked when the looping status is set.

The user can overload this method according to the needs.

Parameters:

status (LoopingStatus) – The set looping status.

on_processor_status_change(old_status: ProcessorStatus, new_status: ProcessorStatus) None[source]

Callback invoked when the processor status is changed.

Parameters:
print_process_statistics() None[source]

Print the process statistics.

A utility method to display the fastest, the slowest and the average timing required to process on a single item. This is particularly useful when the looping processor is part of a ProcessorList.

process() None[source]

Processes the current item.

This is the core of the Processor, where the user has to define the calculations required.

set_parameter_value(name: str, value: ParameterType) None[source]

Sets the value of a processor parameter.

Parameters:
  • name (str) – The name of the parameter to be deleted.

  • value (ParameterType) – The value to be assigned to the parameter.

Raises:

ProcessorParameterError – If a parameter with name is not registered.

skip_item() None[source]

Does post process actions on a NOT successfully processed item.

Within the process(), the user set the looping status to Skip, so it means that something went wrong and here corrective actions can be taken if needed.

start() None[source]

Start method.

The user can overload this method, including all steps that should be performed at the beginning of the operation.

If the user decides to overload it, it should include a call to the super method.

while_condition() bool[source]

Return the while condition

Returns:

True if the while loop has to continue, false otherwise.

Return type:

bool

_ids = count(0)

A counter for all processor instances

_methods_to_be_checked_for_super

List of methods to be checked for super inclusion.

It is a list of tuple, with the first element the name of the method to be checked and the second the base class to the be compared.

property database: Database

Returns the database instance

Returns:

A database object.

Raises:

MissingDatabase – If the database connection has not been established.

description

A short description of the processor task.

filter_register: FilterRegister

The DB filter register of the Processor.

property i_item: int

The enumeration of the current item being processed.

item: Any

The current item of the loop.

property local_resource_acquisition: bool

Checks if resources should be acquired locally.

When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute resources among the whole processor list. In this case, resources should not be acquired locally by the single processor, but from the parent execution context.

Returns:

True if resources are to be acquired locally by the processor. False, otherwise.

Return type:

bool

loop_type: LoopType

The loop type.

The value of this parameter can also be changed by the execution_workflow() decorator factory.

See LoopType for more details.

looping_status

Looping modifier

property n_item: int | None

The total number of items to be processed or None for an undefined loop

name

The name of the processor.

processor_exit_status

Processor exit status

processor_status

Processor execution status

progress_message: str = 'Processor is working'

Message displayed to show the progress.

It can be customized with information about the current item in the loop by overloading the format_progress_message().

remove_orphan_files: bool

The flag to remove or protect the orphan files. Defaults to True

unique_id

A unique identifier representing how many instances of Processor has been created.

property unique_name: str

Returns the unique name for the processor.

class mafw.processor.ProcessorList(*args: Processor | ProcessorList, name: str | None = None, description: str | None = None, timer: Timer | None = None, timer_params: dict[str, Any] | None = None, user_interface: UserInterfaceBase | None = None, database: Database | None = None, database_conf: dict[str, Any] | None = None)[source]

Bases: list[Processor | ProcessorList]

A list like collection of processors.

ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList.

An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError.

Along with an iterable of processors, a new processor list can be built using the following parameters.

Constructor parameters:

Parameters:
  • name (str, Optional) – The name of the processor list. Defaults to ProcessorList.

  • description (str, Optional) – An optional short description. Default to ProcessorList.

  • timer (Timer, Optional) – The timer object. If None is provided, a new one will be created. Defaults to None.

  • timer_params (dict, Optional) – A dictionary of parameter to build the timer object. Defaults to None.

  • user_interface (UserInterfaceBase, Optional) – A user interface. Defaults to None

  • database (Database, Optional) – A database instance. Defaults to None.

  • database_conf (dict, Optional) – Configuration for the database. Default to None.

static validate_item(item: Processor | ProcessorList) Processor | ProcessorList[source]

Validates the item being added.

static validate_items(items: tuple[Processor | ProcessorList, ...] = ()) tuple[Processor | ProcessorList, ...][source]

Validates a tuple of items being added.

acquire_resources() None[source]

Acquires external resources.

append(_ProcessorList__object: Processor | ProcessorList) None[source]

Appends a new processor at the end of the list.

distribute_resources(processor: Processor | Self) None[source]

Distributes the external resources to the items in the list.

execute() ProcessorExitStatus[source]

Execute the list of processors.

Similarly to the Processor, ProcessorList can be executed. In simple words, the execute method of each processor in the list is called exactly in the same sequence as they were added.

extend(_ProcessorList__iterable: Iterable[Processor | ProcessorList]) None[source]

Extends the processor list with a list of processors.

insert(_ProcessorList__index: SupportsIndex, _ProcessorList__object: Processor | ProcessorList) None[source]

Adds a new processor at the specified index.

property database: Database

Returns the database instance

Returns:

A database instance

Raises:

MissingDatabase – if a database connection is missing.

property name: str

The name of the processor list

Returns:

The name of the processor list

Return type:

str

property processor_exit_status: ProcessorExitStatus

The processor exit status.

It refers to the whole processor list execution.

class mafw.processor.ProcessorMeta[source]

Bases: type

A metaclass to implement the post-init method.

mafw.processor.ensure_parameter_registration(func: F) F[source]

Decorator to ensure that before calling func the processor parameters have been registered.

mafw.processor.validate_database_conf(database_conf: dict[str, Any] | None = None) dict[str, Any] | None[source]

Validates the database configuration.

Parameters:

database_conf (dict, Optional) – The input database configuration. Defaults to None.

Returns:

Either the validated database configuration or None if it is invalid.

Return type:

dict, None