Source code for mafw.tools.click_extensions

#  Copyright 2025–2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""
Reusable Click group classes and shell completion tools for MAFw command-line interfaces.

This module centralizes the command abbreviation behavior used by the
``mafw`` executable and the development tools so nested command groups can
inherit the same resolution policy without repeating the ``cls=...``
configuration. It also provides common helper functions for shell completion
installation and management across all MAFw CLI tools.

.. versionadded:: 2.2

Authors
-------
Bulgheroni Antonio <antonio.bulgheroni@ec.europa.eu>
"""

from __future__ import annotations

import collections.abc as cabc
import os
import pathlib
import re
import sys
import warnings
from typing import TYPE_CHECKING, Any, Dict, Optional

import click

from mafw.tools.shell_tools import CONSOLE, run_stdout

if TYPE_CHECKING:
    pass


[docs] class DeprecatedOption(click.Option): """A Click Option subclass that emits a DeprecationWarning when explicitly used. Use this class as ``cls=DeprecatedOption`` in a ``@click.option`` decorator to mark an option as deprecated. The warning is only emitted when the user explicitly provides the option on the command line; default-value resolution does **not** trigger the warning. The resolved value is passed through to the command callback unchanged. :param deprecated_message: Warning text emitted when the option is explicitly provided on the command line. :type deprecated_message: str .. versionadded:: 2.2 """ def __init__(self, *args: Any, deprecated_message: str = '', **kwargs: Any) -> None: self.deprecated_message = deprecated_message super().__init__(*args, **kwargs) if self.help: self.help = f'(DEPRECATED) {self.help}'
[docs] def consume_value( self, ctx: click.Context, opts: cabc.Mapping[str, click.Parameter] ) -> tuple[Any, click.core.ParameterSource]: """Intercept value consumption to detect explicit CLI usage. :param ctx: Current Click context. :type ctx: click.Context :param opts: Parsed option tokens from the command line. :type opts: Mapping[str, click.Parameter] :return: Tuple of (value, source) as returned by the parent implementation. :rtype: tuple[Any, click.core.ParameterSource] """ value, source = super().consume_value(ctx, opts) if source == click.core.ParameterSource.COMMANDLINE: warnings.warn(self.deprecated_message, DeprecationWarning, stacklevel=1) return value, source
[docs] class AbbreviateGroup(click.Group): """Click group that resolves unique command prefixes and catches DevtoolsError.""" group_class: type[click.Group] = click.Group
[docs] def invoke(self, ctx: click.Context) -> Any: """Invoke the group, catching DevtoolsError and converting to ClickException.""" from mafw.devtools import DevtoolsError try: return super().invoke(ctx) except DevtoolsError as exc: raise click.ClickException(str(exc)) from exc
[docs] def get_command(self, ctx: click.Context, cmd_name: str) -> click.Command | None: """Return a command by exact name or a unique abbreviation.""" # let's give it a try to the cmd_name. if the user provided the full command, # then we might be lucky and get it working right away, otherwise we will have to try # to get the command using the abbreviations. rv = super().get_command(ctx, cmd_name) if rv is not None: return rv matches = [name for name in self.list_commands(ctx) if name.startswith(cmd_name)] if not matches: return None if len(matches) == 1: return click.Group.get_command(self, ctx, matches[0]) ctx.fail(f'Too many matches: {", ".join(sorted(matches))}')
[docs] def resolve_command( self, ctx: click.Context, args: list[str] ) -> tuple[str | None, click.Command | None, list[str]]: """Return the canonical command name for abbreviated commands.""" _, cmd, args = super().resolve_command(ctx, args) if TYPE_CHECKING: assert isinstance(cmd, click.Command) return cmd.name if cmd is not None else None, cmd, args
AbbreviateGroup.group_class = AbbreviateGroup COMPLETION_SHELLS: Dict[str, str] = {'bash': 'bash_source', 'zsh': 'zsh_source', 'fish': 'fish_source'}
[docs] def check_ci_completion_guard() -> None: """ Check if the current environment is a CI environment. If the ``CI`` environment variable is set, the function prints an informational message and exits the process with code 0. """ if os.environ.get('CI'): CONSOLE.print('This command is not compatible with the CI environment.') sys.exit(0)
[docs] def completion_shell_from_env(shell_path: Optional[str]) -> str: """ Resolve a completion shell from ``$SHELL``. The resolver supports the shells handled by Click completion generation: ``bash``, ``zsh``, and ``fish``. :param shell_path: The raw shell path as exposed by the environment. :type shell_path: str | None :return: The normalized shell name. :rtype: str :raises click.ClickException: If the shell cannot be determined or is unsupported. """ if not shell_path: raise click.ClickException('Unable to infer a shell from $SHELL. Supported shells: bash, fish, zsh.') shell = pathlib.Path(shell_path).name if shell not in COMPLETION_SHELLS: supported = ', '.join(sorted(COMPLETION_SHELLS)) raise click.ClickException(f'Unsupported shell "{shell}". Supported shells: {supported}.') return shell
[docs] def resolve_completion_shell(shell: str) -> str: """ Normalize the requested completion shell. :param shell: Shell selector from the CLI. :type shell: str :return: The resolved supported shell name. :rtype: str :raises click.ClickException: If the shell is unsupported. """ if shell == 'auto': return completion_shell_from_env(os.environ.get('SHELL')) if shell not in COMPLETION_SHELLS: supported = ', '.join(['auto', *sorted(COMPLETION_SHELLS)]) raise click.ClickException(f'Unsupported shell "{shell}". Supported shells: {supported}.') return shell
[docs] def _virtualenv_root() -> pathlib.Path: """ Return the active virtual environment root. :return: Active virtual environment path. :rtype: pathlib.Path :raises click.ClickException: If ``VIRTUAL_ENV`` is missing. """ virtual_env = os.environ.get('VIRTUAL_ENV') if not virtual_env: raise click.ClickException('VIRTUAL_ENV is not set. Activate a virtual environment first.') return pathlib.Path(virtual_env)
[docs] def completion_script_path(tool_name: str, shell: str) -> pathlib.Path: """ Build the completion script path inside the active virtual environment. :param tool_name: The name of the tool (e.g., 'mafw', 'multiversion-doc', 'release-mgt'). :type tool_name: str :param shell: Resolved shell name. :type shell: str :return: Target completion script path. :rtype: pathlib.Path """ suffix = {'bash': '.bash', 'zsh': '.zsh', 'fish': '.fish'}[shell] return _virtualenv_root() / 'share' / 'mafw' / f'{tool_name}_completion{suffix}'
[docs] def is_script_already_installed(tool_name: str, shell: str) -> bool: """ Check if the completion script for the tool is already installed. :param tool_name: The name of the tool. :type tool_name: str :param shell: Resolved shell name. :type shell: str :return: True if the completion script exists, False otherwise. :rtype: bool """ return completion_script_path(tool_name, shell).exists()
[docs] def _activation_script_path(shell: str) -> pathlib.Path: """ Build the activation script path for a shell. :param shell: Resolved shell name. :type shell: str :return: Target activation script path. :rtype: pathlib.Path """ if shell == 'fish': return _virtualenv_root() / 'bin' / 'activate.fish' return _virtualenv_root() / 'bin' / 'activate'
[docs] def completion_source_script(tool_name: str, shell: str) -> str: """ Generate the Click completion source script for the requested shell. :param tool_name: The name of the tool. :type tool_name: str :param shell: Resolved shell name. :type shell: str :return: Completion script content. :rtype: str """ completion_env = COMPLETION_SHELLS[shell] # Use underscores and uppercase for the environment variable as Click expects env_var = f'_{tool_name.upper().replace("-", "_")}_COMPLETE' return run_stdout([tool_name], env={env_var: completion_env}, quiet=True)
[docs] def _completion_marker_block(shell: str) -> str: """ Build the activation block appended to the environment activation script. This block executes all files in `$VIRTUAL_ENV/share/mafw/*_completion.<ext>`. :param shell: Resolved shell name. :type shell: str :return: Marker block to append to the activation file. :rtype: str """ if shell == 'fish': return ( '# >>> MAFw completion >>>\n' 'for file in "$VIRTUAL_ENV/share/mafw/"*_completion.fish\n' ' if test -f "$file"\n' ' source "$file"\n' ' end\n' 'end\n' '# <<< MAFw completion <<<\n' ) return ( '# >>> MAFw completion >>>\n' 'case "$SHELL" in\n' ' *zsh*) ext="zsh" ;;\n' ' *) ext="bash" ;;\n' 'esac\n' 'for file in "$VIRTUAL_ENV/share/mafw/"*_completion."$ext"; do\n' ' if [ -f "$file" ]; then\n' ' . "$file"\n' ' fi\n' 'done\n' '# <<< MAFw completion <<<\n' )
[docs] def _strip_completion_marker_block(content: str) -> str: """ Remove the completion block delimited by the MAFw markers. :param content: Activation script content. :type content: str :return: Content without the MAFw completion block. :rtype: str """ pattern = re.compile(r'\n?# >>> MAFw completion >>>\n.*?\n# <<< MAFw completion <<<\n?', re.S) return pattern.sub('\n', content)
[docs] def uninstall_completion_files(tool_name: str, shell: Optional[str] = None) -> None: """ Remove installed completion files and activation hooks. :param tool_name: The name of the tool. :type tool_name: str :param shell: Optional shell selector. When omitted, all completion files for the tool are removed. :type shell: str | None """ virtual_env = _virtualenv_root() share_dir = virtual_env / 'share' / 'mafw' if share_dir.exists(): if shell is None: targets = list(share_dir.glob(f'{tool_name}_completion.*')) else: targets = [completion_script_path(tool_name, shell)] for path in targets: if path.exists(): path.unlink() # Check if any other completion files remain remaining = list(share_dir.glob('*_completion.*')) if share_dir.exists() else [] if not remaining: for activation_path in (virtual_env / 'bin' / 'activate', virtual_env / 'bin' / 'activate.fish'): if not activation_path.exists(): continue content = activation_path.read_text(encoding='utf-8') updated = _strip_completion_marker_block(content) if updated != content: activation_path.write_text(updated.lstrip('\n'), encoding='utf-8')
[docs] def install_completion(tool_name: str, shell: str, force: bool, script_path: pathlib.Path) -> pathlib.Path: """ Install Click completion for the requested shell. :param tool_name: The name of the tool. :type tool_name: str :param shell: Resolved shell name. :type shell: str :param force: Reinstall even if completion is already loaded. :type force: bool :param script_path: The target path for the completion script. :type script_path: pathlib.Path :return: Installed completion script path. :rtype: pathlib.Path """ if force: uninstall_completion_files(tool_name, shell) script_path.parent.mkdir(parents=True, exist_ok=True) activation_path = _activation_script_path(shell) script_text = completion_source_script(tool_name, shell) script_path.write_text(script_text, encoding='utf-8') activation_path.parent.mkdir(parents=True, exist_ok=True) activation_content = activation_path.read_text(encoding='utf-8') if activation_path.exists() else '' # Add marker block if not present if '# >>> MAFw completion >>>' not in activation_content: activation_content = activation_content.rstrip('\n') + '\n\n' + _completion_marker_block(shell) activation_path.write_text(activation_content, encoding='utf-8') return script_path