Coverage for src / mafw / tools / file_tools.py: 100%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4"""The module provides utilities for handling file, filename, hashing and so on.""" 

5 

6import hashlib 

7import warnings 

8from collections.abc import Sequence 

9from pathlib import Path 

10from typing import cast 

11 

12from peewee import Model 

13 

14import mafw.db.fields 

15from mafw.db.db_types import PeeweeModelWithMeta 

16from mafw.mafw_errors import ModelError 

17 

18 

19def file_checksum(filenames: str | Path | Sequence[str | Path], buf_size: int = 65536) -> str: 

20 """ 

21 Generates the hexadecimal digest of a file or a list of files. 

22 

23 The digest is calculated using the sha256 algorithm. 

24 

25 :param filenames: The filename or the list of filenames for digest calculations. 

26 :type filenames: str, Path, list 

27 :param buf_size: The buffer size in bytes for reading the input files. Defaults to 64kB. 

28 :type buf_size: int, Optional 

29 :return: The hexadecimal digest. 

30 :rtype: str 

31 """ 

32 if isinstance(filenames, (str, Path)): 

33 filenames = [filenames] 

34 

35 hasher = hashlib.sha256() 

36 

37 for filename in filenames: 

38 with open(filename, 'rb') as file: 

39 while True: 

40 data = file.read(buf_size) 

41 if not data: 

42 break 

43 hasher.update(data) 

44 

45 return hasher.hexdigest() 

46 

47 

48# noinspection PyUnresolvedReferences 

49def remove_widow_db_rows(models: list[Model | type[Model]] | Model | type[Model]) -> None: 

50 """Removes widow rows from a database table. 

51 

52 According to MAFw architecture, the Database is mainly providing I/O support to the various processors. 

53 

54 This means that the processor retrieves a list of items from a database table for processing and subsequently 

55 updates a result table with the newly generated outputs. 

56 

57 Very often the input and output data are not stored directly in the database, but rather in files saved on the 

58 disc. In this case, the database is just providing a valid path where the input (or output) data can be found. 

59 

60 From this point of view, a **widow row** is a database entry in which the file referenced by the FilenameField 

61 has been deleted. A typical example is the following: the user wants a certain processor to regenerate a given 

62 result stored inside an output file. Instead of setting up a complex filter so that the processor receives only 

63 this element to process, the user can delete the actual output file and ask the processor to process all new items. 

64 

65 The provided ``models`` can be either a list or a single element, representing either an instance of a DB model 

66 or a model class. If a model class is provided, then a select over all its entries is performed. 

67 

68 The function will look at all fields of :class:`~mafw.db.fields.FileNameField` and 

69 :class:`~mafw.db.fields.FileNameListField` and check if it corresponds to an existing path or list of paths. If not, 

70 then the corresponding row is removed from the DB table. 

71 

72 :param models: A list or a single Model instance or Model class for widow rows removal. 

73 :type models: list[Model | type(Model)] | Model | type(Model) 

74 :raises TypeError: if ``models`` is not of the right type. 

75 """ 

76 from mafw.db.std_tables import TriggerDisabler 

77 

78 # noinspection PyUnresolvedReferences,PyProtectedMember 

79 def _check_row(r: Model) -> None: 

80 """Internal function performing the check and removal on a single instance""" 

81 # this is just to make mypy happy 

82 # r0 and r are exactly the same thing! 

83 r0 = cast(PeeweeModelWithMeta, r) 

84 for k, f in r0._meta.fields.items(): 

85 # since FileNameListField is a subclass of FileNameField, we first have to check for 

86 # the case of a list and then of a simple field. 

87 if isinstance(f, mafw.db.fields.FileNameListField): 

88 files = [Path(p) for p in getattr(r, k).split(';')] 

89 for file in files: 

90 if not file.exists(): 

91 r.delete_instance() 

92 break 

93 elif isinstance(f, mafw.db.fields.FileNameField): 

94 if not getattr(r, k).exists(): 

95 r.delete_instance() 

96 

97 if isinstance(models, (Model, type(Model))): 

98 models = [models] 

99 

100 with TriggerDisabler(trigger_type_id=4): 

101 for m in models: 

102 if isinstance(m, Model): 

103 _check_row(m) 

104 elif isinstance(m, type(Model)): 

105 # this is just to make mypy happy 

106 # m0 and m are exactly the same thing! 

107 m0 = cast(PeeweeModelWithMeta, m) 

108 for row in m0.select().execute(): 

109 _check_row(row) 

110 else: 

111 raise TypeError('models must be list[Model | type(Model)] | Model | type(Model)') 

112 

113 

114# noinspection PyUnresolvedReferences 

115def verify_checksum(models: list[Model | type[Model]] | Model | type[Model]) -> None: 

116 """ 

117 Verifies the goodness of FileChecksumField. 

118 

119 If in a model there is a FileChecksumField, this must be connected to a FileNameField or a FileNameListField in 

120 the same model. The goal of this function is to recalculate the checksum of the FileNameField / FileNameListField 

121 and compare it with the actual stored value. If the newly calculated value differs from the stored one, the 

122 corresponding row in the model will be removed, as it is no longer valid. 

123 

124 If a file is missing, then the checksum check is not performed, but the row is removed right away. 

125 

126 This function can be CPU and I/O intensive and last a lot, so use it with care, especially when dealing with long 

127 tables and large files. 

128 

129 :param models: A list or a single Model instance or Model class for checksum verification. 

130 :type models: list[Model | type(Model)] | Model | type(Model) 

131 :raises TypeError: if ``models`` is not of the right type. 

132 :raises mafw.mafw_errors.ModelError: if the FileCheckSumField is referring to a FilenameField that does not exist. 

133 """ 

134 from mafw.db.std_tables import TriggerDisabler 

135 

136 # noinspection PyUnresolvedReferences,PyProtectedMember 

137 def _check_row(r: Model) -> None: 

138 def _check_file(file: str | Path | Sequence[str | Path], stored_checksum: str) -> None: 

139 checksum = file_checksum(file) 

140 if checksum != getattr(r, stored_checksum): 

141 r.delete_instance() 

142 

143 for k, f in r._meta.fields.items(): # type: ignore[attr-defined] # it looks like it is a problem with peewee 

144 if isinstance(f, (mafw.db.fields.FileNameField, mafw.db.fields.FileNameListField)): 

145 # f is a filename field or a filename list field 

146 # this might be linked to a Checksum field 

147 if f.checksum_field is None: 

148 continue 

149 

150 if not hasattr(r, f.checksum_field): 

151 raise ModelError( 

152 f'FileNameField {k} is referring to {f.checksum_field}, but Model ' 

153 f'{r.__class__.__name__} has not such field' 

154 ) 

155 

156 if isinstance(f, mafw.db.fields.FileNameListField): 

157 files: list[Path] = [Path(p) for p in getattr(r, k)] 

158 if not all([file.exists() for file in files]): 

159 r.delete_instance() 

160 warnings.warn('A file is missing from the list, removing the whole row from the DB.') 

161 else: 

162 _check_file(files, f.checksum_field) 

163 else: # isinstance(f, FileNameField) 

164 file = getattr(r, k) 

165 if not file.exists(): 

166 warnings.warn(f'{str(file)} does not exist, removing the corresponding row from the DB') 

167 r.delete_instance() 

168 else: 

169 _check_file(file, f.checksum_field) 

170 

171 if isinstance(models, (Model, type(Model))): 

172 models = [models] 

173 

174 with TriggerDisabler(trigger_type_id=4): 

175 for m in models: 

176 if isinstance(m, Model): 

177 _check_row(m) 

178 elif isinstance(m, type(Model)): 

179 for row in m.select().execute(): # type: ignore[no-untyped-call] # problem with peewee 

180 _check_row(row) 

181 else: 

182 raise TypeError('models must be list[Model | type(Model)] | Model | type(Model)')