Coverage for src / mafw / scripts / mafw_exe.py: 94%
321 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-28 13:34 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-28 13:34 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5The execution framework.
7This module provides the run functionality to the whole library.
9It is heavily relying on ``click`` for the generation of commands, options, and arguments.
11.. click:: mafw.scripts.mafw_exe:cli
12 :prog: mafw
13 :nested: full
15"""
17import datetime
18import logging
19import pathlib
20import shutil
21import sys
22import warnings
23from enum import IntEnum
24from typing import Any
26import click
27from click.exceptions import ClickException
28from pwiz import DATABASE_MAP, make_introspector # type: ignore[import-untyped]
29from rich import print as rprint
30from rich import traceback
31from rich.align import Align
32from rich.logging import RichHandler
33from rich.prompt import Prompt
34from rich.rule import Rule
35from rich.table import Table
36from rich_pyfiglet import RichFiglet
38from mafw.__about__ import __version__
39from mafw.db.db_configurations import db_scheme, default_conf
40from mafw.db.db_wizard import dump_models
41from mafw.enumerators import ProcessorExitStatus
42from mafw.lazy_import import LazyImportProcessor
43from mafw.mafw_errors import AbortProcessorException
44from mafw.plugin_manager import get_plugin_manager
45from mafw.runner import MAFwApplication
46from mafw.tools.click_extensions import (
47 AbbreviateGroup,
48 check_ci_completion_guard,
49 completion_script_path,
50 completion_source_script,
51 install_completion,
52 is_script_already_installed,
53 resolve_completion_shell,
54 uninstall_completion_files,
55)
56from mafw.tools.parallel import is_free_threading
57from mafw.tools.shell_tools import CONSOLE
58from mafw.tools.toml_tools import generate_steering_file
60suppress = [click]
61traceback.install(show_locals=True, suppress=suppress)
64class MAFwGroup(AbbreviateGroup):
65 """Click group with abbreviation and MAFw-specific exit handling."""
67 group_class: type[click.Group] = AbbreviateGroup
69 def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]:
70 """Parse arguments and record the selected root command before invoke."""
71 rv = super().parse_args(ctx, args)
72 if ( 72 ↛ 76line 72 didn't jump to line 76 because the condition on line 72 was always true
73 ctx.parent is None
74 ): # this is assuring that we store only the first command corresponding to the root context
75 ctx.meta['_mafw_invoked_subcommand'] = self._detect_command_name(args)
76 return rv
78 def _detect_command_name(self, args: list[str]) -> str | None:
79 """Return the first subcommand token while skipping root option values."""
80 # This is probably an overkill. After the default parsing the first element in args should be the first
81 # (abbreviated or fully typed) command. All global options will be removed from args.
82 # so potentially we could return self.get_command(ctx, args[0]),
83 # I will leave it like this because it might be useful in the future.
84 option_params = {opt for param in self.params for opt in getattr(param, 'opts', [])}
85 expects_value = {
86 opt for param in self.params if not getattr(param, 'is_flag', False) for opt in getattr(param, 'opts', [])
87 }
88 skip_next = False
89 for token in args:
90 if skip_next: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 skip_next = False
92 continue
93 if token in option_params: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true
94 if token in expects_value:
95 skip_next = True
96 continue
97 if token.startswith('-'): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 continue
99 selected = self.get_command(click.Context(self), token)
100 return selected.name if selected is not None else token
101 return None
103 def invoke(self, ctx: click.Context) -> Any:
104 """Invoke the command and normalize Click exceptions to MAFw exits."""
105 print_banner(ctx)
107 try:
108 return super().invoke(ctx)
109 except ClickException as exc:
110 exc.show()
111 sys.exit(exc.exit_code)
112 except (SystemExit, click.exceptions.Exit):
113 raise
114 except Exception:
115 sys.exit(ReturnValue.Error)
117 def main(self, *args: Any, **kwargs: Any) -> None: # type: ignore[override]
118 """Run the CLI and convert returned statuses to process exit codes."""
119 try:
120 rv = super().main(*args, standalone_mode=False, **kwargs) # type: ignore[call-overload]
121 if isinstance(rv, (ReturnValue, int)):
122 sys.exit(rv)
123 sys.exit(ReturnValue.OK)
124 except ClickException as exc:
125 exc.show()
126 sys.exit(exc.exit_code)
127 except SystemExit:
128 raise
129 except Exception:
130 sys.exit(ReturnValue.Error)
133LEVELS = {'debug': 10, 'info': 20, 'warning': 30, 'error': 40, 'critical': 50}
134CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
136_warnings_captured = False
138welcome_message = RichFiglet(
139 'MAFw',
140 colors=['#ff0000', 'magenta1', 'blue3'],
141 horizontal=True,
142 # font='banner4',
143 remove_blank_lines=True,
144 border='ROUNDED',
145 border_color='#ff0000',
146)
149def print_banner(ctx: click.Context) -> None:
150 """
151 Print the welcome banner only once, only for rich UI,
152 and only if not disabled.
154 .. note::
156 ctx.obj is not yet populated when Group.invoke() is called,
157 but ctx.params contains parsed options for the current group. We
158 therefore inspect the root context params.
159 """
160 # avoid printing while click is still doing resilient parsing (e.g., --help)
161 if getattr(ctx, 'resilient_parsing', False):
162 return
164 root = ctx.find_root()
166 # Parse-time command resolution records the selected subcommand before
167 # invoke() emits any output, so completion can remain quiet.
168 # Possible improvements: allow selection of commands for which there must not be any banner printing.
169 if root.meta.get('_mafw_invoked_subcommand') == 'completion':
170 return
172 params = getattr(root, 'params', {}) or {}
173 ui = params.get('ui')
174 no_banner = params.get('no_banner')
176 # Normalize and check
177 if no_banner:
178 return
179 if not ui: # pragma: no cover
180 # if nothing available, be conservative and don't print
181 return
182 if str(ui).lower() != 'rich':
183 return
185 # Print only once per process / invocation
186 if root.meta.get('_banner_printed', False):
187 return
189 console = CONSOLE
190 console.print(welcome_message)
191 root.meta['_banner_printed'] = True
194def custom_formatwarning(
195 message: Warning | str, category: type[Warning], filename: str, lineno: int, line: str | None = None
196) -> str:
197 """Return the pure message of the warning."""
198 return str(message)
201def is_bugged_warning_capture_version() -> bool:
202 """Check if the Python version has the warning capture bug in free-threading mode.
204 Python 3.14.0 through 3.14.3 (free-threading builds only) have a bug where
205 ``logging.captureWarnings(True)`` can crash or misbehave. This function returns
206 True when the current interpreter is affected.
208 .. note::
210 See https://github.com/python/cpython/pull/146374 for details.
212 .. todo::
214 Remove this workaround when Python 3.14 is no longer a supported version
215 (expected late 2030).
217 :return: True if the interpreter is affected by the warning capture bug, False otherwise.
218 :rtype: bool
219 """
220 version = sys.version_info
221 return version.major == 3 and version.minor == 14 and version.micro <= 3 and is_free_threading()
224warnings.formatwarning = custom_formatwarning
225# Disable warning capture on bugged free-threading builds (3.14.0–3.14.3).
226if is_bugged_warning_capture_version(): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 logging.captureWarnings(False)
228else:
229 logging.captureWarnings(True)
231# get the root logger
232log = logging.getLogger()
235class ReturnValue(IntEnum):
236 """Enumerator to handle the script return value."""
238 OK = 0
239 """No error"""
241 Error = 1
242 """Generic error"""
245def logger_setup(level: str, ui: str, tracebacks: bool) -> None:
246 """Set up the logger.
248 This function is actually configuring the root logger level from the command line options and it attaches either
249 a RichHandler or a StreamHandler depending on the user interface type.
251 The `tracebacks` flag is used only by the RichHandler. Printing the tracebacks is rather useful when debugging
252 the code, but it could be detrimental for final users. In normal circumstances, tracebacks is set to False,
253 and is turned on when the debug flag is activated.
255 :param level: Logging level as a string.
256 :type level: str
257 :param ui: User interface as a string ('rich' or 'console').
258 :type ui: str
259 :param tracebacks: Enable/disable the logging of exception tracebacks.
260 """
261 level = level.lower()
262 ui = ui.lower()
264 log.setLevel(LEVELS[level])
265 handler: logging.Handler
267 if ui == 'rich':
268 fs = '%(message)s'
269 handler = RichHandler(
270 rich_tracebacks=tracebacks, markup=True, show_path=False, log_time_format='%Y%m%d-%H:%M:%S'
271 )
272 else:
273 fs = '%(asctime)s - %(levelname)s - %(message)s'
274 handler = logging.StreamHandler()
276 formatter = logging.Formatter(fs)
277 handler.setFormatter(formatter)
278 log.addHandler(handler)
279 # _ensure_warning_capture()
282def display_exception(exception: Exception, show_traceback: bool = False) -> None:
283 """
284 Display exception information with optional debug details.
286 This function logs exception information at the critical level. When show_traceback is enabled,
287 it logs the full exception including traceback information. Otherwise, it logs a simplified
288 message directing users to enable debug mode for more details.
290 :param exception: The exception to be displayed and logged.
291 :type exception: Exception
292 :param show_traceback: Flag indicating whether to show detailed traceback information. Defaults to False
293 :type show_traceback: bool
294 """
296 if show_traceback:
297 log.critical('A critical error occurred')
298 log.exception(exception)
299 else:
300 log.critical('A critical error occurred. Set option -D to get traceback output')
301 log.exception(
302 '%s: %s' % (exception.__class__.__name__, exception), exc_info=False, stack_info=False, stacklevel=1
303 )
306@click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS, name='mafw', cls=MAFwGroup)
307@click.pass_context
308@click.option(
309 '--log-level',
310 type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False),
311 show_default=True,
312 default='info',
313 help='Log level',
314)
315@click.option(
316 '--ui',
317 type=click.Choice(['console', 'rich'], case_sensitive=False),
318 default='rich',
319 help='The user interface',
320 show_default=True,
321)
322@click.option('-D', '--debug', is_flag=True, default=False, help='Show debug information about errors')
323@click.option('--no-banner', is_flag=True, default=False, help='Disable the welcome banner')
324@click.version_option(__version__, '-v', '--version')
325def cli(ctx: click.core.Context, log_level: str, ui: str, debug: bool, no_banner: bool) -> None:
326 """
327 The Modular Analysis Framework execution.
329 This is the command line interface where you can configure and launch your analysis tasks.
331 More information on our documentation page.
332 \f
334 :param ctx: The click context.
335 :type ctx: click.core.Context
336 :param log_level: The logging level as a string. Choice from debug, info, warning, error and critical.
337 :type log_level: str
338 :param ui: The user interface as a string. Choice from console and rich.
339 :type ui: str
340 :param debug: Flag to show debug information about exception.
341 :type debug: bool
342 :param no_banner: Flag to disable the welcome banner.
343 :type no_banner: bool
344 """
345 ctx.ensure_object(dict)
346 ctx.obj = {'log_level': log_level, 'ui': ui, 'debug': debug, 'no_banner': no_banner}
347 logger_setup(log_level, ui, debug)
349 if ctx.invoked_subcommand is None:
350 rprint('Use --help to get a quick help on the mafw command.')
353@cli.group(name='completion')
354@click.pass_context
355def completion(ctx: click.Context) -> None:
356 """
357 Manage shell completion for the ``mafw`` command.
359 The completion workflow installs the Click-generated shell code into the
360 active virtual environment, updates the activation script so completion is
361 loaded automatically, and exposes a ``show`` helper for direct evaluation.
363 \f
365 .. versionadded:: v2.2
367 :param ctx: The click context.
368 :type ctx: click.core.Context
369 """
370 ctx.ensure_object(dict)
371 ctx.obj['tool_name'] = 'mafw'
374@completion.command(name='install')
375@click.option(
376 '-s',
377 '--shell',
378 type=click.Choice(['auto', 'bash', 'zsh', 'fish'], case_sensitive=False),
379 default='auto',
380 show_default=True,
381 help='Target shell for completion installation',
382)
383@click.option('-F', '--force', is_flag=True, default=False, help='Reinstall completion even if already loaded')
384@click.pass_context
385def completion_install(ctx: click.Context, shell: str, force: bool) -> None:
386 """
387 Install the ``mafw`` shell completion script.
389 When ``--shell`` is omitted, the command guesses the shell from ``$SHELL``.
390 The generated Click completion script is stored in the active virtual
391 environment and the activation script is updated so that future shell
392 sessions load completion automatically.
394 \f
396 .. versionadded:: v2.2
398 :param ctx: The click context.
399 :type ctx: click.core.Context
400 :param shell: Target shell selector.
401 :type shell: str
402 :param force: Reinstall completion even if already loaded.
403 :type force: bool
404 """
405 check_ci_completion_guard()
406 tool_name = ctx.obj['tool_name']
407 resolved_shell = resolve_completion_shell(shell)
408 script_path = completion_script_path(tool_name, resolved_shell)
410 if is_script_already_installed(tool_name, resolved_shell) and not force: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true
411 raise click.ClickException(
412 f'MAFw completion is already installed in {script_path}. Use --force to reinstall it.'
413 )
415 install_completion(tool_name, resolved_shell, force, script_path)
416 rprint(f'Completion script installed in [blue underline]{script_path}[/blue underline].')
417 rprint('Exit and re-enter the virtual environment to activate shell completion.')
420@completion.command(name='uninstall')
421@click.pass_context
422def completion_uninstall(ctx: click.Context) -> None:
423 """
424 Remove the installed ``mafw`` shell completion files.
426 This command removes generated completion files from ``share/mafw`` and
427 strips the marker block from the activation scripts.
429 .. versionadded:: v2.2
431 :param ctx: The click context.
432 :type ctx: click.core.Context
433 """
434 tool_name = ctx.obj['tool_name']
435 uninstall_completion_files(tool_name, None)
436 rprint(f'Shell completion for {tool_name} has been removed from the active virtual environment.')
439@completion.command(name='show')
440@click.option(
441 '-s',
442 '--shell',
443 type=click.Choice(['auto', 'bash', 'zsh', 'fish'], case_sensitive=False),
444 default='auto',
445 show_default=True,
446 help='Target shell for completion output',
447)
448@click.pass_context
449def completion_show(ctx: click.Context, shell: str) -> None:
450 """
451 Display the Click completion script on standard output.
453 The output is intentionally clean so it can be used with ``eval``:
455 .. code-block:: console
457 eval "$(mafw completion show)"
459 .. versionadded:: v2.2
461 :param ctx: The click context.
462 :type ctx: click.core.Context
463 :param shell: Target shell selector.
464 :type shell: str
465 """
466 check_ci_completion_guard()
467 tool_name = ctx.obj['tool_name']
468 resolved_shell = resolve_completion_shell(shell)
469 click.echo(completion_source_script(tool_name, resolved_shell), nl=True)
472@cli.command(name='list')
473@click.pass_obj
474def list_processors(obj: dict[str, Any]) -> ReturnValue:
475 """Display the list of available processors.
477 This command will retrieve all available processors via the plugin manager. Both internal and external processors
478 will be listed if the ext-plugins option is passed.
479 \f
481 """
482 try:
483 plugin_manager = get_plugin_manager()
484 plugins = plugin_manager.load_plugins({'processors'})
485 available_processors = plugins.processor_list
486 print('\n')
487 table = Table(
488 title='Available processors',
489 header_style='orange3',
490 expand=True,
491 title_style='italic red',
492 )
493 table.add_column('Processor Name', justify='left', style='cyan')
494 table.add_column('Package Name', justify='left', style='cyan')
495 table.add_column('Module', justify='left', style='cyan')
496 mafw_processors = 0
497 other_processors = 0
498 for processor in available_processors:
499 if isinstance(processor, LazyImportProcessor):
500 package, module = processor.plugin_qualname.split('.', 1)
501 name = processor.plugin_name
502 else:
503 package, module = processor.__module__.split('.', 1)
504 name = processor.__name__
505 table.add_row(name, package, module)
506 if package == 'mafw':
507 mafw_processors += 1
508 else:
509 other_processors += 1
510 table.caption = f'Total processors = {len(available_processors)}, internal = {mafw_processors}, external = {other_processors}'
511 table.caption_style = 'italic green'
512 console = CONSOLE
513 console.print(Align.center(table))
514 return ReturnValue.OK
515 except Exception as e:
516 display_exception(e, show_traceback=obj['debug'])
517 return ReturnValue.Error
520@cli.command(name='steering')
521@click.pass_obj
522@click.option('--show/--no-show', default=False, help='Display the generated steering file on console')
523@click.option('--ext-plugins/--no-ext-plugin', default=True, help='Load external plugins')
524@click.option('--open-editor/--no-open-editor', default=False, help='Open the file in your editor.')
525@click.option(
526 '--db-engine',
527 type=click.Choice(['sqlite', 'mysql', 'postgresql'], case_sensitive=False),
528 help='Select a DB engine',
529 default='sqlite',
530)
531@click.option('--db-url', type=str, default=':memory:', help='URL to the DB')
532@click.argument('steering-file', type=click.Path())
533def generate_steering(
534 obj: dict[str, Any],
535 show: bool,
536 ext_plugins: bool,
537 open_editor: bool,
538 steering_file: pathlib.Path,
539 db_engine: str,
540 db_url: str,
541) -> ReturnValue:
542 """Generates a steering file with the default parameters of all available processors.
544 STEERING_FILE A path to the steering file to execute.
546 The user must modify the generated steering file to ensure it can be executed using the run command.
547 \f
549 :param obj: The context object being passed from the main command.
550 :type obj: dict
551 :param show: Display the steering file in the console after the generation. Defaults to False.
552 :type show: bool
553 :param ext_plugins: Extend the search for processor to external libraries.
554 :type ext_plugins: bool
555 :param open_editor: Open a text editor after the generation to allow direct editing.
556 :type open_editor: bool
557 :param steering_file: The steering file path.
558 :type steering_file: Path
559 :param db_engine: The name of the db engine.
560 :type db_engine: str
561 :param db_url: The URL of the database.
562 :type db_url: str
563 """
564 try:
565 plugin_manager = get_plugin_manager()
566 plugins = plugin_manager.load_plugins({'processors'})
567 available_processors = plugins.processor_list
568 # db_engine is already sure to be in the default conf because the Choice is assuring it.
569 database_conf = default_conf[db_engine]
570 database_conf['URL'] = db_scheme[db_engine] + db_url
571 generate_steering_file(steering_file, available_processors, database_conf)
573 if show:
574 console = CONSOLE
575 with open(steering_file) as fp:
576 text = fp.read()
577 with console.pager():
578 console.print(text, highlight=True)
579 console.print(Rule())
581 if open_editor:
582 click.edit(filename=str(steering_file))
583 else:
584 rprint(f'A generic steering file has been saved in [blue underline]{steering_file}[/blue underline].')
585 rprint('Open it in your favourite text editor, change the processors_to_run list and save it.')
586 rprint('')
587 rprint(f'To execute it launch: [blue]mafw run {steering_file}[/blue].')
589 return ReturnValue.OK
591 except Exception as e:
592 display_exception(e, show_traceback=obj['debug'])
593 return ReturnValue.Error
596@cli.command()
597@click.pass_obj
598@click.argument('steering-file', type=click.Path())
599def run(obj: dict[str, Any], steering_file: click.Path) -> ReturnValue:
600 """Runs a steering file.
602 STEERING_FILE A path to the steering file to execute.
604 \f
606 :param obj: The context object being passed from the main command.
607 :type obj: dict
608 :param steering_file: The path to the output steering file.
609 :type steering_file: Path
610 """
611 try:
612 app = MAFwApplication(steering_file) # type: ignore
613 pes = app.run()
614 if pes == ProcessorExitStatus.Successful:
615 rv = ReturnValue.OK
616 else:
617 rv = ReturnValue.Error
618 return rv
620 except AbortProcessorException:
621 return ReturnValue.Error
622 except Exception as e:
623 display_exception(e, show_traceback=obj['debug'])
624 return ReturnValue.Error
627@cli.group
628@click.pass_context
629def db(ctx: click.core.Context) -> None:
630 """
631 Advanced database commands.
633 The db group of commands offers a set of useful database operations. Invoke the help option of each command for
634 more details.
635 \f
637 :param ctx: The click context.
638 :type ctx: click.core.Context
639 """
642@db.command(name='wizard')
643@click.pass_context
644@click.option(
645 '-o',
646 '--output-file',
647 type=click.Path(),
648 default=pathlib.Path.cwd() / pathlib.Path('my_model.py'),
649 help='The name of the output file with the reflected model.',
650)
651@click.option('-s', '--schema', type=str, help='The name of the DB schema')
652@click.option(
653 '-t', '--tables', type=str, multiple=True, help='Generate model for selected tables. Multiple option possible.'
654)
655@click.option('--overwrite/--no-overwrite', default=True, help='Overwrite output file if already exists.')
656@click.option('--preserve-order/--no-preserve-order', default=True, help='Preserve column order.')
657@click.option('--with-views/--without-views', default=False, help='Include also database views.')
658@click.option('--ignore-unknown/--no-ignore-unknown', default=False, help='Ignore unknown fields.')
659@click.option('--snake-case/--no-snake-case', default=True, help='Use snake case for table and field names.')
660@click.option('--host', type=str, help='Hostname for the DB server.')
661@click.option('-p', '--port', type=int, help='Port number for the DB server.')
662@click.option('-u', '--user', '--username', type=str, help='Username for the connection to the DB server.')
663@click.option('--password', prompt=True, prompt_required=False, hide_input=True, help='Insert password when prompted')
664@click.option('-e', '--engine', type=click.Choice(sorted(DATABASE_MAP)), help='The DB engine')
665@click.argument('database', type=str)
666def wizard(
667 ctx: click.core.Context,
668 overwrite: bool,
669 tables: tuple[str, ...] | None,
670 preserve_order: bool,
671 with_views: bool,
672 ignore_unknown: bool,
673 snake_case: bool,
674 output_file: click.Path | pathlib.Path | str,
675 host: str,
676 port: int,
677 user: str,
678 password: str,
679 engine: str,
680 schema: str,
681 database: str,
682) -> ReturnValue:
683 """
684 Reflect an existing DB into a python module.
686 mafw db wizard [Options] Database
688 Database Name of the Database to be reflected.
690 About connection options (user / host / port):
692 That information will be used only in case you are trying to access a network database (MySQL or PostgreSQL). In
693 case of Sqlite, the parameters will be discarded.
695 About passwords:
697 If you need to specify a password to connect to the DB server, just add --password in the command line without
698 typing your password as clear text. You will be prompted to insert the password with hidden characters at the start
699 of the processor.
701 About engines:
703 The full list of supported engines is provided in the option below. If you do not specify any
704 engine and the database is actually an existing filename, then engine is set to Sqlite, otherwise to postgresql.
706 \f
708 :param database: The name of the database.
709 :type database: str
710 :param schema: The database schema to be reflected.
711 :type schema: str
712 :param engine: The database engine. A selection of possible values is provided in the script help.
713 :type engine: str
714 :param password: The password for the DB connection. Not used in case of Sqlite.
715 :type password: str
716 :param user: The username for the DB connection. Not used in case of Sqlite.
717 :type user: str
718 :param port: The port number of the database server. Not used in case of Sqlite.
719 :type port: int
720 :param host: The database hostname. Not used in case of Sqlite.
721 :type host: str
722 :param output_file: The filename for the output python module.
723 :type output_file: click.Path | pathlib.Path | str
724 :param snake_case: Flag to select snake_case convention for table and field names, or all small letter formatting.
725 :type snake_case: bool
726 :param ignore_unknown: Flag to ignore unknown fields. If False, an unknown field will be labelled with UnknownField.
727 :type ignore_unknown: bool
728 :param with_views: Flag to include views in the reflected elements.
729 :type with_views: bool
730 :param preserve_order: Flag to select if table fields should be reflected in the original order (True) or in
731 alphabetical order (False)
732 :type preserve_order: bool
733 :param tables: A tuple containing a selection of table names to be reflected.
734 :type tables: tuple[str, ...]
735 :param overwrite: Flag to overwrite the output file if exists. If False and the output file already exists, the
736 user can decide what to do.
737 :type overwrite: bool
738 :param ctx: The click context, that includes the original object with global options.
739 :type ctx: click.core.Context
740 :return: The script return value
741 """
742 obj = ctx.obj
744 if isinstance(output_file, (str, click.Path)): 744 ↛ 748line 744 didn't jump to line 748 because the condition on line 744 was always true
745 output_file = pathlib.Path(str(output_file))
747 # if not overwrite, check if the file exists
748 if not overwrite and output_file.exists():
749 answer = Prompt.ask(
750 f'A module ({output_file.name}) already exists. Do you want to overwrite, cancel or backup?',
751 case_sensitive=False,
752 choices=['o', 'c', 'b'],
753 show_choices=True,
754 show_default=True,
755 default='b',
756 )
757 if answer == 'c':
758 return ReturnValue.OK
759 elif answer == 'b': 759 ↛ 766line 759 didn't jump to line 766 because the condition on line 759 was always true
760 bck_filename = output_file.parent / pathlib.Path(
761 output_file.stem + f'_{datetime.datetime.now():%Y%m%dT%H%M%S}' + output_file.suffix
762 )
764 shutil.copy(output_file, bck_filename)
766 if tables == ():
767 tables = None
769 if engine is None:
770 engine = 'sqlite' if pathlib.Path(database).exists() else 'postgresql'
772 # prepare the connection options
773 if engine in ['sqlite', 'sqlite3']:
774 # for sqlite the connection
775 keys: list[str] = ['schema']
776 values: list[str | int] = [schema]
777 else:
778 keys = ['host', 'port', 'user', 'schema', 'password']
779 values = [host, port, user, schema, password]
781 connection_options: dict[str, Any] = {}
782 for k, v in zip(keys, values):
783 if v:
784 connection_options[k] = v
786 try:
787 introspector = make_introspector(engine, database, **connection_options)
788 except Exception as e:
789 msg = f'[red]Problem generating an introspector instance of {database}.'
790 display_exception(e, show_traceback=obj['debug'])
791 return ReturnValue.Error
793 try:
794 with open(output_file, 'tw') as out_file:
795 dump_models(
796 out_file,
797 introspector,
798 tables,
799 preserve_order=preserve_order,
800 include_views=with_views,
801 ignore_unknown=ignore_unknown,
802 snake_case=snake_case,
803 )
804 except Exception as e:
805 display_exception(e, obj['debug'])
806 return ReturnValue.Error
808 msg = f'[green]Database {database} successfully reflected in {output_file.name}'
809 log.info(msg)
810 return ReturnValue.OK
813if __name__ == '__main__':
814 # Use the custom main method that handles exit codes
815 cli.main()