mafw.processor

Module implements the basic Processor class, the ProcessorList and all helper classes to achieve the core functionality of the MAFw.

Module Attributes

ParameterType

Generic variable type for the ActiveParameter and PassiveParameter.

F

Type variable for generic callable with any return value.

Functions

ensure_parameter_registration(func)

Decorator to ensure that before calling func the processor parameters have been registered.

validate_database_conf([database_conf])

Validates the database configuration.

Classes

ActiveParameter(name[, value, default, help_doc])

The public interface to the processor parameter.

PassiveParameter(name[, value, default, ...])

An helper class to store processor parameter value and metadata.

Processor(*args, **kwargs)

The basic processor.

ProcessorList(*args[, name, description, ...])

A list like collection of processors.

ProcessorMeta

A metaclass to implement the post-init method.

class mafw.processor.ActiveParameter(name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '')[source]

Bases: Generic[ParameterType]

The public interface to the processor parameter.

The behaviour of a Processor can be customised by using processor parameters. The value of these parameters can be either set via a configuration file or directly when creating the class.

If the user wants to benefit from this facility, they have to add in the instance of the Processor subclass an ActiveParameter instance in this way:

class MyProcessor(Processor):

    # this is the input folder
    input_folder = ActiveParameter('input_folder', Path(r'C:\'), help_doc='This is where to look for input files')

    def __init__(self, *args, **kwargs):
        super().__init(*args, **kwargs)

        # change the input folder to something else
        self.input_folder = Path(r'D:\data')

        # get the value of the parameter
        print(self.input_folder)

The ActiveParameter is a descriptor, it means that when you create one of them, a lot of work is done behind the scene.

In simple words, a processor parameter is made by two objects: a public interface where the user can easily access the value of the parameter and a private interface where all other information (default, documentation…) is also stored.

The user does not have to take care of all of this. When a new ActiveParameter instance is added to the class as in the code snippet above, the private interface is automatically created and will stay in the class instance until the end of the class lifetime.

To access the private interface, the user can use the Processor.get_parameter() method using the parameter name as a key.

The user can assign to an ActiveParameter almost any name. There are just a few invalid parameter names that are used for other purposes. The list of reserved names is available here. Should the user inadvertently use a reserved named, a ProcessorParameterError is raised.

See also

The private counter part in the PassiveParameter.

An explanation on how processor parameters work and should be used is given in Understanding processor parameters

The list of reserved names.

Constructor parameters:

Parameters:
  • name (str) – The name of the parameter.

  • value (ParameterType, Optional) – The initial value of the parameter. Defaults to None.

  • default (ParameterType, Optional) – The default value of the parameter, to be used when value is not set., Defaults to None.

  • help_doc (str, Optional) – An explanatory text describing the parameter.

_validate_name(proposed_name: str) str[source]

Validate that the proposed parameter name is not in the list of forbidden names.

This private method checks if the provided name is allowed for use as a processor parameter. Names that are listed in reserved_names cannot be used as parameter names.

Parameters:

proposed_name (str) – The name to be validated for use as a processor parameter.

Returns:

The validated name if it passes the forbidden names check.

Return type:

str

Raises:

ProcessorParameterError – If the proposed name is in the list of forbidden names.

reserved_names: list[str] = ['__logic__', '__filter__', '__new_only__', '__inheritance__']

A list of names that cannot be used as processor parameter names.

  • __logic__

  • __filter__

  • __new_only__

  • __inheritance__

class mafw.processor.F

Type variable for generic callable with any return value.

alias of TypeVar(‘F’, bound=Callable[[…], Any])

class mafw.processor.ParameterType

Generic variable type for the ActiveParameter and PassiveParameter.

alias of TypeVar(‘ParameterType’)

class mafw.processor.PassiveParameter(name: str, value: ParameterType | None = None, default: ParameterType | None = None, help_doc: str = '')[source]

Bases: Generic[ParameterType]

An helper class to store processor parameter value and metadata.

This class is the private interface used by the ActiveParameter descriptor to store its value and metadata.

When a new ActiveParameter is added to a class, an instance of a PassiveParameter is added to the processor parameter register.

See also

An explanation on how processor parameters work and should be used is given in Understanding processor parameters

Changed in version v2.0.0: User should only use ActiveParameter and never manually instantiate PassiveParameter.

Constructor parameters:

Parameters:
  • name (str) – The name of the parameter. It must be a valid python identifier.

  • value (ParameterType, Optional) – The set value of the parameter. If None, then the default value will be used. Defaults to None.

  • default (ParameterType, Optional) – The default value for the parameter. It is used if the value is not provided. Defaults to None.

  • help_doc (str, Optional) – A brief explanation of the parameter.

Raises:

ProcessorParameterError – if both value and default are not provided or if name is not a valid identifier.

property is_optional: bool

Property to check if the parameter is optional.

Returns:

True if the parameter is optional

Return type:

bool

property is_set: bool

Property to check if the value has been set.

It is useful for optional parameter to see if the current value is the default one, or if the user set it.

property value: ParameterType

Gets the parameter value.

Returns:

The parameter value.

Return type:

ParameterType

Raises:

ProcessorParameterError – if both value and default were not defined.

class mafw.processor.Processor(*args: Any, **kwargs: Any)[source]

Bases: object

The basic processor.

A very comprehensive description of what a Processor does and how it works is available at Processor: The core of MAFw.

Constructor parameters

Parameters:
  • name (str, Optional) – The name of the processor. If None is provided, the class name is used instead. Defaults to None.

  • description (str, Optional) – A short description of the processor task. Defaults to the processor name.

  • config (dict, Optional) – A configuration dictionary for this processor. Defaults to None.

  • looper (LoopType, Optional) – Enumerator to define the looping type. Defaults to LoopType.ForLoop

  • user_interface (UserInterfaceBase, Optional) – A user interface instance to be used by the processor to interact with the user.

  • timer (Timer, Optional) – A timer object to measure process duration.

  • timer_params (dict, Optional) – Parameters for the timer object.

  • database (Database, Optional) – A database instance. Defaults to None.

  • database_conf (dict, Optional) – Configuration for the database. Default to None.

  • remove_orphan_files (bool, Optional) – Boolean flag to remove files on disc without a reference to the database. See Standard tables and _remove_orphan_files(). Defaults to True

  • replica_id (str, Optional) – The replica identifier for the current processor.

  • create_standard_tables (bool, Optional) – Boolean flag to create std tables on disk. Defaults to True

  • kwargs – Keyword arguments that can be used to set processor parameters.

_check_method_overload() None[source]

Check if the user overloaded the required methods.

Depending on the loop type, the user must overload different methods. This method is doing the check and if the required methods are not overloaded a warning is emitted.

_check_method_super() None[source]

Check if some specific methods are calling their super.

For some specific methods (for example: start and finish), the user should always call their super method. This method verifies that the user implementation of these methods is including a super call, otherwise a warning is emitted to inform the user about the problem and possible misbehaviour of the processor.

The list of methods to be verified is stored in a private class attribute _methods_to_be_checked_for_super as a list of tuples, made by the name of the methods to be verified and the base class for comparison. The base class is required because Processor subclasses may be extending this list with methods that are not present in the base Processor. See, for example, the patch_data_frame() that is required to have a super call, but it is not present in the base Processor.

_execute_for_loop() None[source]

Executes the processor within a for loop.

Private method. Do not overload nor invoke it directly. The execute() method will call the appropriate implementation depending on the processor LoopType.

_execute_single() None[source]

Execute the processor in single mode.

Private method. Do not overload nor invoke it directly. The execute() method will call the appropriate implementation depending on the processor LoopType.

_execute_while_loop() None[source]

Executes the processor within a while loop.

Private method. Do not overload nor invoke it directly. The execute() method will call the appropriate implementation depending on the processor LoopType.

_load_parameter_configuration() None[source]

Load processor parameter configuration from the internal configuration dictionary.

This method processes the processor’s configuration dictionary to set parameter values. It handles two configuration formats:

  1. Nested format: {'ProcessorName': {'param1': value1, ...}}

  2. Flat format: {'param1': value1, ...}

The method also handles filter configurations by collecting filter table names and deferring their initialisation until after the global filter has been processed.

Changed in version v2.0.0: For option 1 combining configuration from name and name_replica

Raises:

ProcessorParameterError – If a parameter in the configuration is not registered.

_override_defaults() None[source]

Override default parameter values with values from new_defaults.

This private method iterates through the new_defaults dictionary and updates the corresponding processor parameters with new values. Only parameters that exist in both new_defaults and _processor_parameters are updated.

Added in version v2.0.0.

_overrule_kws_parameters() None[source]

Override processor parameters with values from keyword arguments.

This method applies parameter values passed as keyword arguments during processor initialisation. It ensures that the parameter types match the expected types before setting the values.

_register_parameters() None[source]

Register processor parameters defined as ActiveParameter instances in the class.

This private method scans the class definition for any ActiveParameter instances and creates corresponding PassiveParameter instances to store the actual parameter values and metadata. It ensures that all processor parameters are properly initialised and available for configuration through the processor’s configuration system.

The method checks for duplicate parameter names and raises a ProcessorParameterError if duplicates are detected. It also sets the internal flag _parameter_registered to True once registration is complete.

Note

This method is automatically called during processor initialisation and should not be called directly by users.

Changed in version v2.0.0: Only ActiveParameter are not registered. The use of PassiveParameter is only meant to store the value and metadata of the active counter part.

_remove_orphan_files() None[source]

Remove orphan files.

If a connection to the database is available, then the OrphanFile standard table is queried for all its entries, and all the files are then removed.

The user can turn off this behaviour by switching the remove_orphan_files to False.

_reset_parameters() None[source]

Reset processor parameters to their initial state.

This method clears all currently registered processor parameters and triggers a fresh registration process. It’s useful when parameter configurations need to be reinitialized or when parameters have been modified and need to be reset.

accept_item() None[source]

Does post process actions on a successfully processed item.

Within the process(), the user left the looping status to Continue, so it means that everything looks good and this is the right place to perform database updates or file savings.

acquire_resources() None[source]

Acquires resources and add them to the resource stack.

The whole body of the execute() method is within a context structure. The idea is that if any part of the code inside should throw an exception that breaking the execution, we want to be sure that all stateful resources are properly closed.

Since the number of resources may vary, the variable number of nested with statements has been replaced by an ExitStack. Resources, like open files, timers, db connections, need to be added to the resource stacks in this method.

In the case a processor is being executed within a ProcessorList, then some resources might be shared, and for this reason they are not added to the stack. This selection can be done via the private local_resource_acquisition. This is normally True, meaning that the processor will handle its resources independently, but when the processor is executed from a ProcessorList, this flag is automatically turned to False.

If the user wants to add additional resources, he has to overload this method calling the super to preserve the original resources. If he wants to have shared resources among different processors executed from inside a processor list, he has to overload the ProcessorList class as well.

delete_parameter(name: str) None[source]

Deletes a processor parameter.

Parameters:

name (str) – The name of the parameter to be deleted.

Raises:

ProcessorParameterError – If a parameter with name is not registered.

dump_parameter_configuration(option: int = 1) dict[str, Any][source]

Dumps the processor parameter values in a dictionary.

The snippet below explains the meaning of option.

# option 1
conf_dict1 = {
    'Processor': {'param1': 5, 'input_table': 'my_table'}
}

# option 2
conf_dict2 = {'param1': 5, 'input_table': 'my_table'}

In the case of option 1, the replica aware name (replica_name()) will be used as a key for the configuration dictionary.

Changed in version v2.0.0: With option 1, using replica_name() instead of name as key of the configuration dictionary.

Parameters:

option (int, Optional) – Select the dictionary style. Defaults to 1.

Returns:

A parameter configuration dictionary.

Return type:

dict

execute() None[source]

Execute the processor tasks.

This method works as a dispatcher, reassigning the call to a more specific execution implementation depending on the loop_type.

finish() None[source]

Concludes the execution.

The user can reimplement this method if there are some conclusive tasks that must be achieved. Always include a call to super().

format_progress_message() None[source]

Customizes the progress message with information about the current item.

The user can overload this method in order to modify the message being displayed during the process loop with information about the current item.

The user can access the current value, its position in the looping cycle and the total number of items using Processor.item, Processor.i_item and Processor.n_item.

get_filter(model_name: str) ModelFilter[source]

Returns a registered ModelFilter via the model name.

If a filter for the provided model_name does not exist, a KeyError is raised.

Parameters:

model_name (str) – The model name for which the filter will be returned.

Returns:

The registered filter

Return type:

mafw.db.db_filter.ModelFilter

Raises:

KeyError is a filter with the give name is not found.

get_items() Collection[Any][source]

Returns the item collections for the processor loop.

This method must be overloaded for the processor to work. Generally, this is getting a list of rows from the database, or a list of files from the disk to be processed.

Returns:

A collection of items for the loop

Return type:

Collection[Any]

get_parameter(name: str) PassiveParameter[ParameterType][source]

Gets the processor parameter named name.

Parameters:

name (str) – The name of the parameter.

Returns:

The processor parameter

Return type:

PassiveParameter

Raises:

ProcessorParameterError – If a parameter with name is not registered.

get_parameters() dict[str, PassiveParameter[ParameterType]][source]

Returns the full dictionary of registered parameters for this processor.

Useful when dumping the parameter specification in a configuration file, for example.

Returns:

The dictionary with the registered parameters.

Return type:

dict[str, PassiveParameter[ParameterType]

initialise_parameters() None[source]

Initialises processor parameters by registering them and applying various configuration sources.

This method orchestrates the parameter initialisation process by performing the following steps in order:

  1. Registers processor parameters defined as ActiveParameter instances

  2. Overrides default parameter values with any configured overrides

  3. Loads parameter configuration from the processor’s configuration dictionary

  4. Applies keyword arguments as parameter overrides

The method ensures that all processor parameters are properly configured before the processor execution begins. It is automatically called during processor initialisation and should not typically be called directly by users.

Added in version v2.0.0.

on_looping_status_set(status: LoopingStatus) None[source]

Call back invoked when the looping status is set.

The user can overload this method according to the needs.

Parameters:

status (LoopingStatus) – The set looping status.

on_processor_status_change(old_status: ProcessorStatus, new_status: ProcessorStatus) None[source]

Callback invoked when the processor status is changed.

Parameters:
print_process_statistics() None[source]

Print the process statistics.

A utility method to display the fastest, the slowest and the average timing required to process on a single item. This is particularly useful when the looping processor is part of a ProcessorList.

process() None[source]

Processes the current item.

This is the core of the Processor, where the user has to define the calculations required.

set_parameter_value(name: str, value: ParameterType) None[source]

Sets the value of a processor parameter.

Parameters:
  • name (str) – The name of the parameter to be deleted.

  • value (ParameterType) – The value to be assigned to the parameter.

Raises:

ProcessorParameterError – If a parameter with name is not registered.

skip_item() None[source]

Does post process actions on a NOT successfully processed item.

Within the process(), the user set the looping status to Skip, so it means that something went wrong and here corrective actions can be taken if needed.

start() None[source]

Start method.

The user can overload this method, including all steps that should be performed at the beginning of the operation.

If the user decides to overload it, it should include a call to the super method.

validate_configuration() None[source]

Validate the configuration provided via the processor parameters.

Method to be implemented by subclasses if a configuration validation is needed.

The method should silently check for the proper configuration, if this is not obtained, then the InvalidConfigurationError must be raised.

Added in version v2.0.0.

while_condition() bool[source]

Return the while condition

Returns:

True if the while loop has to continue, false otherwise.

Return type:

bool

_config: dict[str, Any]

A dictionary containing the processor configuration object.

This dictionary is populated with configuration parameter (always type 2) during the _load_parameter_configuration() method.

The original value of the configuration dictionary that is passed to the constructor is stored in _orig_config.

Changed in version v2.0.0: Now it is an empty dictionary until the _load_parameter_configuration() is called.

_ids = count(0)

A counter for all processor instances

_methods_to_be_checked_for_super

List of methods to be checked for super inclusion.

It is a list of tuple, with the first element the name of the method to be checked and the second the base class to the be compared.

_orig_config

A copy of the original configuration dictionary.

Added in version v2.0.0.

_parameter_registered

A boolean flag to confirm successful parameter registration.

_processor_parameters: dict[str, PassiveParameter[ParameterType]]

A dictionary to store all the processor parameter instances.

The name of the parameter is used as a key, while for the value an instance of the PassiveParameter is used.

create_standard_tables

The boolean flag to proceed or skip with standard table creation and initialisation

property database: Database

Returns the database instance

Returns:

A database object.

Raises:

MissingDatabase – If the database connection has not been established.

description

A short description of the processor task.

filter_register: ProcessorFilter

The DB filter register of the Processor.

property i_item: int

The enumeration of the current item being processed.

item: Any

The current item of the loop.

property local_resource_acquisition: bool

Checks if resources should be acquired locally.

When the processor is executed in stand-alone mode, it is responsible to acquire and release its own external resources, but when it is executed from a ProcessorList, then is a good practice to share and distribute resources among the whole processor list. In this case, resources should not be acquired locally by the single processor, but from the parent execution context.

Returns:

True if resources are to be acquired locally by the processor. False, otherwise.

Return type:

bool

loop_type: LoopType

The loop type.

The value of this parameter can also be changed by the execution_workflow() decorator factory.

See LoopType for more details.

looping_status

Looping modifier

property n_item: int | None

The total number of items to be processed or None for an undefined loop

name

The name of the processor.

new_defaults: dict[str, Any] = {}

A dictionary containing defaults value for the parameters to be overridden

Added in version v2.0.0.

processor_exit_status

Processor exit status

processor_status

Processor execution status

progress_message: str = 'Processor is working'

Message displayed to show the progress.

It can be customized with information about the current item in the loop by overloading the format_progress_message().

remove_orphan_files: bool

The flag to remove or protect the orphan files. Defaults to True

replica_id

The replica identifier specified in the constructor

Added in version v2.0.0.

property replica_name: str

Returns the replica aware name of the processor.

If no replica_id is specified, then return the pure name, otherwise join the two string using the ‘#’ symbol.

Added in version v2.0.0.

Returns:

The replica aware name of the processor.

Return type:

str

unique_id

A unique identifier representing how many instances of Processor has been created.

property unique_name: str

Returns the unique name for the processor.

class mafw.processor.ProcessorList(*args: Processor | ProcessorList, name: str | None = None, description: str | None = None, timer: Timer | None = None, timer_params: dict[str, Any] | None = None, user_interface: UserInterfaceBase | None = None, database: Database | None = None, database_conf: dict[str, Any] | None = None, create_standard_tables: bool = True)[source]

Bases: list[Processor | ProcessorList]

A list like collection of processors.

ProcessorList is a subclass of list containing only Processor subclasses or other ProcessorList.

An attempt to add an element that is not a Processor or a ProcessorList will raise a TypeError.

Along with an iterable of processors, a new processor list can be built using the following parameters.

Constructor parameters:

Parameters:
  • name (str, Optional) – The name of the processor list. Defaults to ProcessorList.

  • description (str, Optional) – An optional short description. Default to ProcessorList.

  • timer (Timer, Optional) – The timer object. If None is provided, a new one will be created. Defaults to None.

  • timer_params (dict, Optional) – A dictionary of parameter to build the timer object. Defaults to None.

  • user_interface (UserInterfaceBase, Optional) – A user interface. Defaults to None

  • database (Database, Optional) – A database instance. Defaults to None.

  • database_conf (dict, Optional) – Configuration for the database. Default to None.

  • create_standard_tables (bool, Optional) – Whether or not to create the standard tables. Defaults to True.

static validate_item(item: Processor | ProcessorList) Processor | ProcessorList[source]

Validates the item being added.

static validate_items(items: tuple[Processor | ProcessorList, ...] = ()) tuple[Processor | ProcessorList, ...][source]

Validates a tuple of items being added.

acquire_resources() None[source]

Acquires external resources.

append(_ProcessorList__object: Processor | ProcessorList) None[source]

Appends a new processor at the end of the list.

distribute_resources(processor: Processor | Self) None[source]

Distributes the external resources to the items in the list.

execute() ProcessorExitStatus[source]

Execute the list of processors.

Similarly to the Processor, ProcessorList can be executed. In simple words, the execute method of each processor in the list is called exactly in the same sequence as they were added.

extend(_ProcessorList__iterable: Iterable[Processor | ProcessorList]) None[source]

Extends the processor list with a list of processors.

insert(_ProcessorList__index: SupportsIndex, _ProcessorList__object: Processor | ProcessorList) None[source]

Adds a new processor at the specified index.

create_standard_tables

The boolean flag to proceed or skip with standard table creation and initialisation

property database: Database

Returns the database instance

Returns:

A database instance

Raises:

MissingDatabase – if a database connection is missing.

property name: str

The name of the processor list

Returns:

The name of the processor list

Return type:

str

nested_list

Boolean flag to identify that this list is actually inside another list.

Similarly to the local resource flag for the Processor, this flag prevent the user interface to be added to the resource stack.

property processor_exit_status: ProcessorExitStatus

The processor exit status.

It refers to the whole processor list execution.

class mafw.processor.ProcessorMeta[source]

Bases: type

A metaclass to implement the post-init method.

mafw.processor.ensure_parameter_registration(func: F) F[source]

Decorator to ensure that before calling func the processor parameters have been registered.

mafw.processor.validate_database_conf(database_conf: dict[str, Any] | None = None) dict[str, Any] | None[source]

Validates the database configuration.

Parameters:

database_conf (dict, Optional) – The input database configuration. Defaults to None.

Returns:

Either the validated database configuration or None if it is invalid.

Return type:

dict, None