Database: your new buddy!
As a scientist you are used to work with many software tools and often you need to write your own programs because the ones available do not really match your needs. Databases are not so common among scientists, we do not really understand why, but nevertheless their strength is beyond question.
The demonstration of the power and usefulness of a database assisted analysis framework will become clear and evident during our tutorial where we will build a small analytical experiment from scratch one step at the time.
For the time being, let us concentrate a bit on the technical aspect without delving too deeply into the specifics.
Database: one name, many implementations
Database is really a generic term and, from a technical point of view, one should try to be more specific and define better what kind of database we are talking about. We are not just referring to the brand or the producing software house: there are indeed different database architectures, but the one best suited for our application is the relational database, where each entity can be related directly to one or more other entities. You can read about relational databases on wikipedia for example, and if you find it too complicated, have a look at this shorter and easier version.
The software market is full of relational databases, from the very common MySQL to the very famous ORACLE, passing through the open source PostgreSQL to finish with SQLite the simplest and most used database in the world. As you may have guessed from their names, they are all sharing the same query language (SQL: structured query language), making it rather simple to have an abstract interface, that is to say a layer in between the user and the actual database that allows your code to work in the same way irrespectively of the specific implementation.
Peewee: a simple and small ORM
Of course there are also lots of different abstract interfaces, some more performing than the others. We have selected:link:peewee, because it is lightweight, easy to understand and to use, and it works with several different implementations.
peewee is a ORM (promised this is the last acronym for this page!), it is to say an object relational mappers, or in simpler words a smart way to connect the tables in your database with python classes in your code. Have a look at this interesting article for a more detailed explanation.
peewee offers at least three different backends: SQLite, MySQL and PostgreSQL. If the size of your project is small to medium and the analysis is mainly performed on a single computer, then we recommend SQLite: the entire database will be living in a single file on your disc, eliminating the need for IT experts to set up a database server. If you are aiming for a much bigger project with distributed computing power, then the other two choices are probably equally good and your local IT helpdesk may suggest you what is best for your configuration and the available IT infrastructure. As you see, MAFw is always offering you a tailored solution!
Now, take a short break from this page, move to the peewee documentation and read the Quickstart section before coming back here.
Database drivers
If you have installed MAFw via pip without specifying the all-db optional features, then your python environment is very likely missing the python drivers to connect to MySQL and PostgreSQL. This is not a bug, but more a feature, because MAFw gives you the freedom to select the database implementation that fits your needs. Sqlite is natively supported by python, so you do not need to install anything extra, but if you want to use MySQL, PostgreSQL or any other DB supported by peewee than it is your responsibility to install in your environment the proper driver. Here you can find a list of DB driver compatible with peewee.
If you want, you can install MAFw adding the all-db optional feature and in this way the standard MySQL and PostgreSQL drivers will also be installed.
One class for each table
As mentioned before, the goal of an ORM is to establish a link between a database table and a python class. You can use the class to retrieve existing rows or to add new ones, and as always, you do not need take care of the boring parts, like establishing the connection, creating the table and so on, because this is the task of MAFw and we do it gladly for you!
Let us have a look together to the following example. We want a processor that lists recursively all files starting from a given directory and adds the filenames and the file hash digests to a table in the database.
Let us start with some imports
1import datetime
2from pathlib import Path
3
4from peewee import DateTimeField, IntegerField, TextField
5
6import mafw.processor
7from mafw.db.db_configurations import db_scheme, default_conf
8from mafw.db.db_model import MAFwBaseModel
9from mafw.decorators import database_required
10from mafw.tools.file_tools import file_checksum
The crucial one is at line 8, where we import MAFwBaseModel that is the base model for all the tables we want to handle with MAFw. Your tables must inherit from that one, if you want the Processor to take care of handling the link between the class and the table in the database.
At line 4, we import some classes from peewee, that will define the columns in our model class and consequently in our DB table.
Now let us create the model class for our table of files.
1class File(MAFwBaseModel):
2 """The Model class representing the table in the database"""
3
4 filename = TextField(primary_key=True)
5 digest = TextField()
6 creation_date = DateTimeField()
7 file_size = IntegerField()
Note
A much better implementation of a similar class will be given later on demonstrating the power of custom defined fields.
As you can see the class definition is extremely simple. We define a class attribute for each of the columns we want to have in the table and we can choose the field type from a long list of available ones or we can even easily implement our owns. The role of a field is to adapt the type of the column from the native python type to the native database type and vice versa.
Our table will have just four columns, but you can have as many as you like. We will have one text field with the full filename, another text containing the hexadecimal hashlib digest of the file, the creation date for which we will use a datetime field, and finally a file_size field of integer type. We will be using the filename column as a primary key, because there cannot be two files with the same filename. On the contrary, there might be two identical files having the same hash but different filenames. According to many good database experts, using a not numeric primary key is not a good practice, but for our small example it is very practical.
If you do not specify any primary key, the ORM will add an additional number auto-incrementing column for this purpose. If you want to specify multiple primary keys, this is what you should do. If you want to create a model without a primary key, here is what you need to do.
The ORM will define the actual name of the table in the database and the column names. You do not need to worry about this!
What is the name of my table?
We have created a Model class so far, that is the link between a python object and a database table. But what is the name of the table in the database? And the name of the columns?
All those information belong to the so called model metadata and are stored in a inner class of the MAFwBaseModel. Peewee uses a standard way to name tables and columns and as long as your are happy with this you do not have to worry about.
In general, MAFw is following this convention: model classes are written with CamelCase, while database tables are in snake_case. In other words, if you have a model class named RawData this will be linked to a database table named raw_data.
If you want to customize this naming convention, you can assign a specific name to a field as explained here and for the table name take a look at this page
And now comes the processor that will be doing the real work, it is to say, filling the File table.
1@database_required
2class FillFileTableProcessor(mafw.processor.Processor):
3 """Processor to fill a table with the content of a directory"""
4
5 root_folder: Path = mafw.processor.ActiveParameter(
6 'root_folder', default=Path.cwd(), help_doc='The root folder for the file listing'
7 )
8
9 def __init__(self, *args, **kwargs):
10 """
11 Constructor parameter:
12
13 :param root_folder: ActiveParameter corresponding to the directory from
14 where to start the recursive search
15 :type root_folder: Path, Optional
16 """
17 super().__init__(*args, **kwargs)
18 self.data: list[dict] = []
19
20 def format_progress_message(self):
21 self.progress_message = f'Upserting {self.item.name}'
22
23 def start(self):
24 """Starts the execution.
25
26 Be sure that the table corresponding to the File model exists.
27 It it does already exists, it is not a problem.
28 """
29 super().start()
30 self.database.create_tables((File,))
31
32 def get_items(self) -> list[Path]:
33 """Retrieves the list of files.
34
35 Insert or update the files from the root folder to the database
36
37 :return: The list of full filename
38 :rtype: list[Path]
39 """
40 file_list = []
41 if self.root_folder.is_file():
42 return [self.root_folder]
43 elif self.root_folder.is_dir():
44 for f in self.root_folder.glob('**/*'):
45 if f.is_file():
46 file_list.append(f)
47 else: # root_file is a glob
48 for f in self.root_folder.parent.glob(self.root_folder.name):
49 file_list.append(f)
50 return file_list
51
52 def process(self):
53 """Add all information to the data list"""
54 self.data.append(
55 dict(
56 filename=str(self.item),
57 digest=file_checksum(self.item),
58 file_size=self.item.stat().st_size,
59 creation_date=datetime.datetime.fromtimestamp(self.item.stat().st_mtime),
60 )
61 )
62
63 def finish(self):
64 """Transfers all the data to the File table via an atomic transaction."""
65 with self.database.atomic():
66 File.insert_many(self.data).on_conflict_replace().execute()
67 super().finish()
The first thing to notice is at line 1, where we used the decorator database_required(). The use of this decorator is actually not compulsory, its goal is to raise an exception if the user tries to execute the processor without having a properly initialized database.
At line 30, in the start method we ask the database to create the table corresponding to our File model. If the table already exists, then nothing will happen.
In the process method we will store all the information we have collected from the files into a list and we interact with the database only in the finish method. At line 65, we use a context manager to create an atomic transaction and then, at line 66, we insert in the File all our entries and in case a row with the same primary key exists, then it is replaced.
We could have used several different insert approaches, here below are few examples:
# create an instance of File with all fields initialized
new_file = File(filename=str(self.item),
digest=file_checksum(self.item),
file_size=self.item.stat().st_size,
creation_date=datetime.datetime.fromtimestamp(self.item.stat().st_mtime))
new_file.save() # new_file is now stored in the database
# create an instance of File and add the fields later
new_file = File()
new_file.filename = str(self.item)
new_file.digest = file_checksum(self.item)
new_file.file_size = self.item.stat().st_size
new_file.creation_data = datetime.datetime.fromtimestamp(self.item.stat().st_mtime)
new_file.save()
# create and insert directly
new_file = File.create(filename=str(self.item),
digest=file_checksum(self.item),
file_size=self.item.stat().st_size,
creation_date=datetime.datetime.fromtimestamp(self.item.stat().st_mtime))
To choice of approach to follow depends on various factor. Keep in mind that peewee operates by default in auto commit mode, meaning that for each database interaction, it creates a transaction to do the operation and it closes afterwards.
To be more performant from the database point of view, especially when you have several operations that can be grouped together, you can create an atomic transaction where the ORM will open one transaction only to perform all the required operations.
What we have done in the finish method is actually known as an upsert. It means that we will be inserting new items or updating them if they exist already.
Ready, go!
We have prepared the code, now we can try to run it. We can do it directly from a script
if __name__ == '__main__':
database_conf = default_conf['sqlite']
database_conf['URL'] = db_scheme['sqlite'] + str( Path.cwd() / Path('test.db'))
db_proc = FillFileTableProcessor(root_folder =r'C:\Users\bulghao\Documents\autorad-analysis\EdgeTrimming',
database_conf=database_conf)
db_proc.execute()
or in a more elegant way we can use the mafw app to run, but first we need to generate the proper steering file.
c:\> mafw steering db-processor.toml
A generic steering file has been saved in db-processor.toml.
Open it in your favourite text editor, change the processors_to_run list and save it.
To execute it launch: mafw run db-processor.toml.
1# MAFw steering file generated on 2024-11-24 22:13:38.248423
2
3# uncomment the line below and insert the processors you want to run from the available processor list
4processors_to_run = ["FillFileTableProcessor"]
5
6# customise the name of the analysis
7analysis_name = "mafw analysis"
8analysis_description = "Using the DB"
9available_processors = ["AccumulatorProcessor", "GaussAdder", "ModifyLoopProcessor", "FillFileTableProcessor", "PrimeFinder"]
10
11[DBConfiguration]
12URL = "sqlite:///file-db.db" # Change the protocol depending on the DB type. Update this file to the path of your DB.
13
14[DBConfiguration.pragmas] # Leave these default values, unless you know what you are doing!
15journal_mode = "wal"
16cache_size = -64000
17foreign_keys = 1
18synchronous = 0
19
20[FillFileTableProcessor] # Processor to fill a table with the content of a directory
21root_folder = 'C:\Users\bulghao\PycharmProjects\mafw' # The root folder for the file listing
22
23[UserInterface] # Specify UI options
24interface = "rich" # Default "rich", backup "console"
If you look at the steering file, you will notice that there is a DBConfiguration section, where we define the most important variable, it is to say the DB URL. This is not only specifying where the database can be found, but also the actual implementation of the database. In this case, it will be a sqlite database located in the file file-db.db inside the current directory.
There is also an additional sub table, named pragmas, containing advanced options for the sqlite DB. Unless you really know what you are doing, otherwise, you should leave them as they are.
In the following Configuring other types of databases, we will cover the case you want to use another DB implementation different from SQLite.
In the FillFileTableProcessor you can find the standard configuration of its processor parameters.
Now we are really ready to run our first DB processor and with a bit of luck, you should get your DB created and filled.
How to check the content of a DB?
There are several tools serving this purpose. One of those is dbeaver that works with all kind of databases offering an open source community version that you can download and install.
Configuring other types of databases
In the previous example, we have seen how to configure a simple SQLite database. For this database, you just need to indicate in the URL field the path on the local disc where the database file is stored.
SQLite does not require any user name nor password and there are no other fields to be provided. Nevertheless, it is worth adding the previously mentioned pragmas section to assure the best functionality of peewee.
In the case of MySQL and PostgreSQL, the URL should point to the server where the database is running. This could be the localhost but also any other network destination. Along with the server destination, you need also to specify the port, the database name, the user name and the password to establish the connection.
Of course, it is not a good idea to write your database password as plain text in a steering file that might be shared among colleagues or even worse included in a Git repository. To avoid this security issue, it is recommended to follow some other authentication approach.
Both MySQL and PostgreSQL offers the possibility to store the password in a separate file, that, at least in linux, should have a very limited access right. Have a look at the exemplary steering files with the corresponding password files here below.
[DBConfiguration]
URL = "sqlite:///file-db.db" # change the filename to the absolute path of the db
[DBConfiguration.pragmas] # Leave these default values, unless you know what you are doing!
journal_mode = "wal"
cache_size = -64000
foreign_keys = 1
synchronous = 0
[DBConfiguration]
# Update the database server and the database name to reflect your configuration
URL = "postgresql://database-server:5432/database-name"
# change it to your username
user = 'username'
# if you want, you can leave the pragmas section from the SQLite default configuration because it
# want be used.
Instruction on how to create a PostgreSQL password file are provided here. This is an example:
database-server:5432:database-name:username:password
[DBConfiguration]
# Update the database server and the database name to reflect your configuration
URL = "mysql://database-server:3306/database-name"
# update to specify your username
user = 'username'
# update to specify the password file
read_default_file = '~/.my.cnf'
# if you want, you can leave the pragmas section from the SQLite default configuration because it
# want be used.
Instruction on how to create a MySQL password file are provided here. This is an example:
[client]
user=username
password=password
host=database-server
Triggers: when the database works on its own
In the next paragraphs we will spend a few minutes understanding the roles of Triggers. Those are database entities performing some actions in response of specific events. You can have, for example, a trigger that is inserting a row in TableB whenever a row is inserted in TableA. If you are not really familiar with triggers, this is a brief introduction.
Triggers are very handy for many applications, and in our tutorial we will see an interesting case, but they tend to struggle with ORM in general. In fact, no ORM system is natively supporting triggers. The reason is very simple. In an ORM, the application (the python code, if you wish) is the main actor and the database is just playing the role of the passive buddy. From the point of view of an ORM based application, if you want to have a trigger, then just write the needed lines of python code to have the actions performed in the other tables. It makes totally sense, you have only one active player and it simplifies the debugging because if something is going wrong, it can only be a fault of the application.
The standard implementation of trigger-like functions with ORM is to use signals, where you can have callbacks called before and after high level ORM APIs calls to the underlying database. Signals are good, but they are not free from disadvantages: at a first glance, they look like a neat solution, but as soon as the number of callbacks is growing, it may become difficult to follow a linear path in the application debugging. Second, if you do a change in the database from another application, like the dbeaver browser, then none of your codified triggers will be executed. Moreover in the case of Peewee, signals work only on Model instances, so all bulk inserts and updates are excluded.
Having triggers in the database would assure that irrespectively of the source of the change, they will always be executed, but as mentioned above, the user will have to be more careful in the debug because also the database is now playing an active role.
We let you decide what is the best solution. If you want to follow the pure ORM approach, then all models inheriting from MAFwBaseModel have the possibility to use signals. If you want to have triggers, you can also do so. An example for both approaches is shown here below.
The signal approach
As mentioned above, the signal approach is the favourite one if you plan to make all changes to the database only via your python code. If you are considering making changes also from other applications, then you should better use the trigger approach.
Another limitation is that only model instances emit signals. Everytime you use a classmethod of a Model, then no signals will be emitted.
The signal dispatching pattern functionality is achieved by linking the signal emitted by a sender in some specific circumstances to a handler that is receiving this signal and performing some additional operations (not necessarily database operations).
Every model class has five different signals:
pre_save: emitted just before that a model instance is saved;
post_save: emitted just after the saving of a model instance in the DB;
pre_delete: emitted just before deleting a model instance in the DB;
post_delete: emitted just after deleting a model instance from the DB;
pre_init: emitted just after the init method of the class is invoked. Note that the pre is actually a post in the case of init.
Let us try to understand how this works with the next example.
1class MyTable(MAFwBaseModel):
2 id_ = AutoField(primary_key=True)
3 integer = IntegerField()
4 float_num = FloatField()
5
6class TargetTable(MAFwBaseModel):
7 id_ = ForeignKeyField(MyTable, on_delete='cascade', primary_key=True, backref='half')
8 another_float_num = FloatField()
9
10 @post_save(sender=MyTable, name='my_table_after_save_handler')
11 def post_save_of_my_table(sender: type(MAFwBaseModel), instance: MAFwBaseModel, created: bool):
12 """
13 Handler for the post save signal.
14
15 The post_save decorator is taking care of making the connection.
16 The sender specified in the decorator argument is assuring that only signals generated from MyClass will be
17 dispatched to this handler.
18
19 The name in the decorator is optional and can be use if we want to disconnect the signal from the handler.
20
21 :param sender: The Model class sending this signal.
22 :type sender: type(Model)
23 :param instance: The actual instance sending the signal.
24 :type instance: Model
25 :param created: Bool flag if the instance has been created.
26 :type created: bool
27 """
28 TargetTable.insert({'id__id': instance.id, 'another_float_num': instance.float_num / 2}).execute()
29
30database: Database = SqliteDatabase(':memory:', pragmas=default_conf['sqlite']['pragmas'])
31database.connect()
32database_proxy.initialize(database)
33database.create_tables([MyTable, TargetTable], safe=True)
34
35MyTable.delete().execute()
36TargetTable.delete().execute()
37
38# insert a single row in MyTable with the save method.
39my_table = MyTable()
40my_table.integer = 20
41my_table.float_num = 32.16
42my_table.save()
43# after the save query is done, the signal mechanism will call the
44# post_save_trigger_of_my_table and perform an insert on the target
45# table as well.
46assert MyTable.select().count() == 1
47assert TargetTable.select().count() == 1
48
49# add some bulk data to MyTable
50data = []
51for i in range(100):
52 data.append(dict(integer=random.randint(i, 10 * i), float_num=random.gauss(i, 2 * i)))
53MyTable.insert_many(data).execute()
54# this is done via the Model class and not via a concrete instance of the Model, so no signals will be emitted.
55
56assert MyTable.select().count() == 101
57assert TargetTable.select().count() == 1
We created two tables linked via a foreign key. The goal is that everytime we fill in a row in MyTable, a row is
also added to TargetTable with the same id but where the value of another_float_num is just half of the original
float_num. The example is stupid, but it is good enough for our demonstration.
The signal part is coded in the lines from 11 to 28 (mainly doc strings). We use the post_save decorator to connect
MyTable to the post_save_of_my_table function where an insert in the TargetTable will be made.
The code is rather simple to follow. Just to be sure, we empty the two tables, then we create an instance of the MyTable model, to set the integer and the float_num column. When we save the new row, the post_save signal of MyTable is emitted and the handler is reacting by creating an entry in the TargetTable as well. In fact the number of rows of both tables are equal to 1.
What happens later is to demonstrate the weak point of signals. At line 53, we insert several rows via a
insert_many. It must be noted that the insert_many is a classmethod applied directly to the model class.
The consequence is that the signal handler will not be invoked and no extra rows will be added to the TargetTable.
The trigger approach
In order to use a trigger you need to create it. This is an entity that lives in the database, so you would need the database itself to create it.
MAFw is providing a Trigger class that helps you in creating the required SQL query that needed to be issued in order to create the trigger. Once it is created it will operate continuously.
If you have a look at the CREATE TRIGGER SQL command you will see that it starts with the definition of when the trigger is entering into play (BEFORE/AFTER) and which operation (INSERT/DELETE/UPDATE) of which table. Then there is a section enclosed by the BEGIN and END keywords, where you can have as many SQL queries as you like.
The same structure is reproduced in the Trigger class. In the constructor, we will pass the arguments related to the configuration of the trigger itself. Then you can add as many SQL statement as you wish.
1from mafw.db.db_model import Trigger
2
3new_trigger = Trigger('trigger_after_update', (TriggerWhen.After,
4 TriggerAction.Update), 'my_table', safe=False, for_each_row=False)
5new_trigger.add_sql('INSERT INTO another_table (col1, col2) VALUES (1, 2)')
6new_trigger.add_sql('INSERT INTO third_table (col1, col2) VALUES (2, 3)'))
7new_trigger.create()
1CREATE TRIGGER trigger_after_update
2AFTER UPDATE ON my_table
3
4BEGIN
5 INSERT INTO another_table (col1, col2) VALUES (1, 2);
6 INSERT INTO third_table (col1, col2) VALUES (2, 3);
7END;
Now let us have a look at how you can use this, following one of our test benches.
Standalone triggers
1def test_manually_created_trigger():
2 class MyTable(MAFwBaseModel):
3 id_ = AutoField(primary_key=True)
4 integer = IntegerField()
5 float_num = FloatField()
6
7 class TargetTable(MAFwBaseModel):
8 id_ = ForeignKeyField(MyTable, on_delete='cascade', primary_key=True, backref='half')
9 half_float_num = FloatField()
10
11 database: Database = SqliteDatabase(':memory:', pragmas=default_conf['sqlite']['pragmas'])
12 database.connect()
13 database_proxy.initialize(database)
14 database.create_tables([MyTable, TargetTable], safe=True)
15
16 MyTable.delete().execute()
17 TargetTable.delete().execute()
18
19 # manually create a trigger
20 trig = Trigger('mytable_after_insert', (TriggerWhen.After, TriggerAction.Insert), MyTable, safe=True)
21 trig.add_sql('INSERT INTO target_table (id__id, half_float_num) VALUES (NEW.id_, NEW.float_num / 2)')
22 database.execute_sql(trig.create())
23
24 # add some data for testing to the first table
25 data = []
26 for i in range(100):
27 data.append(dict(integer=random.randint(i, 10 * i), float_num=random.gauss(i, 2 * i)))
28 MyTable.insert_many(data).execute()
29
30 # check that the target table got the right entries
31 for row in MyTable.select(MyTable.float_num, TargetTable.half_float_num).join(TargetTable).namedtuples():
32 assert row.float_num == 2 * row.half_float_num
33
34 assert MyTable.select().count() == TargetTable.select().count()
In lines 20 - 22, we create a trigger and we ask the database to execute the generated SQL statement.
We insert 100 rows using the insert many class method and the trigger is doing its job in the background filling the other table. We can check that the values in the two tables are matching our expectations.
The drawback of this approach is that you may have triggers created all around your code, making your code a bit messy.
Model embedded triggers
An alternative approach is to define the trigger within the Model class, allowing it to be created simultaneously with model table. This is demonstrated in the code example below.
1# the trigger is directly defined in the class.
2class MyTable(MAFwBaseModel):
3 id_ = AutoField(primary_key=True)
4 integer = IntegerField()
5 float_num = FloatField()
6
7 @classmethod
8 def triggers(cls):
9 return [
10 Trigger('mytable_after_insert', (TriggerWhen.After, TriggerAction.Insert), cls, safe=True).add_sql(
11 'INSERT INTO target_table (id__id, half_float_num) VALUES (NEW.id_, NEW.float_num / 2)'
12 )
13 ]
14
15class TargetTable(MAFwBaseModel):
16 id_ = ForeignKeyField(MyTable, on_delete='cascade', primary_key=True, backref='half')
17 half_float_num = FloatField()
18
19database: Database = SqliteDatabase(':memory:', pragmas=default_conf['sqlite']['pragmas'])
20database.connect()
21database_proxy.initialize(database)
22database.create_tables([MyTable, TargetTable], safe=True)
This approach is much cleaner. The Trigger is stored directly in the Model (lines 8 - 13). In the specific case, the triggers method returned one trigger only, but you can return as many as you like. When the tables are created (line 22), all the triggers will also be created.
In the example above, you have written the SQL statement directly, but nobody is preventing you to use peewee queries for this purpose. See below, how exactly the same trigger might be re-written, using an insert statement:
1class MyTable(MAFwBaseModel):
2 id_ = AutoField(primary_key=True)
3 integer = IntegerField()
4 float_num = FloatField()
5
6 @classmethod
7 def triggers(cls):
8 trigger = Trigger('mytable_after_insert', (TriggerWhen.After, TriggerAction.Insert), cls, safe=True)
9 sql = TargetTable.insert(id_=SQL('NEW.id_'), half_float_num=SQL('NEW.float_num/2'))
10 trigger.add_sql(sql)
11 return [trigger]
12
13 class Meta:
14 depends_on = [TargetTable]
The key point here is at line 9, where the actual insert statement is generated by peewee (just for your information, you have generated the statement, but you have not execute it) and added to the existing trigger.
In the last two highlighted lines, we are overloading the Meta class, specifying that MyTable, depends on TargetTable, so that when the create_tables is issued, they are built in the right order. This is not necessary if you follow the previous approach because the trigger will be very likely executed only after that the tables have been created.
Warning
Even though starting from MAFw release v1.1.0, triggers are now properly generated for the three main database backends, its use has been deeply tested only with SQLite. For this reason, we (MAFw developers) encourage the user community to work also with other DBs and, in case, submit bugs or feature request.
Disabling triggers
Not all database implementations provide the same option to temporarily disable one or more triggers. In order to cope with this limitation, MAFw is providing a general solution that is always working independently of the concrete implementation of the database.
The standard SQL trigger definition allows to have one or more WHEN clauses [1], meaning that the firing of a trigger script might be limited to the case in which some other external conditions are met.
In order to achieve that, we use one of our standard tables, that are automatically created in every MAFw database.
This is the TriggerStatus table as you can see it in the snippet below:
class TriggerStatus(StandardTable):
"""A Model for the trigger status"""
trigger_type_id = AutoField(primary_key=True, help_text='Primary key')
trigger_type = TextField(
help_text='You can use it to specify the type (DELETE/INSERT/UPDATE) or the name of a specific trigger'
)
status = BooleanField(default=True, help_text='False (0) = disable / True (1) = enable')
# noinspection PyProtectedMember
@classmethod
def init(cls) -> None:
"""Resets all triggers to enable when the database connection is opened."""
data = [
dict(trigger_type_id=1, trigger_type='DELETE', status=True),
dict(trigger_type_id=2, trigger_type='INSERT', status=True),
dict(trigger_type_id=3, trigger_type='UPDATE', status=True),
dict(trigger_type_id=4, trigger_type='DELETE_FILES', status=True),
]
# this is used just to make mypy happy
# cls and meta_cls are exactly the same thing
meta_cls = cast(PeeweeModelWithMeta, cls)
db_proxy = meta_cls._meta.database
if isinstance(db_proxy, peewee.DatabaseProxy):
db = cast(peewee.Database, db_proxy.obj)
else:
db = cast(peewee.Database, db_proxy)
if isinstance(db, peewee.PostgresqlDatabase):
cls.insert_many(data).on_conflict(
'update', conflict_target=[cls.trigger_type_id], update={cls.status: True}
).execute()
else:
cls.insert_many(data).on_conflict_replace().execute()
You can use the trigger_type column to specify a generic family of triggers (DELETE/INSERT/UPDATE) or the name of a specific trigger. By default a trigger is active (status = 1), but you can easily disable it by changing its status to 0.
To use this functionality, the Trigger definition should include a WHEN clause as described in this modified model definition.
class MyTable(MAFwBaseModel):
id_ = AutoField(primary_key=True)
integer = IntegerField()
float_num = FloatField()
@classmethod
def triggers(cls):
return [
Trigger('mytable_after_insert', (TriggerWhen.After, TriggerAction.Insert), cls, safe=True)
.add_sql('INSERT INTO target_table (id__id, half_float_num) VALUES (NEW.id_, NEW.float_num / 2)')
.add_when('1 == (SELECT status FROM trigger_status WHERE trigger_type == "INSERT")')
]
To facilitate the temporary disabling of a specific trigger family, MAFw provides a special class
TriggerDisabler that can be easily used as a context manager in your code. This is an ultra simplified
snippet.
with TriggerDisabler(trigger_type_id = 1):
# do something without triggering the execution of any trigger of type 1
# in case of exceptions thrown within the block, the context manager is restoring
# the trigger status to 1.
Triggers on different databases
We have seen that Peewee provides an abstract interface that allows interaction with various SQL databases like MySQL, PostgreSQL, and SQLite.
This abstraction simplifies database operations by enabling the same codebase to work across different database backends, thanks to the common SQL language they all support. However, while these databases share SQL as their query language, they differ in how they handle certain features, such as triggers. Each database has its own peculiarities and syntax for defining and managing triggers, which can lead to inconsistencies when using a single approach across all databases.
To address this challenge, the MAFw introduced the TriggerDialect abstract class and three specific
implementations for the main databases. Relying on the use of the TriggerDialect class, a syntactically correct SQL
statement for the creation or removal of triggers is generated. But, MAFw cannot read the mind of the user (yet!) and
given the very different behaviour of the databases, the operation of the triggers will be different.
Have a look at the table below for an illustrative comparison on how triggers are handled by the different databases.
Feature |
MySQL |
PostgreSQL |
SQLite |
|---|---|---|---|
Trigger Event |
|
|
|
Trigger Time |
|
|
|
Activation |
Row-level only |
Row-level and statement-level |
Row-level and statement-level |
Implementation |
BEGIN-END block with SQL statements (supports non-standard SQL like SET statements) |
Functions written in PL/pgSQL, PL/Perl, PL/Python, etc. |
BEGIN-END block with SQL statements |
Trigger Per Event |
Multiple triggers allowed ordered by creation time |
Multiple triggers allowed ordered alphabetically by default, can be specified |
Multiple triggers allowed but unspecified execution order |
Privileges required |
TRIGGER privilege on the table and SUPER or SYSTEM_VARIABLES_ADMIN for DEFINER |
CREATE TRIGGER privilege on schema and TRIGGER privilege on table |
No specific privilege model |
Foreign Key Cascading |
Cascaded foreign key actions do not activate triggers |
Triggers are activated by cascaded foreign key actions |
Triggers are activated by cascaded foreign key actions |
Disabled/Enabled Trigger |
Yes, using ALTER TABLE … DISABLE/ENABLE TRIGGER |
Yes, using ALTER TABLE … DISABLE/ENABLE TRIGGER |
No direct mechanism to disable |
PostgreSQL offers the most comprehensive trigger functionality, with built-in support for both row-level and statement-level triggers, INSTEAD OF triggers for views, and the widest range of programming languages for implementation. Its trigger functions can be written in any supported procedural language, providing considerable flexibility.
MySQL implements triggers using SQL statements within BEGIN-END blocks and only supports row-level triggers. It allows non-standard SQL statements like SET within trigger bodies, making it somewhat more flexible for certain operations. A critical limitation is that MySQL triggers are not activated by cascaded foreign key actions, unlike the other databases. This is a strong limiting factor and the user should consider it when designing the database model to store their data. In this case, it might be convenient to not rely at all on the cascading operations, but to have dedicated triggers for this purpose.
SQLite provides more trigger capabilities than it might initially appear. While its implementation is simpler than PostgreSQL’s, it supports both row-level and statement-level triggers (statement-level being the default if FOR EACH ROW is not specified). Like PostgreSQL, SQLite triggers are activated by cascaded foreign key actions, which creates an important behavioral difference compared to MySQL.
When designing database applications that may need to work across different database systems, these implementation differences can lead to subtle bugs, especially around foreign key cascading behavior. MySQL applications that rely on triggers not firing during cascaded operations might behave differently when migrated to PostgreSQL or SQLite. Similarly, applications that depend on statement-level triggers will need to be redesigned when moving from PostgreSQL or SQLite to MySQL.
All so said, even though MAFw provides a way to handle triggers creation and removal in the same way across all the databases, the user who wants to move from one DB implementation to the other should carefully review the content of the trigger body to ensure that the resulting behavior is what is expected.
Standard tables
In the previous section, we discussed a workaround implemented by MAFw to address the limitations of database backends that cannot temporarily disable trigger execution. This is achieved querying a table where the status of a specific trigger or a family of triggers can be toggled from active to inactive and vice-versa.
This TriggerStatus model is one of the so-called MAFw standard tables, it is to say models that will be silently created by the execution of a Processor. In other words, as soon as you connect to a database using the MAFw infrastructure, all those tables will be created so that all processors can benefit from them.
Along with the TriggerStatus, there are two other relevant standard tables: the OrphanFile and the PlotterOutput.
OrphanFile: the house of files without a rowThis table can be profitably used in conjunction with Triggers. The user can define a trigger fired when a row in a table is deleted. The trigger will then insert all file references contained in the deleted row into the OrphanFile model.
The next time a processor (it does not matter which one) having access to the database is executed, it will query the full list of files from the
OrphanFileand remove them.This procedure is needed to avoid having files on your disc without a reference in the database. It is kind of a complementary cleaning up with respect to
another functionyou will discover in a moment.Additional details about this function are provided directly in the
API.
PlotterOutput: where all figures go.
Plottersare specialProcessorsubclasses with the goal of generating a graphical representation of some data you have produced in a previous step.The output of a plotter is in many cases one or more figure files and instead of having to define a specific table to store just one line, MAFw is providing a common table for all plotters where they can store the reference to their output files linked to the plotter name.
It is very useful because it allows the user to skip the execution of a plotter if its output file already exists on disc.
Triggers are again very relevant, because when a change is made in the data used to generate a plotter output, then the corresponding rows in this table should also be removed, in order to force the regeneration of the output figures with the updated data.
The role of those tables is to support the functionality of the general infrastructure and not of a single processor. If your processor needs a specific table, then it is its responsibility to create it. If all your processors need to have access to certain tables, then you may consider having them added to the standard tables.
In this case, you can follow the same plugin approach you used for sharing your processors.
First, you need to create the model classes, possibly inheriting from StandardTable and then you need to export it in the plugin.py module.
Here below is an example:
from peewee import AutoField, TextField
from mafw.db.std_tables import StandardTable
class AnotherExtraTable(StandardTable):
main_id = AutoField(primary_key=True)
interesting_field = TextField()
@classmethod
def init(cls)
# implement here some optional initialisation code.
# it will be performed everytime the connection to the database is opened from MAFw.
pass
import mafw
import my_package.my_std_table
@mafw.mafw_hookimpl
def register_standard_tables() -> list:
return [my_package.my_std_table.AnotherExtraTable]
# your standard pyproject goes here
# be sure to add this entry point
[project.entry-points.'mafw']
my_package = 'my_package.plugins'
Note
Keep in mind, that your extra standard tables will be added only when your processor will be executed by the mafw executable, that means that if you write your own main script to execute a processor or a processor list, this will not load the plugin imported tables.
Custom fields
We have seen in a previous section that there are plenty of field types for you to build up your model classes and that it is also possible to add additional ones. We have made a few for you that are very useful from the point of view of MAFw. The full list is available here.
The role of the database in MAFw is to support the input / output operation. You do not need to worry about specifying filenames or paths. Simply instruct the database to retrieve a list of items, and it will automatically provide the various processors with the necessary file paths for analysis.
With this in mind, we have created a FileNameField, that is the evolution of a text field accepting a Path object as a python type and converting it into a string for database storage. On top of FileNameField, we have made FileNameListField that can contain a list of filenames. This second one is more appropriate when your processor is generating a group of files as output. The filenames are stored in the database as a ‘;’ separated string, and they are seen by the python application as a list of Path objects.
Similarly, we have also a FileChecksumField to store the string of hexadecimal characters corresponding to the checksum of a file (or a list of files). From the python side, you can assign either the checksum directly, as generated for example by file_checksum() or the path to the file, and the field will calculate the checksum automatically.
The FileNameField and FileNameListField accept an additional argument in their constructor, called checksum_field. If you set it to the name of a FileChecksumField in the same table, then you do not even have to set the value of the checksum field because this will be automatically calculated when the row is saved.
With these custom fields in mind, our initial definition of a File table, can be re-factored as follows:
1from peewee import AutoField
2
3from mafw.db.db_model import MAFwBaseModel
4from mafw.db.fields import FileNameField, FileChecksumField
5
6class File(MAFwBaseModel):
7 file_id = AutoField(primary_key=True, help_text='The primary key')
8 file_name = FileNameField(checksum_field='file_digest', help_text='The full filename')
9 file_digest = FileChecksumField(help_text='The hex digest of the file')
Pay attention at the definition of the file_name field. The FileNameField constructor takes an optional parameter checksum_field that is actually pointing to the variable of the FileChecksumField.
You can use the two custom fields as normal, for example you can do:
1new_file = File()
2# you can assign a Path object.
3new_file.file_name = Path('/path/to/some/file')
4# the real checksum will be calculated automatically.
5# this next line is totally optional, you can leave it out and it will work in the same way.
6new_file.file_digest = Path('/path/to/some/file')
The super power of these two custom fields is that you can remove useless rows from the database, just issuing one command.
Removing widow rows
Due to its I/O support, the database content should always remain aligned with the files on your disc. If you have a row in your database pointing to a missing file, this may cause troubles, because sooner or later, you will try to access this missing file causing an application crash.
In MAFw nomenclature, those rows are called widows, following a similar concept in typesetting, because they are a fully valid database entry, but their data counter part on disc disappeared.
To avoid any problem with widow rows, MAFw is supplying a function that the processor can invoke in the start method on the Model classes used as input:
class MyProcessor(Processor):
def start():
super().start()
remove_widow_db_rows(InputData)
The remove_widow_db_rows() will check that all the FileNameField fields in the table are pointing to existing files on disc. If not, then the row is removed from the database.
The function is not automatically called by any of the Processor super methods. It is up to the user to decide if and when to use it. Its recommended use is in the overload of the start() method or as a first action in the get_items() in the case of a for loop workflow, so that you are sure to re-generate the rows that have been removed.
Pruning orphan files
The opposite situation is when you have a file on disc that is not linked to an entry in the database anymore. This situation could be even more perilous than the previous one and may occur more frequently than you realize. The consequences of this mismatch can be severe, imagine that during the testing / development phase of your Processor you generate an output figure saved on disc. You then realize that the plot is wrong and you fix the bug and update the DB, but for some reasons you have forgotten to delete the figure file from the disc. Afterwards, while looking for the processor output, you find this file and believe it is a valid result and you use it for your publication. In order to prevent this to happen, you just have to follow some simple rules, and then the reliable machinery of MAFw will do the rest.
The key point is to use a specific trigger in every table that has a file name field. This trigger has to react before any delete query on such a table and inserting all FileNameFields or FileNameListFields in the OrphanFile table. You will see an example of such a trigger in the next paragraphs. This standard tables will be queried by the next processor being executed and during the start super method, all files in the Orphan table will be removed from the disc.
Let us try to understand this better with a step-by-step example. For simplicity, we have removed the import statements from the code snippet, but it should not be too difficult to understand the code anyway.
We begin with the declaration of our input model:
class File(MAFwBaseModel):
file_id = AutoField(primary_key=True, help_text='primary key')
file_name = FileNameField(checksum_field='check_sum', help_text='the file name')
check_sum = FileChecksumField(help_text='checksum')
@classmethod
def triggers(cls) -> list[Trigger]:
file_delete_file = Trigger(
'file_delete_file',
(TriggerWhen.Before, TriggerAction.Delete),
source_table=cls,
safe=True,
for_each_row=True,
)
file_delete_file.add_when('1 == (SELECT status FROM trigger_status WHERE trigger_type = "DELETE_FILES")')
file_delete_file.add_sql(OrphanFile.insert(filenames=SQL('OLD.file_name'), checksum=SQL('OLD.file_name')))
return [file_delete_file]
class Meta:
depends_on = [OrphanFile]
Here you see the trigger definition: it is a before delete type and when triggered it is adding the filename field to the OrphanFile table. It is important to notice that this trigger has a when condition and will only be executed when the trigger type DELETE_FILES is enabled. This is necessary for the pruning mechanism to work, just copy this line in your trigger definition.
And now let us define some fake processors. First we import some files into our model, then we remove some rows from the file table and finally other two processors, doing nothing but useful to demonstrate the effect of the orphan removal.
@database_required
class FileImporter(Processor):
input_folder = ActiveParameter('input_folder', default=Path.cwd(), help_doc='From where to import')
def __init__(self, *args, **kwargs):
super().__init__(*args, looper=LoopType.SingleLoop, **kwargs)
self.n_files: int = -1
def start(self):
super().start()
self.database.create_tables([File])
File.delete().execute()
def process(self):
data = [(f, f) for f in self.input_folder.glob('**/*dat')]
File.insert_many(data, fields=['file_name', 'check_sum']).execute()
self.n_files = len(data)
def finish(self):
super().finish()
if File.select().count() != self.n_files:
self.processor_exit_status = ProcessorExitStatus.Failed
@database_required
class RowRemover(Processor):
n_rows = ActiveParameter('n_rows', default=0, help_doc='How many rows to be removed')
def __init__(self, *args, **kwargs):
super().__init__(*args, looper=LoopType.SingleLoop, **kwargs)
self.n_initial = 0
def start(self):
super().start()
self.database.create_tables([File])
def process(self):
self.n_initial = File.select().count()
query = File.select().order_by(fn.Random()).limit(self.n_rows).execute()
ids = [q.file_id for q in query]
File.delete().where(File.file_id.in_(ids)).execute()
def finish(self):
super().finish()
if File.select().count() != self.n_initial - self.n_rows or OrphanFile.select().count() != self.n_rows:
self.processor_exit_status = ProcessorExitStatus.Failed
@orphan_protector
@database_required
class OrphanProtector(Processor):
def __init__(self, *args, **kwargs):
super().__init__(looper=LoopType.SingleLoop, *args, **kwargs)
self.n_orphan = 0
def start(self):
self.n_orphan = OrphanFile.select().count()
super().start()
def finish(self):
super().finish()
if OrphanFile.select().count() != self.n_orphan:
self.processor_exit_status = ProcessorExitStatus.Failed
@single_loop
class LazyProcessor(Processor):
def finish(self):
super().finish()
if OrphanFile.select().count() != 0:
self.processor_exit_status = ProcessorExitStatus.Failed
The FileImporter [2] is very simple, it reads all dat files in a directory and loads them in the File model along with their checksum. Just to be sure we empty the File model in the start and in the finish we check that the number of rows in File is the same as the number of files in the folder.
The RowRemover is getting an integer number of rows to be removed. Even though the File model is already created, it is a good practice to repeat the statement again in the start method. Then we select some random rows from File and we delete them. At this point, we have created some orphan files on disc without related rows in the DB. Finally (in the finish method), we verify that the number of remaining rows in the database aligns with our expectations. Additionally, we ensure that the trigger functioned correctly, resulting in the appropriate rows being added to the OrphanFile model.
The OrphanProtector does even less than the others. But if you look carefully, you will see that along with the database_required() there is also the orphan_protector() decorator. This will prevent the processor to perform the check on the OrphanFile model and deleting the unrelated files.
In the start method, we record the number of orphan files in the OrphanFile model and we confirm that they are still there in the finish. Since the actual removal of the orphan files happens in the processor start method, we need to count the number of orphans before calling the super start.
The LazyProcessor is responsible to check that there are no rows left in the OrphanFile, meaning that the removal was successful.
And now let us put everything together and run it.
db_conf = default_conf['sqlite']
db_conf['URL'] = 'sqlite:///:memory:'
plist = ProcessorList(name='Orphan test', description='dealing with orphan files', database_conf=db_conf)
importer = FileImporter(input_folder=tmp_path)
remover = RowRemover(n_rows=n_delete)
protector = OrphanProtector()
lazy = LazyProcessor()
plist.extend([importer, remover, protector, lazy])
plist.execute()
assert importer.processor_exit_status == ProcessorExitStatus.Successful
assert remover.processor_exit_status == ProcessorExitStatus.Successful
assert protector.processor_exit_status == ProcessorExitStatus.Successful
assert lazy.processor_exit_status == ProcessorExitStatus.Successful
In practice, the only thing you have to take care of is to add a dedicated trigger to each of your tables having at least a file field and then the rest will be automatically performed by MAFw.
Warning
You should be very careful if your processor is removing rows from the target table (where you should be storing the processor’s results). This might be the case of a processor that wants to reset the status of your analysis to a previous step, for example. In this case, as soon as ProcessorA removes the rows from the model, the trigger will inserts all FileNameFields in the OrphanFile model in order to be deleted. This is a lazy operation and will be performed by the following processor to be executed either in the same pipeline or in the next. When ProcessorA will have finished its work, the target table will be repopulated and the same will happen to the folders on the disc. Now the next processor will empty the orphan file model and possibly remove the freshly generated files.
You have two solutions for this problem: either you block the execution of the trigger when deleting the rows (you can use the TriggerDisabler for this purpose), in this way the rows in the model will be removed, but not the files from disc with the risks we have already mentioned. The second possibility is to force the processor to immediately take care of the orphan file pruning. This is the suggested procedure and you only need to include a call to the _remove_orphan_files() soon after the delete query.
Keeping the entries updated
One aspect is that the file exists; another is that the file content remains unchanged. You may have replaced an input file with a newer one and the database will not know it. If your processors are only executed on items for which there is still no output generated, then this replaced file may go unnoticed causing issues to your analysis.
For this reason, we are strongly recommending to always add a checksum field for each file field in your table. Calculating a checksum is just a matter of a split second on modern CPU while the time for the debugging your analysis code is for sure longer.
The function verify_checksum() takes a Model as argument and will verify that all checksums are still valid. In other words, for each FileNameField (or FileNameListField) with a link to a checksum field in the table, the function will compare the actual digest with the stored one. If it is different, then the DB row will be removed.
Also this function is not automatically called by any processor super methods. It is ultimately the user’s responsibility to decide whether to proceed, bearing in mind that working with long tables and large files may result in delays in processor execution.
The implementation is very similar to the previous one, just change the function name. Keep in mind that the verify_checksum() will implicitly check for the existence of files and warn you if some items are missing, so you can avoid the remove_widow_db_rows(), if you perform the checksum verification.
class MyProcessor(Processor):
def start():
super().start()
verify_checksum(InputData)
Filters: let us do only what is needed to be done!
As mentioned already several times, the main role of the database is to support the processor execution in providing the input items and in storing the output products. Not everything can be efficiently stored in a database, for example large chunk of binary data are better saved to the disc, in this case, the database will know the path where these data are stored.
One advantage of the database is that you can apply selection rules and you do not have to process the whole dataset if you do not need it. To help you in this, MAFw is offering you a ready-to-use solution, the Filter class. This is an object that you can configure via the steering file allowing you to run the processors only over the items passing the criteria you set.
How to configure a filter
In a steering file, there is a table for each processor where you can set the configurable active and passive parameters. To this table, you can add a sub-table named Filter, containing other tables, one for each input Model. This is how it will look like:
[GlobalFilter]
new_only = true
[MyProcessor]
param1 = 15
[MyProcessor.Filter.InputTable1]
resolution_value = 25
[MyProcessor.Filter.InputTable2]
sample_name = 'sample_1'
In the example above, MyProcessor has two *.Filter.* tables, one for InputTable1 and one for InputTable2. When the steering file will be parsed, the processor constructor will automatically generate two filters: for InputerTable1 it will put a condition that the resolution field must be 25 and for InputTable2, the sample_name column should be ‘sample_1’. If MyProcessor is using other tables for generating the input items, you could add them in the same way.
There is also an additional table named GlobalFilter where you can specify conditions that will be applied by default to all processors being executed. If the same field is specified in both the GlobalFilter and the Processor/Model specific one, the one in the Processor/Model will overrule the global one.
For the global filter, we can specify a magic boolean keyword new_only == true [3]. This will allow to implement a different input sequence considering only items for which an output does not exist.
Explicit operation configuration
In the previous example we have seen how to select one specific field to be exactly equal to a given value, but maybe our goal is to select an interval, or performing a bitwise logical operation. The filter system also supports explicit operation specification, allowing you to define the exact logical operation to be applied. Here is an example:
[MyProcessor.Filter.InputTable1]
# Traditional format (automatic operation based on value type)
resolution_value = 25
sample_name = 'sample_*'
# Explicit operation format
flags = { op = "BIT_AND", value = 5 }
score = { op = ">=", value = 75.0 }
category = { op = "IN", value = ["A", "B", "C"] }
date_range = { op = "BETWEEN", value = ["2024-01-01", "2024-12-31"] }
active_field = { op = "IS_NOT_NULL", value = 'null' } # value can be whatever you want
The supported operations include:
Operation |
Description |
Example |
|---|---|---|
== |
Equal |
Field == 42 |
!= |
Not equal |
Field != 42 |
< |
Less than |
Field < 100 |
<= |
Less than/equal |
Field <= 100 |
> |
Greater than |
Field > 0 |
> |
Greater than/equal |
Field >= 10 |
GLOB |
Pattern matching |
Field GLOB ‘test*’ |
LIKE |
SQL LIKE |
Field LIKE ‘test%’ |
REGEXP |
Regular expression |
Field REGEXP ‘^[A-Z]’ |
IN |
Value in list |
Field IN [1, 2, 3] |
NOT_IN |
Value not in list |
Field NOT_IN [1, 2] |
BETWEEN |
Value between |
Field BETWEEN [1, 10] |
BIT_AND |
Bitwise AND |
Field & 5 != 0 |
BIT_OR |
Bitwise OR |
Field | 7 != 0 |
IS_NULL |
Field is NULL |
Field IS NULL |
IS_NOT_NULL |
Field is not NULL |
Field IS NOT NULL |
Note
The default sqlite library provides only an abstract definition of the regular expression matching. In simple words, it means that the user needs to implement the user function regexp(), or using any sqlite extensions that implements it.
In summary, if you are using the vanilla sqlite, you cannot use the REGEXP operator in your filter and you need to reformulate your filtering condition using a combination of other string matching tools.
How to use a filter
Let us put in practice what we have seen so far. The filter native playground is in the implementation of the get_items() method.
Let us assume that our Processor, named AdvProcessor, is using three models to obtain the item lists. Everything is nicely described in the ERD below. The three models are interconnected via foreign key relations. There is a fourth model, that is where the output data will be saved.
Fig. 2 The ERD of the AdvProcessor.
The real core of our database is the image table, where our data are first introduced, the other two on the right, are kind of helper tables storing references to the samples and to the resolution of our images. The processed_image table is where the output of our AdvProcessor will be stored.
To realize this database with our ORM we need to code the corresponding model classes as follows:
1class Sample(MAFwBaseModel):
2 sample_id = AutoField(primary_key=True, help_text='The sample id primary key')
3 sample_name = TextField(help_text='The sample name')
4
5class Resolution(MAFwBaseModel):
6 resolution_id = AutoField(primary_key=True, help_text='The resolution id primary key')
7 resolution_value = FloatField(help_text='The resolution in µm')
8
9class Image(MAFwBaseModel):
10 image_id = AutoField(primary_key=True, help_text='The image id primary key')
11 filename = FileNameField(help_text='The filename of the image', checksum_field='checksum')
12 checksum = FileChecksumField(help_text='The checksum of the input file')
13 experiment = IntegerField(default=1,
14 help_text='A flag for selection of the experiment. Flags are bitwise combinable')
15 category = TextField(
16 help_text='A text string to describe the image category. Accepted values are: "STANDARD", "SUT", "REFERENCE"')
17 sample = ForeignKeyField(
18 Sample, Sample.sample_id, on_delete='CASCADE', backref='sample', column_name='sample_id'
19 )
20 resolution = ForeignKeyField(
21 Resolution, Resolution.resolution_id, on_delete='CASCADE', backref='resolution', column_name='resolution_id'
22 )
23
24class ProcessedImage(MAFwBaseModel):
25 image = ForeignKeyField(
26 Image,
27 Image.image_id,
28 primary_key=True,
29 column_name='image_id',
30 backref='raw',
31 help_text='The image id, foreign key and primary',
32 on_delete='CASCADE',
33 )
34 value = FloatField(default=0)
By now, you should be an expert in ORM and everything there should be absolutely clear, otherwise, take your chance now to go back to the previous sections or to the peewee documentation to find an explanation. Note how the Image class is making use of our FileNameField and FileChecksumField. We added also a bit of help text to each field, in order to make even more evident what they are.
Particularly interesting is the experiment field in the Image model. This is a binary flag and can be very useful to assign one file (an image in this case) to one or more experiments. For example, imagine you have three different experiments in your data analysis; you assign to the first experiment the label 1 (binary: 0b1), to the second the label 2 (0b10) and to the third the label 4 (0b100). Now, if you want an image to be used only for experiment 1, you set the experiment column to 1; similarly if you want an image to be part of experiment 1 and 3, then you set its experiment column to 1 + 4 = 5 (b101). In fact, if you bit-wise AND, this image with the label of the experiments (5 BIT_AND 1 = 5 BIT_AND 4 = True) you will get a True value.
For each foreign key field, we have specified a backref field, so that you could get access to the related models. Pay attention also at the highlighted lines, where we define foreign key fields to other tables. Peewee follows Django style references, so actually the field object is named with the noun of the object you are referring to. This will allow the following:
image.sample # Resolve the related object returning a Sample instance, it costs an additional query
image.sample_id # Return the corresponding Sample's id number
The primary source of input is the Image; however, you may wish to process only images that meet specific criteria, such as belonging to a particular sample or being captured at a certain resolution. Unfortunately, this information is not explicitly included in the Image model. Only the resolution_id and the sample_id are included in the image table: those primary keys are excellent for a computer, but for a human being it is better to use sample names and resolution values. The solution is to use a join query in order to have all fields available and then we will be able to apply the configurable filters from the TOML steering file to limit the selection to what we want. We have three input models, so we may have up to three filters defined in the configuration file, along with the GlobalFilter. The processor, during its construction, will build the filters with the configuration information and store them in the processor filter register.
Let us have a look at a possible implementation of the get_items() for our advanced processor.
1def get_items(self):
2
3 # first of all, let us be sure that the tables exist
4 # the order is irrelevant, the ORM will find the best creation strategy.
5 # if the table already exists, nothing will happen.
6 self.database.create_tables([Sample, Resolution, Image, ProcessedImage])
7
8 # if you want to remove widow rows from the output table or verify the checksum do it now!
9 remove_widow_rows([Image, ProcessedImage])
10
11 # bind all input table filters
12 # this will establish a physical connection between the filter and
13 # the corresponding ORM model.
14 # the order of the model in the list is irrelevant.
15 # list all input models.
16 self.filter_register.bind_all([Sample, Resolution, Image])
17
18 # let us get a list of all already processed items.
19 # since the item in the output table are stored using the same primary key,
20 # this will simplify the discrimination between new and already processed items.
21 existing_entries = ProcessedImage.select(ProcessedImage.image_id).execute()
22
23 # did we select new_only in the global filter?
24 # if yes, prepare an additional condition in which we specify that the
25 # Image.image_id should not be among the image_id of the ProcessedImage.
26 # if no, then just accept everything.
27 if self.filter_register.new_only:
28 existing = ~Image.image_id.in_([i.image_id for i in existing_entries])
29 else:
30 existing = True
31
32 # finally let us make the query.
33 query = (Image.select(Image, Sample, Resolution)
34 .join(Sample, on=(Image.sample_id == Sample.sample_id), attr='s')
35 .switch(Image)
36 .join(Resolution, on=(Image.resolution_id == Resolution.resolution_id), attr='r')
37 .where(self.filter_register.filter_all())
38 .where(existing)
39 )
40 return query
1[GlobalFilter]
2new_only = true
3
4[AdvProcessor.Filter.Sample]
5sample_name = 'Sample_0000[123]'
6
7[AdvProcessor.Filter.Resolution]
8resolution_value = 50
9
10[AdvProcessor.Filter.Image]
11filename = '\*file_0[12345]0\*'
1[GlobalFilter]
2new_only = true
3
4[AdvProcessor.Filter.Sample]
5# Simplified format
6sample_name = 'Sample_0000[123]'
7
8[AdvProcessor.Filter.Resolution]
9# Explicit operation with comparison
10resolution_value = { op = ">=", value = 25.0 }
11
12[AdvProcessor.Filter.Image]
13# Mix of simplified and explicit operations
14filename = '\*file_0[12345]0\*'
15experiment = { op = "BIT_AND", value = 5 }
16category = { op = "IN", value = ["STANDARD", "REFERENCE"] }
The comments in the code make it rather self explanatory, but let us have a look together at the key points.
At line 16, we are binding the filters in the filter register with the model classes. The filter register contains all the filters that were found in the configuration file, and since their presence in the steering file is optional, you may have decided not to include all model specific filters. In such a case, the
FilterRegister()will create an empty filter for the missing models using the global filter as default values.At line 21, we query the output table to get the list of already existing output. Since the input and output images are sharing the same primary key, this will make the identification of new items very easy. If the DB structure was not built in this way, you will have to work out a solution to find which are the new items.
The list of items is finally retrieved in the lines 33 to 39. We select not only from Image, but we join also Sample and Resolution. In the join statement we specify the target Model and the relation to align the two models. Theoretically, if the two models are linked by a foreign key relation, then the
onattribute could be skipped, but we like when everything is clearly stated. We will come back to theattrin a second.Lines 37 and 38 is where the filter gets into action, exactly in the where clause. First we ask at the
filter_registerto generate the filter statement for each of the filters it is holding and then we select according to the existence criteria.
For the simplified configuration example (click on the “TOML (simplified)” tab), the filter_all condition will be equivalent of writing:
(Sample.sample_name % 'Sample_0000[123]') &
(Resolution.resolution_value == 50) &
(Image.filename % '\*file_0[12345]0\*')
where the % operator [4] is the SQL GLOB.
For the enhanced configuration example (click on the “TOML (Enhanced)” tab), the filter_all condition will be equivalent of writing:
(Sample.sample_name % 'Sample_0000[123]') &
(Resolution.resolution_value >= 25.0) &
(Image.filename % '\*file_0[12345]0\*') &
(Image.experiment.bin_and(5) != 0) &
(Image.category.in_(["STANDARD", "REFERENCE"]))
This demonstrates how you can achieve precise filtering control using both simple and explicit function definition.
In the last line, we return the query, that is an iterable object, so actually our looping processor will assign to the item attribute the value of one selected and filtered row after the other. The item will then be a modified version of the Image class. You will be able to access the image table columns directly using the dot notation, and, at the same time, you can access the two joined tables via the two attr fields, that you have specified in the join statement.
So for example, this will be a valid syntax:
print(self.item.filename.name) # print the file name (without the path) of the image
print(self.item.s.sample_name) # print the sample name, look at the .s in between
print(self.item.r.resolution_value) # print the value of the resolution. look at the .r in between
Avoiding the N+1 problem
Since the Image table has foreign key dependency from Sample and Resolution, you could theoretically access the referenced row via the backref attribute, but this will trigger an additional query every time. The impact of those additional queries can be excessive especially if the tables are large. This is the so-called N+1 problem in ORM. On the contrary, if you use the s and r attributes, no additional queries will be performed.
New only flag and processor parameters
In the previous paragraph we have seen a very useful feature of MAFw and the filters: it allows to go through the whole pipelines, but processing only the items for each the output is missing. This is normally achieved comparing the primary keys in the input and output tables.
Let’s now consider another common scenario. You have implemented a Processor that is responsible to perform a gaussian filtering of your data and for this you have defined gauss_sigma as an ActiveParameter. You run the pipeline for the first time and your GaussFilter is doing its job using gauss_sigma = 3, but then you realize that 3 is probably too much and you want to lower it down to say 2. You change the steering file and you re-run it and very likely nothing will happen. The reason is that if you have implemented the new_only filter in the get_items() as shown before, the output table is already containing all the filtered items from the input table.
A trick to force the re-processing is to delete the item manually from the database or one of its output files (if any and if you have included the remove_widow_db_rows() or verify_checksum(), but this is not really comfortable. The most elegant solution is to include a column in the output table to store the value of gauss_sigma and then adding a where condition in the query looking for the existing items.
Look at this query snippet example:
def get_items(self):
# add here all your checks, table creations, filter bindings
if self.filter_register.new_only:
query = (
Image.select(OutputTable.input_id)
.where(OutputTable.gauss_sigma == self.gauss_sigma)
)
existing = ~InputTable.input_id.in_([row.input_id for row in query])
else:
existing = True
items = (InputTable.select(InputTable)
.where(self.filter_register.filter_all())
.where(existing)
)
return items
This approach is very handy, because it allows to link the entries in the database with the parameters set in the steering file, but it must be used with care, because changing a parameter will trigger the reprocessing of all entries while you might be thinking that this will only apply to the added items only.
Multi-primary key columns
Special attention is required when you need to have a primary key that is spanning over two or more columns of your model. So far we have seen how we can identify one column in the model as the primary key and now we will see what to do if you want to use more than one column as primary key and, even more important, how you can use this composite primary key as a foreign key in another model.
To describe this topic, we will make use of an example that you can also find in the examples modules of MAFw named multi_primary.
Let us start with the model definition.
1class Sample(MAFwBaseModel):
2 sample_id = AutoField(primary_key=True, help_text='The sample id primary key')
3 sample_name = TextField(help_text='The sample name')
4
5class Resolution(MAFwBaseModel):
6 resolution_id = AutoField(primary_key=True, help_text='The resolution id primary key')
7 resolution_value = FloatField(help_text='The resolution in µm')
8
9class Image(MAFwBaseModel):
10 image_id = AutoField(primary_key=True, help_text='The image id primary key')
11 filename = FileNameField(help_text='The filename of the image', checksum_field='checksum')
12 checksum = FileChecksumField()
13 sample = ForeignKeyField(Sample, on_delete='CASCADE', backref='+', lazy_load=False, column_name='sample_id')
14 resolution = ForeignKeyField(
15 Resolution, on_delete='CASCADE', backref='+', lazy_load=False, column_name='resolution_id'
16 )
17
18class ProcessedImage(MAFwBaseModel):
19 image = ForeignKeyField(
20 Image,
21 primary_key=True,
22 backref='+',
23 help_text='The image id, foreign key and primary',
24 on_delete='CASCADE',
25 lazy_load=False,
26 column_name='image_id',
27 )
28 value = FloatField(default=0)
29
30class CalibrationMethod(MAFwBaseModel):
31 method_id = AutoField(primary_key=True, help_text='The primary key for the calculation method')
32 multiplier = FloatField(default=1.0, help_text='The multiplication factor of this method')
33
34class CalibratedImage(MAFwBaseModel):
35 image = ForeignKeyField(
36 ProcessedImage,
37 on_delete='CASCADE',
38 help_text='The reference to the processed image',
39 backref='+',
40 lazy_load=False,
41 column_name='image_id',
42 )
43 method = ForeignKeyField(
44 CalibrationMethod,
45 on_delete='CASCADE',
46 help_text='The reference to the calibration method',
47 backref='+',
48 lazy_load=False,
49 column_name='method_id',
50 )
51 calibrated_value = FloatField(default=0.0, help_text='The calibrated value')
52
53 @property
54 def primary_key(self) -> Iterable:
55 return self.image_id, self.method_id
56
57 class Meta:
58 primary_key = CompositeKey('image', 'method')
59
60class ColoredImage(MAFwBaseModel):
61 image_id = IntegerField(help_text='The reference to the processed image. Combined FK with method_id')
62 method_id = IntegerField(help_text='The reference to the calibration method. Combined FK with method_id')
63 red = FloatField(default=0, help_text='Fake red. Only for testing')
64 green = FloatField(default=0, help_text='Fake green. Only for testing')
65 blue = FloatField(default=0, help_text='Fake blue. Only for testing')
66
67 @property
68 def primary_key(self) -> Iterable:
69 return self.image_id, self.method_id
70
71 class Meta:
72 constraints = [
73 SQL(
74 'FOREIGN KEY (image_id, method_id) REFERENCES '
75 'calibrated_image(image_id, method_id) ON DELETE CASCADE'
76 )
77 ]
78 primary_key = CompositeKey('image_id', 'method_id')
79
As always, one single picture can convey more than a thousand lines of code. Here below the ERDs of Image and of CalibratedImage.
Fig. 3 The ERD of the Image Model
Fig. 4 The ERD of the CalibratedImage Model
In the diagrams, the fields with bold font represent primary keys, also highlighted by the separation line, while the arrow are the standard relation.
As in the examples above, we have images of different samples acquired with different resolutions entering the Image model. We use those lines to make some calculations and we obtain the rows in the ProcessedImage model. These two tables are in 1 to 1 relation and this relation is enforced with a delete cascade, meaning that if we delete an element in the Image model, the corresponding one in the ProcessedImage will also be deleted.
The CalibrationMethod model contains different sets of calibration constants to bring each row from the ProcessedImage model to the CalibratedImage one. It is natural to assume that the image_id and the method_id are the best candidates to be a combined primary key.
To achieve this, in the CalibratedImage model, we need to add (line 57-58) an overload of the Meta class, where we specify our CompositeKey. Pay attention to an important detail: the CompositeKey constructor takes the name of the fields and not the name of the columns, that in the case of foreign keys differ of ‘_id’. Optionally we can also define a primary_key property (line 53-55) to quickly retrieve the values of our keys.
From the application point of view, we want all the processed images to be calibrated with all possible calibration methods, that means we need to make a cross join as described below:
1with database.atomic() as txn:
2 cross_join = (
3 ProcessedImage.select(ProcessedImage, CalibrationMethod).join(CalibrationMethod, JOIN.CROSS).execute()
4 )
5 for row in cross_join:
6 calibrated_image = CalibratedImage()
7 calibrated_image.image_id = row.image_id
8 calibrated_image.method_id = row.method_id
9 calibrated_image.calibrated_value = row.value * row.multiplier
10 calibrated_image.save(force_insert=True)
Up to this point we have seen what we have to do to specify a composite primary key, we cannot use the AutoField or the primary_key parameter, but we need to go through the Meta class in the way shown in the example.
The next step is to have another table (ColoredImage in our imaginary case) that is in relation with CalibratedImage. We would need to have again a composite primary key that is also a composite foreign key. Peewee does not support composite foreign keys, but we can use the workaround shown at lines 72-77. Along with the CompositeKey definition, we need to add a Constraint as well using the SQL function to convert a string into a valid SQL statement. This time, since we are using low level SQL directives, we have to use the column names (additional ‘_id’) instead of the field name.
And in a similar way we can insert items in the ColoredImage model.
1with database.atomic() as txn:
2 full_list = CalibratedImage.select().execute()
3
4 for row in full_list:
5 colored_image = ColoredImage(image_id=row.image_id, method_id=row.method_id)
6 colored_image.red = row.calibrated_value / 3
7 colored_image.green = row.calibrated_value / 3
8 colored_image.blue = row.calibrated_value / 3
9 colored_image.save(force_insert=True)
Now, with all the tables linked to each other, try to delete one from a table, and guess what will happen to all other tables.
This tutorial might be a bit more complex than the examples we have seen so far, but we believed you have appreciated the power of such a relational tool.
Advanced new_only filtering with multiple column primary keys
Before moving on to the next section, we would like to show you another implementation tip where multiple column primary keys and new_only filtering work nice together. In the previous section How to use a filter we have seen how one can implement the get_items method in order to process items in the input table that are not yet present in the output target table. That implementation was rather easy and straightforward because both input and output tables were sharing the same primary key. Now let’s see how we can handle the situation where the input table is the cross join of two tables with different primary keys (image_id from ProcessedImage and method_id from CalibrationMethod) and the output table CalibratedImage has the combined primary key (image_id, method_id).
The idea is to use field combinations and for this you can rely on some helper functions defined in the db_tools module. Let’s have a look at a possible implementation.
1def get_items(self):
2
3 # as always be sure that the tables are properly created
4 self.database.create_tables([ProcessedImage, CalibrationMethod,
5 CalibratedImage, Sample, Resolution, Image])
6
7 # bind all input table filters
8 self.filter_register.bind_all([Sample, Resolution, Image,
9 ProcessedImage, CalibrationMethod])
10
11 # check if we want to process only new items
12 if self.filter_register.new_only:
13 # get a combined primary key because the output table (CalibratedImage) has
14 # a multicolumn primary key
15 existing_combo = combine_pk(CalibratedImage,
16 alias_name = 'combo',
17 join_str = ' x ')
18 # execute a select on the output table for the combined field
19 existing_entries = CalibratedImage.select(existing_combo).execute()
20 # to find the existing entries in the input model, I need to combine the fields
21 # and compare them to the list of combined pks from the target model.
22 # Pay attention to two things:
23 # 1. In the where conditions you cannot have an alias, so the
24 # combine_fields is not aliased.
25 # 2. The argument of the in_ must be a python list and not the pure select.
26 existing = ~combine_fields(
27 [ProcessedImage.image_id,
28 CalibrationMethod.method_id],
29 join_str = ' x '
30 ).in_([entry.combo for entry in existing_entries])
31 else:
32 existing = True
33
34 query = (
35 ProcessedImage.select(ProcessedImage, Image, Sample,
36 Resolution, CalibrationMethod)
37 .join_from(ProcessedImage, Image, attr='_image')
38 .join_from(Image, Sample, on=(Image.sample_id == Sample.sample_id), attr='_sample')
39 .join_from(Image, Resolution, on=(Image.resolution_id == Resolution.resolution_id), attr='_resolution')
40 .join_from(ProcessedImage, CalibrationMethod, on=True, attr='_calibration_method')
41 .where(existing)
42 .where(self.filter_register.filter_all())
43 )
44 return query
The code snippet is rather self-explanatory and well commented. First we create all tables, no worries nothing bad will happen if they already exist. Then we bind our filter register to all input tables. The next step is to check for existing entries. The lines 15-17 are the SQL equivalent of:
SELECT image_id | ' x ' | method_id FROM calibrated_image
so it will return one single column of text type where each row is something like 1 x 1, 2 x 1….. You can change the joining string to whatever you like most, the only requirement is that it must be the same as the one used at line 29.
The next step is to build a query of all possible combinations of the two fields ProcessedImage.image_id and CalibrationMethod.method_id. This is obtained using the combine_fields() function. Pay attention to two small details:
The output of
combine_fields()is meant to be used in the where condition and peewee does not supported aliased expression in where conditions.The in_ operator is expecting a python list as argument, so you need to transform the existing_entries query is a list with the combinations.
Importing an existing DB
The last section of this long chapter on database will show you how to deal with an existing DB. It is possible that before you have adopted MAFw for your analysis tasks, you were already employing a relational database to store your dataset and results. So far we have seen how to create tables in a database starting from an object oriented description (a model) in a python library. But what do we have to do if the database already exists? Can we create the classes starting from a database? This process goes under the name of reflection and it is the subject of this section.
The reflection of tables in python classes cannot be performed automatically at 100% by definition. A typical case is the use of application specific fields. Consider, for example, the FileNameField that we have discussed earlier. This field corresponds to a Path object when you look at it from the application point of view, but the actual path is saved as a text field in the concrete database implementation. If you now read the metadata of this table from the database point of view, you will see that the field will contain a text variable and thus the reflected class will not have any FileNameField.
Let us try to understand the process looking at the picture below. If we create the model in python, then we can assign special field descriptors to the table columns, but their concrete implementation in the database must be done using types that are available in the database itself. So when we perform the reverse process, we get only a good approximation of the initial definition.
Fig. 5 This is the model implementation as you would code it making use of the specific field definitions.
Fig. 6 During the actual implementation of the model as a database table, python column definitions will be translated into database types.
Fig. 7 The reflection process will translate the database implementation in a generic model implementation, not necessarily including all the specific field definition.
Nevertheless the process is rather efficient and can generate an excellent starting point that we can use to customize the model classes to make them more useful in our application.
From a practical point of view, you just have to open a console and type the command mafw db wizard --help to get some help on the tool and also read its documentation. You need to provide the name of the database and how to connect to it, in the case of Sqlite DB, it is enough to provide the filename, and you have to specify the name of the output python file that will contain all the model classes. This module is ready to go, you could theoretically import it into your project and use it, but it is strongly recommended to accurately check that everything is really the way you want it to be.
The reflection process is absolutely safe for your existing database, so it is worth to give it a try!
What’s next
Congratulations! You reached the end of the most difficult chapter in this tutorial. It is difficult because as a scientist you might not be used to deal with databases everyday, but their power is incredible, isn’t it?
The next chapter is about the library of advanced processors that MAFw is sharing to simplify your job even further. In particular, we are sure you will like a lot our plotting processors!
Footnotes