# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Helpers for interacting with the GitLab generic package registry.
This tool module is meant to serve development tools like the doc_versioning.py and
the release_mgt.py scripts.
Should you need to use it for other generic MAFw related tasks, you might find limitations.
The module centralizes the small HTTP and naming helpers used by the release
and documentation maintenance scripts. This keeps registry interactions
consistent while allowing each command group to define its own user-facing
policy, such as whether uploads should skip or replace pre-existing files.
:author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
"""
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Final, Literal, MutableMapping, cast
try:
import requests
from packaging.version import InvalidVersion, Version
except (ImportError, ModuleNotFoundError) as e:
raise ImportError(
'The "requests" and "packaging" packages are required for GitLab operations. '
'This usually means you are running outside the MAFw development environment. '
'Install MAFw with the optional [dev] feature.'
) from e
_PYLOCK_RE: Final[re.Pattern[str]] = re.compile(r'^pylock\.py(?P<python_version>3\.\d+)_ref\.toml$')
"""Pattern for MAFw reference dependency lock files."""
_MAFW_VERSION_RE: Final[re.Pattern[str]] = re.compile(r'^(?:v)?(?P<version>\d+\.\d+\.\d+)$')
"""Pattern for accepted MAFw version overrides."""
[docs]
@dataclass(frozen=True, slots=True)
class GitlabAPIConfiguration:
"""Configuration needed to communicate with the GitLab API.
:param api_url: Base GitLab API v4 URL.
:type api_url: str
:param on_ci: Whether the process runs on GitLab CI.
:type on_ci: bool
:param project_id: GitLab project numeric ID.
:type project_id: int
:param token: Authentication token value.
:type token: str
:param token_type: Token kind used for authentication.
:type token_type: Literal['job_token', 'api_token']
"""
api_url: str
on_ci: bool
project_id: int
token: str
token_type: Literal['job_token', 'api_token']
[docs]
def build_gitlab_api_configuration(
api_url: str | None, project_id: int | None, token: str | None
) -> GitlabAPIConfiguration:
"""Build a GitLab API configuration from provided values and environment context.
:param api_url: Optional override for the GitLab API URL.
:type api_url: str | None
:param project_id: Optional override for the GitLab project ID.
:type project_id: int | None
:param token: Optional override for the authentication token.
:type token: str | None
:return: The validated GitLab API configuration.
:rtype: GitlabAPIConfiguration
:raises ValueError: If any required configuration value is missing.
"""
if api_url is None:
api_url = os.environ.get('CI_API_V4_URL')
if project_id is None:
project_id_env = os.environ.get('CI_PROJECT_ID')
project_id = int(project_id_env) if project_id_env else None
if token is None:
token = os.environ.get('CI_JOB_TOKEN')
missing: list[str] = []
if not api_url:
missing.append('api_url')
if project_id is None:
missing.append('project_id')
if not token:
missing.append('token')
if missing or api_url is None or project_id is None or token is None:
raise ValueError(f'Missing GitLab configuration values: {", ".join(missing)}')
return GitlabAPIConfiguration(
api_url=api_url,
on_ci=bool(os.environ.get('CI')),
project_id=project_id,
token=token,
token_type='job_token' if bool(os.environ.get('CI')) else 'api_token',
)
[docs]
def parse_pylock_reference_filename(file_name: str) -> tuple[str, str] | None:
"""Parse a reference dependency filename into the Python version and normalized file name.
:param file_name: The filename to parse.
:type file_name: str
:return: A tuple of (python_version, filename) if matched, else None.
:rtype: tuple[str, str] | None
"""
base = Path(file_name).name
match = _PYLOCK_RE.fullmatch(base)
if match is None:
return None
python_version = match.group('python_version')
return python_version, base
[docs]
def normalize_dependency_registry_item(item: str) -> tuple[str, str]:
"""Normalize a registry item into a Python version and dependency file name.
:param item: A filename or a version string.
:type item: str
:return: A tuple of (python_version, reference_filename).
:rtype: tuple[str, str]
:raises ValueError: If the item is invalid or unsupported.
"""
item = item.strip()
parsed = parse_pylock_reference_filename(item)
if parsed is not None:
return parsed
try:
version = Version(item)
except InvalidVersion as exc:
raise ValueError(f'Invalid version or lock filename: {item}') from exc
if version.is_prerelease or version.is_devrelease:
raise ValueError(f'Pre-release/dev versions are not supported here: {item}')
python_version = f'{version.major}.{version.minor}'
return python_version, f'pylock.py{python_version}_ref.toml'
[docs]
def normalize_mafw_version(version_text: str) -> str:
"""Normalize a MAFw version override and strip an optional leading ``v``.
:param version_text: The version string to normalize.
:type version_text: str
:return: The normalized version string.
:rtype: str
:raises ValueError: If the version string is invalid.
"""
match = _MAFW_VERSION_RE.fullmatch(version_text.strip())
if match is None:
raise ValueError(f'Invalid MAFw version: {version_text}')
return match.group('version')
[docs]
def iter_local_pylock_reference_files(directory: Path) -> list[tuple[str, Path]]:
"""List local reference dependency files in a directory.
:param directory: The directory to search for reference files.
:type directory: Path
:return: A list of (python_version, file_path) tuples, sorted by version.
:rtype: list[tuple[str, Path]]
"""
directory = Path(directory).resolve()
if not directory.exists():
return []
items: list[tuple[str, Path]] = []
for fp in directory.iterdir():
if not fp.is_file():
continue
parsed = parse_pylock_reference_filename(fp.name)
if parsed is None:
continue
python_version, _ = parsed
items.append((python_version, fp))
items.sort(key=lambda item: tuple(int(part) for part in item[0].split('.')))
return items
[docs]
def list_generic_packages(api_config: GitlabAPIConfiguration, package_name: str) -> list[dict[str, Any]]:
"""List generic packages for a given name in the GitLab Package Registry.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_name: The name of the package to list.
:type package_name: str
:return: A list of package dictionaries from the API.
:rtype: list[dict[str, Any]]
:raises RuntimeError: If the API request fails.
"""
url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages'
headers = build_gitlab_auth_headers(api_config)
out: list[dict[str, Any]] = []
page = 1
while True:
resp = requests.get(
url,
headers=headers,
params=cast(
dict[str, str | int],
{
'package_type': 'generic',
'package_name': package_name,
'per_page': 100,
'page': page,
'order_by': 'version',
'sort': 'asc',
},
),
timeout=60.0,
)
if not (200 <= resp.status_code < 300):
body_preview = (resp.text or '')[:500].replace('\n', ' ')
raise RuntimeError(f'GitLab list packages failed ({resp.status_code}): {body_preview}')
data = resp.json()
if not data:
break
for pkg in data:
if pkg.get('package_type') == 'generic' and pkg.get('name') == package_name:
out.append(pkg)
if len(data) < 100:
break
page += 1
return out
[docs]
def resolve_package_ids_by_version(api_config: GitlabAPIConfiguration, package_name: str) -> dict[str, int]:
"""Resolve package IDs for a package name by version.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_name: The name of the package.
:type package_name: str
:return: A mapping of version strings to package IDs.
:rtype: dict[str, int]
"""
mapping: dict[str, int] = {}
for pkg in list_generic_packages(api_config, package_name):
version = pkg.get('version')
pkg_id = pkg.get('id')
if isinstance(version, str) and isinstance(pkg_id, int):
mapping[version] = pkg_id
return mapping
[docs]
def upload_generic_file(
api_config: GitlabAPIConfiguration,
package_name: str,
package_version: str,
file_path: Path,
*,
replace_existing: bool = False,
timeout_s: float = 60.0,
) -> bool:
"""Upload a file to the GitLab Generic Package Registry.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_name: The name of the generic package.
:type package_name: str
:param package_version: The version of the generic package.
:type package_version: str
:param file_path: The local path to the file to upload.
:type file_path: Path
:param replace_existing: Whether to overwrite if the file already exists in the registry.
:type replace_existing: bool
:param timeout_s: Request timeout in seconds.
:type timeout_s: float
:return: True if the file was uploaded, False if skipped because it already exists.
:rtype: bool
:raises FileNotFoundError: If the file_path does not exist.
:raises RuntimeError: If the API request fails.
"""
file_path = Path(file_path).resolve()
if not file_path.exists():
raise FileNotFoundError(f'File not found: {file_path}')
url = (
f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/generic/'
f'{package_name}/{package_version}/{file_path.name}'
)
headers = build_gitlab_auth_headers(api_config)
if not replace_existing:
head_resp = requests.head(url, headers=headers, timeout=timeout_s)
if head_resp.status_code == 200:
return False
if head_resp.status_code != 404:
body_preview = (head_resp.text or '')[:500].replace('\n', ' ')
raise RuntimeError(f'GitLab existence check failed ({head_resp.status_code}): {body_preview}')
with file_path.open('rb') as fp:
put_headers = dict(headers)
put_headers['Content-Type'] = 'application/toml'
resp = requests.put(url, headers=put_headers, data=fp, timeout=timeout_s)
if not (200 <= resp.status_code < 300):
body_preview = (resp.text or '')[:500].replace('\n', ' ')
raise RuntimeError(f'GitLab upload failed ({resp.status_code}): {body_preview}')
return True
[docs]
def download_generic_file(
api_config: GitlabAPIConfiguration,
package_name: str,
package_version: str,
file_name: str,
download_dir: Path,
*,
timeout_s: float = 60.0,
) -> Path | None:
"""Download a file from the GitLab Generic Package Registry.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_name: The name of the generic package.
:type package_name: str
:param package_version: The version of the generic package.
:type package_version: str
:param file_name: The name of the file to download.
:type file_name: str
:param download_dir: The local directory where to save the file.
:type download_dir: Path
:param timeout_s: Request timeout in seconds.
:type timeout_s: float
:return: The Path to the downloaded file, or None if not found.
:rtype: Path | None
:raises RuntimeError: If the API request fails.
"""
download_dir = Path(download_dir).resolve()
download_dir.mkdir(parents=True, exist_ok=True)
url = (
f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/generic/'
f'{package_name}/{package_version}/{file_name}'
)
headers = build_gitlab_auth_headers(api_config)
head_resp = requests.head(url, headers=headers, timeout=timeout_s)
if head_resp.status_code == 404:
return None
if head_resp.status_code != 200:
body_preview = (head_resp.text or '')[:200].replace('\n', ' ')
raise RuntimeError(f'GitLab existence check failed ({head_resp.status_code}): {body_preview}')
dest = download_dir / file_name
resp = requests.get(url, headers=headers, stream=True, timeout=timeout_s)
if not (200 <= resp.status_code < 300):
body_preview = (resp.text or '')[:200].replace('\n', ' ')
raise RuntimeError(f'GitLab download failed ({resp.status_code}): {body_preview}')
with dest.open('wb') as fp:
for chunk in resp.iter_content(chunk_size=1024 * 1024):
if chunk:
fp.write(chunk)
return dest
[docs]
def delete_generic_package_version(
api_config: GitlabAPIConfiguration, package_id: int, package_name: str, package_version: str
) -> bool:
"""Delete a package version from the GitLab Package Registry.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_id: The numeric ID of the package version.
:type package_id: int
:param package_name: The name of the package (for logging/error reporting).
:type package_name: str
:param package_version: The version of the package (for logging/error reporting).
:type package_version: str
:return: True if the package was successfully deleted, False if not found or forbidden.
:rtype: bool
:raises RuntimeError: If the API request fails.
"""
url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/{package_id}'
resp = requests.delete(url, headers=build_gitlab_auth_headers(api_config), timeout=60.0)
if resp.status_code == 204:
return True
if resp.status_code in (403, 404):
return False
body_preview = (resp.text or '')[:500].replace('\n', ' ')
raise RuntimeError(
f'GitLab delete failed for {package_name}/{package_version} (id={package_id}): {resp.status_code} {body_preview}'
)