Coverage for src / mafw / scripts / mafw_exe.py: 97%
277 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5The execution framework.
7This module provides the run functionality to the whole library.
9It is heavily relying on ``click`` for the generation of commands, options, and arguments.
11.. click:: mafw.scripts.mafw_exe:cli
12 :prog: mafw
13 :nested: full
15"""
17import datetime
18import logging
19import pathlib
20import shutil
21import sys
22import warnings
23from enum import IntEnum
24from typing import TYPE_CHECKING, Any
26import click
27from click import ClickException
28from pwiz import DATABASE_MAP, make_introspector # type: ignore[import-untyped]
29from rich import print as rprint
30from rich import traceback
31from rich.align import Align
32from rich.console import Console
33from rich.logging import RichHandler
34from rich.prompt import Prompt
35from rich.rule import Rule
36from rich.table import Table
37from rich_pyfiglet import RichFiglet
39from mafw.__about__ import __version__
40from mafw.db.db_configurations import db_scheme, default_conf
41from mafw.db.db_wizard import dump_models
42from mafw.enumerators import ProcessorExitStatus
43from mafw.lazy_import import LazyImportProcessor
44from mafw.mafw_errors import AbortProcessorException
45from mafw.plugin_manager import get_plugin_manager
46from mafw.runner import MAFwApplication
47from mafw.tools.parallel import is_free_threading
48from mafw.tools.toml_tools import generate_steering_file
50suppress = [click]
51traceback.install(show_locals=True, suppress=suppress)
53LEVELS = {'debug': 10, 'info': 20, 'warning': 30, 'error': 40, 'critical': 50}
54CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
56_warnings_captured = False
58welcome_message = RichFiglet(
59 'MAFw',
60 colors=['#ff0000', 'magenta1', 'blue3'],
61 horizontal=True,
62 # font='banner4',
63 remove_blank_lines=True,
64 border='ROUNDED',
65 border_color='#ff0000',
66)
69def print_banner(ctx: click.Context) -> None:
70 """
71 Print the welcome banner only once, only for rich UI,
72 and only if not disabled.
74 .. note::
76 ctx.obj is not yet populated when Group.invoke() is called,
77 but ctx.params contains parsed options for the current group. We
78 therefore inspect the root context params.
79 """
80 # avoid printing while click is still doing resilient parsing (e.g., --help)
81 if getattr(ctx, 'resilient_parsing', False):
82 return
84 # find the root context (so nested groups share the same parsed params)
85 root = ctx
86 while root.parent is not None:
87 root = root.parent
89 # look for options in root.params first (these are already parsed)
90 params = getattr(root, 'params', {}) or {}
91 ui = params.get('ui')
92 no_banner = params.get('no_banner')
94 # Normalize and check
95 if no_banner:
96 return
97 if not ui: # pragma: no cover
98 # if nothing available, be conservative and don't print
99 return
100 if str(ui).lower() != 'rich':
101 return
103 # Print only once per process / invocation
104 if root.meta.get('_banner_printed', False):
105 return
107 console = Console()
108 console.print(welcome_message)
109 root.meta['_banner_printed'] = True
112def custom_formatwarning(
113 message: Warning | str, category: type[Warning], filename: str, lineno: int, line: str | None = None
114) -> str:
115 """Return the pure message of the warning."""
116 return str(message)
119def is_bugged_version() -> bool:
120 """Check if the Python version has a known bug.
122 This function checks if the current Python version is affected by a specific bug
123 present in Python 3.14.0 through 3.14.3. The bug relates to warning capture issues
124 when running in free-threading mode.
126 .. note::
128 This is a temporary check to work around a bug in Python up to 3.14.3.
129 See https://github.com/python/cpython/pull/146374 for more details.
131 :return: True if the Python version is affected by the bug, False otherwise.
132 :rtype: bool
133 """
134 version = sys.version_info
135 return version >= (3, 14) and version.micro <= 3
138warnings.formatwarning = custom_formatwarning
139# TODO: python bug
140# there is a bug in the python 3.14t up to 3.14.3
141# see https://github.com/python/cpython/pull/146374
142# until the bug will be fixed and the 3.14.4 will be releases
143# we won't be able to redirect the warnings to the logging when
144# working with the free-threading version.
145# The bug has been fixed, but we need to wait until the release of 3.14.4
146if is_bugged_version() <= 3 and is_free_threading(): 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 logging.captureWarnings(False)
148else:
149 logging.captureWarnings(True)
151# get the root logger
152log = logging.getLogger()
155class ReturnValue(IntEnum):
156 """Enumerator to handle the script return value."""
158 OK = 0
159 """No error"""
161 Error = 1
162 """Generic error"""
165def logger_setup(level: str, ui: str, tracebacks: bool) -> None:
166 """Set up the logger.
168 This function is actually configuring the root logger level from the command line options and it attaches either
169 a RichHandler or a StreamHandler depending on the user interface type.
171 The `tracebacks` flag is used only by the RichHandler. Printing the tracebacks is rather useful when debugging
172 the code, but it could be detrimental for final users. In normal circumstances, tracebacks is set to False,
173 and is turned on when the debug flag is activated.
175 :param level: Logging level as a string.
176 :type level: str
177 :param ui: User interface as a string ('rich' or 'console').
178 :type ui: str
179 :param tracebacks: Enable/disable the logging of exception tracebacks.
180 """
181 level = level.lower()
182 ui = ui.lower()
184 log.setLevel(LEVELS[level])
185 handler: logging.Handler
187 if ui == 'rich':
188 fs = '%(message)s'
189 handler = RichHandler(
190 rich_tracebacks=tracebacks, markup=True, show_path=False, log_time_format='%Y%m%d-%H:%M:%S'
191 )
192 else:
193 fs = '%(asctime)s - %(levelname)s - %(message)s'
194 handler = logging.StreamHandler()
196 formatter = logging.Formatter(fs)
197 handler.setFormatter(formatter)
198 log.addHandler(handler)
199 # _ensure_warning_capture()
202def display_exception(exception: Exception, show_traceback: bool = False) -> None:
203 """
204 Display exception information with optional debug details.
206 This function logs exception information at the critical level. When show_traceback is enabled,
207 it logs the full exception including traceback information. Otherwise, it logs a simplified
208 message directing users to enable debug mode for more details.
210 :param exception: The exception to be displayed and logged.
211 :type exception: Exception
212 :param show_traceback: Flag indicating whether to show detailed traceback information. Defaults to False
213 :type show_traceback: bool
214 """
216 if show_traceback:
217 log.critical('A critical error occurred')
218 log.exception(exception)
219 else:
220 log.critical('A critical error occurred. Set option -D to get traceback output')
221 log.exception(
222 '%s: %s' % (exception.__class__.__name__, exception), exc_info=False, stack_info=False, stacklevel=1
223 )
226class MAFwGroup(click.Group):
227 """Custom Click Group for MAFw runner.
229 It implements two main features:
231 1. Support commands abbreviation. Instead of providing the whole command,
232 the user can use whatever abbreviation instead as long as it is unique.
233 So for example, instead of `mafw list`, the use can provide `mafw l` and the result
234 will be the same.
235 2. Implements the cascading of return values among different command levels.
236 """
238 def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None:
239 """
240 Return a command.
242 Given a context and a command name as passed from the CLI, the click.Command is returned.
243 This method overloads the basic one allowing to use command abbreviations.
245 If more than one match is found, then an error is raised.
247 If no matches are found, then click will handle this case as in the standard situation.
249 :param ctx: The click context
250 :param cmd_name: The command name as provided from the CLI
251 :return: The corresponding command or None if no command is found.
252 """
253 rv = super().get_command(ctx, cmd_name)
255 if rv is not None:
256 return rv
258 matches = [x for x in self.list_commands(ctx) if x.startswith(cmd_name)]
260 if not matches:
261 return None
263 if len(matches) == 1: 263 ↛ 266line 263 didn't jump to line 266 because the condition on line 263 was always true
264 return click.Group.get_command(self, ctx, matches[0])
266 ctx.fail(f'Too many matches: {", ".join(sorted(matches))}')
268 def resolve_command(
269 self, ctx: click.Context, args: list[str]
270 ) -> tuple[str | None, click.Command | None, list[str]]:
271 # always return the full command name
272 _, cmd, args = super().resolve_command(ctx, args)
273 if TYPE_CHECKING:
274 assert isinstance(cmd, click.Command)
275 return cmd.name, cmd, args
277 def invoke(self, ctx: click.Context) -> Any:
278 """Invoke the command.
280 This override method is just wrapping the base invoke call in a try / except block.
282 In the case of a ClickException, then this is shown and its exit code is used passed to the sys.exit call. In
283 case of a SystemExit or click.exceptions.Exit, then this is simply re-raised, so that Click can handle it as
284 in normal circumstances In all other cases, the exception is caught and the sys.exit is called with the
285 :attr:`.ReturnValue.Error`.
287 :param ctx: The click context
288 :return: The return value of the invoked command
289 """
290 print_banner(ctx)
292 try:
293 return super().invoke(ctx)
295 except ClickException as e:
296 e.show()
297 sys.exit(e.exit_code)
298 except (SystemExit, click.exceptions.Exit):
299 # Re-raise SystemExit to maintain Click's normal behavior
300 raise
301 except Exception:
302 # For any other exception, exit with error code
303 sys.exit(ReturnValue.Error)
305 def main(self, *args: Any, **kwargs: Any) -> None: # type: ignore[override]
306 """Override main to handle return values properly."""
307 try:
308 # Get the result from the command
309 rv = super().main(*args, standalone_mode=False, **kwargs) # type: ignore[call-overload]
311 # If the command returned a ReturnValue or an integer, use it as exit code
312 if isinstance(rv, (ReturnValue, int)):
313 sys.exit(rv)
314 else:
315 # Default to success if no return value or unknown return value
316 sys.exit(ReturnValue.OK)
318 except ClickException as e:
319 e.show()
320 sys.exit(e.exit_code)
321 except SystemExit:
322 # Re-raise SystemExit (this includes calls to sys.exit())
323 raise
324 except Exception:
325 # For any unhandled exception, exit with error code
326 sys.exit(ReturnValue.Error)
329@click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS, name='mafw', cls=MAFwGroup)
330@click.pass_context
331@click.option(
332 '--log-level',
333 type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False),
334 show_default=True,
335 default='info',
336 help='Log level',
337)
338@click.option(
339 '--ui',
340 type=click.Choice(['console', 'rich'], case_sensitive=False),
341 default='rich',
342 help='The user interface',
343 show_default=True,
344)
345@click.option('-D', '--debug', is_flag=True, default=False, help='Show debug information about errors')
346@click.option('--no-banner', is_flag=True, default=False, help='Disable the welcome banner')
347@click.version_option(__version__, '-v', '--version')
348def cli(ctx: click.core.Context, log_level: str, ui: str, debug: bool, no_banner: bool) -> None:
349 """
350 The Modular Analysis Framework execution.
352 This is the command line interface where you can configure and launch your analysis tasks.
354 More information on our documentation page.
355 \f
357 :param ctx: The click context.
358 :type ctx: click.core.Context
359 :param log_level: The logging level as a string. Choice from debug, info, warning, error and critical.
360 :type log_level: str
361 :param ui: The user interface as a string. Choice from console and rich.
362 :type ui: str
363 :param debug: Flag to show debug information about exception.
364 :type debug: bool
365 :param no_banner: Flag to disable the welcome banner.
366 :type no_banner: bool
367 """
368 ctx.ensure_object(dict)
369 ctx.obj = {'log_level': log_level, 'ui': ui, 'debug': debug, 'no_banner': no_banner}
370 logger_setup(log_level, ui, debug)
372 if ctx.invoked_subcommand is None:
373 rprint('Use --help to get a quick help on the mafw command.')
376@cli.command(name='list')
377@click.pass_obj
378def list_processors(obj: dict[str, Any]) -> ReturnValue:
379 """Display the list of available processors.
381 This command will retrieve all available processors via the plugin manager. Both internal and external processors
382 will be listed if the ext-plugins option is passed.
383 \f
385 """
386 try:
387 plugin_manager = get_plugin_manager()
388 plugins = plugin_manager.load_plugins({'processors'})
389 available_processors = plugins.processor_list
390 print('\n')
391 table = Table(
392 title='Available processors',
393 header_style='orange3',
394 expand=True,
395 title_style='italic red',
396 )
397 table.add_column('Processor Name', justify='left', style='cyan')
398 table.add_column('Package Name', justify='left', style='cyan')
399 table.add_column('Module', justify='left', style='cyan')
400 mafw_processors = 0
401 other_processors = 0
402 for processor in available_processors:
403 if isinstance(processor, LazyImportProcessor):
404 package, module = processor.plugin_qualname.split('.', 1)
405 name = processor.plugin_name
406 else:
407 package, module = processor.__module__.split('.', 1)
408 name = processor.__name__
409 table.add_row(name, package, module)
410 if package == 'mafw':
411 mafw_processors += 1
412 else:
413 other_processors += 1
414 table.caption = f'Total processors = {len(available_processors)}, internal = {mafw_processors}, external = {other_processors}'
415 table.caption_style = 'italic green'
416 console = Console()
417 console.print(Align.center(table))
418 return ReturnValue.OK
419 except Exception as e:
420 display_exception(e, show_traceback=obj['debug'])
421 return ReturnValue.Error
424@cli.command(name='steering')
425@click.pass_obj
426@click.option('--show/--no-show', default=False, help='Display the generated steering file on console')
427@click.option('--ext-plugins/--no-ext-plugin', default=True, help='Load external plugins')
428@click.option('--open-editor/--no-open-editor', default=False, help='Open the file in your editor.')
429@click.option(
430 '--db-engine',
431 type=click.Choice(['sqlite', 'mysql', 'postgresql'], case_sensitive=False),
432 help='Select a DB engine',
433 default='sqlite',
434)
435@click.option('--db-url', type=str, default=':memory:', help='URL to the DB')
436@click.argument('steering-file', type=click.Path())
437def generate_steering(
438 obj: dict[str, Any],
439 show: bool,
440 ext_plugins: bool,
441 open_editor: bool,
442 steering_file: pathlib.Path,
443 db_engine: str,
444 db_url: str,
445) -> ReturnValue:
446 """Generates a steering file with the default parameters of all available processors.
448 STEERING_FILE A path to the steering file to execute.
450 The user must modify the generated steering file to ensure it can be executed using the run command.
451 \f
453 :param obj: The context object being passed from the main command.
454 :type obj: dict
455 :param show: Display the steering file in the console after the generation. Defaults to False.
456 :type show: bool
457 :param ext_plugins: Extend the search for processor to external libraries.
458 :type ext_plugins: bool
459 :param open_editor: Open a text editor after the generation to allow direct editing.
460 :type open_editor: bool
461 :param steering_file: The steering file path.
462 :type steering_file: Path
463 :param db_engine: The name of the db engine.
464 :type db_engine: str
465 :param db_url: The URL of the database.
466 :type db_url: str
467 """
468 try:
469 plugin_manager = get_plugin_manager()
470 plugins = plugin_manager.load_plugins({'processors'})
471 available_processors = plugins.processor_list
472 # db_engine is already sure to be in the default conf because the Choice is assuring it.
473 database_conf = default_conf[db_engine]
474 database_conf['URL'] = db_scheme[db_engine] + db_url
475 generate_steering_file(steering_file, available_processors, database_conf)
477 if show:
478 console = Console()
479 with open(steering_file) as fp:
480 text = fp.read()
481 with console.pager():
482 console.print(text, highlight=True)
483 console.print(Rule())
485 if open_editor:
486 click.edit(filename=str(steering_file))
487 else:
488 rprint(f'A generic steering file has been saved in [blue underline]{steering_file}[/blue underline].')
489 rprint('Open it in your favourite text editor, change the processors_to_run list and save it.')
490 rprint('')
491 rprint(f'To execute it launch: [blue]mafw run {steering_file}[/blue].')
493 return ReturnValue.OK
495 except Exception as e:
496 display_exception(e, show_traceback=obj['debug'])
497 return ReturnValue.Error
500@cli.command()
501@click.pass_obj
502@click.argument('steering-file', type=click.Path())
503def run(obj: dict[str, Any], steering_file: click.Path) -> ReturnValue:
504 """Runs a steering file.
506 STEERING_FILE A path to the steering file to execute.
508 \f
510 :param obj: The context object being passed from the main command.
511 :type obj: dict
512 :param steering_file: The path to the output steering file.
513 :type steering_file: Path
514 """
515 try:
516 app = MAFwApplication(steering_file) # type: ignore
517 pes = app.run()
518 if pes == ProcessorExitStatus.Successful:
519 rv = ReturnValue.OK
520 else:
521 rv = ReturnValue.Error
522 return rv
524 except AbortProcessorException:
525 return ReturnValue.Error
526 except Exception as e:
527 display_exception(e, show_traceback=obj['debug'])
528 return ReturnValue.Error
531@cli.group
532@click.pass_context
533def db(ctx: click.core.Context) -> None:
534 """
535 Advanced database commands.
537 The db group of commands offers a set of useful database operations. Invoke the help option of each command for
538 more details.
539 \f
541 :param ctx: The click context.
542 :type ctx: click.core.Context
543 """
546@db.command(name='wizard')
547@click.pass_context
548@click.option(
549 '-o',
550 '--output-file',
551 type=click.Path(),
552 default=pathlib.Path.cwd() / pathlib.Path('my_model.py'),
553 help='The name of the output file with the reflected model.',
554)
555@click.option('-s', '--schema', type=str, help='The name of the DB schema')
556@click.option(
557 '-t', '--tables', type=str, multiple=True, help='Generate model for selected tables. Multiple option possible.'
558)
559@click.option('--overwrite/--no-overwrite', default=True, help='Overwrite output file if already exists.')
560@click.option('--preserve-order/--no-preserve-order', default=True, help='Preserve column order.')
561@click.option('--with-views/--without-views', default=False, help='Include also database views.')
562@click.option('--ignore-unknown/--no-ignore-unknown', default=False, help='Ignore unknown fields.')
563@click.option('--snake-case/--no-snake-case', default=True, help='Use snake case for table and field names.')
564@click.option('--host', type=str, help='Hostname for the DB server.')
565@click.option('-p', '--port', type=int, help='Port number for the DB server.')
566@click.option('-u', '--user', '--username', type=str, help='Username for the connection to the DB server.')
567@click.option('--password', prompt=True, prompt_required=False, hide_input=True, help='Insert password when prompted')
568@click.option('-e', '--engine', type=click.Choice(sorted(DATABASE_MAP)), help='The DB engine')
569@click.argument('database', type=str)
570def wizard(
571 ctx: click.core.Context,
572 overwrite: bool,
573 tables: tuple[str, ...] | None,
574 preserve_order: bool,
575 with_views: bool,
576 ignore_unknown: bool,
577 snake_case: bool,
578 output_file: click.Path | pathlib.Path | str,
579 host: str,
580 port: int,
581 user: str,
582 password: str,
583 engine: str,
584 schema: str,
585 database: str,
586) -> ReturnValue:
587 """
588 Reflect an existing DB into a python module.
590 mafw db wizard [Options] Database
592 Database Name of the Database to be reflected.
594 About connection options (user / host / port):
596 That information will be used only in case you are trying to access a network database (MySQL or PostgreSQL). In
597 case of Sqlite, the parameters will be discarded.
599 About passwords:
601 If you need to specify a password to connect to the DB server, just add --password in the command line without
602 typing your password as clear text. You will be prompted to insert the password with hidden characters at the start
603 of the processor.
605 About engines:
607 The full list of supported engines is provided in the option below. If you do not specify any
608 engine and the database is actually an existing filename, then engine is set to Sqlite, otherwise to postgresql.
610 \f
612 :param database: The name of the database.
613 :type database: str
614 :param schema: The database schema to be reflected.
615 :type schema: str
616 :param engine: The database engine. A selection of possible values is provided in the script help.
617 :type engine: str
618 :param password: The password for the DB connection. Not used in case of Sqlite.
619 :type password: str
620 :param user: The username for the DB connection. Not used in case of Sqlite.
621 :type user: str
622 :param port: The port number of the database server. Not used in case of Sqlite.
623 :type port: int
624 :param host: The database hostname. Not used in case of Sqlite.
625 :type host: str
626 :param output_file: The filename for the output python module.
627 :type output_file: click.Path | pathlib.Path | str
628 :param snake_case: Flag to select snake_case convention for table and field names, or all small letter formatting.
629 :type snake_case: bool
630 :param ignore_unknown: Flag to ignore unknown fields. If False, an unknown field will be labelled with UnknownField.
631 :type ignore_unknown: bool
632 :param with_views: Flag to include views in the reflected elements.
633 :type with_views: bool
634 :param preserve_order: Flag to select if table fields should be reflected in the original order (True) or in
635 alphabetical order (False)
636 :type preserve_order: bool
637 :param tables: A tuple containing a selection of table names to be reflected.
638 :type tables: tuple[str, ...]
639 :param overwrite: Flag to overwrite the output file if exists. If False and the output file already exists, the
640 user can decide what to do.
641 :type overwrite: bool
642 :param ctx: The click context, that includes the original object with global options.
643 :type ctx: click.core.Context
644 :return: The script return value
645 """
646 obj = ctx.obj
648 if isinstance(output_file, (str, click.Path)): 648 ↛ 652line 648 didn't jump to line 652 because the condition on line 648 was always true
649 output_file = pathlib.Path(str(output_file))
651 # if not overwrite, check if the file exists
652 if not overwrite and output_file.exists():
653 answer = Prompt.ask(
654 f'A module ({output_file.name}) already exists. Do you want to overwrite, cancel or backup?',
655 case_sensitive=False,
656 choices=['o', 'c', 'b'],
657 show_choices=True,
658 show_default=True,
659 default='b',
660 )
661 if answer == 'c':
662 return ReturnValue.OK
663 elif answer == 'b': 663 ↛ 670line 663 didn't jump to line 670 because the condition on line 663 was always true
664 bck_filename = output_file.parent / pathlib.Path(
665 output_file.stem + f'_{datetime.datetime.now():%Y%m%dT%H%M%S}' + output_file.suffix
666 )
668 shutil.copy(output_file, bck_filename)
670 if tables == ():
671 tables = None
673 if engine is None:
674 engine = 'sqlite' if pathlib.Path(database).exists() else 'postgresql'
676 # prepare the connection options
677 if engine in ['sqlite', 'sqlite3']:
678 # for sqlite the connection
679 keys: list[str] = ['schema']
680 values: list[str | int] = [schema]
681 else:
682 keys = ['host', 'port', 'user', 'schema', 'password']
683 values = [host, port, user, schema, password]
685 connection_options: dict[str, Any] = {}
686 for k, v in zip(keys, values):
687 if v:
688 connection_options[k] = v
690 try:
691 introspector = make_introspector(engine, database, **connection_options)
692 except Exception as e:
693 msg = f'[red]Problem generating an introspector instance of {database}.'
694 display_exception(e, show_traceback=obj['debug'])
695 return ReturnValue.Error
697 try:
698 with open(output_file, 'tw') as out_file:
699 dump_models(
700 out_file,
701 introspector,
702 tables,
703 preserve_order=preserve_order,
704 include_views=with_views,
705 ignore_unknown=ignore_unknown,
706 snake_case=snake_case,
707 )
708 except Exception as e:
709 display_exception(e, obj['debug'])
710 return ReturnValue.Error
712 msg = f'[green]Database {database} successfully reflected in {output_file.name}'
713 log.info(msg)
714 return ReturnValue.OK
717if __name__ == '__main__':
718 # Use the custom main method that handles exit codes
719 cli.main()