# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
GitLab Generic Package Registry operations.
This module centralizes the HTTP helpers for uploading, downloading, listing,
and deleting files in the GitLab Generic Package Registry.
:author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Any, Final, cast
from mafw.devtools import ensure_devtools_available
ensure_devtools_available()
import requests # noqa: E402
from packaging.version import InvalidVersion, Version # noqa: E402
from mafw.devtools.gitlab.api import ( # noqa: E402
GitlabAPIConfiguration,
build_gitlab_auth_headers,
)
_PYLOCK_RE: Final[re.Pattern[str]] = re.compile(r'^pylock\.py(?P<python_version>3\.\d+)_ref\.toml$')
"""Pattern for MAFw reference dependency lock files."""
_MAFW_VERSION_RE: Final[re.Pattern[str]] = re.compile(r'^(?:v)?(?P<version>\d+\.\d+\.\d+)$')
"""Pattern for accepted MAFw version overrides."""
_CONTENT_TYPE_MAP: Final[dict[str, str]] = {
'.toml': 'application/toml',
'.zip': 'application/zip',
'.json': 'application/json',
'.tar': 'application/x-tar',
'.gz': 'application/gzip',
}
"""Mapping of file extensions to Content-Type values for registry uploads."""
def parse_pylock_reference_filename(file_name: str) -> tuple[str, str] | None:
"""Parse a reference dependency filename into the Python version and normalized file name.
:param file_name: The filename to parse.
:type file_name: str
:return: A tuple of (python_version, filename) if matched, else None.
:rtype: tuple[str, str] | None
"""
base = Path(file_name).name
match = _PYLOCK_RE.fullmatch(base)
if match is None:
return None
python_version = match.group('python_version')
return python_version, base
def normalize_dependency_registry_item(item: str) -> tuple[str, str]:
"""Normalize a registry item into a Python version and dependency file name.
:param item: A filename or a version string.
:type item: str
:return: A tuple of (python_version, reference_filename).
:rtype: tuple[str, str]
:raises ValueError: If the item is invalid or unsupported.
"""
item = item.strip()
parsed = parse_pylock_reference_filename(item)
if parsed is not None:
return parsed
try:
version = Version(item)
except InvalidVersion as exc:
raise ValueError(f'Invalid version or lock filename: {item}') from exc
if version.is_prerelease or version.is_devrelease:
raise ValueError(f'Pre-release/dev versions are not supported here: {item}')
python_version = f'{version.major}.{version.minor}'
return python_version, f'pylock.py{python_version}_ref.toml'
def normalize_mafw_version(version_text: str) -> str:
"""Normalize a MAFw version override and strip an optional leading ``v``.
:param version_text: The version string to normalize.
:type version_text: str
:return: The normalized version string.
:rtype: str
:raises ValueError: If the version string is invalid.
"""
match = _MAFW_VERSION_RE.fullmatch(version_text.strip())
if match is None:
raise ValueError(f'Invalid MAFw version: {version_text}')
return match.group('version')
def iter_local_pylock_reference_files(directory: Path) -> list[tuple[str, Path]]:
"""List local reference dependency files in a directory.
:param directory: The directory to search for reference files.
:type directory: Path
:return: A list of (python_version, file_path) tuples, sorted by version.
:rtype: list[tuple[str, Path]]
"""
directory = Path(directory).resolve()
if not directory.exists():
return []
items: list[tuple[str, Path]] = []
for fp in directory.iterdir():
if not fp.is_file():
continue
parsed = parse_pylock_reference_filename(fp.name)
if parsed is None:
continue
python_version, _ = parsed
items.append((python_version, fp))
items.sort(key=lambda item: tuple(int(part) for part in item[0].split('.')))
return items
def list_generic_packages(api_config: GitlabAPIConfiguration, package_name: str) -> list[dict[str, Any]]:
"""List generic packages for a given name in the GitLab Package Registry.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_name: The name of the package to list.
:type package_name: str
:return: A list of package dictionaries from the API.
:rtype: list[dict[str, Any]]
:raises RuntimeError: If the API request fails.
"""
url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages'
headers = build_gitlab_auth_headers(api_config)
out: list[dict[str, Any]] = []
page = 1
while True:
resp = requests.get(
url,
headers=headers,
params=cast(
dict[str, str | int],
{
'package_type': 'generic',
'package_name': package_name,
'per_page': 100,
'page': page,
'order_by': 'version',
'sort': 'asc',
},
),
timeout=60.0,
)
if not (200 <= resp.status_code < 300):
body_preview = (resp.text or '')[:500].replace('\n', ' ')
raise RuntimeError(f'GitLab list packages failed ({resp.status_code}): {body_preview}')
data = resp.json()
if not data:
break
for pkg in data:
if pkg.get('package_type') == 'generic' and pkg.get('name') == package_name:
out.append(pkg)
if len(data) < 100:
break
page += 1
return out
def resolve_package_ids_by_version(api_config: GitlabAPIConfiguration, package_name: str) -> dict[str, int]:
"""Resolve package IDs for a package name by version.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_name: The name of the package.
:type package_name: str
:return: A mapping of version strings to package IDs.
:rtype: dict[str, int]
"""
mapping: dict[str, int] = {}
for pkg in list_generic_packages(api_config, package_name):
version = pkg.get('version')
pkg_id = pkg.get('id')
if isinstance(version, str) and isinstance(pkg_id, int):
mapping[version] = pkg_id
return mapping
[docs]
def _infer_content_type(file_path: Path) -> str:
"""Infer a Content-Type from the file extension.
:param file_path: Path to the file.
:type file_path: Path
:return: A MIME type string.
:rtype: str
"""
return _CONTENT_TYPE_MAP.get(file_path.suffix.lower(), 'application/octet-stream')
def upload_generic_file(
api_config: GitlabAPIConfiguration,
package_name: str,
package_version: str,
file_path: Path,
*,
replace_existing: bool = False,
package_id: int | None = None,
timeout_s: float = 60.0,
) -> bool:
"""Upload a file to the GitLab Generic Package Registry.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_name: The name of the generic package.
:type package_name: str
:param package_version: The version of the generic package.
:type package_version: str
:param file_path: The local path to the file to upload.
:type file_path: Path
:param replace_existing: Whether to overwrite if the file already exists.
:type replace_existing: bool
:param package_id: Optional numeric package ID to avoid extra API calls.
:type package_id: int | None
:param timeout_s: Request timeout in seconds.
:type timeout_s: float
:return: True if the file was uploaded, False if skipped.
:rtype: bool
:raises FileNotFoundError: If the file_path does not exist.
:raises RuntimeError: If the API request fails.
"""
file_path = Path(file_path).resolve()
if not file_path.exists():
raise FileNotFoundError(f'File not found: {file_path}')
if package_id is None:
mapping = resolve_package_ids_by_version(api_config, package_name)
package_id = mapping.get(package_version)
if package_id is not None:
existing_files = list_generic_package_files(api_config, package_id, timeout_s=timeout_s)
matching = [f for f in existing_files if f.get('file_name') == file_path.name and isinstance(f.get('id'), int)]
if matching:
if not replace_existing:
return False
for f in matching:
delete_generic_package_file(api_config, package_id, f['id'], timeout_s=timeout_s)
url = (
f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/generic/'
f'{package_name}/{package_version}/{file_path.name}'
)
headers = build_gitlab_auth_headers(api_config)
with file_path.open('rb') as fp:
put_headers = dict(headers)
put_headers['Content-Type'] = _infer_content_type(file_path)
resp = requests.put(url, headers=put_headers, data=fp, timeout=timeout_s)
if not (200 <= resp.status_code < 300):
body_preview = (resp.text or '')[:500].replace('\n', ' ')
raise RuntimeError(f'GitLab upload failed ({resp.status_code}): {body_preview}')
return True
def download_generic_file(
api_config: GitlabAPIConfiguration,
package_name: str,
package_version: str,
file_name: str,
download_dir: Path,
*,
timeout_s: float = 60.0,
) -> Path | None:
"""Download a file from the GitLab Generic Package Registry.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_name: The name of the generic package.
:type package_name: str
:param package_version: The version of the generic package.
:type package_version: str
:param file_name: The name of the file to download.
:type file_name: str
:param download_dir: The local directory where to save the file.
:type download_dir: Path
:param timeout_s: Request timeout in seconds.
:type timeout_s: float
:return: The Path to the downloaded file, or None if not found.
:rtype: Path | None
:raises RuntimeError: If the API request fails.
"""
download_dir = Path(download_dir).resolve()
download_dir.mkdir(parents=True, exist_ok=True)
url = (
f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/generic/'
f'{package_name}/{package_version}/{file_name}'
)
headers = build_gitlab_auth_headers(api_config)
head_resp = requests.head(url, headers=headers, timeout=timeout_s)
if head_resp.status_code == 404:
return None
if head_resp.status_code != 200:
body_preview = (head_resp.text or '')[:200].replace('\n', ' ')
raise RuntimeError(f'GitLab existence check failed ({head_resp.status_code}): {body_preview}')
dest = download_dir / file_name
resp = requests.get(url, headers=headers, stream=True, timeout=timeout_s)
if not (200 <= resp.status_code < 300):
body_preview = (resp.text or '')[:200].replace('\n', ' ')
raise RuntimeError(f'GitLab download failed ({resp.status_code}): {body_preview}')
with dest.open('wb') as fp:
for chunk in resp.iter_content(chunk_size=1024 * 1024):
if chunk:
fp.write(chunk)
return dest
def delete_generic_package_version(
api_config: GitlabAPIConfiguration, package_id: int, package_name: str, package_version: str
) -> bool:
"""Delete a package version from the GitLab Package Registry.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_id: The numeric ID of the package version.
:type package_id: int
:param package_name: The name of the package (for error reporting).
:type package_name: str
:param package_version: The version of the package (for error reporting).
:type package_version: str
:return: True if deleted, False if not found or forbidden.
:rtype: bool
:raises RuntimeError: If the API request fails.
"""
url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/{package_id}'
resp = requests.delete(url, headers=build_gitlab_auth_headers(api_config), timeout=60.0)
if resp.status_code == 204:
return True
if resp.status_code in (403, 404):
return False
body_preview = (resp.text or '')[:500].replace('\n', ' ')
raise RuntimeError(
f'GitLab delete failed for {package_name}/{package_version} (id={package_id}): {resp.status_code} {body_preview}'
)
def list_generic_package_files(
api_config: GitlabAPIConfiguration,
package_id: int,
*,
timeout_s: float = 60.0,
) -> list[dict[str, Any]]:
"""List files belonging to a specific package version.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_id: The numeric ID of the package version.
:type package_id: int
:param timeout_s: Request timeout in seconds.
:type timeout_s: float
:return: A list of file metadata dictionaries.
:rtype: list[dict[str, Any]]
:raises RuntimeError: If the API request fails.
"""
url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/{package_id}/package_files'
headers = build_gitlab_auth_headers(api_config)
out: list[dict[str, Any]] = []
page = 1
while True:
resp = requests.get(
url,
headers=headers,
params={'per_page': 100, 'page': page},
timeout=timeout_s,
)
if not (200 <= resp.status_code < 300):
body_preview = (resp.text or '')[:500].replace('\n', ' ')
raise RuntimeError(f'GitLab list package files failed ({resp.status_code}): {body_preview}')
data = resp.json()
if not data:
break
out.extend(data)
if len(data) < 100:
break
page += 1
return out
def delete_generic_package_file(
api_config: GitlabAPIConfiguration,
package_id: int,
file_id: int,
*,
timeout_s: float = 60.0,
) -> bool:
"""Delete a single file from a package version.
:param api_config: The GitLab API configuration.
:type api_config: GitlabAPIConfiguration
:param package_id: The numeric ID of the package version.
:type package_id: int
:param file_id: The numeric ID of the package file to delete.
:type file_id: int
:param timeout_s: Request timeout in seconds.
:type timeout_s: float
:return: True if deleted, False if not found or forbidden.
:rtype: bool
:raises RuntimeError: If the API request fails.
"""
url = (
f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}'
f'/packages/{package_id}/package_files/{file_id}'
)
resp = requests.delete(url, headers=build_gitlab_auth_headers(api_config), timeout=timeout_s)
if resp.status_code == 204:
return True
if resp.status_code in (403, 404):
return False
body_preview = (resp.text or '')[:500].replace('\n', ' ')
raise RuntimeError(
f'GitLab delete file failed (package_id={package_id}, file_id={file_id}): {resp.status_code} {body_preview}'
)