# Copyright 2025–2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
The execution framework.
This module provides the run functionality to the whole library.
It is heavily relying on ``click`` for the generation of commands, options, and arguments.
.. click:: mafw.scripts.mafw_exe:cli
:prog: mafw
:nested: full
"""
import datetime
import logging
import pathlib
import shutil
import sys
import warnings
from enum import IntEnum
from typing import Any
import click
from click.exceptions import ClickException
from pwiz import DATABASE_MAP, make_introspector # type: ignore[import-untyped]
from rich import print as rprint
from rich import traceback
from rich.align import Align
from rich.logging import RichHandler
from rich.prompt import Prompt
from rich.rule import Rule
from rich.table import Table
from rich_pyfiglet import RichFiglet
from mafw.__about__ import __version__
from mafw.db.db_configurations import db_scheme, default_conf
from mafw.db.db_wizard import dump_models
from mafw.enumerators import ProcessorExitStatus
from mafw.lazy_import import LazyImportProcessor
from mafw.mafw_errors import AbortProcessorException
from mafw.plugin_manager import get_plugin_manager
from mafw.runner import MAFwApplication
from mafw.tools.click_extensions import (
AbbreviateGroup,
check_ci_completion_guard,
completion_script_path,
completion_source_script,
install_completion,
is_script_already_installed,
resolve_completion_shell,
uninstall_completion_files,
)
from mafw.tools.parallel import is_free_threading
from mafw.tools.shell_tools import CONSOLE
from mafw.tools.toml_tools import generate_steering_file
suppress = [click]
traceback.install(show_locals=True, suppress=suppress)
[docs]
class MAFwGroup(AbbreviateGroup):
"""Click group with abbreviation and MAFw-specific exit handling."""
group_class: type[click.Group] = AbbreviateGroup
[docs]
def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]:
"""Parse arguments and record the selected root command before invoke."""
rv = super().parse_args(ctx, args)
if (
ctx.parent is None
): # this is assuring that we store only the first command corresponding to the root context
ctx.meta['_mafw_invoked_subcommand'] = self._detect_command_name(args)
return rv
[docs]
def _detect_command_name(self, args: list[str]) -> str | None:
"""Return the first subcommand token while skipping root option values."""
# This is probably an overkill. After the default parsing the first element in args should be the first
# (abbreviated or fully typed) command. All global options will be removed from args.
# so potentially we could return self.get_command(ctx, args[0]),
# I will leave it like this because it might be useful in the future.
option_params = {opt for param in self.params for opt in getattr(param, 'opts', [])}
expects_value = {
opt for param in self.params if not getattr(param, 'is_flag', False) for opt in getattr(param, 'opts', [])
}
skip_next = False
for token in args:
if skip_next:
skip_next = False
continue
if token in option_params:
if token in expects_value:
skip_next = True
continue
if token.startswith('-'):
continue
selected = self.get_command(click.Context(self), token)
return selected.name if selected is not None else token
return None
[docs]
def invoke(self, ctx: click.Context) -> Any:
"""Invoke the command and normalize Click exceptions to MAFw exits."""
print_banner(ctx)
try:
return super().invoke(ctx)
except ClickException as exc:
exc.show()
sys.exit(exc.exit_code)
except (SystemExit, click.exceptions.Exit):
raise
except Exception:
sys.exit(ReturnValue.Error)
[docs]
def main(self, *args: Any, **kwargs: Any) -> None: # type: ignore[override]
"""Run the CLI and convert returned statuses to process exit codes."""
try:
rv = super().main(*args, standalone_mode=False, **kwargs) # type: ignore[call-overload]
if isinstance(rv, (ReturnValue, int)):
sys.exit(rv)
sys.exit(ReturnValue.OK)
except ClickException as exc:
exc.show()
sys.exit(exc.exit_code)
except SystemExit:
raise
except Exception:
sys.exit(ReturnValue.Error)
LEVELS = {'debug': 10, 'info': 20, 'warning': 30, 'error': 40, 'critical': 50}
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
_warnings_captured = False
welcome_message = RichFiglet(
'MAFw',
colors=['#ff0000', 'magenta1', 'blue3'],
horizontal=True,
# font='banner4',
remove_blank_lines=True,
border='ROUNDED',
border_color='#ff0000',
)
[docs]
def print_banner(ctx: click.Context) -> None:
"""
Print the welcome banner only once, only for rich UI,
and only if not disabled.
.. note::
ctx.obj is not yet populated when Group.invoke() is called,
but ctx.params contains parsed options for the current group. We
therefore inspect the root context params.
"""
# avoid printing while click is still doing resilient parsing (e.g., --help)
if getattr(ctx, 'resilient_parsing', False):
return
root = ctx.find_root()
# Parse-time command resolution records the selected subcommand before
# invoke() emits any output, so completion can remain quiet.
# Possible improvements: allow selection of commands for which there must not be any banner printing.
if root.meta.get('_mafw_invoked_subcommand') == 'completion':
return
params = getattr(root, 'params', {}) or {}
ui = params.get('ui')
no_banner = params.get('no_banner')
# Normalize and check
if no_banner:
return
if not ui: # pragma: no cover
# if nothing available, be conservative and don't print
return
if str(ui).lower() != 'rich':
return
# Print only once per process / invocation
if root.meta.get('_banner_printed', False):
return
console = CONSOLE
console.print(welcome_message)
root.meta['_banner_printed'] = True
[docs]
def is_bugged_warning_capture_version() -> bool:
"""Check if the Python version has the warning capture bug in free-threading mode.
Python 3.14.0 through 3.14.3 (free-threading builds only) have a bug where
``logging.captureWarnings(True)`` can crash or misbehave. This function returns
True when the current interpreter is affected.
.. note::
See https://github.com/python/cpython/pull/146374 for details.
.. todo::
Remove this workaround when Python 3.14 is no longer a supported version
(expected late 2030).
:return: True if the interpreter is affected by the warning capture bug, False otherwise.
:rtype: bool
"""
version = sys.version_info
return version.major == 3 and version.minor == 14 and version.micro <= 3 and is_free_threading()
warnings.formatwarning = custom_formatwarning
# Disable warning capture on bugged free-threading builds (3.14.0–3.14.3).
if is_bugged_warning_capture_version():
logging.captureWarnings(False)
else:
logging.captureWarnings(True)
# get the root logger
log = logging.getLogger()
[docs]
class ReturnValue(IntEnum):
"""Enumerator to handle the script return value."""
OK = 0
"""No error"""
Error = 1
"""Generic error"""
[docs]
def logger_setup(level: str, ui: str, tracebacks: bool) -> None:
"""Set up the logger.
This function is actually configuring the root logger level from the command line options and it attaches either
a RichHandler or a StreamHandler depending on the user interface type.
The `tracebacks` flag is used only by the RichHandler. Printing the tracebacks is rather useful when debugging
the code, but it could be detrimental for final users. In normal circumstances, tracebacks is set to False,
and is turned on when the debug flag is activated.
:param level: Logging level as a string.
:type level: str
:param ui: User interface as a string ('rich' or 'console').
:type ui: str
:param tracebacks: Enable/disable the logging of exception tracebacks.
"""
level = level.lower()
ui = ui.lower()
log.setLevel(LEVELS[level])
handler: logging.Handler
if ui == 'rich':
fs = '%(message)s'
handler = RichHandler(
rich_tracebacks=tracebacks, markup=True, show_path=False, log_time_format='%Y%m%d-%H:%M:%S'
)
else:
fs = '%(asctime)s - %(levelname)s - %(message)s'
handler = logging.StreamHandler()
formatter = logging.Formatter(fs)
handler.setFormatter(formatter)
log.addHandler(handler)
# _ensure_warning_capture()
[docs]
def display_exception(exception: Exception, show_traceback: bool = False) -> None:
"""
Display exception information with optional debug details.
This function logs exception information at the critical level. When show_traceback is enabled,
it logs the full exception including traceback information. Otherwise, it logs a simplified
message directing users to enable debug mode for more details.
:param exception: The exception to be displayed and logged.
:type exception: Exception
:param show_traceback: Flag indicating whether to show detailed traceback information. Defaults to False
:type show_traceback: bool
"""
if show_traceback:
log.critical('A critical error occurred')
log.exception(exception)
else:
log.critical('A critical error occurred. Set option -D to get traceback output')
log.exception(
'%s: %s' % (exception.__class__.__name__, exception), exc_info=False, stack_info=False, stacklevel=1
)
@click.group(invoke_without_command=True, context_settings=CONTEXT_SETTINGS, name='mafw', cls=MAFwGroup)
@click.pass_context
@click.option(
'--log-level',
type=click.Choice(['debug', 'info', 'warning', 'error', 'critical'], case_sensitive=False),
show_default=True,
default='info',
help='Log level',
)
@click.option(
'--ui',
type=click.Choice(['console', 'rich'], case_sensitive=False),
default='rich',
help='The user interface',
show_default=True,
)
@click.option('-D', '--debug', is_flag=True, default=False, help='Show debug information about errors')
@click.option('--no-banner', is_flag=True, default=False, help='Disable the welcome banner')
@click.version_option(__version__, '-v', '--version')
def cli(ctx: click.core.Context, log_level: str, ui: str, debug: bool, no_banner: bool) -> None:
"""
The Modular Analysis Framework execution.
This is the command line interface where you can configure and launch your analysis tasks.
More information on our documentation page.
\f
:param ctx: The click context.
:type ctx: click.core.Context
:param log_level: The logging level as a string. Choice from debug, info, warning, error and critical.
:type log_level: str
:param ui: The user interface as a string. Choice from console and rich.
:type ui: str
:param debug: Flag to show debug information about exception.
:type debug: bool
:param no_banner: Flag to disable the welcome banner.
:type no_banner: bool
"""
ctx.ensure_object(dict)
ctx.obj = {'log_level': log_level, 'ui': ui, 'debug': debug, 'no_banner': no_banner}
logger_setup(log_level, ui, debug)
if ctx.invoked_subcommand is None:
rprint('Use --help to get a quick help on the mafw command.')
@cli.group(name='completion')
@click.pass_context
def completion(ctx: click.Context) -> None:
"""
Manage shell completion for the ``mafw`` command.
The completion workflow installs the Click-generated shell code into the
active virtual environment, updates the activation script so completion is
loaded automatically, and exposes a ``show`` helper for direct evaluation.
\f
.. versionadded:: v2.2
:param ctx: The click context.
:type ctx: click.core.Context
"""
ctx.ensure_object(dict)
ctx.obj['tool_name'] = 'mafw'
@completion.command(name='install')
@click.option(
'-s',
'--shell',
type=click.Choice(['auto', 'bash', 'zsh', 'fish'], case_sensitive=False),
default='auto',
show_default=True,
help='Target shell for completion installation',
)
@click.option('-F', '--force', is_flag=True, default=False, help='Reinstall completion even if already loaded')
@click.pass_context
def completion_install(ctx: click.Context, shell: str, force: bool) -> None:
"""
Install the ``mafw`` shell completion script.
When ``--shell`` is omitted, the command guesses the shell from ``$SHELL``.
The generated Click completion script is stored in the active virtual
environment and the activation script is updated so that future shell
sessions load completion automatically.
\f
.. versionadded:: v2.2
:param ctx: The click context.
:type ctx: click.core.Context
:param shell: Target shell selector.
:type shell: str
:param force: Reinstall completion even if already loaded.
:type force: bool
"""
check_ci_completion_guard()
tool_name = ctx.obj['tool_name']
resolved_shell = resolve_completion_shell(shell)
script_path = completion_script_path(tool_name, resolved_shell)
if is_script_already_installed(tool_name, resolved_shell) and not force:
raise click.ClickException(
f'MAFw completion is already installed in {script_path}. Use --force to reinstall it.'
)
install_completion(tool_name, resolved_shell, force, script_path)
rprint(f'Completion script installed in [blue underline]{script_path}[/blue underline].')
rprint('Exit and re-enter the virtual environment to activate shell completion.')
@completion.command(name='uninstall')
@click.pass_context
def completion_uninstall(ctx: click.Context) -> None:
"""
Remove the installed ``mafw`` shell completion files.
This command removes generated completion files from ``share/mafw`` and
strips the marker block from the activation scripts.
.. versionadded:: v2.2
:param ctx: The click context.
:type ctx: click.core.Context
"""
tool_name = ctx.obj['tool_name']
uninstall_completion_files(tool_name, None)
rprint(f'Shell completion for {tool_name} has been removed from the active virtual environment.')
@completion.command(name='show')
@click.option(
'-s',
'--shell',
type=click.Choice(['auto', 'bash', 'zsh', 'fish'], case_sensitive=False),
default='auto',
show_default=True,
help='Target shell for completion output',
)
@click.pass_context
def completion_show(ctx: click.Context, shell: str) -> None:
"""
Display the Click completion script on standard output.
The output is intentionally clean so it can be used with ``eval``:
.. code-block:: console
eval "$(mafw completion show)"
.. versionadded:: v2.2
:param ctx: The click context.
:type ctx: click.core.Context
:param shell: Target shell selector.
:type shell: str
"""
check_ci_completion_guard()
tool_name = ctx.obj['tool_name']
resolved_shell = resolve_completion_shell(shell)
click.echo(completion_source_script(tool_name, resolved_shell), nl=True)
@cli.command(name='list')
@click.pass_obj
def list_processors(obj: dict[str, Any]) -> ReturnValue:
"""Display the list of available processors.
This command will retrieve all available processors via the plugin manager. Both internal and external processors
will be listed if the ext-plugins option is passed.
\f
"""
try:
plugin_manager = get_plugin_manager()
plugins = plugin_manager.load_plugins({'processors'})
available_processors = plugins.processor_list
print('\n')
table = Table(
title='Available processors',
header_style='orange3',
expand=True,
title_style='italic red',
)
table.add_column('Processor Name', justify='left', style='cyan')
table.add_column('Package Name', justify='left', style='cyan')
table.add_column('Module', justify='left', style='cyan')
mafw_processors = 0
other_processors = 0
for processor in available_processors:
if isinstance(processor, LazyImportProcessor):
package, module = processor.plugin_qualname.split('.', 1)
name = processor.plugin_name
else:
package, module = processor.__module__.split('.', 1)
name = processor.__name__
table.add_row(name, package, module)
if package == 'mafw':
mafw_processors += 1
else:
other_processors += 1
table.caption = f'Total processors = {len(available_processors)}, internal = {mafw_processors}, external = {other_processors}'
table.caption_style = 'italic green'
console = CONSOLE
console.print(Align.center(table))
return ReturnValue.OK
except Exception as e:
display_exception(e, show_traceback=obj['debug'])
return ReturnValue.Error
@cli.command(name='steering')
@click.pass_obj
@click.option('--show/--no-show', default=False, help='Display the generated steering file on console')
@click.option('--ext-plugins/--no-ext-plugin', default=True, help='Load external plugins')
@click.option('--open-editor/--no-open-editor', default=False, help='Open the file in your editor.')
@click.option(
'--db-engine',
type=click.Choice(['sqlite', 'mysql', 'postgresql'], case_sensitive=False),
help='Select a DB engine',
default='sqlite',
)
@click.option('--db-url', type=str, default=':memory:', help='URL to the DB')
@click.argument('steering-file', type=click.Path())
def generate_steering(
obj: dict[str, Any],
show: bool,
ext_plugins: bool,
open_editor: bool,
steering_file: pathlib.Path,
db_engine: str,
db_url: str,
) -> ReturnValue:
"""Generates a steering file with the default parameters of all available processors.
STEERING_FILE A path to the steering file to execute.
The user must modify the generated steering file to ensure it can be executed using the run command.
\f
:param obj: The context object being passed from the main command.
:type obj: dict
:param show: Display the steering file in the console after the generation. Defaults to False.
:type show: bool
:param ext_plugins: Extend the search for processor to external libraries.
:type ext_plugins: bool
:param open_editor: Open a text editor after the generation to allow direct editing.
:type open_editor: bool
:param steering_file: The steering file path.
:type steering_file: Path
:param db_engine: The name of the db engine.
:type db_engine: str
:param db_url: The URL of the database.
:type db_url: str
"""
try:
plugin_manager = get_plugin_manager()
plugins = plugin_manager.load_plugins({'processors'})
available_processors = plugins.processor_list
# db_engine is already sure to be in the default conf because the Choice is assuring it.
database_conf = default_conf[db_engine]
database_conf['URL'] = db_scheme[db_engine] + db_url
generate_steering_file(steering_file, available_processors, database_conf)
if show:
console = CONSOLE
with open(steering_file) as fp:
text = fp.read()
with console.pager():
console.print(text, highlight=True)
console.print(Rule())
if open_editor:
click.edit(filename=str(steering_file))
else:
rprint(f'A generic steering file has been saved in [blue underline]{steering_file}[/blue underline].')
rprint('Open it in your favourite text editor, change the processors_to_run list and save it.')
rprint('')
rprint(f'To execute it launch: [blue]mafw run {steering_file}[/blue].')
return ReturnValue.OK
except Exception as e:
display_exception(e, show_traceback=obj['debug'])
return ReturnValue.Error
@cli.command()
@click.pass_obj
@click.argument('steering-file', type=click.Path())
def run(obj: dict[str, Any], steering_file: click.Path) -> ReturnValue:
"""Runs a steering file.
STEERING_FILE A path to the steering file to execute.
\f
:param obj: The context object being passed from the main command.
:type obj: dict
:param steering_file: The path to the output steering file.
:type steering_file: Path
"""
try:
app = MAFwApplication(steering_file) # type: ignore
pes = app.run()
if pes == ProcessorExitStatus.Successful:
rv = ReturnValue.OK
else:
rv = ReturnValue.Error
return rv
except AbortProcessorException:
return ReturnValue.Error
except Exception as e:
display_exception(e, show_traceback=obj['debug'])
return ReturnValue.Error
@cli.group
@click.pass_context
def db(ctx: click.core.Context) -> None:
"""
Advanced database commands.
The db group of commands offers a set of useful database operations. Invoke the help option of each command for
more details.
\f
:param ctx: The click context.
:type ctx: click.core.Context
"""
@db.command(name='wizard')
@click.pass_context
@click.option(
'-o',
'--output-file',
type=click.Path(),
default=pathlib.Path.cwd() / pathlib.Path('my_model.py'),
help='The name of the output file with the reflected model.',
)
@click.option('-s', '--schema', type=str, help='The name of the DB schema')
@click.option(
'-t', '--tables', type=str, multiple=True, help='Generate model for selected tables. Multiple option possible.'
)
@click.option('--overwrite/--no-overwrite', default=True, help='Overwrite output file if already exists.')
@click.option('--preserve-order/--no-preserve-order', default=True, help='Preserve column order.')
@click.option('--with-views/--without-views', default=False, help='Include also database views.')
@click.option('--ignore-unknown/--no-ignore-unknown', default=False, help='Ignore unknown fields.')
@click.option('--snake-case/--no-snake-case', default=True, help='Use snake case for table and field names.')
@click.option('--host', type=str, help='Hostname for the DB server.')
@click.option('-p', '--port', type=int, help='Port number for the DB server.')
@click.option('-u', '--user', '--username', type=str, help='Username for the connection to the DB server.')
@click.option('--password', prompt=True, prompt_required=False, hide_input=True, help='Insert password when prompted')
@click.option('-e', '--engine', type=click.Choice(sorted(DATABASE_MAP)), help='The DB engine')
@click.argument('database', type=str)
def wizard(
ctx: click.core.Context,
overwrite: bool,
tables: tuple[str, ...] | None,
preserve_order: bool,
with_views: bool,
ignore_unknown: bool,
snake_case: bool,
output_file: click.Path | pathlib.Path | str,
host: str,
port: int,
user: str,
password: str,
engine: str,
schema: str,
database: str,
) -> ReturnValue:
"""
Reflect an existing DB into a python module.
mafw db wizard [Options] Database
Database Name of the Database to be reflected.
About connection options (user / host / port):
That information will be used only in case you are trying to access a network database (MySQL or PostgreSQL). In
case of Sqlite, the parameters will be discarded.
About passwords:
If you need to specify a password to connect to the DB server, just add --password in the command line without
typing your password as clear text. You will be prompted to insert the password with hidden characters at the start
of the processor.
About engines:
The full list of supported engines is provided in the option below. If you do not specify any
engine and the database is actually an existing filename, then engine is set to Sqlite, otherwise to postgresql.
\f
:param database: The name of the database.
:type database: str
:param schema: The database schema to be reflected.
:type schema: str
:param engine: The database engine. A selection of possible values is provided in the script help.
:type engine: str
:param password: The password for the DB connection. Not used in case of Sqlite.
:type password: str
:param user: The username for the DB connection. Not used in case of Sqlite.
:type user: str
:param port: The port number of the database server. Not used in case of Sqlite.
:type port: int
:param host: The database hostname. Not used in case of Sqlite.
:type host: str
:param output_file: The filename for the output python module.
:type output_file: click.Path | pathlib.Path | str
:param snake_case: Flag to select snake_case convention for table and field names, or all small letter formatting.
:type snake_case: bool
:param ignore_unknown: Flag to ignore unknown fields. If False, an unknown field will be labelled with UnknownField.
:type ignore_unknown: bool
:param with_views: Flag to include views in the reflected elements.
:type with_views: bool
:param preserve_order: Flag to select if table fields should be reflected in the original order (True) or in
alphabetical order (False)
:type preserve_order: bool
:param tables: A tuple containing a selection of table names to be reflected.
:type tables: tuple[str, ...]
:param overwrite: Flag to overwrite the output file if exists. If False and the output file already exists, the
user can decide what to do.
:type overwrite: bool
:param ctx: The click context, that includes the original object with global options.
:type ctx: click.core.Context
:return: The script return value
"""
obj = ctx.obj
if isinstance(output_file, (str, click.Path)):
output_file = pathlib.Path(str(output_file))
# if not overwrite, check if the file exists
if not overwrite and output_file.exists():
answer = Prompt.ask(
f'A module ({output_file.name}) already exists. Do you want to overwrite, cancel or backup?',
case_sensitive=False,
choices=['o', 'c', 'b'],
show_choices=True,
show_default=True,
default='b',
)
if answer == 'c':
return ReturnValue.OK
elif answer == 'b':
bck_filename = output_file.parent / pathlib.Path(
output_file.stem + f'_{datetime.datetime.now():%Y%m%dT%H%M%S}' + output_file.suffix
)
shutil.copy(output_file, bck_filename)
if tables == ():
tables = None
if engine is None:
engine = 'sqlite' if pathlib.Path(database).exists() else 'postgresql'
# prepare the connection options
if engine in ['sqlite', 'sqlite3']:
# for sqlite the connection
keys: list[str] = ['schema']
values: list[str | int] = [schema]
else:
keys = ['host', 'port', 'user', 'schema', 'password']
values = [host, port, user, schema, password]
connection_options: dict[str, Any] = {}
for k, v in zip(keys, values):
if v:
connection_options[k] = v
try:
introspector = make_introspector(engine, database, **connection_options)
except Exception as e:
msg = f'[red]Problem generating an introspector instance of {database}.'
display_exception(e, show_traceback=obj['debug'])
return ReturnValue.Error
try:
with open(output_file, 'tw') as out_file:
dump_models(
out_file,
introspector,
tables,
preserve_order=preserve_order,
include_views=with_views,
ignore_unknown=ignore_unknown,
snake_case=snake_case,
)
except Exception as e:
display_exception(e, obj['debug'])
return ReturnValue.Error
msg = f'[green]Database {database} successfully reflected in {output_file.name}'
log.info(msg)
return ReturnValue.OK
if __name__ == '__main__':
# Use the custom main method that handles exit codes
cli.main()