Coverage for src / mafw / db / db_wizard.py: 100%

69 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5The module allows to generate a DB model structure starting from an existing DB. 

6 

7It is strongly based on the :link:`peewee` playhouse module `pwiz`. 

8""" 

9 

10import datetime 

11from typing import Any, TextIO 

12 

13from playhouse.reflection import Introspector 

14 

15from mafw import __version__ as mafw_version 

16 

17 

18class UnknownField: 

19 """Placeholder class for an Unknown Field.""" 

20 

21 def __init__(self, *_: Any, **__: Any) -> None: 

22 pass 

23 

24 

25MODULE_DOCSTRING = """r\"\"\" 

26The module provides an automatically generated ORM model from the introspection of {database_name}. 

27 

28Even though the model might be working, all specific peewee and MAFw fields could not be reconstructed and thus the  

29user should carefully analyse the content of this file before including into their project. 

30 

31File generated by MAFw db_wizard on {date} 

32MAFw version: {mafw_version} 

33 

34\"\"\" 

35""" 

36 

37HEADER = """ 

38# ruff: noqa: F405 

39 

40from typing import Any 

41 

42from peewee import * # noqa: F403 

43 

44from mafw.db.db_model import MAFwBaseModel 

45 

46 

47""" 

48 

49UNKNOWN_FIELD = """\ 

50class UnknownField(object): 

51 def __init__(self, *_: Any, **__: Any) -> None:  

52 pass 

53 

54""" 

55 

56 

57def dump_models( 

58 output_file: TextIO, 

59 introspector: Introspector, 

60 tables: list[str] | tuple[str, ...] | None = None, 

61 preserve_order: bool = True, 

62 include_views: bool = False, 

63 ignore_unknown: bool = False, 

64 snake_case: bool = True, 

65) -> None: 

66 """Dumps all the ORM models in the output file. 

67 

68 This function will write to the output stream a fully functional python module with all the models that the 

69 introspector class can access. The user has the possibility to pre-select a subset of tables to be dumped and 

70 also to optionally include views. 

71 

72 .. seealso:: 

73 

74 This function is the core of the `db wizard <mafw.scripts.mafw_exe.html#mafw-db-wizard>`_. 

75 

76 :param output_file: The output file. It must be open in text/write mode. 

77 :type output_file: TextIO 

78 :param introspector: A database introspector from the reflection module. 

79 :type introspector: Introspector 

80 :param tables: A list of table names to be dumped. If None, then all tables in the DB will be dumped. Defaults to None. 

81 :type tables: list | None, Optional 

82 :param preserve_order: Preserve the order of the columns in the model. Defaults to True. 

83 :type preserve_order: bool, Optional 

84 :param include_views: Include saved views to be dumped. Views can be obtained joining tables. Defaults to False. 

85 :type include_views: bool, Optional 

86 :param ignore_unknown: Ignore unknown fields. If True, then an UnknownField type will be used as a placeholder. Defaults to False. 

87 :type ignore_unknown: bool, Optional 

88 :param snake_case: Use snake case for column and table name. Defaults to True. 

89 :type snake_case: bool, Optional 

90 """ 

91 

92 def dump_intro(database_name: str) -> None: 

93 """Dumps the module intro to the output file. 

94 

95 :param database_name: The name of the database. 

96 :type database_name: str 

97 """ 

98 output_file.write( 

99 MODULE_DOCSTRING.format( 

100 database_name=database_name, date=datetime.datetime.now(), mafw_version=mafw_version 

101 ) 

102 ) 

103 output_file.write(HEADER) 

104 

105 if not ignore_unknown: 

106 output_file.write(UNKNOWN_FIELD) 

107 

108 def dump_table(table: str, seen: set[str], cycle_ref: list[str] | None = None) -> None: 

109 """Dumps a table to the output file. 

110 

111 :param table: The name of the table being dumped. 

112 :type table: str 

113 :param seen: A set containing all tables already dumped. 

114 :type seen: set 

115 :param cycle_ref: A list containing cycle reference table. Defaults to None. 

116 :type cycle_ref: list, Optional 

117 """ 

118 

119 cycle_ref = cycle_ref or [] 

120 foreign_keys = database.foreign_keys[table] 

121 for foreign_key in foreign_keys: 

122 dest = foreign_key.dest_table 

123 

124 # In the event the destination table has already been pushed 

125 # for printing, then we have a reference cycle. 

126 if dest in cycle_ref and table not in cycle_ref: 

127 output_file.write('# Possible reference cycle: %s\n' % dest) 

128 

129 # If this is not a self-referential foreign key, and we have 

130 # not already processed the destination table, do so now. 

131 if dest not in seen and dest not in cycle_ref: 

132 seen.add(dest) 

133 if dest != table: 

134 dump_table(dest, seen, cycle_ref + [table]) 

135 

136 output_file.write('class %s(MAFwBaseModel):\n' % database.model_names[table]) 

137 columns = database.columns[table].items() 

138 if not preserve_order: 

139 columns = sorted(columns) 

140 primary_keys = database.primary_keys[table] 

141 for name, column in columns: 

142 skip = all( 

143 [ 

144 name in primary_keys, 

145 name == 'id', 

146 len(primary_keys) == 1, 

147 column.field_class in introspector.pk_classes, 

148 ] 

149 ) 

150 if skip: 

151 continue 

152 if column.primary_key and len(primary_keys) > 1: 

153 # If we have a CompositeKey, then we do not want to explicitly 

154 # mark the columns as being primary keys. 

155 column.primary_key = False 

156 

157 is_unknown = column.field_class is UnknownField 

158 if is_unknown and ignore_unknown: 

159 disp = '%s - %s' % (column.name, column.raw_column_type or '?') 

160 output_file.write(' # %s\n' % disp) 

161 else: 

162 output_file.write(' %s\n' % column.get_field()) 

163 

164 output_file.write('\n') 

165 output_file.write(' class Meta:\n') 

166 output_file.write(" table_name = '%s'\n" % table) 

167 multi_column_indexes = database.multi_column_indexes(table) 

168 if multi_column_indexes: 

169 output_file.write(' indexes = (\n') 

170 for fields, unique in sorted(multi_column_indexes): 

171 output_file.write( 

172 ' ((%s), %s),\n' 

173 % ( 

174 ', '.join("'%s'" % field for field in fields), 

175 unique, 

176 ) 

177 ) 

178 output_file.write(' )\n') 

179 

180 if introspector.schema: 

181 output_file.write(" schema = '%s'\n" % introspector.schema) 

182 if len(primary_keys) > 1: 

183 pk_field_names = sorted([field.name for col, field in columns if col in primary_keys]) 

184 pk_list = ', '.join("'%s'" % pk for pk in pk_field_names) 

185 output_file.write(' primary_key = CompositeKey(%s)\n' % pk_list) 

186 elif not primary_keys: 

187 output_file.write(' primary_key = False\n') 

188 output_file.write('\n') 

189 

190 seen.add(table) 

191 

192 database = introspector.introspect(table_names=tables, include_views=include_views, snake_case=snake_case) # type: ignore 

193 dump_intro(introspector.get_database_name()) # type: ignore 

194 

195 already_seen: set[str] = set() 

196 for table_name in sorted(database.model_names.keys()): 

197 if table_name not in already_seen: 

198 if not tables or table_name in tables: 

199 dump_table(table_name, already_seen)