# Copyright 2025 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
The module allows to generate a DB model structure starting from an existing DB.
It is strongly based on the :link:`peewee` playhouse module `pwiz`.
"""
import datetime
from typing import Any, TextIO
from playhouse.reflection import Introspector
from mafw import __version__ as mafw_version
[docs]
class UnknownField:
"""Placeholder class for an Unknown Field."""
def __init__(self, *_: Any, **__: Any) -> None:
pass
MODULE_DOCSTRING = """r\"\"\"
The module provides an automatically generated ORM model from the introspection of {database_name}.
Even though the model might be working, all specific peewee and MAFw fields could not be reconstructed and thus the
user should carefully analyse the content of this file before including into their project.
File generated by MAFw db_wizard on {date}
MAFw version: {mafw_version}
\"\"\"
"""
HEADER = """
# ruff: noqa: F405
from typing import Any
from peewee import * # noqa: F403
from mafw.db.db_model import MAFwBaseModel
"""
UNKNOWN_FIELD = """\
class UnknownField(object):
def __init__(self, *_: Any, **__: Any) -> None:
pass
"""
[docs]
def dump_models(
output_file: TextIO,
introspector: Introspector,
tables: list[str] | tuple[str, ...] | None = None,
preserve_order: bool = True,
include_views: bool = False,
ignore_unknown: bool = False,
snake_case: bool = True,
) -> None:
"""Dumps all the ORM models in the output file.
This function will write to the output stream a fully functional python module with all the models that the
introspector class can access. The user has the possibility to pre-select a subset of tables to be dumped and
also to optionally include views.
.. seealso::
This function is the core of the `db wizard <mafw.scripts.mafw_exe.html#mafw-db-wizard>`_.
:param output_file: The output file. It must be open in text/write mode.
:type output_file: TextIO
:param introspector: A database introspector from the reflection module.
:type introspector: Introspector
:param tables: A list of table names to be dumped. If None, then all tables in the DB will be dumped. Defaults to None.
:type tables: list | None, Optional
:param preserve_order: Preserve the order of the columns in the model. Defaults to True.
:type preserve_order: bool, Optional
:param include_views: Include saved views to be dumped. Views can be obtained joining tables. Defaults to False.
:type include_views: bool, Optional
:param ignore_unknown: Ignore unknown fields. If True, then an UnknownField type will be used as a placeholder. Defaults to False.
:type ignore_unknown: bool, Optional
:param snake_case: Use snake case for column and table name. Defaults to True.
:type snake_case: bool, Optional
"""
def dump_intro(database_name: str) -> None:
"""Dumps the module intro to the output file.
:param database_name: The name of the database.
:type database_name: str
"""
output_file.write(
MODULE_DOCSTRING.format(
database_name=database_name, date=datetime.datetime.now(), mafw_version=mafw_version
)
)
output_file.write(HEADER)
if not ignore_unknown:
output_file.write(UNKNOWN_FIELD)
def dump_table(table: str, seen: set[str], cycle_ref: list[str] | None = None) -> None:
"""Dumps a table to the output file.
:param table: The name of the table being dumped.
:type table: str
:param seen: A set containing all tables already dumped.
:type seen: set
:param cycle_ref: A list containing cycle reference table. Defaults to None.
:type cycle_ref: list, Optional
"""
cycle_ref = cycle_ref or []
foreign_keys = database.foreign_keys[table]
for foreign_key in foreign_keys:
dest = foreign_key.dest_table
# In the event the destination table has already been pushed
# for printing, then we have a reference cycle.
if dest in cycle_ref and table not in cycle_ref:
output_file.write('# Possible reference cycle: %s\n' % dest)
# If this is not a self-referential foreign key, and we have
# not already processed the destination table, do so now.
if dest not in seen and dest not in cycle_ref:
seen.add(dest)
if dest != table:
dump_table(dest, seen, cycle_ref + [table])
output_file.write('class %s(MAFwBaseModel):\n' % database.model_names[table])
columns = database.columns[table].items()
if not preserve_order:
columns = sorted(columns)
primary_keys = database.primary_keys[table]
for name, column in columns:
skip = all(
[
name in primary_keys,
name == 'id',
len(primary_keys) == 1,
column.field_class in introspector.pk_classes,
]
)
if skip:
continue
if column.primary_key and len(primary_keys) > 1:
# If we have a CompositeKey, then we do not want to explicitly
# mark the columns as being primary keys.
column.primary_key = False
is_unknown = column.field_class is UnknownField
if is_unknown and ignore_unknown:
disp = '%s - %s' % (column.name, column.raw_column_type or '?')
output_file.write(' # %s\n' % disp)
else:
output_file.write(' %s\n' % column.get_field())
output_file.write('\n')
output_file.write(' class Meta:\n')
output_file.write(" table_name = '%s'\n" % table)
multi_column_indexes = database.multi_column_indexes(table)
if multi_column_indexes:
output_file.write(' indexes = (\n')
for fields, unique in sorted(multi_column_indexes):
output_file.write(
' ((%s), %s),\n'
% (
', '.join("'%s'" % field for field in fields),
unique,
)
)
output_file.write(' )\n')
if introspector.schema:
output_file.write(" schema = '%s'\n" % introspector.schema)
if len(primary_keys) > 1:
pk_field_names = sorted([field.name for col, field in columns if col in primary_keys])
pk_list = ', '.join("'%s'" % pk for pk in pk_field_names)
output_file.write(' primary_key = CompositeKey(%s)\n' % pk_list)
elif not primary_keys:
output_file.write(' primary_key = False\n')
output_file.write('\n')
seen.add(table)
database = introspector.introspect(table_names=tables, include_views=include_views, snake_case=snake_case) # type: ignore
dump_intro(introspector.get_database_name()) # type: ignore
already_seen: set[str] = set()
for table_name in sorted(database.model_names.keys()):
if table_name not in already_seen:
if not tables or table_name in tables:
dump_table(table_name, already_seen)