# Copyright 2025–2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Reusable Click group classes and shell completion tools for MAFw command-line interfaces.
This module centralizes the command abbreviation behavior used by the
``mafw`` executable and the development tools so nested command groups can
inherit the same resolution policy without repeating the ``cls=...``
configuration. It also provides common helper functions for shell completion
installation and management across all MAFw CLI tools.
.. versionadded:: 2.2
Authors
-------
Bulgheroni Antonio <antonio.bulgheroni@ec.europa.eu>
"""
from __future__ import annotations
import collections.abc as cabc
import os
import pathlib
import re
import sys
import warnings
from typing import TYPE_CHECKING, Any, Dict, Optional
import click
from mafw.tools.shell_tools import CONSOLE, run_stdout
if TYPE_CHECKING:
pass
[docs]
class DeprecatedOption(click.Option):
"""A Click Option subclass that emits a DeprecationWarning when explicitly used.
Use this class as ``cls=DeprecatedOption`` in a ``@click.option`` decorator to
mark an option as deprecated. The warning is only emitted when the user
explicitly provides the option on the command line; default-value resolution
does **not** trigger the warning. The resolved value is passed through to the
command callback unchanged.
:param deprecated_message: Warning text emitted when the option is explicitly
provided on the command line.
:type deprecated_message: str
.. versionadded:: 2.2
"""
def __init__(self, *args: Any, deprecated_message: str = '', **kwargs: Any) -> None:
self.deprecated_message = deprecated_message
super().__init__(*args, **kwargs)
if self.help:
self.help = f'(DEPRECATED) {self.help}'
[docs]
def consume_value(
self, ctx: click.Context, opts: cabc.Mapping[str, click.Parameter]
) -> tuple[Any, click.core.ParameterSource]:
"""Intercept value consumption to detect explicit CLI usage.
:param ctx: Current Click context.
:type ctx: click.Context
:param opts: Parsed option tokens from the command line.
:type opts: Mapping[str, click.Parameter]
:return: Tuple of (value, source) as returned by the parent implementation.
:rtype: tuple[Any, click.core.ParameterSource]
"""
value, source = super().consume_value(ctx, opts)
if source == click.core.ParameterSource.COMMANDLINE:
warnings.warn(self.deprecated_message, DeprecationWarning, stacklevel=1)
return value, source
[docs]
class AbbreviateGroup(click.Group):
"""Click group that resolves unique command prefixes and catches DevtoolsError."""
group_class: type[click.Group] = click.Group
[docs]
def invoke(self, ctx: click.Context) -> Any:
"""Invoke the group, catching DevtoolsError and converting to ClickException."""
from mafw.devtools import DevtoolsError
try:
return super().invoke(ctx)
except DevtoolsError as exc:
raise click.ClickException(str(exc)) from exc
[docs]
def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None:
"""Return a command by exact name or a unique abbreviation."""
# let's give it a try to the cmd_name. if the user provided the full command,
# then we might be lucky and get it working right away, otherwise we will have to try
# to get the command using the abbreviations.
rv = super().get_command(ctx, cmd_name)
if rv is not None:
return rv
matches = [name for name in self.list_commands(ctx) if name.startswith(cmd_name)]
if not matches:
return None
if len(matches) == 1:
return click.Group.get_command(self, ctx, matches[0])
ctx.fail(f'Too many matches: {", ".join(sorted(matches))}')
[docs]
def resolve_command(
self, ctx: click.Context, args: list[str]
) -> tuple[str | None, click.Command | None, list[str]]:
"""Return the canonical command name for abbreviated commands."""
_, cmd, args = super().resolve_command(ctx, args)
if TYPE_CHECKING:
assert isinstance(cmd, click.Command)
return cmd.name if cmd is not None else None, cmd, args
AbbreviateGroup.group_class = AbbreviateGroup
COMPLETION_SHELLS: Dict[str, str] = {'bash': 'bash_source', 'zsh': 'zsh_source', 'fish': 'fish_source'}
[docs]
def check_ci_completion_guard() -> None:
"""
Check if the current environment is a CI environment.
If the ``CI`` environment variable is set, the function prints an
informational message and exits the process with code 0.
"""
if os.environ.get('CI'):
CONSOLE.print('This command is not compatible with the CI environment.')
sys.exit(0)
[docs]
def completion_shell_from_env(shell_path: Optional[str]) -> str:
"""
Resolve a completion shell from ``$SHELL``.
The resolver supports the shells handled by Click completion generation:
``bash``, ``zsh``, and ``fish``.
:param shell_path: The raw shell path as exposed by the environment.
:type shell_path: str | None
:return: The normalized shell name.
:rtype: str
:raises click.ClickException: If the shell cannot be determined or is unsupported.
"""
if not shell_path:
raise click.ClickException('Unable to infer a shell from $SHELL. Supported shells: bash, fish, zsh.')
shell = pathlib.Path(shell_path).name
if shell not in COMPLETION_SHELLS:
supported = ', '.join(sorted(COMPLETION_SHELLS))
raise click.ClickException(f'Unsupported shell "{shell}". Supported shells: {supported}.')
return shell
[docs]
def resolve_completion_shell(shell: str) -> str:
"""
Normalize the requested completion shell.
:param shell: Shell selector from the CLI.
:type shell: str
:return: The resolved supported shell name.
:rtype: str
:raises click.ClickException: If the shell is unsupported.
"""
if shell == 'auto':
return completion_shell_from_env(os.environ.get('SHELL'))
if shell not in COMPLETION_SHELLS:
supported = ', '.join(['auto', *sorted(COMPLETION_SHELLS)])
raise click.ClickException(f'Unsupported shell "{shell}". Supported shells: {supported}.')
return shell
[docs]
def _virtualenv_root() -> pathlib.Path:
"""
Return the active virtual environment root.
:return: Active virtual environment path.
:rtype: pathlib.Path
:raises click.ClickException: If ``VIRTUAL_ENV`` is missing.
"""
virtual_env = os.environ.get('VIRTUAL_ENV')
if not virtual_env:
raise click.ClickException('VIRTUAL_ENV is not set. Activate a virtual environment first.')
return pathlib.Path(virtual_env)
[docs]
def completion_script_path(tool_name: str, shell: str) -> pathlib.Path:
"""
Build the completion script path inside the active virtual environment.
:param tool_name: The name of the tool (e.g., 'mafw', 'multiversion-doc', 'release-mgt').
:type tool_name: str
:param shell: Resolved shell name.
:type shell: str
:return: Target completion script path.
:rtype: pathlib.Path
"""
suffix = {'bash': '.bash', 'zsh': '.zsh', 'fish': '.fish'}[shell]
return _virtualenv_root() / 'share' / 'mafw' / f'{tool_name}_completion{suffix}'
[docs]
def is_script_already_installed(tool_name: str, shell: str) -> bool:
"""
Check if the completion script for the tool is already installed.
:param tool_name: The name of the tool.
:type tool_name: str
:param shell: Resolved shell name.
:type shell: str
:return: True if the completion script exists, False otherwise.
:rtype: bool
"""
return completion_script_path(tool_name, shell).exists()
[docs]
def _activation_script_path(shell: str) -> pathlib.Path:
"""
Build the activation script path for a shell.
:param shell: Resolved shell name.
:type shell: str
:return: Target activation script path.
:rtype: pathlib.Path
"""
if shell == 'fish':
return _virtualenv_root() / 'bin' / 'activate.fish'
return _virtualenv_root() / 'bin' / 'activate'
[docs]
def completion_source_script(tool_name: str, shell: str) -> str:
"""
Generate the Click completion source script for the requested shell.
:param tool_name: The name of the tool.
:type tool_name: str
:param shell: Resolved shell name.
:type shell: str
:return: Completion script content.
:rtype: str
"""
completion_env = COMPLETION_SHELLS[shell]
# Use underscores and uppercase for the environment variable as Click expects
env_var = f'_{tool_name.upper().replace("-", "_")}_COMPLETE'
return run_stdout([tool_name], env={env_var: completion_env}, quiet=True)
[docs]
def _completion_marker_block(shell: str) -> str:
"""
Build the activation block appended to the environment activation script.
This block executes all files in `$VIRTUAL_ENV/share/mafw/*_completion.<ext>`.
:param shell: Resolved shell name.
:type shell: str
:return: Marker block to append to the activation file.
:rtype: str
"""
if shell == 'fish':
return (
'# >>> MAFw completion >>>\n'
'for file in "$VIRTUAL_ENV/share/mafw/"*_completion.fish\n'
' if test -f "$file"\n'
' source "$file"\n'
' end\n'
'end\n'
'# <<< MAFw completion <<<\n'
)
return (
'# >>> MAFw completion >>>\n'
'case "$SHELL" in\n'
' *zsh*) ext="zsh" ;;\n'
' *) ext="bash" ;;\n'
'esac\n'
'for file in "$VIRTUAL_ENV/share/mafw/"*_completion."$ext"; do\n'
' if [ -f "$file" ]; then\n'
' . "$file"\n'
' fi\n'
'done\n'
'# <<< MAFw completion <<<\n'
)
[docs]
def _strip_completion_marker_block(content: str) -> str:
"""
Remove the completion block delimited by the MAFw markers.
:param content: Activation script content.
:type content: str
:return: Content without the MAFw completion block.
:rtype: str
"""
pattern = re.compile(r'\n?# >>> MAFw completion >>>\n.*?\n# <<< MAFw completion <<<\n?', re.S)
return pattern.sub('\n', content)
[docs]
def uninstall_completion_files(tool_name: str, shell: Optional[str] = None) -> None:
"""
Remove installed completion files and activation hooks.
:param tool_name: The name of the tool.
:type tool_name: str
:param shell: Optional shell selector. When omitted, all completion files for the tool are removed.
:type shell: str | None
"""
virtual_env = _virtualenv_root()
share_dir = virtual_env / 'share' / 'mafw'
if share_dir.exists():
if shell is None:
targets = list(share_dir.glob(f'{tool_name}_completion.*'))
else:
targets = [completion_script_path(tool_name, shell)]
for path in targets:
if path.exists():
path.unlink()
# Check if any other completion files remain
remaining = list(share_dir.glob('*_completion.*')) if share_dir.exists() else []
if not remaining:
for activation_path in (virtual_env / 'bin' / 'activate', virtual_env / 'bin' / 'activate.fish'):
if not activation_path.exists():
continue
content = activation_path.read_text(encoding='utf-8')
updated = _strip_completion_marker_block(content)
if updated != content:
activation_path.write_text(updated.lstrip('\n'), encoding='utf-8')
[docs]
def install_completion(tool_name: str, shell: str, force: bool, script_path: pathlib.Path) -> pathlib.Path:
"""
Install Click completion for the requested shell.
:param tool_name: The name of the tool.
:type tool_name: str
:param shell: Resolved shell name.
:type shell: str
:param force: Reinstall even if completion is already loaded.
:type force: bool
:param script_path: The target path for the completion script.
:type script_path: pathlib.Path
:return: Installed completion script path.
:rtype: pathlib.Path
"""
if force:
uninstall_completion_files(tool_name, shell)
script_path.parent.mkdir(parents=True, exist_ok=True)
activation_path = _activation_script_path(shell)
script_text = completion_source_script(tool_name, shell)
script_path.write_text(script_text, encoding='utf-8')
activation_path.parent.mkdir(parents=True, exist_ok=True)
activation_content = activation_path.read_text(encoding='utf-8') if activation_path.exists() else ''
# Add marker block if not present
if '# >>> MAFw completion >>>' not in activation_content:
activation_content = activation_content.rstrip('\n') + '\n\n' + _completion_marker_block(shell)
activation_path.write_text(activation_content, encoding='utf-8')
return script_path