Coverage for src / mafw / db / db_wizard.py: 100%
69 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5The module allows to generate a DB model structure starting from an existing DB.
7It is strongly based on the :link:`peewee` playhouse module `pwiz`.
8"""
10import datetime
11from typing import Any, TextIO
13from playhouse.reflection import Introspector
15from mafw import __version__ as mafw_version
18class UnknownField:
19 """Placeholder class for an Unknown Field."""
21 def __init__(self, *_: Any, **__: Any) -> None:
22 pass
25MODULE_DOCSTRING = """r\"\"\"
26The module provides an automatically generated ORM model from the introspection of {database_name}.
28Even though the model might be working, all specific peewee and MAFw fields could not be reconstructed and thus the
29user should carefully analyse the content of this file before including into their project.
31File generated by MAFw db_wizard on {date}
32MAFw version: {mafw_version}
34\"\"\"
35"""
37HEADER = """
38# ruff: noqa: F405
40from typing import Any
42from peewee import * # noqa: F403
44from mafw.db.db_model import MAFwBaseModel
47"""
49UNKNOWN_FIELD = """\
50class UnknownField(object):
51 def __init__(self, *_: Any, **__: Any) -> None:
52 pass
54"""
57def dump_models(
58 output_file: TextIO,
59 introspector: Introspector,
60 tables: list[str] | tuple[str, ...] | None = None,
61 preserve_order: bool = True,
62 include_views: bool = False,
63 ignore_unknown: bool = False,
64 snake_case: bool = True,
65) -> None:
66 """Dumps all the ORM models in the output file.
68 This function will write to the output stream a fully functional python module with all the models that the
69 introspector class can access. The user has the possibility to pre-select a subset of tables to be dumped and
70 also to optionally include views.
72 .. seealso::
74 This function is the core of the `db wizard <mafw.scripts.mafw_exe.html#mafw-db-wizard>`_.
76 :param output_file: The output file. It must be open in text/write mode.
77 :type output_file: TextIO
78 :param introspector: A database introspector from the reflection module.
79 :type introspector: Introspector
80 :param tables: A list of table names to be dumped. If None, then all tables in the DB will be dumped. Defaults to None.
81 :type tables: list | None, Optional
82 :param preserve_order: Preserve the order of the columns in the model. Defaults to True.
83 :type preserve_order: bool, Optional
84 :param include_views: Include saved views to be dumped. Views can be obtained joining tables. Defaults to False.
85 :type include_views: bool, Optional
86 :param ignore_unknown: Ignore unknown fields. If True, then an UnknownField type will be used as a placeholder. Defaults to False.
87 :type ignore_unknown: bool, Optional
88 :param snake_case: Use snake case for column and table name. Defaults to True.
89 :type snake_case: bool, Optional
90 """
92 def dump_intro(database_name: str) -> None:
93 """Dumps the module intro to the output file.
95 :param database_name: The name of the database.
96 :type database_name: str
97 """
98 output_file.write(
99 MODULE_DOCSTRING.format(
100 database_name=database_name, date=datetime.datetime.now(), mafw_version=mafw_version
101 )
102 )
103 output_file.write(HEADER)
105 if not ignore_unknown:
106 output_file.write(UNKNOWN_FIELD)
108 def dump_table(table: str, seen: set[str], cycle_ref: list[str] | None = None) -> None:
109 """Dumps a table to the output file.
111 :param table: The name of the table being dumped.
112 :type table: str
113 :param seen: A set containing all tables already dumped.
114 :type seen: set
115 :param cycle_ref: A list containing cycle reference table. Defaults to None.
116 :type cycle_ref: list, Optional
117 """
119 cycle_ref = cycle_ref or []
120 foreign_keys = database.foreign_keys[table]
121 for foreign_key in foreign_keys:
122 dest = foreign_key.dest_table
124 # In the event the destination table has already been pushed
125 # for printing, then we have a reference cycle.
126 if dest in cycle_ref and table not in cycle_ref:
127 output_file.write('# Possible reference cycle: %s\n' % dest)
129 # If this is not a self-referential foreign key, and we have
130 # not already processed the destination table, do so now.
131 if dest not in seen and dest not in cycle_ref:
132 seen.add(dest)
133 if dest != table:
134 dump_table(dest, seen, cycle_ref + [table])
136 output_file.write('class %s(MAFwBaseModel):\n' % database.model_names[table])
137 columns = database.columns[table].items()
138 if not preserve_order:
139 columns = sorted(columns)
140 primary_keys = database.primary_keys[table]
141 for name, column in columns:
142 skip = all(
143 [
144 name in primary_keys,
145 name == 'id',
146 len(primary_keys) == 1,
147 column.field_class in introspector.pk_classes,
148 ]
149 )
150 if skip:
151 continue
152 if column.primary_key and len(primary_keys) > 1:
153 # If we have a CompositeKey, then we do not want to explicitly
154 # mark the columns as being primary keys.
155 column.primary_key = False
157 is_unknown = column.field_class is UnknownField
158 if is_unknown and ignore_unknown:
159 disp = '%s - %s' % (column.name, column.raw_column_type or '?')
160 output_file.write(' # %s\n' % disp)
161 else:
162 output_file.write(' %s\n' % column.get_field())
164 output_file.write('\n')
165 output_file.write(' class Meta:\n')
166 output_file.write(" table_name = '%s'\n" % table)
167 multi_column_indexes = database.multi_column_indexes(table)
168 if multi_column_indexes:
169 output_file.write(' indexes = (\n')
170 for fields, unique in sorted(multi_column_indexes):
171 output_file.write(
172 ' ((%s), %s),\n'
173 % (
174 ', '.join("'%s'" % field for field in fields),
175 unique,
176 )
177 )
178 output_file.write(' )\n')
180 if introspector.schema:
181 output_file.write(" schema = '%s'\n" % introspector.schema)
182 if len(primary_keys) > 1:
183 pk_field_names = sorted([field.name for col, field in columns if col in primary_keys])
184 pk_list = ', '.join("'%s'" % pk for pk in pk_field_names)
185 output_file.write(' primary_key = CompositeKey(%s)\n' % pk_list)
186 elif not primary_keys:
187 output_file.write(' primary_key = False\n')
188 output_file.write('\n')
190 seen.add(table)
192 database = introspector.introspect(table_names=tables, include_views=include_views, snake_case=snake_case) # type: ignore
193 dump_intro(introspector.get_database_name()) # type: ignore
195 already_seen: set[str] = set()
196 for table_name in sorted(database.model_names.keys()):
197 if table_name not in already_seen:
198 if not tables or table_name in tables:
199 dump_table(table_name, already_seen)