Processor: The core of MAFw
The Processor is responsible to carry out a specific analytical task, in other simple words, it takes some input data, it does some calculations or manipulations, and it produces some output data.
The input and output data can be of any type: a simple list of numbers, a structured data frame, a path to a file where data are stored, an output graph or a link to a web resource, a database table, and so on.
In ultra simplified words, a Processor does three sequential actions:
Prepare the conditions to operate (
start())Process the data (
process())Clean up what is left (
finish()).
Execution workflow
There are instances where tasks can be executed in a single operation across your entire dataset, such as generating a graph from the data. However, there are also situations where you need to iterate over a list of similar items, applying the same process repeatedly. Additionally, you may need to continue a process until a specific condition is met. The Processor can actually accomplish all those different execution workflows just by changing a variable: the loop type.
You will be presented a more detailed description of the different workflows in the following sections.
The single loop execution
The first family has a simplified execution workflow schematically shown here below:
The distinction of roles between the three methods is purely academic, one could implement all the preparation, the real calculation and the clean up in one method and the processor will work in the same way. The methods are anyhow preserved to offer a similar execution scheme also for the other looping scheme.
The for loop execution
In python a for loop is generally performed on a list of items and MAFw is actually following the same strategy. Here below is the schematic workflow.
As you can see, after having called the start(), the user must provide a list of items to be processed implementing the get_items(). This can be the list of files in a directory, or the rows in a DB table or even a list of simple numbers; whatever best suit the user’s needs. You will soon learn how to deal with database entries and how to filter them.
Now everything is ready to start the loop and call the process() as many times as the items in the input list. In the process implementation, the user can realize that something went wrong with a particular item and can modify what is executed next (accept_item() or skip_item()). See an example of such a possibility here.
At the end of the loop it is time to clean up everything, saving files, updating DBs and so one. This is again the task of the finish() method.
The while loop execution
From a programming point of view, the for loop and the while loop execution are rather similar. Between the execution of the start and the finish method, the process is repeated for a certain number of times until a certain condition is met.
In this case, there is no list of items to loop over, but a condition that should be checked. Thus the user has to overload the while_condition() method to return a boolean value: True if the loop has to continue or False if it has to stop and go to the finish.
How to switch from one loop type to another
It is extremely simple to switch from one execution scheme to another. The Processor class takes the argument loop_type, just change this value in the processor init and the rest will come automatically.
Remember that, by default a processor is set to work with a for loop workflow, and thus you have to implement the get_items(). If you switch to while loop, then you need to implement while_condition() for the system to work.
The last thing, you need to know is how to run a processor. First you create an instance of your processor, then you call the execute() method. There are other more practical ways, not involving any coding that we will discuss later on.
A comparison between the for loop and the while loop execution workflow is described in the example page.
Subclassing
The basic processor provided in the library does nothing specifically, it is only used as a skeleton for the execution. In order to perform some real analysis, the user has to subclass it and overload some methods. Having gained a clear understanding of the role of each steps in the execution workflow, read the list of methods that can be overloaded here below and then you are ready to see some simple Processor Examples.
List of methods to be overloaded
The process() method.
This is the central part of the processor, and it must contain all the calculations. If the processor is looping on a list of input items, the user can access the current item via the
itemattribute.
The start() method.
This is useful to prepare the condition to operate. For example, it is a good idea to open files and do all the preparatory work. In a looping processor, this method is called just before the cycle starts.
The finish() method.
This is useful to clean up after the work. For example, to save files, update DB tables and so on. In a looping processor this is performed just after the last iteration. Here is also the place where you can set the value of the
ProcessorExitStatus. This attribute is particularly relevant when several processors are executed in a daisy-chained manner. See more about this here and here.
Specifically for processors with a looping workflow, these methods need to be subclassed.
The get_items() method.
This is where the input collection of items is generated. Very likely this can be a list of files resulting from a glob or the rows of an input table or similar. This is required only for for_loop processors and must return an iterable object.
The while_condition() method.
This is the point at which the decision is made to either continue or terminate the loop. This is required only for while_loop processors and must return a boolean value.
The accept_item() and skip_item() methods (optional).
The execution of the for loop can be slightly modified using the
LoopingStatus(see this example). If the current iteration was successful, then the user can decide to perform some actions, otherwise if the current iteration was failing, then some other actions can be taken.
The format_progress_message() (optional).
During the process execution, your console may look frozen because your CPU is working out your analysis, thus it may be relevant to have every now and then an update on the progress. The
Processorwill automatically display regular messages via the logging system about the progress (more or less every 10% of the total number of items), but the message it is using is rather generic and does not contain any information about the current item.By overloading this method, you can include information about the current item and customize the content of the message. You can use
itemto refer to the current item being processed. Here below is an example:def format_progress_message(self): self.progress_message = f'{self.name} is processing {self.item}'Be aware that if your items are objects without a good __repr__ or string conversion, the output may be a little messy.
Customize and document you processor
You are a scientist, we know, the only text you like to write is the manuscript with your last research results. Nevertheless, documenting your code, in particular your processor is a very helpful approach. We strongly recommend to use docstring to give a description of what a processor does. If you do so, you will get a surprise when you will generate your first steering file.
It is also a very good practice to provide help/doc information to your parameters using the help_doc argument of the ActiveParameter and PassiveParameter. If you do so, your first steering file will be much more readable.
One more point, each Processor has a class attribute named description, this is a short string that is used by some user interfaces (like RichInterface) to make the progress bars more meaningful.
Processor parameters
One super power of MAFw is its capability to re-use code, that means less work, less bugs and more efficiency.
In order to boost code re-usability, one should implement Processor accomplishing one single task and possibly doing it with a lot of general freedom. If you have a processor to calculate the Threshold of B/W images and you have hard coded the threshold algorithm and tomorrow you decide to give a try to another algorithm, then you have to recode a processor that actually already exists.
The solution is to have processor parameters, kind of variable that can be changed in order to make your processor more general and more useful. A note of caution, if you opt for too many parameters, then it may become too difficult to configure your processor. As always, the optimal solution often lies in finding a balance.
You can have parameters implemented in the processor subclass as normal attributes, but then you would need to modify the code in order to change them and this is far from practical. You can have them as specific variables passed to the processor init, but then you would need to code the command line to pass this value or to implement a way to read a configuration file. MAFw has already done all this for you as long as you use the right way to declare processor parameters.
Let us start with a bit of code.
class MyProcessor(Processor):
"""
This is my wonderful processor.
It needs to know the folder where the input files are stored.
Let us put this in a processor parameter.
:param input_folder: The input folder.
"""
input_folder = ActiveParameter(name='input_folder', default=Path.cwd(),
help_doc='This is the input folder for the file')
def __init__(self, *args, **kwargs):
super().__init(*args, **kwargs)
# change the input folder to something else
self.input_folder = Path(r'D:\data')
# get the value of the parameter
print(self.input_folder)
Some important notes: input_folder is defined as a class attribute rather then as an instance attribute, it is to say it is outside the init method. This must always be the case for processor parameters. The fact that the variable and the parameter names are the same is not compulsory, but why do you want to make your life more complicated!
We have declared input_folder as an ActiveParameter, but if you put a break point in the code
execution and you inspect the type of self.input_folder you will see that it is a Path object. This is because
ActiveParameter is a descriptor,
a very pythonic way to have a double interface for one single variable.
Public ActiveParameter and private PassiveParameter
When you define input_folder, you use up to four parameters in the ActiveParameter init,
but if you try to access those attributes from input_folder you will get an error message, because class Path does not
have the attributes you are looking for. Where are then those values gone?
When you create an ActiveParameter, there is a lot of work behind the scene: first of all your
class is dynamically modified and an additional attribute named param_input_file is created and it is an instance of
PassiveParameter. This private attribute [1] is actually playing the role of the container,
storing the value of the parameter, plus a bunch of other interesting stuff (default, help…).
The set and get dunder methods of the input_file are overloaded to operate directly on the private interface, more or less in the same way like when you define property setter and getter, but here it is done all automatically.
If you want to access the private interface, you can still do it. And if you have forgotten the name that is automatically assigned (param_param_name), you can always use the processor get_parameter() using the parameter name as a key. Theoretically you can even set the value of the parameter using the private interface (set_parameter_value()), but this is equivalent to set public interface directly.
Let us summarize all this with an example:
from mafw.processor import Processor, ActiveParameter
class MyFavProcessor(Processor):
useful_param = ActiveParameter('useful_param', default=0, help_doc='Important parameter')
# create your processor, initialize the parameter with a keyword argument.
my_fav = MyFavProcessor(useful_param=10)
# print the value of useful_param in all possible ways
print(my_fav.useful_param)
print(my_fav.param_useful_param.value)
print(my_fav.get_parameter('useful_param').value)
# change the value of useful_param in all possible ways
my_fav.useful_param += 1
my_fav.param_useful_param.value += 1
my_fav.get_parameter('useful_param').value += 1
print(my_fav.useful_param)
# access other fields of the parameter
print(my_fav.get_parameter('useful_param').doc)
This is the output that will be generated:
10
10
10
13
Important parameter
Parameter configuration
We have seen how to add flexibility to a processor including parameters, but how do you configure the parameters?
You have probably noticed that for both ActiveParameter and PassiveParameter you have the possibility to pass a default value, that is a very good practice, especially for very advanced parameters that will remain untouched most of the time.
If you want to set a value for a parameter, the easiest way is via the processor __init__ method. The basic Processor accepts any number of keyword arguments that can be used exactly for this purpose. Just add a keyword argument named after the parameter and the processor will take care of the rest.
Have a look at the example below covering both the case of Active and Passive parameters:
1class MyProcessor(Processor):
2 active_param = ActiveParameter('active', default=0, help_doc='An active parameter')
3
4 def __init__(self, *args, **kwargs):
5 super().__init__(*args, **kwargs)
6 self.passive_param = PassiveParameter('passive', default='I am a string', help_doc='A string')
7
8my_p = MyProcessor(active=100, passive='a better string', looper='single')
9
10print(my_p.active_param) # we get 100
11assert my_p.active_param == 100
12
13print(my_p.passive_param.value) # we get 'a better string'
14assert my_p.passive_param.value == 'a better string'
Note that the best way is to avoid to explicitly include the parameter names in the init signature. They will be collected anyhow through keyword arguments and registered automatically.
The second approach is to use a configuration object, it is to say a dictionary containing all the parameters key and value pairs. This is particularly handy when using a configuration file. Exactly for this reason, the configuration object can have one of the two following structures. In both cases the configuration object has to be passed to the class using the keyword config
1@single_loop
2class ConfigProcessor(Processor):
3 def __init__(self, *args, **kwargs):
4 super().__init__(*args, **kwargs)
5 self.p1 = PassiveParameter('param1', default='value')
6 self.p2 = PassiveParameter('param2', default='another_value')
7
8cp = ConfigProcessor(config=dict(param1='new_value', param2='better_value', param3='do not exists'))
9assert cp.get_parameter('param1').value == 'new_value'
10assert cp.get_parameter('param2').value == 'better_value'
11dumped_config = cp.dump_parameter_configuration(option=2)
12assert dumped_config == dict(param1='new_value', param2='better_value')
13dumped_config = cp.dump_parameter_configuration(option=20)
14assert dumped_config == dict(param1='new_value', param2='better_value')
15
16config = {'ConfigProcessor': {'param1': '1st', 'param2': '2nd'}}
17cp = ConfigProcessor(config=config)
18assert cp.get_parameter('param1').value == '1st'
19assert cp.get_parameter('param2').value == '2nd'
20dumped_config = cp.dump_parameter_configuration()
21assert config == dumped_config
The limitation of ActiveParameter
With great power comes great responsibility. The use of ActiveParameters has a lot of advantages as we have seen, we can make our processor easily reusable and customizable. The configuration process is straightforward, whether using keyword arguments or a configuration file. But there is at least one disadvantage the user should be aware of and it is connected with the fact that all ActiveParameter must be declared as class attributes (read about the difference between class and instance attributes here). The consequence of this is that two instances of the same class will share the same value for all parameters.
Have a look at the code below:
1from mafw.processor import ActiveParameter, Processor
2
3@single_loop
4class MyProcess(Processor):
5 my_param = ActiveParameter('my_param', default=10)
6
7my_proc = MyProcess(my_param=12)
8print(my_proc.my_param) # we expect 12
9assert my_proc.my_param == 12
10
11second_proc = MyProcess(my_param=15)
12print(second_proc.my_param) # we expect 15
13assert second_proc.my_param == 15
14
15print(my_proc.my_param) # we would expect 12, but we get 15
16assert my_proc.my_param == 15
Having more than one instance of the same processor type in the same execution run is not really a common situation, but nevertheless there is a workaround. The user can move the definition of the parameter from the class to the instance (moving it inside the processor __init__ method), and using a PassiveParameter instead of an active one.
In this way, the parameter will remain bound to the instance (and not to the class), but the user will have to access attributes of the passive parameter using the dot notation. All other super-powers, like parameter registration and configuration, remain unchanged. See the following snippet for a demonstration.
1from mafw.processor import PassiveParameter, Processor
2
3class MyProcess(Processor):
4 def __init__(self, *args, **kwargs):
5 super().__init__(*args, looper='single', **kwargs)
6 self.my_param = PassiveParameter('my_param', default=10)
7
8my_proc = MyProcess(my_param=12)
9print(my_proc.my_param.value) # we expect 12
10assert my_proc.my_param.value == 12
11
12second_proc = MyProcess(my_param=15)
13assert second_proc.my_param.value == 15
14print(second_proc.my_param.value) # we expect 15
15
16print(my_proc.my_param.value) # we expect 12 and we get 12!
17assert my_proc.my_param.value == 12
The emphasized lines are showing the difference between the two approaches. There is no init method in the first snippet because the ActiveParameter is defined at the class level. In the second snippet, we moved the definition of my_param in the init method to make it an instance attribute, but we had to change from Active to PassiveParameter. To access the value of the passive parameter we need to use the value() method.
Saving parameter configuration
We have seen that we can configure processor using a dictionary with parameter / value pairs. This is very handy, because we can load toml file with the configuration to be used for all the processors we want to execute.
We don’t want you to write toml file hand, for this we have a function
dump_processor_parameters_to_toml() that will generate an output file with all the parameter
values.
But which value is stored? The default or the actual one? Good question!
Let us start from the basics: we have just seen that there are two types of parameters: the
ActiveParameter and the PassiveParameter with the former being class
attributes and the latter instance attributes.
This last detail (class or instance attributes) makes a lot of difference. If you change a class attribute, this impacts all instances of that class and of all its subclasses as well. If you change an instance attribute, this will only affect that specific instance.
Let us have a look at the following example.
from mafw.processor import ActiveParameter, PassiveParameter
class ActiveParameterProcessor(Processor):
"""A processor with one active parameter."""
active_param = ActiveParameter('active_param', default=-1,
help_doc='An active parameter with default value -1')
class AnotherActiveParameterProcessor(Processor):
"""Another processor with one active parameter."""
active_param = ActiveParameter('active_param', default=-1,
help_doc='An active parameter with default value -1')
class PassiveParameterProcessor(Processor):
"""A processor with one passive parameter."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.passive_param = PassiveParameter('passive_param', default=-1,
help_doc='A passive parameter with default value -1')
class AnotherPassiveParameterProcessor(Processor):
"""Another processor with one passive parameter."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.passive_param = PassiveParameter('passive_param', default=-1,
help_doc='A passive parameter with default value -1')
We have defined four different processors, two with active parameters and two with passive parameters.
Now let us have a look at what happens when we attempt to dump the configuration of these processors.
processor_list = [ActiveParameterProcessor, AnotherPassiveParameterProcessor,
PassiveParameterProcessor, AnotherActiveParameterProcessor]
dump_processor_parameters_to_toml(processor_list, 'test1.toml')
We have not created any instances of none of the four processors, and the processor_list consists of four classes
(no instance). When passed to the dump_processor_parameters_to_toml() function, an instance of each class
type is created and the parameter dictionary is retrieved. The creation of an instance is required because only
after __init__ all parameters are registered.
Inside the function, the instances are created without passing any parameters, so it means that for all parameters the dumped value will be the initial value of the parameter, if specified, or the default value if a value is not given in the parameter definition.
The output of the first example will be with no surprise, all four processors will have their parameters listed and connected to their default values.
[ActiveParameterProcessor] # A processor with one active parameter.
active_param = -1 # An active parameter with default value -1
[AnotherPassiveParameterProcessor] # Another processor with one passive parameter.
passive_param = -1 # A passive parameter with default value -1
[PassiveParameterProcessor] # A processor with one passive parameter.
passive_param = -1 # A passive parameter with default value -1
[AnotherActiveParameterProcessor] # Another processor with one active parameter.
active_param = -1 # An active parameter with default value -1
Now, let us take a step forward and add a bit more complexity to the situation. See the next snippet:
# create an instance of ActiveParameterProcessor with a specific value of the parameter
# but we will include the class in the processor list.
active_processor_instance = ActiveParameterProcessor(active_param=100)
# create an instance of AnotherActiveParameterProcessor with a specific value of the parameter
# and we will submit the instance
another_active_processor_instance = AnotherActiveParameterProcessor(active_param=101)
# create an instance of PassiveParameterProcessor with a specific value of the parameter
# but we will submit the class via the use of type.
passive_processor_instance = PassiveParameterProcessor(passive_param=102)
# create an instance of AnotherPassiveParameterProcessor with a specific value of the parameter
# and we will submit the instance.
another_passive_processor_instance = AnotherPassiveParameterProcessor(passive_param=103)
processor_list = [
ActiveParameterProcessor, # a class
another_active_processor_instance, # an instance
type(passive_processor_instance), # a class
another_passive_processor_instance # an instance
]
dump_processor_parameters_to_toml(processor_list, 'test2.toml')
This time we have a mixed list, the first item is a class, so the function will have to create an instance of this. The same is happening with the third element (the type function is actually turning back the instance into a class). For these two elements, the generated instances will use no constructor arguments. You might assume that the the default values are automatically written to the output file, but surprisingly, that is not the case. When you have created the active_processor_instance you set the value of active_param to 100 and since it is a class attribute, this will be applied to all instances of this class. In the case of the third one, since the parameter is passive, the default value will be actually dumped.
The second element is an instance, so it will not be regenerated inside the function, but since active_param is indeed a class attribute, the modified value will be stored in the file. The last one is an instance thus a new instance will not be generated internally and the actual value of the parameter will be stored.
Here is the produced configuration file:
[ActiveParameterProcessor] # A processor with one active parameter.
active_param = 100 # An active parameter with default value -1
[AnotherActiveParameterProcessor] # Another processor with one active parameter.
active_param = 101 # An active parameter with default value -1
[PassiveParameterProcessor] # A processor with one passive parameter.
passive_param = -1 # A passive parameter with default value -1
[AnotherPassiveParameterProcessor] # Another processor with one passive parameter.
passive_param = 103 # A passive parameter with default value -1
Ultimately, the values that will be output depends on the type of parameter used (Active / Passive) and in case of Passive parameters also on the fact that you pass an instance or a class.
In the end, this is not really important because the user should dump TOML configuration files as a sort of template to be checked, modified and adapted by the user for a specific run.
Parameter typing
When creating a Passive or Active parameter, you have the option to directly specify the parameter’s type using the typing template, but you can also do it, and probably in a simpler way assigning a reasonable default value. While this is not really important for numbers, it is extremely important if you want to interpret string as Path object.
If you declare the default value as a Path, for example Path.cwd(), then the string read from the configuration file will be automatically converted in a Path.
Note
If you intend to have a float parameter, use a decimal number, for example 0., as default, otherwise the interpreter will assume it is an integer and convert to int the parameter being read from the configuration file.
One more note about parameters. Theoretically speaking you could also have custom objects / classes as parameters, but this will become a problem when you will be loading the parameters from a TOML file. Actually two problems:
The TOML writer is not necessarily able to convert your custom type to a valid TOML type (number, string…). If your custom type has a relatively easy string representation then you can add an encoder to the TOML writer and teach it how to write your object. See for example the encoder for the Path object.
1def path_encoder(obj: Any) -> Item: 2 """Encoder for PathItem.""" 3 if isinstance(obj, PosixPath): 4 return PathItem.from_raw(str(obj), type_=StringType.SLB, escape=False) 5 elif isinstance(obj, WindowsPath): 6 return PathItem.from_raw(str(obj), type_=StringType.SLL, escape=False) 7 else: 8 raise ConvertErrorEven though you managed to write your class to the TOML steering file, you have now the problem of reading back the steering file information and build your custom type with that.
One way to overcome this limitation might be to write to the steering file the __repr__ of your custom class and at read back time to use eval to transform it back to your class. This below would be a more concrete implementation:
1import tomlkit
2from tomlkit.items import String, Item, ConvertError
3from tomlkit.toml_file import TOMLFile
4
5class MyClass:
6 """A custom class to be used as a processor parameter"""
7 def __init__(self, a, b):
8 self.a = a
9 self.b = b
10
11 def __repr__(self):
12 """IT MUST BE IMPLEMENTED"""
13 return f"{self.__class__.__name__}({self.a}, {self.b})"
14
15class MyClassItem(String):
16 """TOML item representing a MyClass"""
17 def unwrap(self) -> MyClass:
18 return MyClass(*super().unwrap())
19
20def my_class_encoder(obj: MyClassItem) -> Item:
21 """Encoder for MyClassItem."""
22 if isinstance(obj, MyClass):
23 # we write the class as a string using the class repr.
24 return MyClassItem.from_raw(repr(obj))
25 else:
26 raise ConvertError
27
28# register the encoder
29tomlkit.register_encoder(my_class_encoder)
30
31# ------------------
32# write to TOML file
33# ------------------
34
35my_class = MyClass(10,24)
36doc = tomlkit.document()
37doc.add('my_class', my_class)
38doc.add('simple_i', 15)
39
40with open('test.toml', 'w') as fd:
41 tomlkit.dump(doc, fd)
42
43# ------------------------
44# read back from TOML file
45# ------------------------
46
47doc = TOMLFile('test.toml').read()
48read_back_class = eval(doc['my_class'])
49try:
50 simple_i = eval(doc['simple_i'])
51except TypeError:
52 simple_i = doc['simple_i']
53
54assert isinstance(read_back_class, MyClass)
55assert read_back_class.a == my_class.a
56
57assert isinstance(simple_i, int)
58assert simple_i == 15
my_class = "MyClass(10, 24)"
simple_i = 15
This approach, even though possible, is rather risky, we all know how dangerous eval can be especially when using it directly with information coming from external files.
Furthermore, it is worth considering whether having a custom class as a parameter is a processor is truly necessary. Often, there are simpler and safer alternatives available.
What’s next
You reached the end of the first part. It means that by now you have understood what a processor is and how you can subclass the basic class to implement your analytical tasks, both with looping or single shot workflow. You learned a lot about parameters and how you can configure it.
The next section will be about chaining more processors one after the other using the ProcessorList.
Footnotes