Source code for mafw.scripts.mafw_exe

#  Copyright 2025–2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""
The execution framework.

This module provides the run functionality to the whole library.

It is heavily relying on ``click`` for the generation of commands, options, and arguments.

.. click:: mafw.scripts.mafw_exe:cli
    :prog: mafw
    :nested: full

"""

import datetime
import logging
import pathlib
import shutil
import sys
import warnings
from enum import IntEnum
from typing import Any

import click
from click.exceptions import ClickException
from pwiz import DATABASE_MAP, make_introspector  # type: ignore[import-untyped]
from rich import print as rprint
from rich import traceback
from rich.align import Align
from rich.logging import RichHandler
from rich.prompt import Prompt
from rich.rule import Rule
from rich.table import Table
from rich_pyfiglet import RichFiglet

from mafw.__about__ import __version__
from mafw.db.db_configurations import db_scheme, default_conf
from mafw.db.db_wizard import dump_models
from mafw.enumerators import ProcessorExitStatus
from mafw.lazy_import import LazyImportProcessor
from mafw.mafw_errors import AbortProcessorException
from mafw.plugin_manager import get_plugin_manager
from mafw.runner import MAFwApplication
from mafw.tools.click_extensions import (
    AbbreviateGroup,
    check_ci_completion_guard,
    completion_script_path,
    completion_source_script,
    install_completion,
    is_script_already_installed,
    resolve_completion_shell,
    uninstall_completion_files,
)
from mafw.tools.parallel import is_free_threading
from mafw.tools.shell_tools import CONSOLE
from mafw.tools.toml_tools import generate_steering_file

suppress = [click]
traceback.install(show_locals=True, suppress=suppress)


[docs] class MAFwGroup(AbbreviateGroup): """Click group with abbreviation and MAFw-specific exit handling.""" group_class: type[click.Group] = AbbreviateGroup
[docs] def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]: """Parse arguments and record the selected root command before invoke.""" rv = super().parse_args(ctx, args) if ( ctx.parent is None ): # this is assuring that we store only the first command corresponding to the root context ctx.meta['_mafw_invoked_subcommand'] = self._detect_command_name(args) return rv
[docs] def _detect_command_name(self, args: list[str]) -> str | None: """Return the first subcommand token while skipping root option values.""" # This is probably an overkill. After the default parsing the first element in args should be the first # (abbreviated or fully typed) command. All global options will be removed from args. # so potentially we could return self.get_command(ctx, args[0]), # I will leave it like this because it might be useful in the future. option_params = {opt for param in self.params for opt in getattr(param, 'opts', [])} expects_value = { opt for param in self.params if not getattr(param, 'is_flag', False) for opt in getattr(param, 'opts', []) } skip_next = False for token in args: if skip_next: skip_next = False continue if token in option_params: if token in expects_value: skip_next = True continue if token.startswith('-'): continue selected = self.get_command(click.Context(self), token) return selected.name if selected is not None else token return None
[docs] def invoke(self, ctx: click.Context) -> Any: """Invoke the command and normalize Click exceptions to MAFw exits.""" print_banner(ctx) try: return super().invoke(ctx) except ClickException as exc: exc.show() sys.exit(exc.exit_code) except (SystemExit, click.exceptions.Exit): raise except Exception: sys.exit(ReturnValue.Error)
[docs] def main(self, *args: Any, **kwargs: Any) -> None: # type: ignore[override] """Run the CLI and convert returned statuses to process exit codes.""" try: rv = super().main(*args, standalone_mode=False, **kwargs) # type: ignore[call-overload] if isinstance(rv, (ReturnValue, int)): sys.exit(rv) sys.exit(ReturnValue.OK) except ClickException as exc: exc.show() sys.exit(exc.exit_code) except SystemExit: raise except Exception: sys.exit(ReturnValue.Error)
LEVELS = {'debug': 10, 'info': 20, 'warning': 30, 'error': 40, 'critical': 50} CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) _warnings_captured = False welcome_message = RichFiglet( 'MAFw', colors=['#ff0000', 'magenta1', 'blue3'], horizontal=True, # font='banner4', remove_blank_lines=True, border='ROUNDED', border_color='#ff0000', )
[docs] def custom_formatwarning( message: Warning | str, category: type[Warning], filename: str, lineno: int, line: str | None = None ) -> str: """Return the pure message of the warning.""" return str(message)
[docs] def is_bugged_warning_capture_version() -> bool: """Check if the Python version has the warning capture bug in free-threading mode. Python 3.14.0 through 3.14.3 (free-threading builds only) have a bug where ``logging.captureWarnings(True)`` can crash or misbehave. This function returns True when the current interpreter is affected. .. note:: See https://github.com/python/cpython/pull/146374 for details. .. todo:: Remove this workaround when Python 3.14 is no longer a supported version (expected late 2030). :return: True if the interpreter is affected by the warning capture bug, False otherwise. :rtype: bool """ version = sys.version_info return version.major == 3 and version.minor == 14 and version.micro <= 3 and is_free_threading()
warnings.formatwarning = custom_formatwarning # Disable warning capture on bugged free-threading builds (3.14.0–3.14.3). if is_bugged_warning_capture_version(): logging.captureWarnings(False) else: logging.captureWarnings(True) # get the root logger log = logging.getLogger()
[docs] class ReturnValue(IntEnum): """Enumerator to handle the script return value.""" OK = 0 """No error""" Error = 1 """Generic error"""
[docs] def logger_setup(level: str, ui: str, tracebacks: bool) -> None: """Set up the logger. This function is actually configuring the root logger level from the command line options and it attaches either a RichHandler or a StreamHandler depending on the user interface type. The `tracebacks` flag is used only by the RichHandler. Printing the tracebacks is rather useful when debugging the code, but it could be detrimental for final users. In normal circumstances, tracebacks is set to False, and is turned on when the debug flag is activated. :param level: Logging level as a string. :type level: str :param ui: User interface as a string ('rich' or 'console'). :type ui: str :param tracebacks: Enable/disable the logging of exception tracebacks. """ level = level.lower() ui = ui.lower() log.setLevel(LEVELS[level]) handler: logging.Handler if ui == 'rich': fs = '%(message)s' handler = RichHandler( rich_tracebacks=tracebacks, markup=True, show_path=False, log_time_format='%Y%m%d-%H:%M:%S' ) else: fs = '%(asctime)s - %(levelname)s - %(message)s' handler = logging.StreamHandler() formatter = logging.Formatter(fs) handler.setFormatter(formatter) log.addHandler(handler)
# _ensure_warning_capture()
[docs] def display_exception(exception: Exception, show_traceback: bool = False) -> None: """ Display exception information with optional debug details. This function logs exception information at the critical level. When show_traceback is enabled, it logs the full exception including traceback information. Otherwise, it logs a simplified message directing users to enable debug mode for more details. :param exception: The exception to be displayed and logged. :type exception: Exception :param show_traceback: Flag indicating whether to show detailed traceback information. Defaults to False :type show_traceback: bool """ if show_traceback: log.critical('A critical error occurred') log.exception(exception) else: log.critical('A critical error occurred. Set option -D to get traceback output') log.exception( '%s: %s' % (exception.__class__.__name__, exception), exc_info=False, stack_info=False, stacklevel=1 )
@click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS, name='mafw', cls=MAFwGroup) @click.pass_context @click.option( '--log-level', type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False), show_default=True, default='info', help='Log level', ) @click.option( '--ui', type=click.Choice(['console', 'rich'], case_sensitive=False), default='rich', help='The user interface', show_default=True, ) @click.option('-D', '--debug', is_flag=True, default=False, help='Show debug information about errors') @click.option('--no-banner', is_flag=True, default=False, help='Disable the welcome banner') @click.version_option(__version__, '-v', '--version') def cli(ctx: click.core.Context, log_level: str, ui: str, debug: bool, no_banner: bool) -> None: """ The Modular Analysis Framework execution. This is the command line interface where you can configure and launch your analysis tasks. More information on our documentation page. \f :param ctx: The click context. :type ctx: click.core.Context :param log_level: The logging level as a string. Choice from debug, info, warning, error and critical. :type log_level: str :param ui: The user interface as a string. Choice from console and rich. :type ui: str :param debug: Flag to show debug information about exception. :type debug: bool :param no_banner: Flag to disable the welcome banner. :type no_banner: bool """ ctx.ensure_object(dict) ctx.obj = {'log_level': log_level, 'ui': ui, 'debug': debug, 'no_banner': no_banner} logger_setup(log_level, ui, debug) if ctx.invoked_subcommand is None: rprint('Use --help to get a quick help on the mafw command.') @cli.group(name='completion') @click.pass_context def completion(ctx: click.Context) -> None: """ Manage shell completion for the ``mafw`` command. The completion workflow installs the Click-generated shell code into the active virtual environment, updates the activation script so completion is loaded automatically, and exposes a ``show`` helper for direct evaluation. \f .. versionadded:: v2.2 :param ctx: The click context. :type ctx: click.core.Context """ ctx.ensure_object(dict) ctx.obj['tool_name'] = 'mafw' @completion.command(name='install') @click.option( '-s', '--shell', type=click.Choice(['auto', 'bash', 'zsh', 'fish'], case_sensitive=False), default='auto', show_default=True, help='Target shell for completion installation', ) @click.option('-F', '--force', is_flag=True, default=False, help='Reinstall completion even if already loaded') @click.pass_context def completion_install(ctx: click.Context, shell: str, force: bool) -> None: """ Install the ``mafw`` shell completion script. When ``--shell`` is omitted, the command guesses the shell from ``$SHELL``. The generated Click completion script is stored in the active virtual environment and the activation script is updated so that future shell sessions load completion automatically. \f .. versionadded:: v2.2 :param ctx: The click context. :type ctx: click.core.Context :param shell: Target shell selector. :type shell: str :param force: Reinstall completion even if already loaded. :type force: bool """ check_ci_completion_guard() tool_name = ctx.obj['tool_name'] resolved_shell = resolve_completion_shell(shell) script_path = completion_script_path(tool_name, resolved_shell) if is_script_already_installed(tool_name, resolved_shell) and not force: raise click.ClickException( f'MAFw completion is already installed in {script_path}. Use --force to reinstall it.' ) install_completion(tool_name, resolved_shell, force, script_path) rprint(f'Completion script installed in [blue underline]{script_path}[/blue underline].') rprint('Exit and re-enter the virtual environment to activate shell completion.') @completion.command(name='uninstall') @click.pass_context def completion_uninstall(ctx: click.Context) -> None: """ Remove the installed ``mafw`` shell completion files. This command removes generated completion files from ``share/mafw`` and strips the marker block from the activation scripts. .. versionadded:: v2.2 :param ctx: The click context. :type ctx: click.core.Context """ tool_name = ctx.obj['tool_name'] uninstall_completion_files(tool_name, None) rprint(f'Shell completion for {tool_name} has been removed from the active virtual environment.') @completion.command(name='show') @click.option( '-s', '--shell', type=click.Choice(['auto', 'bash', 'zsh', 'fish'], case_sensitive=False), default='auto', show_default=True, help='Target shell for completion output', ) @click.pass_context def completion_show(ctx: click.Context, shell: str) -> None: """ Display the Click completion script on standard output. The output is intentionally clean so it can be used with ``eval``: .. code-block:: console eval "$(mafw completion show)" .. versionadded:: v2.2 :param ctx: The click context. :type ctx: click.core.Context :param shell: Target shell selector. :type shell: str """ check_ci_completion_guard() tool_name = ctx.obj['tool_name'] resolved_shell = resolve_completion_shell(shell) click.echo(completion_source_script(tool_name, resolved_shell), nl=True) @cli.command(name='list') @click.pass_obj def list_processors(obj: dict[str, Any]) -> ReturnValue: """Display the list of available processors. This command will retrieve all available processors via the plugin manager. Both internal and external processors will be listed if the ext-plugins option is passed. \f """ try: plugin_manager = get_plugin_manager() plugins = plugin_manager.load_plugins({'processors'}) available_processors = plugins.processor_list print('\n') table = Table( title='Available processors', header_style='orange3', expand=True, title_style='italic red', ) table.add_column('Processor Name', justify='left', style='cyan') table.add_column('Package Name', justify='left', style='cyan') table.add_column('Module', justify='left', style='cyan') mafw_processors = 0 other_processors = 0 for processor in available_processors: if isinstance(processor, LazyImportProcessor): package, module = processor.plugin_qualname.split('.', 1) name = processor.plugin_name else: package, module = processor.__module__.split('.', 1) name = processor.__name__ table.add_row(name, package, module) if package == 'mafw': mafw_processors += 1 else: other_processors += 1 table.caption = f'Total processors = {len(available_processors)}, internal = {mafw_processors}, external = {other_processors}' table.caption_style = 'italic green' console = CONSOLE console.print(Align.center(table)) return ReturnValue.OK except Exception as e: display_exception(e, show_traceback=obj['debug']) return ReturnValue.Error @cli.command(name='steering') @click.pass_obj @click.option('--show/--no-show', default=False, help='Display the generated steering file on console') @click.option('--ext-plugins/--no-ext-plugin', default=True, help='Load external plugins') @click.option('--open-editor/--no-open-editor', default=False, help='Open the file in your editor.') @click.option( '--db-engine', type=click.Choice(['sqlite', 'mysql', 'postgresql'], case_sensitive=False), help='Select a DB engine', default='sqlite', ) @click.option('--db-url', type=str, default=':memory:', help='URL to the DB') @click.argument('steering-file', type=click.Path()) def generate_steering( obj: dict[str, Any], show: bool, ext_plugins: bool, open_editor: bool, steering_file: pathlib.Path, db_engine: str, db_url: str, ) -> ReturnValue: """Generates a steering file with the default parameters of all available processors. STEERING_FILE A path to the steering file to execute. The user must modify the generated steering file to ensure it can be executed using the run command. \f :param obj: The context object being passed from the main command. :type obj: dict :param show: Display the steering file in the console after the generation. Defaults to False. :type show: bool :param ext_plugins: Extend the search for processor to external libraries. :type ext_plugins: bool :param open_editor: Open a text editor after the generation to allow direct editing. :type open_editor: bool :param steering_file: The steering file path. :type steering_file: Path :param db_engine: The name of the db engine. :type db_engine: str :param db_url: The URL of the database. :type db_url: str """ try: plugin_manager = get_plugin_manager() plugins = plugin_manager.load_plugins({'processors'}) available_processors = plugins.processor_list # db_engine is already sure to be in the default conf because the Choice is assuring it. database_conf = default_conf[db_engine] database_conf['URL'] = db_scheme[db_engine] + db_url generate_steering_file(steering_file, available_processors, database_conf) if show: console = CONSOLE with open(steering_file) as fp: text = fp.read() with console.pager(): console.print(text, highlight=True) console.print(Rule()) if open_editor: click.edit(filename=str(steering_file)) else: rprint(f'A generic steering file has been saved in [blue underline]{steering_file}[/blue underline].') rprint('Open it in your favourite text editor, change the processors_to_run list and save it.') rprint('') rprint(f'To execute it launch: [blue]mafw run {steering_file}[/blue].') return ReturnValue.OK except Exception as e: display_exception(e, show_traceback=obj['debug']) return ReturnValue.Error @cli.command() @click.pass_obj @click.argument('steering-file', type=click.Path()) def run(obj: dict[str, Any], steering_file: click.Path) -> ReturnValue: """Runs a steering file. STEERING_FILE A path to the steering file to execute. \f :param obj: The context object being passed from the main command. :type obj: dict :param steering_file: The path to the output steering file. :type steering_file: Path """ try: app = MAFwApplication(steering_file) # type: ignore pes = app.run() if pes == ProcessorExitStatus.Successful: rv = ReturnValue.OK else: rv = ReturnValue.Error return rv except AbortProcessorException: return ReturnValue.Error except Exception as e: display_exception(e, show_traceback=obj['debug']) return ReturnValue.Error @cli.group @click.pass_context def db(ctx: click.core.Context) -> None: """ Advanced database commands. The db group of commands offers a set of useful database operations. Invoke the help option of each command for more details. \f :param ctx: The click context. :type ctx: click.core.Context """ @db.command(name='wizard') @click.pass_context @click.option( '-o', '--output-file', type=click.Path(), default=pathlib.Path.cwd() / pathlib.Path('my_model.py'), help='The name of the output file with the reflected model.', ) @click.option('-s', '--schema', type=str, help='The name of the DB schema') @click.option( '-t', '--tables', type=str, multiple=True, help='Generate model for selected tables. Multiple option possible.' ) @click.option('--overwrite/--no-overwrite', default=True, help='Overwrite output file if already exists.') @click.option('--preserve-order/--no-preserve-order', default=True, help='Preserve column order.') @click.option('--with-views/--without-views', default=False, help='Include also database views.') @click.option('--ignore-unknown/--no-ignore-unknown', default=False, help='Ignore unknown fields.') @click.option('--snake-case/--no-snake-case', default=True, help='Use snake case for table and field names.') @click.option('--host', type=str, help='Hostname for the DB server.') @click.option('-p', '--port', type=int, help='Port number for the DB server.') @click.option('-u', '--user', '--username', type=str, help='Username for the connection to the DB server.') @click.option('--password', prompt=True, prompt_required=False, hide_input=True, help='Insert password when prompted') @click.option('-e', '--engine', type=click.Choice(sorted(DATABASE_MAP)), help='The DB engine') @click.argument('database', type=str) def wizard( ctx: click.core.Context, overwrite: bool, tables: tuple[str, ...] | None, preserve_order: bool, with_views: bool, ignore_unknown: bool, snake_case: bool, output_file: click.Path | pathlib.Path | str, host: str, port: int, user: str, password: str, engine: str, schema: str, database: str, ) -> ReturnValue: """ Reflect an existing DB into a python module. mafw db wizard [Options] Database Database Name of the Database to be reflected. About connection options (user / host / port): That information will be used only in case you are trying to access a network database (MySQL or PostgreSQL). In case of Sqlite, the parameters will be discarded. About passwords: If you need to specify a password to connect to the DB server, just add --password in the command line without typing your password as clear text. You will be prompted to insert the password with hidden characters at the start of the processor. About engines: The full list of supported engines is provided in the option below. If you do not specify any engine and the database is actually an existing filename, then engine is set to Sqlite, otherwise to postgresql. \f :param database: The name of the database. :type database: str :param schema: The database schema to be reflected. :type schema: str :param engine: The database engine. A selection of possible values is provided in the script help. :type engine: str :param password: The password for the DB connection. Not used in case of Sqlite. :type password: str :param user: The username for the DB connection. Not used in case of Sqlite. :type user: str :param port: The port number of the database server. Not used in case of Sqlite. :type port: int :param host: The database hostname. Not used in case of Sqlite. :type host: str :param output_file: The filename for the output python module. :type output_file: click.Path | pathlib.Path | str :param snake_case: Flag to select snake_case convention for table and field names, or all small letter formatting. :type snake_case: bool :param ignore_unknown: Flag to ignore unknown fields. If False, an unknown field will be labelled with UnknownField. :type ignore_unknown: bool :param with_views: Flag to include views in the reflected elements. :type with_views: bool :param preserve_order: Flag to select if table fields should be reflected in the original order (True) or in alphabetical order (False) :type preserve_order: bool :param tables: A tuple containing a selection of table names to be reflected. :type tables: tuple[str, ...] :param overwrite: Flag to overwrite the output file if exists. If False and the output file already exists, the user can decide what to do. :type overwrite: bool :param ctx: The click context, that includes the original object with global options. :type ctx: click.core.Context :return: The script return value """ obj = ctx.obj if isinstance(output_file, (str, click.Path)): output_file = pathlib.Path(str(output_file)) # if not overwrite, check if the file exists if not overwrite and output_file.exists(): answer = Prompt.ask( f'A module ({output_file.name}) already exists. Do you want to overwrite, cancel or backup?', case_sensitive=False, choices=['o', 'c', 'b'], show_choices=True, show_default=True, default='b', ) if answer == 'c': return ReturnValue.OK elif answer == 'b': bck_filename = output_file.parent / pathlib.Path( output_file.stem + f'_{datetime.datetime.now():%Y%m%dT%H%M%S}' + output_file.suffix ) shutil.copy(output_file, bck_filename) if tables == (): tables = None if engine is None: engine = 'sqlite' if pathlib.Path(database).exists() else 'postgresql' # prepare the connection options if engine in ['sqlite', 'sqlite3']: # for sqlite the connection keys: list[str] = ['schema'] values: list[str | int] = [schema] else: keys = ['host', 'port', 'user', 'schema', 'password'] values = [host, port, user, schema, password] connection_options: dict[str, Any] = {} for k, v in zip(keys, values): if v: connection_options[k] = v try: introspector = make_introspector(engine, database, **connection_options) except Exception as e: msg = f'[red]Problem generating an introspector instance of {database}.' display_exception(e, show_traceback=obj['debug']) return ReturnValue.Error try: with open(output_file, 'tw') as out_file: dump_models( out_file, introspector, tables, preserve_order=preserve_order, include_views=with_views, ignore_unknown=ignore_unknown, snake_case=snake_case, ) except Exception as e: display_exception(e, obj['debug']) return ReturnValue.Error msg = f'[green]Database {database} successfully reflected in {output_file.name}' log.info(msg) return ReturnValue.OK if __name__ == '__main__': # Use the custom main method that handles exit codes cli.main()