Coverage for src / mafw / scripts / mafw_exe.py: 97%

277 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 16:10 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5The execution framework. 

6 

7This module provides the run functionality to the whole library. 

8 

9It is heavily relying on ``click`` for the generation of commands, options, and arguments. 

10 

11.. click:: mafw.scripts.mafw_exe:cli 

12 :prog: mafw 

13 :nested: full 

14 

15""" 

16 

17import datetime 

18import logging 

19import pathlib 

20import shutil 

21import sys 

22import warnings 

23from enum import IntEnum 

24from typing import TYPE_CHECKING, Any 

25 

26import click 

27from click import ClickException 

28from pwiz import DATABASE_MAP, make_introspector # type: ignore[import-untyped] 

29from rich import print as rprint 

30from rich import traceback 

31from rich.align import Align 

32from rich.console import Console 

33from rich.logging import RichHandler 

34from rich.prompt import Prompt 

35from rich.rule import Rule 

36from rich.table import Table 

37from rich_pyfiglet import RichFiglet 

38 

39from mafw.__about__ import __version__ 

40from mafw.db.db_configurations import db_scheme, default_conf 

41from mafw.db.db_wizard import dump_models 

42from mafw.enumerators import ProcessorExitStatus 

43from mafw.lazy_import import LazyImportProcessor 

44from mafw.mafw_errors import AbortProcessorException 

45from mafw.plugin_manager import get_plugin_manager 

46from mafw.runner import MAFwApplication 

47from mafw.tools.parallel import is_free_threading 

48from mafw.tools.toml_tools import generate_steering_file 

49 

50suppress = [click] 

51traceback.install(show_locals=True, suppress=suppress) 

52 

53LEVELS = {'debug': 10, 'info': 20, 'warning': 30, 'error': 40, 'critical': 50} 

54CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) 

55 

56_warnings_captured = False 

57 

58welcome_message = RichFiglet( 

59 'MAFw', 

60 colors=['#ff0000', 'magenta1', 'blue3'], 

61 horizontal=True, 

62 # font='banner4', 

63 remove_blank_lines=True, 

64 border='ROUNDED', 

65 border_color='#ff0000', 

66) 

67 

68 

69def print_banner(ctx: click.Context) -> None: 

70 """ 

71 Print the welcome banner only once, only for rich UI, 

72 and only if not disabled. 

73 

74 .. note:: 

75 

76 ctx.obj is not yet populated when Group.invoke() is called, 

77 but ctx.params contains parsed options for the current group. We 

78 therefore inspect the root context params. 

79 """ 

80 # avoid printing while click is still doing resilient parsing (e.g., --help) 

81 if getattr(ctx, 'resilient_parsing', False): 

82 return 

83 

84 # find the root context (so nested groups share the same parsed params) 

85 root = ctx 

86 while root.parent is not None: 

87 root = root.parent 

88 

89 # look for options in root.params first (these are already parsed) 

90 params = getattr(root, 'params', {}) or {} 

91 ui = params.get('ui') 

92 no_banner = params.get('no_banner') 

93 

94 # Normalize and check 

95 if no_banner: 

96 return 

97 if not ui: # pragma: no cover 

98 # if nothing available, be conservative and don't print 

99 return 

100 if str(ui).lower() != 'rich': 

101 return 

102 

103 # Print only once per process / invocation 

104 if root.meta.get('_banner_printed', False): 

105 return 

106 

107 console = Console() 

108 console.print(welcome_message) 

109 root.meta['_banner_printed'] = True 

110 

111 

112def custom_formatwarning( 

113 message: Warning | str, category: type[Warning], filename: str, lineno: int, line: str | None = None 

114) -> str: 

115 """Return the pure message of the warning.""" 

116 return str(message) 

117 

118 

119def is_bugged_version() -> bool: 

120 """Check if the Python version has a known bug. 

121 

122 This function checks if the current Python version is affected by a specific bug 

123 present in Python 3.14.0 through 3.14.3. The bug relates to warning capture issues 

124 when running in free-threading mode. 

125 

126 .. note:: 

127 

128 This is a temporary check to work around a bug in Python up to 3.14.3. 

129 See https://github.com/python/cpython/pull/146374 for more details. 

130 

131 :return: True if the Python version is affected by the bug, False otherwise. 

132 :rtype: bool 

133 """ 

134 version = sys.version_info 

135 return version >= (3, 14) and version.micro <= 3 

136 

137 

138warnings.formatwarning = custom_formatwarning 

139# TODO: python bug 

140# there is a bug in the python 3.14t up to 3.14.3 

141# see https://github.com/python/cpython/pull/146374 

142# until the bug will be fixed and the 3.14.4 will be releases 

143# we won't be able to redirect the warnings to the logging when 

144# working with the free-threading version. 

145# The bug has been fixed, but we need to wait until the release of 3.14.4 

146if is_bugged_version() <= 3 and is_free_threading(): 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 logging.captureWarnings(False) 

148else: 

149 logging.captureWarnings(True) 

150 

151# get the root logger 

152log = logging.getLogger() 

153 

154 

155class ReturnValue(IntEnum): 

156 """Enumerator to handle the script return value.""" 

157 

158 OK = 0 

159 """No error""" 

160 

161 Error = 1 

162 """Generic error""" 

163 

164 

165def logger_setup(level: str, ui: str, tracebacks: bool) -> None: 

166 """Set up the logger. 

167 

168 This function is actually configuring the root logger level from the command line options and it attaches either 

169 a RichHandler or a StreamHandler depending on the user interface type. 

170 

171 The `tracebacks` flag is used only by the RichHandler. Printing the tracebacks is rather useful when debugging 

172 the code, but it could be detrimental for final users. In normal circumstances, tracebacks is set to False, 

173 and is turned on when the debug flag is activated. 

174 

175 :param level: Logging level as a string. 

176 :type level: str 

177 :param ui: User interface as a string ('rich' or 'console'). 

178 :type ui: str 

179 :param tracebacks: Enable/disable the logging of exception tracebacks. 

180 """ 

181 level = level.lower() 

182 ui = ui.lower() 

183 

184 log.setLevel(LEVELS[level]) 

185 handler: logging.Handler 

186 

187 if ui == 'rich': 

188 fs = '%(message)s' 

189 handler = RichHandler( 

190 rich_tracebacks=tracebacks, markup=True, show_path=False, log_time_format='%Y%m%d-%H:%M:%S' 

191 ) 

192 else: 

193 fs = '%(asctime)s - %(levelname)s - %(message)s' 

194 handler = logging.StreamHandler() 

195 

196 formatter = logging.Formatter(fs) 

197 handler.setFormatter(formatter) 

198 log.addHandler(handler) 

199 # _ensure_warning_capture() 

200 

201 

202def display_exception(exception: Exception, show_traceback: bool = False) -> None: 

203 """ 

204 Display exception information with optional debug details. 

205 

206 This function logs exception information at the critical level. When show_traceback is enabled, 

207 it logs the full exception including traceback information. Otherwise, it logs a simplified 

208 message directing users to enable debug mode for more details. 

209 

210 :param exception: The exception to be displayed and logged. 

211 :type exception: Exception 

212 :param show_traceback: Flag indicating whether to show detailed traceback information. Defaults to False 

213 :type show_traceback: bool 

214 """ 

215 

216 if show_traceback: 

217 log.critical('A critical error occurred') 

218 log.exception(exception) 

219 else: 

220 log.critical('A critical error occurred. Set option -D to get traceback output') 

221 log.exception( 

222 '%s: %s' % (exception.__class__.__name__, exception), exc_info=False, stack_info=False, stacklevel=1 

223 ) 

224 

225 

226class MAFwGroup(click.Group): 

227 """Custom Click Group for MAFw runner. 

228 

229 It implements two main features: 

230 

231 1. Support commands abbreviation. Instead of providing the whole command, 

232 the user can use whatever abbreviation instead as long as it is unique. 

233 So for example, instead of `mafw list`, the use can provide `mafw l` and the result 

234 will be the same. 

235 2. Implements the cascading of return values among different command levels. 

236 """ 

237 

238 def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None: 

239 """ 

240 Return a command. 

241 

242 Given a context and a command name as passed from the CLI, the click.Command is returned. 

243 This method overloads the basic one allowing to use command abbreviations. 

244 

245 If more than one match is found, then an error is raised. 

246 

247 If no matches are found, then click will handle this case as in the standard situation. 

248 

249 :param ctx: The click context 

250 :param cmd_name: The command name as provided from the CLI 

251 :return: The corresponding command or None if no command is found. 

252 """ 

253 rv = super().get_command(ctx, cmd_name) 

254 

255 if rv is not None: 

256 return rv 

257 

258 matches = [x for x in self.list_commands(ctx) if x.startswith(cmd_name)] 

259 

260 if not matches: 

261 return None 

262 

263 if len(matches) == 1: 263 ↛ 266line 263 didn't jump to line 266 because the condition on line 263 was always true

264 return click.Group.get_command(self, ctx, matches[0]) 

265 

266 ctx.fail(f'Too many matches: {", ".join(sorted(matches))}') 

267 

268 def resolve_command( 

269 self, ctx: click.Context, args: list[str] 

270 ) -> tuple[str | None, click.Command | None, list[str]]: 

271 # always return the full command name 

272 _, cmd, args = super().resolve_command(ctx, args) 

273 if TYPE_CHECKING: 

274 assert isinstance(cmd, click.Command) 

275 return cmd.name, cmd, args 

276 

277 def invoke(self, ctx: click.Context) -> Any: 

278 """Invoke the command. 

279 

280 This override method is just wrapping the base invoke call in a try / except block. 

281 

282 In the case of a ClickException, then this is shown and its exit code is used passed to the sys.exit call. In 

283 case of a SystemExit or click.exceptions.Exit, then this is simply re-raised, so that Click can handle it as 

284 in normal circumstances In all other cases, the exception is caught and the sys.exit is called with the 

285 :attr:`.ReturnValue.Error`. 

286 

287 :param ctx: The click context 

288 :return: The return value of the invoked command 

289 """ 

290 print_banner(ctx) 

291 

292 try: 

293 return super().invoke(ctx) 

294 

295 except ClickException as e: 

296 e.show() 

297 sys.exit(e.exit_code) 

298 except (SystemExit, click.exceptions.Exit): 

299 # Re-raise SystemExit to maintain Click's normal behavior 

300 raise 

301 except Exception: 

302 # For any other exception, exit with error code 

303 sys.exit(ReturnValue.Error) 

304 

305 def main(self, *args: Any, **kwargs: Any) -> None: # type: ignore[override] 

306 """Override main to handle return values properly.""" 

307 try: 

308 # Get the result from the command 

309 rv = super().main(*args, standalone_mode=False, **kwargs) # type: ignore[call-overload] 

310 

311 # If the command returned a ReturnValue or an integer, use it as exit code 

312 if isinstance(rv, (ReturnValue, int)): 

313 sys.exit(rv) 

314 else: 

315 # Default to success if no return value or unknown return value 

316 sys.exit(ReturnValue.OK) 

317 

318 except ClickException as e: 

319 e.show() 

320 sys.exit(e.exit_code) 

321 except SystemExit: 

322 # Re-raise SystemExit (this includes calls to sys.exit()) 

323 raise 

324 except Exception: 

325 # For any unhandled exception, exit with error code 

326 sys.exit(ReturnValue.Error) 

327 

328 

329@click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS, name='mafw', cls=MAFwGroup) 

330@click.pass_context 

331@click.option( 

332 '--log-level', 

333 type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False), 

334 show_default=True, 

335 default='info', 

336 help='Log level', 

337) 

338@click.option( 

339 '--ui', 

340 type=click.Choice(['console', 'rich'], case_sensitive=False), 

341 default='rich', 

342 help='The user interface', 

343 show_default=True, 

344) 

345@click.option('-D', '--debug', is_flag=True, default=False, help='Show debug information about errors') 

346@click.option('--no-banner', is_flag=True, default=False, help='Disable the welcome banner') 

347@click.version_option(__version__, '-v', '--version') 

348def cli(ctx: click.core.Context, log_level: str, ui: str, debug: bool, no_banner: bool) -> None: 

349 """ 

350 The Modular Analysis Framework execution. 

351 

352 This is the command line interface where you can configure and launch your analysis tasks. 

353 

354 More information on our documentation page. 

355 \f 

356 

357 :param ctx: The click context. 

358 :type ctx: click.core.Context 

359 :param log_level: The logging level as a string. Choice from debug, info, warning, error and critical. 

360 :type log_level: str 

361 :param ui: The user interface as a string. Choice from console and rich. 

362 :type ui: str 

363 :param debug: Flag to show debug information about exception. 

364 :type debug: bool 

365 :param no_banner: Flag to disable the welcome banner. 

366 :type no_banner: bool 

367 """ 

368 ctx.ensure_object(dict) 

369 ctx.obj = {'log_level': log_level, 'ui': ui, 'debug': debug, 'no_banner': no_banner} 

370 logger_setup(log_level, ui, debug) 

371 

372 if ctx.invoked_subcommand is None: 

373 rprint('Use --help to get a quick help on the mafw command.') 

374 

375 

376@cli.command(name='list') 

377@click.pass_obj 

378def list_processors(obj: dict[str, Any]) -> ReturnValue: 

379 """Display the list of available processors. 

380 

381 This command will retrieve all available processors via the plugin manager. Both internal and external processors 

382 will be listed if the ext-plugins option is passed. 

383 \f 

384 

385 """ 

386 try: 

387 plugin_manager = get_plugin_manager() 

388 plugins = plugin_manager.load_plugins({'processors'}) 

389 available_processors = plugins.processor_list 

390 print('\n') 

391 table = Table( 

392 title='Available processors', 

393 header_style='orange3', 

394 expand=True, 

395 title_style='italic red', 

396 ) 

397 table.add_column('Processor Name', justify='left', style='cyan') 

398 table.add_column('Package Name', justify='left', style='cyan') 

399 table.add_column('Module', justify='left', style='cyan') 

400 mafw_processors = 0 

401 other_processors = 0 

402 for processor in available_processors: 

403 if isinstance(processor, LazyImportProcessor): 

404 package, module = processor.plugin_qualname.split('.', 1) 

405 name = processor.plugin_name 

406 else: 

407 package, module = processor.__module__.split('.', 1) 

408 name = processor.__name__ 

409 table.add_row(name, package, module) 

410 if package == 'mafw': 

411 mafw_processors += 1 

412 else: 

413 other_processors += 1 

414 table.caption = f'Total processors = {len(available_processors)}, internal = {mafw_processors}, external = {other_processors}' 

415 table.caption_style = 'italic green' 

416 console = Console() 

417 console.print(Align.center(table)) 

418 return ReturnValue.OK 

419 except Exception as e: 

420 display_exception(e, show_traceback=obj['debug']) 

421 return ReturnValue.Error 

422 

423 

424@cli.command(name='steering') 

425@click.pass_obj 

426@click.option('--show/--no-show', default=False, help='Display the generated steering file on console') 

427@click.option('--ext-plugins/--no-ext-plugin', default=True, help='Load external plugins') 

428@click.option('--open-editor/--no-open-editor', default=False, help='Open the file in your editor.') 

429@click.option( 

430 '--db-engine', 

431 type=click.Choice(['sqlite', 'mysql', 'postgresql'], case_sensitive=False), 

432 help='Select a DB engine', 

433 default='sqlite', 

434) 

435@click.option('--db-url', type=str, default=':memory:', help='URL to the DB') 

436@click.argument('steering-file', type=click.Path()) 

437def generate_steering( 

438 obj: dict[str, Any], 

439 show: bool, 

440 ext_plugins: bool, 

441 open_editor: bool, 

442 steering_file: pathlib.Path, 

443 db_engine: str, 

444 db_url: str, 

445) -> ReturnValue: 

446 """Generates a steering file with the default parameters of all available processors. 

447 

448 STEERING_FILE A path to the steering file to execute. 

449 

450 The user must modify the generated steering file to ensure it can be executed using the run command. 

451 \f 

452 

453 :param obj: The context object being passed from the main command. 

454 :type obj: dict 

455 :param show: Display the steering file in the console after the generation. Defaults to False. 

456 :type show: bool 

457 :param ext_plugins: Extend the search for processor to external libraries. 

458 :type ext_plugins: bool 

459 :param open_editor: Open a text editor after the generation to allow direct editing. 

460 :type open_editor: bool 

461 :param steering_file: The steering file path. 

462 :type steering_file: Path 

463 :param db_engine: The name of the db engine. 

464 :type db_engine: str 

465 :param db_url: The URL of the database. 

466 :type db_url: str 

467 """ 

468 try: 

469 plugin_manager = get_plugin_manager() 

470 plugins = plugin_manager.load_plugins({'processors'}) 

471 available_processors = plugins.processor_list 

472 # db_engine is already sure to be in the default conf because the Choice is assuring it. 

473 database_conf = default_conf[db_engine] 

474 database_conf['URL'] = db_scheme[db_engine] + db_url 

475 generate_steering_file(steering_file, available_processors, database_conf) 

476 

477 if show: 

478 console = Console() 

479 with open(steering_file) as fp: 

480 text = fp.read() 

481 with console.pager(): 

482 console.print(text, highlight=True) 

483 console.print(Rule()) 

484 

485 if open_editor: 

486 click.edit(filename=str(steering_file)) 

487 else: 

488 rprint(f'A generic steering file has been saved in [blue underline]{steering_file}[/blue underline].') 

489 rprint('Open it in your favourite text editor, change the processors_to_run list and save it.') 

490 rprint('') 

491 rprint(f'To execute it launch: [blue]mafw run {steering_file}[/blue].') 

492 

493 return ReturnValue.OK 

494 

495 except Exception as e: 

496 display_exception(e, show_traceback=obj['debug']) 

497 return ReturnValue.Error 

498 

499 

500@cli.command() 

501@click.pass_obj 

502@click.argument('steering-file', type=click.Path()) 

503def run(obj: dict[str, Any], steering_file: click.Path) -> ReturnValue: 

504 """Runs a steering file. 

505 

506 STEERING_FILE A path to the steering file to execute. 

507 

508 \f 

509 

510 :param obj: The context object being passed from the main command. 

511 :type obj: dict 

512 :param steering_file: The path to the output steering file. 

513 :type steering_file: Path 

514 """ 

515 try: 

516 app = MAFwApplication(steering_file) # type: ignore 

517 pes = app.run() 

518 if pes == ProcessorExitStatus.Successful: 

519 rv = ReturnValue.OK 

520 else: 

521 rv = ReturnValue.Error 

522 return rv 

523 

524 except AbortProcessorException: 

525 return ReturnValue.Error 

526 except Exception as e: 

527 display_exception(e, show_traceback=obj['debug']) 

528 return ReturnValue.Error 

529 

530 

531@cli.group 

532@click.pass_context 

533def db(ctx: click.core.Context) -> None: 

534 """ 

535 Advanced database commands. 

536 

537 The db group of commands offers a set of useful database operations. Invoke the help option of each command for 

538 more details. 

539 \f 

540 

541 :param ctx: The click context. 

542 :type ctx: click.core.Context 

543 """ 

544 

545 

546@db.command(name='wizard') 

547@click.pass_context 

548@click.option( 

549 '-o', 

550 '--output-file', 

551 type=click.Path(), 

552 default=pathlib.Path.cwd() / pathlib.Path('my_model.py'), 

553 help='The name of the output file with the reflected model.', 

554) 

555@click.option('-s', '--schema', type=str, help='The name of the DB schema') 

556@click.option( 

557 '-t', '--tables', type=str, multiple=True, help='Generate model for selected tables. Multiple option possible.' 

558) 

559@click.option('--overwrite/--no-overwrite', default=True, help='Overwrite output file if already exists.') 

560@click.option('--preserve-order/--no-preserve-order', default=True, help='Preserve column order.') 

561@click.option('--with-views/--without-views', default=False, help='Include also database views.') 

562@click.option('--ignore-unknown/--no-ignore-unknown', default=False, help='Ignore unknown fields.') 

563@click.option('--snake-case/--no-snake-case', default=True, help='Use snake case for table and field names.') 

564@click.option('--host', type=str, help='Hostname for the DB server.') 

565@click.option('-p', '--port', type=int, help='Port number for the DB server.') 

566@click.option('-u', '--user', '--username', type=str, help='Username for the connection to the DB server.') 

567@click.option('--password', prompt=True, prompt_required=False, hide_input=True, help='Insert password when prompted') 

568@click.option('-e', '--engine', type=click.Choice(sorted(DATABASE_MAP)), help='The DB engine') 

569@click.argument('database', type=str) 

570def wizard( 

571 ctx: click.core.Context, 

572 overwrite: bool, 

573 tables: tuple[str, ...] | None, 

574 preserve_order: bool, 

575 with_views: bool, 

576 ignore_unknown: bool, 

577 snake_case: bool, 

578 output_file: click.Path | pathlib.Path | str, 

579 host: str, 

580 port: int, 

581 user: str, 

582 password: str, 

583 engine: str, 

584 schema: str, 

585 database: str, 

586) -> ReturnValue: 

587 """ 

588 Reflect an existing DB into a python module. 

589 

590 mafw db wizard [Options] Database 

591 

592 Database Name of the Database to be reflected. 

593 

594 About connection options (user / host / port): 

595 

596 That information will be used only in case you are trying to access a network database (MySQL or PostgreSQL). In 

597 case of Sqlite, the parameters will be discarded. 

598 

599 About passwords: 

600 

601 If you need to specify a password to connect to the DB server, just add --password in the command line without 

602 typing your password as clear text. You will be prompted to insert the password with hidden characters at the start 

603 of the processor. 

604 

605 About engines: 

606 

607 The full list of supported engines is provided in the option below. If you do not specify any 

608 engine and the database is actually an existing filename, then engine is set to Sqlite, otherwise to postgresql. 

609 

610 \f 

611 

612 :param database: The name of the database. 

613 :type database: str 

614 :param schema: The database schema to be reflected. 

615 :type schema: str 

616 :param engine: The database engine. A selection of possible values is provided in the script help. 

617 :type engine: str 

618 :param password: The password for the DB connection. Not used in case of Sqlite. 

619 :type password: str 

620 :param user: The username for the DB connection. Not used in case of Sqlite. 

621 :type user: str 

622 :param port: The port number of the database server. Not used in case of Sqlite. 

623 :type port: int 

624 :param host: The database hostname. Not used in case of Sqlite. 

625 :type host: str 

626 :param output_file: The filename for the output python module. 

627 :type output_file: click.Path | pathlib.Path | str 

628 :param snake_case: Flag to select snake_case convention for table and field names, or all small letter formatting. 

629 :type snake_case: bool 

630 :param ignore_unknown: Flag to ignore unknown fields. If False, an unknown field will be labelled with UnknownField. 

631 :type ignore_unknown: bool 

632 :param with_views: Flag to include views in the reflected elements. 

633 :type with_views: bool 

634 :param preserve_order: Flag to select if table fields should be reflected in the original order (True) or in 

635 alphabetical order (False) 

636 :type preserve_order: bool 

637 :param tables: A tuple containing a selection of table names to be reflected. 

638 :type tables: tuple[str, ...] 

639 :param overwrite: Flag to overwrite the output file if exists. If False and the output file already exists, the 

640 user can decide what to do. 

641 :type overwrite: bool 

642 :param ctx: The click context, that includes the original object with global options. 

643 :type ctx: click.core.Context 

644 :return: The script return value 

645 """ 

646 obj = ctx.obj 

647 

648 if isinstance(output_file, (str, click.Path)): 648 ↛ 652line 648 didn't jump to line 652 because the condition on line 648 was always true

649 output_file = pathlib.Path(str(output_file)) 

650 

651 # if not overwrite, check if the file exists 

652 if not overwrite and output_file.exists(): 

653 answer = Prompt.ask( 

654 f'A module ({output_file.name}) already exists. Do you want to overwrite, cancel or backup?', 

655 case_sensitive=False, 

656 choices=['o', 'c', 'b'], 

657 show_choices=True, 

658 show_default=True, 

659 default='b', 

660 ) 

661 if answer == 'c': 

662 return ReturnValue.OK 

663 elif answer == 'b': 663 ↛ 670line 663 didn't jump to line 670 because the condition on line 663 was always true

664 bck_filename = output_file.parent / pathlib.Path( 

665 output_file.stem + f'_{datetime.datetime.now():%Y%m%dT%H%M%S}' + output_file.suffix 

666 ) 

667 

668 shutil.copy(output_file, bck_filename) 

669 

670 if tables == (): 

671 tables = None 

672 

673 if engine is None: 

674 engine = 'sqlite' if pathlib.Path(database).exists() else 'postgresql' 

675 

676 # prepare the connection options 

677 if engine in ['sqlite', 'sqlite3']: 

678 # for sqlite the connection 

679 keys: list[str] = ['schema'] 

680 values: list[str | int] = [schema] 

681 else: 

682 keys = ['host', 'port', 'user', 'schema', 'password'] 

683 values = [host, port, user, schema, password] 

684 

685 connection_options: dict[str, Any] = {} 

686 for k, v in zip(keys, values): 

687 if v: 

688 connection_options[k] = v 

689 

690 try: 

691 introspector = make_introspector(engine, database, **connection_options) 

692 except Exception as e: 

693 msg = f'[red]Problem generating an introspector instance of {database}.' 

694 display_exception(e, show_traceback=obj['debug']) 

695 return ReturnValue.Error 

696 

697 try: 

698 with open(output_file, 'tw') as out_file: 

699 dump_models( 

700 out_file, 

701 introspector, 

702 tables, 

703 preserve_order=preserve_order, 

704 include_views=with_views, 

705 ignore_unknown=ignore_unknown, 

706 snake_case=snake_case, 

707 ) 

708 except Exception as e: 

709 display_exception(e, obj['debug']) 

710 return ReturnValue.Error 

711 

712 msg = f'[green]Database {database} successfully reflected in {output_file.name}' 

713 log.info(msg) 

714 return ReturnValue.OK 

715 

716 

717if __name__ == '__main__': 

718 # Use the custom main method that handles exit codes 

719 cli.main()