Coverage for src / mafw / scripts / mafw_exe.py: 98%
270 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5The execution framework.
7This module provides the run functionality to the whole library.
9It is heavily relying on ``click`` for the generation of commands, options, and arguments.
11.. click:: mafw.scripts.mafw_exe:cli
12 :prog: mafw
13 :nested: full
15"""
17import datetime
18import logging
19import pathlib
20import shutil
21import sys
22import warnings
23from enum import IntEnum
24from typing import TYPE_CHECKING, Any
26import click
27from click import ClickException
28from pwiz import DATABASE_MAP, make_introspector # type: ignore[import-untyped]
29from rich import print as rprint
30from rich import traceback
31from rich.align import Align
32from rich.console import Console
33from rich.logging import RichHandler
34from rich.prompt import Prompt
35from rich.rule import Rule
36from rich.table import Table
37from rich_pyfiglet import RichFiglet
39from mafw.__about__ import __version__
40from mafw.db.db_configurations import db_scheme, default_conf
41from mafw.db.db_wizard import dump_models
42from mafw.enumerators import ProcessorExitStatus
43from mafw.lazy_import import LazyImportProcessor
44from mafw.mafw_errors import AbortProcessorException
45from mafw.plugin_manager import get_plugin_manager
46from mafw.runner import MAFwApplication
47from mafw.tools.toml_tools import generate_steering_file
49suppress = [click]
50traceback.install(show_locals=True, suppress=suppress)
52LEVELS = {'debug': 10, 'info': 20, 'warning': 30, 'error': 40, 'critical': 50}
53CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
55welcome_message = RichFiglet(
56 'MAFw',
57 colors=['#ff0000', 'magenta1', 'blue3'],
58 horizontal=True,
59 # font='banner4',
60 remove_blank_lines=True,
61 border='ROUNDED',
62 border_color='#ff0000',
63)
66def print_banner(ctx: click.Context) -> None:
67 """
68 Print the welcome banner only once, only for rich UI,
69 and only if not disabled.
71 .. note::
73 ctx.obj is not yet populated when Group.invoke() is called,
74 but ctx.params contains parsed options for the current group. We
75 therefore inspect the root context params.
76 """
77 # avoid printing while click is still doing resilient parsing (e.g., --help)
78 if getattr(ctx, 'resilient_parsing', False):
79 return
81 # find the root context (so nested groups share the same parsed params)
82 root = ctx
83 while root.parent is not None:
84 root = root.parent
86 # look for options in root.params first (these are already parsed)
87 params = getattr(root, 'params', {}) or {}
88 ui = params.get('ui')
89 no_banner = params.get('no_banner')
91 # Normalize and check
92 if no_banner:
93 return
94 if not ui: # pragma: no cover
95 # if nothing available, be conservative and don't print
96 return
97 if str(ui).lower() != 'rich':
98 return
100 # Print only once per process / invocation
101 if root.meta.get('_banner_printed', False):
102 return
104 console = Console()
105 console.print(welcome_message)
106 root.meta['_banner_printed'] = True
109def custom_formatwarning(
110 message: Warning | str, category: type[Warning], filename: str, lineno: int, line: str | None = None
111) -> str:
112 """Return the pure message of the warning."""
113 return str(message)
116warnings.formatwarning = custom_formatwarning
117logging.captureWarnings(True)
119log = logging.getLogger()
122class ReturnValue(IntEnum):
123 """Enumerator to handle the script return value."""
125 OK = 0
126 """No error"""
128 Error = 1
129 """Generic error"""
132def logger_setup(level: str, ui: str, tracebacks: bool) -> None:
133 """Set up the logger.
135 This function is actually configuring the root logger level from the command line options and it attaches either
136 a RichHandler or a StreamHandler depending on the user interface type.
138 The `tracebacks` flag is used only by the RichHandler. Printing the tracebacks is rather useful when debugging
139 the code, but it could be detrimental for final users. In normal circumstances, tracebacks is set to False,
140 and is turned on when the debug flag is activated.
142 :param level: Logging level as a string.
143 :type level: str
144 :param ui: User interface as a string ('rich' or 'console').
145 :type ui: str
146 :param tracebacks: Enable/disable the logging of exception tracebacks.
147 """
148 level = level.lower()
149 ui = ui.lower()
151 log.setLevel(LEVELS[level])
152 handler: logging.Handler
154 if ui == 'rich':
155 fs = '%(message)s'
156 handler = RichHandler(
157 rich_tracebacks=tracebacks, markup=True, show_path=False, log_time_format='%Y%m%d-%H:%M:%S'
158 )
159 else:
160 fs = '%(asctime)s - %(levelname)s - %(message)s'
161 handler = logging.StreamHandler()
163 formatter = logging.Formatter(fs)
164 handler.setFormatter(formatter)
165 log.addHandler(handler)
168def display_exception(exception: Exception, show_traceback: bool = False) -> None:
169 """
170 Display exception information with optional debug details.
172 This function logs exception information at the critical level. When show_traceback is enabled,
173 it logs the full exception including traceback information. Otherwise, it logs a simplified
174 message directing users to enable debug mode for more details.
176 :param exception: The exception to be displayed and logged.
177 :type exception: Exception
178 :param show_traceback: Flag indicating whether to show detailed traceback information. Defaults to False
179 :type show_traceback: bool
180 """
182 if show_traceback:
183 log.critical('A critical error occurred')
184 log.exception(exception)
185 else:
186 log.critical('A critical error occurred. Set option -D to get traceback output')
187 log.exception(
188 '%s: %s' % (exception.__class__.__name__, exception), exc_info=False, stack_info=False, stacklevel=1
189 )
192class MAFwGroup(click.Group):
193 """Custom Click Group for MAFw runner.
195 It implements two main features:
197 1. Support commands abbreviation. Instead of providing the whole command,
198 the user can use whatever abbreviation instead as long as it is unique.
199 So for example, instead of `mafw list`, the use can provide `mafw l` and the result
200 will be the same.
201 2. Implements the cascading of return values among different command levels.
202 """
204 def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None:
205 """
206 Return a command.
208 Given a context and a command name as passed from the CLI, the click.Command is returned.
209 This method overloads the basic one allowing to use command abbreviations.
211 If more than one match is found, then an error is raised.
213 If no matches are found, then click will handle this case as in the standard situation.
215 :param ctx: The click context
216 :param cmd_name: The command name as provided from the CLI
217 :return: The corresponding command or None if no command is found.
218 """
219 rv = super().get_command(ctx, cmd_name)
221 if rv is not None:
222 return rv
224 matches = [x for x in self.list_commands(ctx) if x.startswith(cmd_name)]
226 if not matches:
227 return None
229 if len(matches) == 1: 229 ↛ 232line 229 didn't jump to line 232 because the condition on line 229 was always true
230 return click.Group.get_command(self, ctx, matches[0])
232 ctx.fail(f'Too many matches: {", ".join(sorted(matches))}')
234 def resolve_command(
235 self, ctx: click.Context, args: list[str]
236 ) -> tuple[str | None, click.Command | None, list[str]]:
237 # always return the full command name
238 _, cmd, args = super().resolve_command(ctx, args)
239 if TYPE_CHECKING:
240 assert isinstance(cmd, click.Command)
241 return cmd.name, cmd, args
243 def invoke(self, ctx: click.Context) -> Any:
244 """Invoke the command.
246 This override method is just wrapping the base invoke call in a try / except block.
248 In the case of a ClickException, then this is shown and its exit code is used passed to the sys.exit call. In
249 case of a SystemExit or click.exceptions.Exit, then this is simply re-raised, so that Click can handle it as
250 in normal circumstances In all other cases, the exception is caught and the sys.exit is called with the
251 :attr:`.ReturnValue.Error`.
253 :param ctx: The click context
254 :return: The return value of the invoked command
255 """
256 print_banner(ctx)
258 try:
259 return super().invoke(ctx)
261 except ClickException as e:
262 e.show()
263 sys.exit(e.exit_code)
264 except (SystemExit, click.exceptions.Exit):
265 # Re-raise SystemExit to maintain Click's normal behavior
266 raise
267 except Exception:
268 # For any other exception, exit with error code
269 sys.exit(ReturnValue.Error)
271 def main(self, *args: Any, **kwargs: Any) -> None: # type: ignore[override]
272 """Override main to handle return values properly."""
273 try:
274 # Get the result from the command
275 rv = super().main(*args, standalone_mode=False, **kwargs) # type: ignore[call-overload]
277 # If the command returned a ReturnValue or an integer, use it as exit code
278 if isinstance(rv, (ReturnValue, int)):
279 sys.exit(rv)
280 else:
281 # Default to success if no return value or unknown return value
282 sys.exit(ReturnValue.OK)
284 except ClickException as e:
285 e.show()
286 sys.exit(e.exit_code)
287 except SystemExit:
288 # Re-raise SystemExit (this includes calls to sys.exit())
289 raise
290 except Exception:
291 # For any unhandled exception, exit with error code
292 sys.exit(ReturnValue.Error)
295@click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS, name='mafw', cls=MAFwGroup)
296@click.pass_context
297@click.option(
298 '--log-level',
299 type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False),
300 show_default=True,
301 default='info',
302 help='Log level',
303)
304@click.option(
305 '--ui',
306 type=click.Choice(['console', 'rich'], case_sensitive=False),
307 default='rich',
308 help='The user interface',
309 show_default=True,
310)
311@click.option('-D', '--debug', is_flag=True, default=False, help='Show debug information about errors')
312@click.option('--no-banner', is_flag=True, default=False, help='Disable the welcome banner')
313@click.version_option(__version__, '-v', '--version')
314def cli(ctx: click.core.Context, log_level: str, ui: str, debug: bool, no_banner: bool) -> None:
315 """
316 The Modular Analysis Framework execution.
318 This is the command line interface where you can configure and launch your analysis tasks.
320 More information on our documentation page.
321 \f
323 :param ctx: The click context.
324 :type ctx: click.core.Context
325 :param log_level: The logging level as a string. Choice from debug, info, warning, error and critical.
326 :type log_level: str
327 :param ui: The user interface as a string. Choice from console and rich.
328 :type ui: str
329 :param debug: Flag to show debug information about exception.
330 :type debug: bool
331 :param no_banner: Flag to disable the welcome banner.
332 :type no_banner: bool
333 """
334 ctx.ensure_object(dict)
335 ctx.obj = {'log_level': log_level, 'ui': ui, 'debug': debug, 'no_banner': no_banner}
336 logger_setup(log_level, ui, debug)
338 if ctx.invoked_subcommand is None:
339 rprint('Use --help to get a quick help on the mafw command.')
342@cli.command(name='list')
343@click.pass_obj
344def list_processors(obj: dict[str, Any]) -> ReturnValue:
345 """Display the list of available processors.
347 This command will retrieve all available processors via the plugin manager. Both internal and external processors
348 will be listed if the ext-plugins option is passed.
349 \f
351 """
352 try:
353 plugin_manager = get_plugin_manager()
354 plugins = plugin_manager.load_plugins({'processors'})
355 available_processors = plugins.processor_list
356 print('\n')
357 table = Table(
358 title='Available processors',
359 header_style='orange3',
360 expand=True,
361 title_style='italic red',
362 )
363 table.add_column('Processor Name', justify='left', style='cyan')
364 table.add_column('Package Name', justify='left', style='cyan')
365 table.add_column('Module', justify='left', style='cyan')
366 mafw_processors = 0
367 other_processors = 0
368 for processor in available_processors:
369 if isinstance(processor, LazyImportProcessor):
370 package, module = processor.plugin_qualname.split('.', 1)
371 name = processor.plugin_name
372 else:
373 package, module = processor.__module__.split('.', 1)
374 name = processor.__name__
375 table.add_row(name, package, module)
376 if package == 'mafw':
377 mafw_processors += 1
378 else:
379 other_processors += 1
380 table.caption = f'Total processors = {len(available_processors)}, internal = {mafw_processors}, external = {other_processors}'
381 table.caption_style = 'italic green'
382 console = Console()
383 console.print(Align.center(table))
384 return ReturnValue.OK
385 except Exception as e:
386 display_exception(e, show_traceback=obj['debug'])
387 return ReturnValue.Error
390@cli.command(name='steering')
391@click.pass_obj
392@click.option('--show/--no-show', default=False, help='Display the generated steering file on console')
393@click.option('--ext-plugins/--no-ext-plugin', default=True, help='Load external plugins')
394@click.option('--open-editor/--no-open-editor', default=False, help='Open the file in your editor.')
395@click.option(
396 '--db-engine',
397 type=click.Choice(['sqlite', 'mysql', 'postgresql'], case_sensitive=False),
398 help='Select a DB engine',
399 default='sqlite',
400)
401@click.option('--db-url', type=str, default=':memory:', help='URL to the DB')
402@click.argument('steering-file', type=click.Path())
403def generate_steering(
404 obj: dict[str, Any],
405 show: bool,
406 ext_plugins: bool,
407 open_editor: bool,
408 steering_file: pathlib.Path,
409 db_engine: str,
410 db_url: str,
411) -> ReturnValue:
412 """Generates a steering file with the default parameters of all available processors.
414 STEERING_FILE A path to the steering file to execute.
416 The user must modify the generated steering file to ensure it can be executed using the run command.
417 \f
419 :param obj: The context object being passed from the main command.
420 :type obj: dict
421 :param show: Display the steering file in the console after the generation. Defaults to False.
422 :type show: bool
423 :param ext_plugins: Extend the search for processor to external libraries.
424 :type ext_plugins: bool
425 :param open_editor: Open a text editor after the generation to allow direct editing.
426 :type open_editor: bool
427 :param steering_file: The steering file path.
428 :type steering_file: Path
429 :param db_engine: The name of the db engine.
430 :type db_engine: str
431 :param db_url: The URL of the database.
432 :type db_url: str
433 """
434 try:
435 plugin_manager = get_plugin_manager()
436 plugins = plugin_manager.load_plugins({'processors'})
437 available_processors = plugins.processor_list
438 # db_engine is already sure to be in the default conf because the Choice is assuring it.
439 database_conf = default_conf[db_engine]
440 database_conf['URL'] = db_scheme[db_engine] + db_url
441 generate_steering_file(steering_file, available_processors, database_conf)
443 if show:
444 console = Console()
445 with open(steering_file) as fp:
446 text = fp.read()
447 with console.pager():
448 console.print(text, highlight=True)
449 console.print(Rule())
451 if open_editor:
452 click.edit(filename=str(steering_file))
453 else:
454 rprint(f'A generic steering file has been saved in [blue underline]{steering_file}[/blue underline].')
455 rprint('Open it in your favourite text editor, change the processors_to_run list and save it.')
456 rprint('')
457 rprint(f'To execute it launch: [blue]mafw run {steering_file}[/blue].')
459 return ReturnValue.OK
461 except Exception as e:
462 display_exception(e, show_traceback=obj['debug'])
463 return ReturnValue.Error
466@cli.command()
467@click.pass_obj
468@click.argument('steering-file', type=click.Path())
469def run(obj: dict[str, Any], steering_file: click.Path) -> ReturnValue:
470 """Runs a steering file.
472 STEERING_FILE A path to the steering file to execute.
474 \f
476 :param obj: The context object being passed from the main command.
477 :type obj: dict
478 :param steering_file: The path to the output steering file.
479 :type steering_file: Path
480 """
481 try:
482 app = MAFwApplication(steering_file) # type: ignore
483 pes = app.run()
484 if pes == ProcessorExitStatus.Successful:
485 rv = ReturnValue.OK
486 else:
487 rv = ReturnValue.Error
488 return rv
490 except AbortProcessorException:
491 return ReturnValue.Error
492 except Exception as e:
493 display_exception(e, show_traceback=obj['debug'])
494 return ReturnValue.Error
497@cli.group
498@click.pass_context
499def db(ctx: click.core.Context) -> None:
500 """
501 Advanced database commands.
503 The db group of commands offers a set of useful database operations. Invoke the help option of each command for
504 more details.
505 \f
507 :param ctx: The click context.
508 :type ctx: click.core.Context
509 """
512@db.command(name='wizard')
513@click.pass_context
514@click.option(
515 '-o',
516 '--output-file',
517 type=click.Path(),
518 default=pathlib.Path.cwd() / pathlib.Path('my_model.py'),
519 help='The name of the output file with the reflected model.',
520)
521@click.option('-s', '--schema', type=str, help='The name of the DB schema')
522@click.option(
523 '-t', '--tables', type=str, multiple=True, help='Generate model for selected tables. Multiple option possible.'
524)
525@click.option('--overwrite/--no-overwrite', default=True, help='Overwrite output file if already exists.')
526@click.option('--preserve-order/--no-preserve-order', default=True, help='Preserve column order.')
527@click.option('--with-views/--without-views', default=False, help='Include also database views.')
528@click.option('--ignore-unknown/--no-ignore-unknown', default=False, help='Ignore unknown fields.')
529@click.option('--snake-case/--no-snake-case', default=True, help='Use snake case for table and field names.')
530@click.option('--host', type=str, help='Hostname for the DB server.')
531@click.option('-p', '--port', type=int, help='Port number for the DB server.')
532@click.option('-u', '--user', '--username', type=str, help='Username for the connection to the DB server.')
533@click.option('--password', prompt=True, prompt_required=False, hide_input=True, help='Insert password when prompted')
534@click.option('-e', '--engine', type=click.Choice(sorted(DATABASE_MAP)), help='The DB engine')
535@click.argument('database', type=str)
536def wizard(
537 ctx: click.core.Context,
538 overwrite: bool,
539 tables: tuple[str, ...] | None,
540 preserve_order: bool,
541 with_views: bool,
542 ignore_unknown: bool,
543 snake_case: bool,
544 output_file: click.Path | pathlib.Path | str,
545 host: str,
546 port: int,
547 user: str,
548 password: str,
549 engine: str,
550 schema: str,
551 database: str,
552) -> ReturnValue:
553 """
554 Reflect an existing DB into a python module.
556 mafw db wizard [Options] Database
558 Database Name of the Database to be reflected.
560 About connection options (user / host / port):
562 That information will be used only in case you are trying to access a network database (MySQL or PostgreSQL). In
563 case of Sqlite, the parameters will be discarded.
565 About passwords:
567 If you need to specify a password to connect to the DB server, just add --password in the command line without
568 typing your password as clear text. You will be prompted to insert the password with hidden characters at the start
569 of the processor.
571 About engines:
573 The full list of supported engines is provided in the option below. If you do not specify any
574 engine and the database is actually an existing filename, then engine is set to Sqlite, otherwise to postgresql.
576 \f
578 :param database: The name of the database.
579 :type database: str
580 :param schema: The database schema to be reflected.
581 :type schema: str
582 :param engine: The database engine. A selection of possible values is provided in the script help.
583 :type engine: str
584 :param password: The password for the DB connection. Not used in case of Sqlite.
585 :type password: str
586 :param user: The username for the DB connection. Not used in case of Sqlite.
587 :type user: str
588 :param port: The port number of the database server. Not used in case of Sqlite.
589 :type port: int
590 :param host: The database hostname. Not used in case of Sqlite.
591 :type host: str
592 :param output_file: The filename for the output python module.
593 :type output_file: click.Path | pathlib.Path | str
594 :param snake_case: Flag to select snake_case convention for table and field names, or all small letter formatting.
595 :type snake_case: bool
596 :param ignore_unknown: Flag to ignore unknown fields. If False, an unknown field will be labelled with UnknownField.
597 :type ignore_unknown: bool
598 :param with_views: Flag to include views in the reflected elements.
599 :type with_views: bool
600 :param preserve_order: Flag to select if table fields should be reflected in the original order (True) or in
601 alphabetical order (False)
602 :type preserve_order: bool
603 :param tables: A tuple containing a selection of table names to be reflected.
604 :type tables: tuple[str, ...]
605 :param overwrite: Flag to overwrite the output file if exists. If False and the output file already exists, the
606 user can decide what to do.
607 :type overwrite: bool
608 :param ctx: The click context, that includes the original object with global options.
609 :type ctx: click.core.Context
610 :return: The script return value
611 """
612 obj = ctx.obj
614 if isinstance(output_file, (str, click.Path)): 614 ↛ 618line 614 didn't jump to line 618 because the condition on line 614 was always true
615 output_file = pathlib.Path(str(output_file))
617 # if not overwrite, check if the file exists
618 if not overwrite and output_file.exists():
619 answer = Prompt.ask(
620 f'A module ({output_file.name}) already exists. Do you want to overwrite, cancel or backup?',
621 case_sensitive=False,
622 choices=['o', 'c', 'b'],
623 show_choices=True,
624 show_default=True,
625 default='b',
626 )
627 if answer == 'c':
628 return ReturnValue.OK
629 elif answer == 'b': 629 ↛ 636line 629 didn't jump to line 636 because the condition on line 629 was always true
630 bck_filename = output_file.parent / pathlib.Path(
631 output_file.stem + f'_{datetime.datetime.now():%Y%m%dT%H%M%S}' + output_file.suffix
632 )
634 shutil.copy(output_file, bck_filename)
636 if tables == ():
637 tables = None
639 if engine is None:
640 engine = 'sqlite' if pathlib.Path(database).exists() else 'postgresql'
642 # prepare the connection options
643 if engine in ['sqlite', 'sqlite3']:
644 # for sqlite the connection
645 keys: list[str] = ['schema']
646 values: list[str | int] = [schema]
647 else:
648 keys = ['host', 'port', 'user', 'schema', 'password']
649 values = [host, port, user, schema, password]
651 connection_options: dict[str, Any] = {}
652 for k, v in zip(keys, values):
653 if v:
654 connection_options[k] = v
656 try:
657 introspector = make_introspector(engine, database, **connection_options)
658 except Exception as e:
659 msg = f'[red]Problem generating an introspector instance of {database}.'
660 display_exception(e, show_traceback=obj['debug'])
661 return ReturnValue.Error
663 try:
664 with open(output_file, 'tw') as out_file:
665 dump_models(
666 out_file,
667 introspector,
668 tables,
669 preserve_order=preserve_order,
670 include_views=with_views,
671 ignore_unknown=ignore_unknown,
672 snake_case=snake_case,
673 )
674 except Exception as e:
675 display_exception(e, obj['debug'])
676 return ReturnValue.Error
678 msg = f'[green]Database {database} successfully reflected in {output_file.name}'
679 log.info(msg)
680 return ReturnValue.OK
683if __name__ == '__main__':
684 # Use the custom main method that handles exit codes
685 cli.main()