Coverage for src / mafw / scripts / mafw_exe.py: 94%

321 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-06-28 13:34 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5The execution framework. 

6 

7This module provides the run functionality to the whole library. 

8 

9It is heavily relying on ``click`` for the generation of commands, options, and arguments. 

10 

11.. click:: mafw.scripts.mafw_exe:cli 

12 :prog: mafw 

13 :nested: full 

14 

15""" 

16 

17import datetime 

18import logging 

19import pathlib 

20import shutil 

21import sys 

22import warnings 

23from enum import IntEnum 

24from typing import Any 

25 

26import click 

27from click.exceptions import ClickException 

28from pwiz import DATABASE_MAP, make_introspector # type: ignore[import-untyped] 

29from rich import print as rprint 

30from rich import traceback 

31from rich.align import Align 

32from rich.logging import RichHandler 

33from rich.prompt import Prompt 

34from rich.rule import Rule 

35from rich.table import Table 

36from rich_pyfiglet import RichFiglet 

37 

38from mafw.__about__ import __version__ 

39from mafw.db.db_configurations import db_scheme, default_conf 

40from mafw.db.db_wizard import dump_models 

41from mafw.enumerators import ProcessorExitStatus 

42from mafw.lazy_import import LazyImportProcessor 

43from mafw.mafw_errors import AbortProcessorException 

44from mafw.plugin_manager import get_plugin_manager 

45from mafw.runner import MAFwApplication 

46from mafw.tools.click_extensions import ( 

47 AbbreviateGroup, 

48 check_ci_completion_guard, 

49 completion_script_path, 

50 completion_source_script, 

51 install_completion, 

52 is_script_already_installed, 

53 resolve_completion_shell, 

54 uninstall_completion_files, 

55) 

56from mafw.tools.parallel import is_free_threading 

57from mafw.tools.shell_tools import CONSOLE 

58from mafw.tools.toml_tools import generate_steering_file 

59 

60suppress = [click] 

61traceback.install(show_locals=True, suppress=suppress) 

62 

63 

64class MAFwGroup(AbbreviateGroup): 

65 """Click group with abbreviation and MAFw-specific exit handling.""" 

66 

67 group_class: type[click.Group] = AbbreviateGroup 

68 

69 def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]: 

70 """Parse arguments and record the selected root command before invoke.""" 

71 rv = super().parse_args(ctx, args) 

72 if ( 72 ↛ 76line 72 didn't jump to line 76 because the condition on line 72 was always true

73 ctx.parent is None 

74 ): # this is assuring that we store only the first command corresponding to the root context 

75 ctx.meta['_mafw_invoked_subcommand'] = self._detect_command_name(args) 

76 return rv 

77 

78 def _detect_command_name(self, args: list[str]) -> str | None: 

79 """Return the first subcommand token while skipping root option values.""" 

80 # This is probably an overkill. After the default parsing the first element in args should be the first 

81 # (abbreviated or fully typed) command. All global options will be removed from args. 

82 # so potentially we could return self.get_command(ctx, args[0]), 

83 # I will leave it like this because it might be useful in the future. 

84 option_params = {opt for param in self.params for opt in getattr(param, 'opts', [])} 

85 expects_value = { 

86 opt for param in self.params if not getattr(param, 'is_flag', False) for opt in getattr(param, 'opts', []) 

87 } 

88 skip_next = False 

89 for token in args: 

90 if skip_next: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 skip_next = False 

92 continue 

93 if token in option_params: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true

94 if token in expects_value: 

95 skip_next = True 

96 continue 

97 if token.startswith('-'): 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true

98 continue 

99 selected = self.get_command(click.Context(self), token) 

100 return selected.name if selected is not None else token 

101 return None 

102 

103 def invoke(self, ctx: click.Context) -> Any: 

104 """Invoke the command and normalize Click exceptions to MAFw exits.""" 

105 print_banner(ctx) 

106 

107 try: 

108 return super().invoke(ctx) 

109 except ClickException as exc: 

110 exc.show() 

111 sys.exit(exc.exit_code) 

112 except (SystemExit, click.exceptions.Exit): 

113 raise 

114 except Exception: 

115 sys.exit(ReturnValue.Error) 

116 

117 def main(self, *args: Any, **kwargs: Any) -> None: # type: ignore[override] 

118 """Run the CLI and convert returned statuses to process exit codes.""" 

119 try: 

120 rv = super().main(*args, standalone_mode=False, **kwargs) # type: ignore[call-overload] 

121 if isinstance(rv, (ReturnValue, int)): 

122 sys.exit(rv) 

123 sys.exit(ReturnValue.OK) 

124 except ClickException as exc: 

125 exc.show() 

126 sys.exit(exc.exit_code) 

127 except SystemExit: 

128 raise 

129 except Exception: 

130 sys.exit(ReturnValue.Error) 

131 

132 

133LEVELS = {'debug': 10, 'info': 20, 'warning': 30, 'error': 40, 'critical': 50} 

134CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) 

135 

136_warnings_captured = False 

137 

138welcome_message = RichFiglet( 

139 'MAFw', 

140 colors=['#ff0000', 'magenta1', 'blue3'], 

141 horizontal=True, 

142 # font='banner4', 

143 remove_blank_lines=True, 

144 border='ROUNDED', 

145 border_color='#ff0000', 

146) 

147 

148 

149def print_banner(ctx: click.Context) -> None: 

150 """ 

151 Print the welcome banner only once, only for rich UI, 

152 and only if not disabled. 

153 

154 .. note:: 

155 

156 ctx.obj is not yet populated when Group.invoke() is called, 

157 but ctx.params contains parsed options for the current group. We 

158 therefore inspect the root context params. 

159 """ 

160 # avoid printing while click is still doing resilient parsing (e.g., --help) 

161 if getattr(ctx, 'resilient_parsing', False): 

162 return 

163 

164 root = ctx.find_root() 

165 

166 # Parse-time command resolution records the selected subcommand before 

167 # invoke() emits any output, so completion can remain quiet. 

168 # Possible improvements: allow selection of commands for which there must not be any banner printing. 

169 if root.meta.get('_mafw_invoked_subcommand') == 'completion': 

170 return 

171 

172 params = getattr(root, 'params', {}) or {} 

173 ui = params.get('ui') 

174 no_banner = params.get('no_banner') 

175 

176 # Normalize and check 

177 if no_banner: 

178 return 

179 if not ui: # pragma: no cover 

180 # if nothing available, be conservative and don't print 

181 return 

182 if str(ui).lower() != 'rich': 

183 return 

184 

185 # Print only once per process / invocation 

186 if root.meta.get('_banner_printed', False): 

187 return 

188 

189 console = CONSOLE 

190 console.print(welcome_message) 

191 root.meta['_banner_printed'] = True 

192 

193 

194def custom_formatwarning( 

195 message: Warning | str, category: type[Warning], filename: str, lineno: int, line: str | None = None 

196) -> str: 

197 """Return the pure message of the warning.""" 

198 return str(message) 

199 

200 

201def is_bugged_warning_capture_version() -> bool: 

202 """Check if the Python version has the warning capture bug in free-threading mode. 

203 

204 Python 3.14.0 through 3.14.3 (free-threading builds only) have a bug where 

205 ``logging.captureWarnings(True)`` can crash or misbehave. This function returns 

206 True when the current interpreter is affected. 

207 

208 .. note:: 

209 

210 See https://github.com/python/cpython/pull/146374 for details. 

211 

212 .. todo:: 

213 

214 Remove this workaround when Python 3.14 is no longer a supported version 

215 (expected late 2030). 

216 

217 :return: True if the interpreter is affected by the warning capture bug, False otherwise. 

218 :rtype: bool 

219 """ 

220 version = sys.version_info 

221 return version.major == 3 and version.minor == 14 and version.micro <= 3 and is_free_threading() 

222 

223 

224warnings.formatwarning = custom_formatwarning 

225# Disable warning capture on bugged free-threading builds (3.14.0–3.14.3). 

226if is_bugged_warning_capture_version(): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 logging.captureWarnings(False) 

228else: 

229 logging.captureWarnings(True) 

230 

231# get the root logger 

232log = logging.getLogger() 

233 

234 

235class ReturnValue(IntEnum): 

236 """Enumerator to handle the script return value.""" 

237 

238 OK = 0 

239 """No error""" 

240 

241 Error = 1 

242 """Generic error""" 

243 

244 

245def logger_setup(level: str, ui: str, tracebacks: bool) -> None: 

246 """Set up the logger. 

247 

248 This function is actually configuring the root logger level from the command line options and it attaches either 

249 a RichHandler or a StreamHandler depending on the user interface type. 

250 

251 The `tracebacks` flag is used only by the RichHandler. Printing the tracebacks is rather useful when debugging 

252 the code, but it could be detrimental for final users. In normal circumstances, tracebacks is set to False, 

253 and is turned on when the debug flag is activated. 

254 

255 :param level: Logging level as a string. 

256 :type level: str 

257 :param ui: User interface as a string ('rich' or 'console'). 

258 :type ui: str 

259 :param tracebacks: Enable/disable the logging of exception tracebacks. 

260 """ 

261 level = level.lower() 

262 ui = ui.lower() 

263 

264 log.setLevel(LEVELS[level]) 

265 handler: logging.Handler 

266 

267 if ui == 'rich': 

268 fs = '%(message)s' 

269 handler = RichHandler( 

270 rich_tracebacks=tracebacks, markup=True, show_path=False, log_time_format='%Y%m%d-%H:%M:%S' 

271 ) 

272 else: 

273 fs = '%(asctime)s - %(levelname)s - %(message)s' 

274 handler = logging.StreamHandler() 

275 

276 formatter = logging.Formatter(fs) 

277 handler.setFormatter(formatter) 

278 log.addHandler(handler) 

279 # _ensure_warning_capture() 

280 

281 

282def display_exception(exception: Exception, show_traceback: bool = False) -> None: 

283 """ 

284 Display exception information with optional debug details. 

285 

286 This function logs exception information at the critical level. When show_traceback is enabled, 

287 it logs the full exception including traceback information. Otherwise, it logs a simplified 

288 message directing users to enable debug mode for more details. 

289 

290 :param exception: The exception to be displayed and logged. 

291 :type exception: Exception 

292 :param show_traceback: Flag indicating whether to show detailed traceback information. Defaults to False 

293 :type show_traceback: bool 

294 """ 

295 

296 if show_traceback: 

297 log.critical('A critical error occurred') 

298 log.exception(exception) 

299 else: 

300 log.critical('A critical error occurred. Set option -D to get traceback output') 

301 log.exception( 

302 '%s: %s' % (exception.__class__.__name__, exception), exc_info=False, stack_info=False, stacklevel=1 

303 ) 

304 

305 

306@click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS, name='mafw', cls=MAFwGroup) 

307@click.pass_context 

308@click.option( 

309 '--log-level', 

310 type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False), 

311 show_default=True, 

312 default='info', 

313 help='Log level', 

314) 

315@click.option( 

316 '--ui', 

317 type=click.Choice(['console', 'rich'], case_sensitive=False), 

318 default='rich', 

319 help='The user interface', 

320 show_default=True, 

321) 

322@click.option('-D', '--debug', is_flag=True, default=False, help='Show debug information about errors') 

323@click.option('--no-banner', is_flag=True, default=False, help='Disable the welcome banner') 

324@click.version_option(__version__, '-v', '--version') 

325def cli(ctx: click.core.Context, log_level: str, ui: str, debug: bool, no_banner: bool) -> None: 

326 """ 

327 The Modular Analysis Framework execution. 

328 

329 This is the command line interface where you can configure and launch your analysis tasks. 

330 

331 More information on our documentation page. 

332 \f 

333 

334 :param ctx: The click context. 

335 :type ctx: click.core.Context 

336 :param log_level: The logging level as a string. Choice from debug, info, warning, error and critical. 

337 :type log_level: str 

338 :param ui: The user interface as a string. Choice from console and rich. 

339 :type ui: str 

340 :param debug: Flag to show debug information about exception. 

341 :type debug: bool 

342 :param no_banner: Flag to disable the welcome banner. 

343 :type no_banner: bool 

344 """ 

345 ctx.ensure_object(dict) 

346 ctx.obj = {'log_level': log_level, 'ui': ui, 'debug': debug, 'no_banner': no_banner} 

347 logger_setup(log_level, ui, debug) 

348 

349 if ctx.invoked_subcommand is None: 

350 rprint('Use --help to get a quick help on the mafw command.') 

351 

352 

353@cli.group(name='completion') 

354@click.pass_context 

355def completion(ctx: click.Context) -> None: 

356 """ 

357 Manage shell completion for the ``mafw`` command. 

358 

359 The completion workflow installs the Click-generated shell code into the 

360 active virtual environment, updates the activation script so completion is 

361 loaded automatically, and exposes a ``show`` helper for direct evaluation. 

362 

363 \f 

364 

365 .. versionadded:: v2.2 

366 

367 :param ctx: The click context. 

368 :type ctx: click.core.Context 

369 """ 

370 ctx.ensure_object(dict) 

371 ctx.obj['tool_name'] = 'mafw' 

372 

373 

374@completion.command(name='install') 

375@click.option( 

376 '-s', 

377 '--shell', 

378 type=click.Choice(['auto', 'bash', 'zsh', 'fish'], case_sensitive=False), 

379 default='auto', 

380 show_default=True, 

381 help='Target shell for completion installation', 

382) 

383@click.option('-F', '--force', is_flag=True, default=False, help='Reinstall completion even if already loaded') 

384@click.pass_context 

385def completion_install(ctx: click.Context, shell: str, force: bool) -> None: 

386 """ 

387 Install the ``mafw`` shell completion script. 

388 

389 When ``--shell`` is omitted, the command guesses the shell from ``$SHELL``. 

390 The generated Click completion script is stored in the active virtual 

391 environment and the activation script is updated so that future shell 

392 sessions load completion automatically. 

393 

394 \f 

395 

396 .. versionadded:: v2.2 

397 

398 :param ctx: The click context. 

399 :type ctx: click.core.Context 

400 :param shell: Target shell selector. 

401 :type shell: str 

402 :param force: Reinstall completion even if already loaded. 

403 :type force: bool 

404 """ 

405 check_ci_completion_guard() 

406 tool_name = ctx.obj['tool_name'] 

407 resolved_shell = resolve_completion_shell(shell) 

408 script_path = completion_script_path(tool_name, resolved_shell) 

409 

410 if is_script_already_installed(tool_name, resolved_shell) and not force: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true

411 raise click.ClickException( 

412 f'MAFw completion is already installed in {script_path}. Use --force to reinstall it.' 

413 ) 

414 

415 install_completion(tool_name, resolved_shell, force, script_path) 

416 rprint(f'Completion script installed in [blue underline]{script_path}[/blue underline].') 

417 rprint('Exit and re-enter the virtual environment to activate shell completion.') 

418 

419 

420@completion.command(name='uninstall') 

421@click.pass_context 

422def completion_uninstall(ctx: click.Context) -> None: 

423 """ 

424 Remove the installed ``mafw`` shell completion files. 

425 

426 This command removes generated completion files from ``share/mafw`` and 

427 strips the marker block from the activation scripts. 

428 

429 .. versionadded:: v2.2 

430 

431 :param ctx: The click context. 

432 :type ctx: click.core.Context 

433 """ 

434 tool_name = ctx.obj['tool_name'] 

435 uninstall_completion_files(tool_name, None) 

436 rprint(f'Shell completion for {tool_name} has been removed from the active virtual environment.') 

437 

438 

439@completion.command(name='show') 

440@click.option( 

441 '-s', 

442 '--shell', 

443 type=click.Choice(['auto', 'bash', 'zsh', 'fish'], case_sensitive=False), 

444 default='auto', 

445 show_default=True, 

446 help='Target shell for completion output', 

447) 

448@click.pass_context 

449def completion_show(ctx: click.Context, shell: str) -> None: 

450 """ 

451 Display the Click completion script on standard output. 

452 

453 The output is intentionally clean so it can be used with ``eval``: 

454 

455 .. code-block:: console 

456 

457 eval "$(mafw completion show)" 

458 

459 .. versionadded:: v2.2 

460 

461 :param ctx: The click context. 

462 :type ctx: click.core.Context 

463 :param shell: Target shell selector. 

464 :type shell: str 

465 """ 

466 check_ci_completion_guard() 

467 tool_name = ctx.obj['tool_name'] 

468 resolved_shell = resolve_completion_shell(shell) 

469 click.echo(completion_source_script(tool_name, resolved_shell), nl=True) 

470 

471 

472@cli.command(name='list') 

473@click.pass_obj 

474def list_processors(obj: dict[str, Any]) -> ReturnValue: 

475 """Display the list of available processors. 

476 

477 This command will retrieve all available processors via the plugin manager. Both internal and external processors 

478 will be listed if the ext-plugins option is passed. 

479 \f 

480 

481 """ 

482 try: 

483 plugin_manager = get_plugin_manager() 

484 plugins = plugin_manager.load_plugins({'processors'}) 

485 available_processors = plugins.processor_list 

486 print('\n') 

487 table = Table( 

488 title='Available processors', 

489 header_style='orange3', 

490 expand=True, 

491 title_style='italic red', 

492 ) 

493 table.add_column('Processor Name', justify='left', style='cyan') 

494 table.add_column('Package Name', justify='left', style='cyan') 

495 table.add_column('Module', justify='left', style='cyan') 

496 mafw_processors = 0 

497 other_processors = 0 

498 for processor in available_processors: 

499 if isinstance(processor, LazyImportProcessor): 

500 package, module = processor.plugin_qualname.split('.', 1) 

501 name = processor.plugin_name 

502 else: 

503 package, module = processor.__module__.split('.', 1) 

504 name = processor.__name__ 

505 table.add_row(name, package, module) 

506 if package == 'mafw': 

507 mafw_processors += 1 

508 else: 

509 other_processors += 1 

510 table.caption = f'Total processors = {len(available_processors)}, internal = {mafw_processors}, external = {other_processors}' 

511 table.caption_style = 'italic green' 

512 console = CONSOLE 

513 console.print(Align.center(table)) 

514 return ReturnValue.OK 

515 except Exception as e: 

516 display_exception(e, show_traceback=obj['debug']) 

517 return ReturnValue.Error 

518 

519 

520@cli.command(name='steering') 

521@click.pass_obj 

522@click.option('--show/--no-show', default=False, help='Display the generated steering file on console') 

523@click.option('--ext-plugins/--no-ext-plugin', default=True, help='Load external plugins') 

524@click.option('--open-editor/--no-open-editor', default=False, help='Open the file in your editor.') 

525@click.option( 

526 '--db-engine', 

527 type=click.Choice(['sqlite', 'mysql', 'postgresql'], case_sensitive=False), 

528 help='Select a DB engine', 

529 default='sqlite', 

530) 

531@click.option('--db-url', type=str, default=':memory:', help='URL to the DB') 

532@click.argument('steering-file', type=click.Path()) 

533def generate_steering( 

534 obj: dict[str, Any], 

535 show: bool, 

536 ext_plugins: bool, 

537 open_editor: bool, 

538 steering_file: pathlib.Path, 

539 db_engine: str, 

540 db_url: str, 

541) -> ReturnValue: 

542 """Generates a steering file with the default parameters of all available processors. 

543 

544 STEERING_FILE A path to the steering file to execute. 

545 

546 The user must modify the generated steering file to ensure it can be executed using the run command. 

547 \f 

548 

549 :param obj: The context object being passed from the main command. 

550 :type obj: dict 

551 :param show: Display the steering file in the console after the generation. Defaults to False. 

552 :type show: bool 

553 :param ext_plugins: Extend the search for processor to external libraries. 

554 :type ext_plugins: bool 

555 :param open_editor: Open a text editor after the generation to allow direct editing. 

556 :type open_editor: bool 

557 :param steering_file: The steering file path. 

558 :type steering_file: Path 

559 :param db_engine: The name of the db engine. 

560 :type db_engine: str 

561 :param db_url: The URL of the database. 

562 :type db_url: str 

563 """ 

564 try: 

565 plugin_manager = get_plugin_manager() 

566 plugins = plugin_manager.load_plugins({'processors'}) 

567 available_processors = plugins.processor_list 

568 # db_engine is already sure to be in the default conf because the Choice is assuring it. 

569 database_conf = default_conf[db_engine] 

570 database_conf['URL'] = db_scheme[db_engine] + db_url 

571 generate_steering_file(steering_file, available_processors, database_conf) 

572 

573 if show: 

574 console = CONSOLE 

575 with open(steering_file) as fp: 

576 text = fp.read() 

577 with console.pager(): 

578 console.print(text, highlight=True) 

579 console.print(Rule()) 

580 

581 if open_editor: 

582 click.edit(filename=str(steering_file)) 

583 else: 

584 rprint(f'A generic steering file has been saved in [blue underline]{steering_file}[/blue underline].') 

585 rprint('Open it in your favourite text editor, change the processors_to_run list and save it.') 

586 rprint('') 

587 rprint(f'To execute it launch: [blue]mafw run {steering_file}[/blue].') 

588 

589 return ReturnValue.OK 

590 

591 except Exception as e: 

592 display_exception(e, show_traceback=obj['debug']) 

593 return ReturnValue.Error 

594 

595 

596@cli.command() 

597@click.pass_obj 

598@click.argument('steering-file', type=click.Path()) 

599def run(obj: dict[str, Any], steering_file: click.Path) -> ReturnValue: 

600 """Runs a steering file. 

601 

602 STEERING_FILE A path to the steering file to execute. 

603 

604 \f 

605 

606 :param obj: The context object being passed from the main command. 

607 :type obj: dict 

608 :param steering_file: The path to the output steering file. 

609 :type steering_file: Path 

610 """ 

611 try: 

612 app = MAFwApplication(steering_file) # type: ignore 

613 pes = app.run() 

614 if pes == ProcessorExitStatus.Successful: 

615 rv = ReturnValue.OK 

616 else: 

617 rv = ReturnValue.Error 

618 return rv 

619 

620 except AbortProcessorException: 

621 return ReturnValue.Error 

622 except Exception as e: 

623 display_exception(e, show_traceback=obj['debug']) 

624 return ReturnValue.Error 

625 

626 

627@cli.group 

628@click.pass_context 

629def db(ctx: click.core.Context) -> None: 

630 """ 

631 Advanced database commands. 

632 

633 The db group of commands offers a set of useful database operations. Invoke the help option of each command for 

634 more details. 

635 \f 

636 

637 :param ctx: The click context. 

638 :type ctx: click.core.Context 

639 """ 

640 

641 

642@db.command(name='wizard') 

643@click.pass_context 

644@click.option( 

645 '-o', 

646 '--output-file', 

647 type=click.Path(), 

648 default=pathlib.Path.cwd() / pathlib.Path('my_model.py'), 

649 help='The name of the output file with the reflected model.', 

650) 

651@click.option('-s', '--schema', type=str, help='The name of the DB schema') 

652@click.option( 

653 '-t', '--tables', type=str, multiple=True, help='Generate model for selected tables. Multiple option possible.' 

654) 

655@click.option('--overwrite/--no-overwrite', default=True, help='Overwrite output file if already exists.') 

656@click.option('--preserve-order/--no-preserve-order', default=True, help='Preserve column order.') 

657@click.option('--with-views/--without-views', default=False, help='Include also database views.') 

658@click.option('--ignore-unknown/--no-ignore-unknown', default=False, help='Ignore unknown fields.') 

659@click.option('--snake-case/--no-snake-case', default=True, help='Use snake case for table and field names.') 

660@click.option('--host', type=str, help='Hostname for the DB server.') 

661@click.option('-p', '--port', type=int, help='Port number for the DB server.') 

662@click.option('-u', '--user', '--username', type=str, help='Username for the connection to the DB server.') 

663@click.option('--password', prompt=True, prompt_required=False, hide_input=True, help='Insert password when prompted') 

664@click.option('-e', '--engine', type=click.Choice(sorted(DATABASE_MAP)), help='The DB engine') 

665@click.argument('database', type=str) 

666def wizard( 

667 ctx: click.core.Context, 

668 overwrite: bool, 

669 tables: tuple[str, ...] | None, 

670 preserve_order: bool, 

671 with_views: bool, 

672 ignore_unknown: bool, 

673 snake_case: bool, 

674 output_file: click.Path | pathlib.Path | str, 

675 host: str, 

676 port: int, 

677 user: str, 

678 password: str, 

679 engine: str, 

680 schema: str, 

681 database: str, 

682) -> ReturnValue: 

683 """ 

684 Reflect an existing DB into a python module. 

685 

686 mafw db wizard [Options] Database 

687 

688 Database Name of the Database to be reflected. 

689 

690 About connection options (user / host / port): 

691 

692 That information will be used only in case you are trying to access a network database (MySQL or PostgreSQL). In 

693 case of Sqlite, the parameters will be discarded. 

694 

695 About passwords: 

696 

697 If you need to specify a password to connect to the DB server, just add --password in the command line without 

698 typing your password as clear text. You will be prompted to insert the password with hidden characters at the start 

699 of the processor. 

700 

701 About engines: 

702 

703 The full list of supported engines is provided in the option below. If you do not specify any 

704 engine and the database is actually an existing filename, then engine is set to Sqlite, otherwise to postgresql. 

705 

706 \f 

707 

708 :param database: The name of the database. 

709 :type database: str 

710 :param schema: The database schema to be reflected. 

711 :type schema: str 

712 :param engine: The database engine. A selection of possible values is provided in the script help. 

713 :type engine: str 

714 :param password: The password for the DB connection. Not used in case of Sqlite. 

715 :type password: str 

716 :param user: The username for the DB connection. Not used in case of Sqlite. 

717 :type user: str 

718 :param port: The port number of the database server. Not used in case of Sqlite. 

719 :type port: int 

720 :param host: The database hostname. Not used in case of Sqlite. 

721 :type host: str 

722 :param output_file: The filename for the output python module. 

723 :type output_file: click.Path | pathlib.Path | str 

724 :param snake_case: Flag to select snake_case convention for table and field names, or all small letter formatting. 

725 :type snake_case: bool 

726 :param ignore_unknown: Flag to ignore unknown fields. If False, an unknown field will be labelled with UnknownField. 

727 :type ignore_unknown: bool 

728 :param with_views: Flag to include views in the reflected elements. 

729 :type with_views: bool 

730 :param preserve_order: Flag to select if table fields should be reflected in the original order (True) or in 

731 alphabetical order (False) 

732 :type preserve_order: bool 

733 :param tables: A tuple containing a selection of table names to be reflected. 

734 :type tables: tuple[str, ...] 

735 :param overwrite: Flag to overwrite the output file if exists. If False and the output file already exists, the 

736 user can decide what to do. 

737 :type overwrite: bool 

738 :param ctx: The click context, that includes the original object with global options. 

739 :type ctx: click.core.Context 

740 :return: The script return value 

741 """ 

742 obj = ctx.obj 

743 

744 if isinstance(output_file, (str, click.Path)): 744 ↛ 748line 744 didn't jump to line 748 because the condition on line 744 was always true

745 output_file = pathlib.Path(str(output_file)) 

746 

747 # if not overwrite, check if the file exists 

748 if not overwrite and output_file.exists(): 

749 answer = Prompt.ask( 

750 f'A module ({output_file.name}) already exists. Do you want to overwrite, cancel or backup?', 

751 case_sensitive=False, 

752 choices=['o', 'c', 'b'], 

753 show_choices=True, 

754 show_default=True, 

755 default='b', 

756 ) 

757 if answer == 'c': 

758 return ReturnValue.OK 

759 elif answer == 'b': 759 ↛ 766line 759 didn't jump to line 766 because the condition on line 759 was always true

760 bck_filename = output_file.parent / pathlib.Path( 

761 output_file.stem + f'_{datetime.datetime.now():%Y%m%dT%H%M%S}' + output_file.suffix 

762 ) 

763 

764 shutil.copy(output_file, bck_filename) 

765 

766 if tables == (): 

767 tables = None 

768 

769 if engine is None: 

770 engine = 'sqlite' if pathlib.Path(database).exists() else 'postgresql' 

771 

772 # prepare the connection options 

773 if engine in ['sqlite', 'sqlite3']: 

774 # for sqlite the connection 

775 keys: list[str] = ['schema'] 

776 values: list[str | int] = [schema] 

777 else: 

778 keys = ['host', 'port', 'user', 'schema', 'password'] 

779 values = [host, port, user, schema, password] 

780 

781 connection_options: dict[str, Any] = {} 

782 for k, v in zip(keys, values): 

783 if v: 

784 connection_options[k] = v 

785 

786 try: 

787 introspector = make_introspector(engine, database, **connection_options) 

788 except Exception as e: 

789 msg = f'[red]Problem generating an introspector instance of {database}.' 

790 display_exception(e, show_traceback=obj['debug']) 

791 return ReturnValue.Error 

792 

793 try: 

794 with open(output_file, 'tw') as out_file: 

795 dump_models( 

796 out_file, 

797 introspector, 

798 tables, 

799 preserve_order=preserve_order, 

800 include_views=with_views, 

801 ignore_unknown=ignore_unknown, 

802 snake_case=snake_case, 

803 ) 

804 except Exception as e: 

805 display_exception(e, obj['debug']) 

806 return ReturnValue.Error 

807 

808 msg = f'[green]Database {database} successfully reflected in {output_file.name}' 

809 log.info(msg) 

810 return ReturnValue.OK 

811 

812 

813if __name__ == '__main__': 

814 # Use the custom main method that handles exit codes 

815 cli.main()