Coverage for src / mafw / tools / file_tools.py: 100%
80 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""The module provides utilities for handling file, filename, hashing and so on."""
6import hashlib
7import warnings
8from collections.abc import Sequence
9from pathlib import Path
10from typing import cast
12from peewee import Model
14import mafw.db.fields
15from mafw.db.db_types import PeeweeModelWithMeta
16from mafw.mafw_errors import ModelError
19def file_checksum(filenames: str | Path | Sequence[str | Path], buf_size: int = 65536) -> str:
20 """
21 Generates the hexadecimal digest of a file or a list of files.
23 The digest is calculated using the sha256 algorithm.
25 :param filenames: The filename or the list of filenames for digest calculations.
26 :type filenames: str, Path, list
27 :param buf_size: The buffer size in bytes for reading the input files. Defaults to 64kB.
28 :type buf_size: int, Optional
29 :return: The hexadecimal digest.
30 :rtype: str
31 """
32 if isinstance(filenames, (str, Path)):
33 filenames = [filenames]
35 hasher = hashlib.sha256()
37 for filename in filenames:
38 with open(filename, 'rb') as file:
39 while True:
40 data = file.read(buf_size)
41 if not data:
42 break
43 hasher.update(data)
45 return hasher.hexdigest()
48# noinspection PyUnresolvedReferences
49def remove_widow_db_rows(models: list[Model | type[Model]] | Model | type[Model]) -> None:
50 """Removes widow rows from a database table.
52 According to MAFw architecture, the Database is mainly providing I/O support to the various processors.
54 This means that the processor retrieves a list of items from a database table for processing and subsequently
55 updates a result table with the newly generated outputs.
57 Very often the input and output data are not stored directly in the database, but rather in files saved on the
58 disc. In this case, the database is just providing a valid path where the input (or output) data can be found.
60 From this point of view, a **widow row** is a database entry in which the file referenced by the FilenameField
61 has been deleted. A typical example is the following: the user wants a certain processor to regenerate a given
62 result stored inside an output file. Instead of setting up a complex filter so that the processor receives only
63 this element to process, the user can delete the actual output file and ask the processor to process all new items.
65 The provided ``models`` can be either a list or a single element, representing either an instance of a DB model
66 or a model class. If a model class is provided, then a select over all its entries is performed.
68 The function will look at all fields of :class:`~mafw.db.fields.FileNameField` and
69 :class:`~mafw.db.fields.FileNameListField` and check if it corresponds to an existing path or list of paths. If not,
70 then the corresponding row is removed from the DB table.
72 :param models: A list or a single Model instance or Model class for widow rows removal.
73 :type models: list[Model | type(Model)] | Model | type(Model)
74 :raises TypeError: if ``models`` is not of the right type.
75 """
76 from mafw.db.std_tables import TriggerDisabler
78 # noinspection PyUnresolvedReferences,PyProtectedMember
79 def _check_row(r: Model) -> None:
80 """Internal function performing the check and removal on a single instance"""
81 # this is just to make mypy happy
82 # r0 and r are exactly the same thing!
83 r0 = cast(PeeweeModelWithMeta, r)
84 for k, f in r0._meta.fields.items():
85 # since FileNameListField is a subclass of FileNameField, we first have to check for
86 # the case of a list and then of a simple field.
87 if isinstance(f, mafw.db.fields.FileNameListField):
88 files = [Path(p) for p in getattr(r, k).split(';')]
89 for file in files:
90 if not file.exists():
91 r.delete_instance()
92 break
93 elif isinstance(f, mafw.db.fields.FileNameField):
94 if not getattr(r, k).exists():
95 r.delete_instance()
97 if isinstance(models, (Model, type(Model))):
98 models = [models]
100 with TriggerDisabler(trigger_type_id=4):
101 for m in models:
102 if isinstance(m, Model):
103 _check_row(m)
104 elif isinstance(m, type(Model)):
105 # this is just to make mypy happy
106 # m0 and m are exactly the same thing!
107 m0 = cast(PeeweeModelWithMeta, m)
108 for row in m0.select().execute():
109 _check_row(row)
110 else:
111 raise TypeError('models must be list[Model | type(Model)] | Model | type(Model)')
114# noinspection PyUnresolvedReferences
115def verify_checksum(models: list[Model | type[Model]] | Model | type[Model]) -> None:
116 """
117 Verifies the goodness of FileChecksumField.
119 If in a model there is a FileChecksumField, this must be connected to a FileNameField or a FileNameListField in
120 the same model. The goal of this function is to recalculate the checksum of the FileNameField / FileNameListField
121 and compare it with the actual stored value. If the newly calculated value differs from the stored one, the
122 corresponding row in the model will be removed, as it is no longer valid.
124 If a file is missing, then the checksum check is not performed, but the row is removed right away.
126 This function can be CPU and I/O intensive and last a lot, so use it with care, especially when dealing with long
127 tables and large files.
129 :param models: A list or a single Model instance or Model class for checksum verification.
130 :type models: list[Model | type(Model)] | Model | type(Model)
131 :raises TypeError: if ``models`` is not of the right type.
132 :raises mafw.mafw_errors.ModelError: if the FileCheckSumField is referring to a FilenameField that does not exist.
133 """
134 from mafw.db.std_tables import TriggerDisabler
136 # noinspection PyUnresolvedReferences,PyProtectedMember
137 def _check_row(r: Model) -> None:
138 def _check_file(file: str | Path | Sequence[str | Path], stored_checksum: str) -> None:
139 checksum = file_checksum(file)
140 if checksum != getattr(r, stored_checksum):
141 r.delete_instance()
143 for k, f in r._meta.fields.items(): # type: ignore[attr-defined] # it looks like it is a problem with peewee
144 if isinstance(f, (mafw.db.fields.FileNameField, mafw.db.fields.FileNameListField)):
145 # f is a filename field or a filename list field
146 # this might be linked to a Checksum field
147 if f.checksum_field is None:
148 continue
150 if not hasattr(r, f.checksum_field):
151 raise ModelError(
152 f'FileNameField {k} is referring to {f.checksum_field}, but Model '
153 f'{r.__class__.__name__} has not such field'
154 )
156 if isinstance(f, mafw.db.fields.FileNameListField):
157 files: list[Path] = [Path(p) for p in getattr(r, k)]
158 if not all([file.exists() for file in files]):
159 r.delete_instance()
160 warnings.warn('A file is missing from the list, removing the whole row from the DB.')
161 else:
162 _check_file(files, f.checksum_field)
163 else: # isinstance(f, FileNameField)
164 file = getattr(r, k)
165 if not file.exists():
166 warnings.warn(f'{str(file)} does not exist, removing the corresponding row from the DB')
167 r.delete_instance()
168 else:
169 _check_file(file, f.checksum_field)
171 if isinstance(models, (Model, type(Model))):
172 models = [models]
174 with TriggerDisabler(trigger_type_id=4):
175 for m in models:
176 if isinstance(m, Model):
177 _check_row(m)
178 elif isinstance(m, type(Model)):
179 for row in m.select().execute(): # type: ignore[no-untyped-call] # problem with peewee
180 _check_row(row)
181 else:
182 raise TypeError('models must be list[Model | type(Model)] | Model | type(Model)')