ProcessorList: Combine your processors in one go
So far we have seen how a Processor can be coded to perform a simple task with a certain degree of generality offered by the configurable parameters. But analytical tasks are normally rather complex and coding the whole task in a single processor will actually go against the mantra of simplicity and code reusability of MAFw.
To tackle your complex analytical task, MAFw proposes a solution that involves chaining multiple processors together. The following processors can start where the previous one stopped so that like in a building game, scientists can put together their analytical solution with simple blocks.
From a practical point of view, this is achieved via the ProcessorList that is an evolution of the basic python list, which can contain only instances of processor subclasses or other ProcessorLists.
Once you have appended the processors in the order you want them to be executed, just call the execute() method of the list and it will take care of running all the processors.
As simple as that:
1def run_simple_processor_list():
2 """Simplest way to run several processors in a go."""
3 from mafw.examples.sum_processor import AccumulatorProcessor, GaussAdder
4 from mafw.processor import ProcessorList
5
6 # create the list. name and description are optional
7 new_list = ProcessorList(name='AddingProcessor', description='Summing up numbers')
8
9 # append the processors. you can pass parameters to the processors in the standard way
10 max_value = 120
11 new_list.append(AccumulatorProcessor(last_value=max_value))
12 new_list.append(GaussAdder(last_value=max_value))
13
14 # execute the list. This will execute all the processors in the list
15 new_list.execute()
16
17 # you can access single processors in the list, in the standard way.
18 # remember that the ProcessorList is actually a list!
19 assert new_list[0].accumulated_value == new_list[1].sum_value
The ProcessorExitStatus
We have seen in a previous section that the user can modify the looping behavior of a processor by using the LoopingStatus enumerator. In a similar manner, the execution loop of a processor list can be modified looking at the ProcessorExitStatus of each processors.
When one processor in the list is finishing its task, the ProcessorList is checking for its exit status before moving to the next item. If a processor is finishing with an Abort status, then the processor list will raise a AbortProcessorException that will cause the loop to be interrupted.
Let us have a look at the snippet here below:
1def run_processor_list_with_loop_modifier():
2 """Example on deal with processors inside a processor list changing the loop structure.
3
4 In this example there are two processors, one that will run until the end and the other that will set the looping
5 status to abort half way. The user can see what happens when the :class:`~mafw.processor.ProcessorList` is executed.
6 """
7 import time
8
9 from mafw.enumerators import LoopingStatus, ProcessorExitStatus, ProcessorStatus
10 from mafw.mafw_errors import AbortProcessorException
11 from mafw.processor import ActiveParameter, Processor, ProcessorList
12
13 class GoodProcessor(Processor):
14 n_loop = ActiveParameter('n_loop', default=100, help_doc='The n of the loop')
15 sleep_time = ActiveParameter('sleep_time', default=0.01, help_doc='So much work')
16
17 def get_items(self) -> list[int]:
18 return list(range(self.n_loop))
19
20 def process(self):
21 # pretend to do something, but actually sleep
22 time.sleep(self.sleep_time)
23
24 def finish(self):
25 super().finish()
26 print(f'{self.name} just finished with status: {self.processor_exit_status.name}')
27
28 class BadProcessor(Processor):
29 n_loop = ActiveParameter('n_loop', default=100, help_doc='The n of the loop')
30 sleep_time = ActiveParameter('sleep_time', default=0.01, help_doc='So much work')
31 im_bad = ActiveParameter('im_bad', default=50, help_doc='I will crash it!')
32
33 def get_items(self) -> list[int]:
34 return list(range(self.n_loop))
35
36 def process(self):
37 if self.item == self.im_bad:
38 self.looping_status = LoopingStatus.Abort
39 return
40 # let me do my job
41 time.sleep(self.sleep_time)
42
43 def finish(self):
44 super().finish()
45 print(f'{self.name} just finished with status: {self.processor_exit_status.name}')
46
47 proc_list = ProcessorList(name='with exception')
48 proc_list.extend([GoodProcessor(), BadProcessor(), GoodProcessor()])
49 try:
50 proc_list.execute()
51 except AbortProcessorException:
52 print('I know you were a bad guy')
53 assert proc_list.processor_exit_status == ProcessorExitStatus.Aborted
54 assert proc_list[0].processor_exit_status == ProcessorExitStatus.Successful
55 assert proc_list[1].processor_exit_status == ProcessorExitStatus.Aborted
56 assert proc_list[2].processor_status == ProcessorStatus.Init
We created two processors, a good and a bad one. The good one is doing nothing, but getting till the end of its job. The bad one is also doing nothing but giving up before the end of the item list. In the process method, the bad processor is setting the looping status to abort, causing the for loop to break immediately and to call finish right away. In the processor finish method, we check if the status was aborted and in such a case we set the exit status of the processor to Aborted.
At line 47, we create a list and we populate it with three elements, a good, a bad and another good processor and we execute it inside a try/except block. The execution of the first good processor finished properly as you can see from the print out and also from the fact that its status (line 54) is Successful. The second processor did not behave, the exception was caught by the except clause and this is confirmed at line 55 by its exit status. The third processor was not even started because the whole processor list got stopped in the middle of processor 2.
Resources acquisition and distribution
While it may seems somewhat technical for this for this tutorial, it is worth highlighting an intriguing implementation detail.
If you look at the constructor of the Processor class, you will notice that you can provide some resources, like the Timer and UserInterface even though we have never done this so far. The idea is that when you execute a single processor, it is fully responsible of creating the required resources by itself, using them during the execution and then closing them when finishing.
Just as an example, consider the case of the use of a database interface. The processor is opening the connection, doing all the needed transactions and finally closing the connection. This approach is also very practical because it is much easier to keep expectations under control.
If you run a ProcessorList, you may want to move the responsibility of handling the resources from the single Processor to the output ProcessorList.
This approach allows all processors to share the same resources efficiently, eliminating the need to repeatedly open and close the database connection each time the ProcessorList advances to the next item.
You do not have to care about this shift in responsibility, it is automatically done behind the scene when you add a processor to the processor lists.
What’s next
In this part we have seen how we can chain the execution of any number of processors all sharing the same resources. Moreover, we have seen how we can change the looping among different processors using the exit status.
Now it is time to move forward and see how you can add your own processor library!