Source code for mafw.devtools.gitlab.registry

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""
GitLab Generic Package Registry operations.

This module centralizes the HTTP helpers for uploading, downloading, listing,
and deleting files in the GitLab Generic Package Registry.

:author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
"""

from __future__ import annotations

import re
from pathlib import Path
from typing import Any, Final, cast

from mafw.devtools import ensure_devtools_available

ensure_devtools_available()

import requests  # noqa: E402
from packaging.version import InvalidVersion, Version  # noqa: E402

from mafw.devtools.gitlab.api import (  # noqa: E402
    GitlabAPIConfiguration,
    build_gitlab_auth_headers,
)

_PYLOCK_RE: Final[re.Pattern[str]] = re.compile(r'^pylock\.py(?P<python_version>3\.\d+)_ref\.toml$')
"""Pattern for MAFw reference dependency lock files."""

_MAFW_VERSION_RE: Final[re.Pattern[str]] = re.compile(r'^(?:v)?(?P<version>\d+\.\d+\.\d+)$')
"""Pattern for accepted MAFw version overrides."""

_CONTENT_TYPE_MAP: Final[dict[str, str]] = {
    '.toml': 'application/toml',
    '.zip': 'application/zip',
    '.json': 'application/json',
    '.tar': 'application/x-tar',
    '.gz': 'application/gzip',
}
"""Mapping of file extensions to Content-Type values for registry uploads."""


def parse_pylock_reference_filename(file_name: str) -> tuple[str, str] | None:
    """Parse a reference dependency filename into the Python version and normalized file name.

    :param file_name: The filename to parse.
    :type file_name: str
    :return: A tuple of (python_version, filename) if matched, else None.
    :rtype: tuple[str, str] | None
    """
    base = Path(file_name).name
    match = _PYLOCK_RE.fullmatch(base)
    if match is None:
        return None
    python_version = match.group('python_version')
    return python_version, base


def normalize_dependency_registry_item(item: str) -> tuple[str, str]:
    """Normalize a registry item into a Python version and dependency file name.

    :param item: A filename or a version string.
    :type item: str
    :return: A tuple of (python_version, reference_filename).
    :rtype: tuple[str, str]
    :raises ValueError: If the item is invalid or unsupported.
    """
    item = item.strip()
    parsed = parse_pylock_reference_filename(item)
    if parsed is not None:
        return parsed

    try:
        version = Version(item)
    except InvalidVersion as exc:
        raise ValueError(f'Invalid version or lock filename: {item}') from exc

    if version.is_prerelease or version.is_devrelease:
        raise ValueError(f'Pre-release/dev versions are not supported here: {item}')

    python_version = f'{version.major}.{version.minor}'
    return python_version, f'pylock.py{python_version}_ref.toml'


def normalize_mafw_version(version_text: str) -> str:
    """Normalize a MAFw version override and strip an optional leading ``v``.

    :param version_text: The version string to normalize.
    :type version_text: str
    :return: The normalized version string.
    :rtype: str
    :raises ValueError: If the version string is invalid.
    """
    match = _MAFW_VERSION_RE.fullmatch(version_text.strip())
    if match is None:
        raise ValueError(f'Invalid MAFw version: {version_text}')
    return match.group('version')


def iter_local_pylock_reference_files(directory: Path) -> list[tuple[str, Path]]:
    """List local reference dependency files in a directory.

    :param directory: The directory to search for reference files.
    :type directory: Path
    :return: A list of (python_version, file_path) tuples, sorted by version.
    :rtype: list[tuple[str, Path]]
    """
    directory = Path(directory).resolve()
    if not directory.exists():
        return []
    items: list[tuple[str, Path]] = []
    for fp in directory.iterdir():
        if not fp.is_file():
            continue
        parsed = parse_pylock_reference_filename(fp.name)
        if parsed is None:
            continue
        python_version, _ = parsed
        items.append((python_version, fp))
    items.sort(key=lambda item: tuple(int(part) for part in item[0].split('.')))
    return items


def list_generic_packages(api_config: GitlabAPIConfiguration, package_name: str) -> list[dict[str, Any]]:
    """List generic packages for a given name in the GitLab Package Registry.

    :param api_config: The GitLab API configuration.
    :type api_config: GitlabAPIConfiguration
    :param package_name: The name of the package to list.
    :type package_name: str
    :return: A list of package dictionaries from the API.
    :rtype: list[dict[str, Any]]
    :raises RuntimeError: If the API request fails.
    """
    url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages'
    headers = build_gitlab_auth_headers(api_config)
    out: list[dict[str, Any]] = []
    page = 1
    while True:
        resp = requests.get(
            url,
            headers=headers,
            params=cast(
                dict[str, str | int],
                {
                    'package_type': 'generic',
                    'package_name': package_name,
                    'per_page': 100,
                    'page': page,
                    'order_by': 'version',
                    'sort': 'asc',
                },
            ),
            timeout=60.0,
        )
        if not (200 <= resp.status_code < 300):
            body_preview = (resp.text or '')[:500].replace('\n', ' ')
            raise RuntimeError(f'GitLab list packages failed ({resp.status_code}): {body_preview}')
        data = resp.json()
        if not data:
            break
        for pkg in data:
            if pkg.get('package_type') == 'generic' and pkg.get('name') == package_name:
                out.append(pkg)
        if len(data) < 100:
            break
        page += 1
    return out


def resolve_package_ids_by_version(api_config: GitlabAPIConfiguration, package_name: str) -> dict[str, int]:
    """Resolve package IDs for a package name by version.

    :param api_config: The GitLab API configuration.
    :type api_config: GitlabAPIConfiguration
    :param package_name: The name of the package.
    :type package_name: str
    :return: A mapping of version strings to package IDs.
    :rtype: dict[str, int]
    """
    mapping: dict[str, int] = {}
    for pkg in list_generic_packages(api_config, package_name):
        version = pkg.get('version')
        pkg_id = pkg.get('id')
        if isinstance(version, str) and isinstance(pkg_id, int):
            mapping[version] = pkg_id
    return mapping


[docs] def _infer_content_type(file_path: Path) -> str: """Infer a Content-Type from the file extension. :param file_path: Path to the file. :type file_path: Path :return: A MIME type string. :rtype: str """ return _CONTENT_TYPE_MAP.get(file_path.suffix.lower(), 'application/octet-stream')
def upload_generic_file( api_config: GitlabAPIConfiguration, package_name: str, package_version: str, file_path: Path, *, replace_existing: bool = False, package_id: int | None = None, timeout_s: float = 60.0, ) -> bool: """Upload a file to the GitLab Generic Package Registry. :param api_config: The GitLab API configuration. :type api_config: GitlabAPIConfiguration :param package_name: The name of the generic package. :type package_name: str :param package_version: The version of the generic package. :type package_version: str :param file_path: The local path to the file to upload. :type file_path: Path :param replace_existing: Whether to overwrite if the file already exists. :type replace_existing: bool :param package_id: Optional numeric package ID to avoid extra API calls. :type package_id: int | None :param timeout_s: Request timeout in seconds. :type timeout_s: float :return: True if the file was uploaded, False if skipped. :rtype: bool :raises FileNotFoundError: If the file_path does not exist. :raises RuntimeError: If the API request fails. """ file_path = Path(file_path).resolve() if not file_path.exists(): raise FileNotFoundError(f'File not found: {file_path}') if package_id is None: mapping = resolve_package_ids_by_version(api_config, package_name) package_id = mapping.get(package_version) if package_id is not None: existing_files = list_generic_package_files(api_config, package_id, timeout_s=timeout_s) matching = [f for f in existing_files if f.get('file_name') == file_path.name and isinstance(f.get('id'), int)] if matching: if not replace_existing: return False for f in matching: delete_generic_package_file(api_config, package_id, f['id'], timeout_s=timeout_s) url = ( f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/generic/' f'{package_name}/{package_version}/{file_path.name}' ) headers = build_gitlab_auth_headers(api_config) with file_path.open('rb') as fp: put_headers = dict(headers) put_headers['Content-Type'] = _infer_content_type(file_path) resp = requests.put(url, headers=put_headers, data=fp, timeout=timeout_s) if not (200 <= resp.status_code < 300): body_preview = (resp.text or '')[:500].replace('\n', ' ') raise RuntimeError(f'GitLab upload failed ({resp.status_code}): {body_preview}') return True def download_generic_file( api_config: GitlabAPIConfiguration, package_name: str, package_version: str, file_name: str, download_dir: Path, *, timeout_s: float = 60.0, ) -> Path | None: """Download a file from the GitLab Generic Package Registry. :param api_config: The GitLab API configuration. :type api_config: GitlabAPIConfiguration :param package_name: The name of the generic package. :type package_name: str :param package_version: The version of the generic package. :type package_version: str :param file_name: The name of the file to download. :type file_name: str :param download_dir: The local directory where to save the file. :type download_dir: Path :param timeout_s: Request timeout in seconds. :type timeout_s: float :return: The Path to the downloaded file, or None if not found. :rtype: Path | None :raises RuntimeError: If the API request fails. """ download_dir = Path(download_dir).resolve() download_dir.mkdir(parents=True, exist_ok=True) url = ( f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/generic/' f'{package_name}/{package_version}/{file_name}' ) headers = build_gitlab_auth_headers(api_config) head_resp = requests.head(url, headers=headers, timeout=timeout_s) if head_resp.status_code == 404: return None if head_resp.status_code != 200: body_preview = (head_resp.text or '')[:200].replace('\n', ' ') raise RuntimeError(f'GitLab existence check failed ({head_resp.status_code}): {body_preview}') dest = download_dir / file_name resp = requests.get(url, headers=headers, stream=True, timeout=timeout_s) if not (200 <= resp.status_code < 300): body_preview = (resp.text or '')[:200].replace('\n', ' ') raise RuntimeError(f'GitLab download failed ({resp.status_code}): {body_preview}') with dest.open('wb') as fp: for chunk in resp.iter_content(chunk_size=1024 * 1024): if chunk: fp.write(chunk) return dest def delete_generic_package_version( api_config: GitlabAPIConfiguration, package_id: int, package_name: str, package_version: str ) -> bool: """Delete a package version from the GitLab Package Registry. :param api_config: The GitLab API configuration. :type api_config: GitlabAPIConfiguration :param package_id: The numeric ID of the package version. :type package_id: int :param package_name: The name of the package (for error reporting). :type package_name: str :param package_version: The version of the package (for error reporting). :type package_version: str :return: True if deleted, False if not found or forbidden. :rtype: bool :raises RuntimeError: If the API request fails. """ url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/{package_id}' resp = requests.delete(url, headers=build_gitlab_auth_headers(api_config), timeout=60.0) if resp.status_code == 204: return True if resp.status_code in (403, 404): return False body_preview = (resp.text or '')[:500].replace('\n', ' ') raise RuntimeError( f'GitLab delete failed for {package_name}/{package_version} (id={package_id}): {resp.status_code} {body_preview}' ) def list_generic_package_files( api_config: GitlabAPIConfiguration, package_id: int, *, timeout_s: float = 60.0, ) -> list[dict[str, Any]]: """List files belonging to a specific package version. :param api_config: The GitLab API configuration. :type api_config: GitlabAPIConfiguration :param package_id: The numeric ID of the package version. :type package_id: int :param timeout_s: Request timeout in seconds. :type timeout_s: float :return: A list of file metadata dictionaries. :rtype: list[dict[str, Any]] :raises RuntimeError: If the API request fails. """ url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/{package_id}/package_files' headers = build_gitlab_auth_headers(api_config) out: list[dict[str, Any]] = [] page = 1 while True: resp = requests.get( url, headers=headers, params={'per_page': 100, 'page': page}, timeout=timeout_s, ) if not (200 <= resp.status_code < 300): body_preview = (resp.text or '')[:500].replace('\n', ' ') raise RuntimeError(f'GitLab list package files failed ({resp.status_code}): {body_preview}') data = resp.json() if not data: break out.extend(data) if len(data) < 100: break page += 1 return out def delete_generic_package_file( api_config: GitlabAPIConfiguration, package_id: int, file_id: int, *, timeout_s: float = 60.0, ) -> bool: """Delete a single file from a package version. :param api_config: The GitLab API configuration. :type api_config: GitlabAPIConfiguration :param package_id: The numeric ID of the package version. :type package_id: int :param file_id: The numeric ID of the package file to delete. :type file_id: int :param timeout_s: Request timeout in seconds. :type timeout_s: float :return: True if deleted, False if not found or forbidden. :rtype: bool :raises RuntimeError: If the API request fails. """ url = ( f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}' f'/packages/{package_id}/package_files/{file_id}' ) resp = requests.delete(url, headers=build_gitlab_auth_headers(api_config), timeout=timeout_s) if resp.status_code == 204: return True if resp.status_code in (403, 404): return False body_preview = (resp.text or '')[:500].replace('\n', ' ') raise RuntimeError( f'GitLab delete file failed (package_id={package_id}, file_id={file_id}): {resp.status_code} {body_preview}' )