Coverage for src / mafw / devtools / gitlab / registry.py: 99%

167 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-06-28 13:34 +0000

1# Copyright 2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5GitLab Generic Package Registry operations. 

6 

7This module centralizes the HTTP helpers for uploading, downloading, listing, 

8and deleting files in the GitLab Generic Package Registry. 

9 

10:author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

11""" 

12 

13from __future__ import annotations 

14 

15import re 

16from pathlib import Path 

17from typing import Any, Final, cast 

18 

19from mafw.devtools import ensure_devtools_available 

20 

21ensure_devtools_available() 

22 

23import requests # noqa: E402 

24from packaging.version import InvalidVersion, Version # noqa: E402 

25 

26from mafw.devtools.gitlab.api import ( # noqa: E402 

27 GitlabAPIConfiguration, 

28 build_gitlab_auth_headers, 

29) 

30 

31_PYLOCK_RE: Final[re.Pattern[str]] = re.compile(r'^pylock\.py(?P<python_version>3\.\d+)_ref\.toml$') 

32"""Pattern for MAFw reference dependency lock files.""" 

33 

34_MAFW_VERSION_RE: Final[re.Pattern[str]] = re.compile(r'^(?:v)?(?P<version>\d+\.\d+\.\d+)$') 

35"""Pattern for accepted MAFw version overrides.""" 

36 

37_CONTENT_TYPE_MAP: Final[dict[str, str]] = { 

38 '.toml': 'application/toml', 

39 '.zip': 'application/zip', 

40 '.json': 'application/json', 

41 '.tar': 'application/x-tar', 

42 '.gz': 'application/gzip', 

43} 

44"""Mapping of file extensions to Content-Type values for registry uploads.""" 

45 

46 

47def parse_pylock_reference_filename(file_name: str) -> tuple[str, str] | None: 

48 """Parse a reference dependency filename into the Python version and normalized file name. 

49 

50 :param file_name: The filename to parse. 

51 :type file_name: str 

52 :return: A tuple of (python_version, filename) if matched, else None. 

53 :rtype: tuple[str, str] | None 

54 """ 

55 base = Path(file_name).name 

56 match = _PYLOCK_RE.fullmatch(base) 

57 if match is None: 

58 return None 

59 python_version = match.group('python_version') 

60 return python_version, base 

61 

62 

63def normalize_dependency_registry_item(item: str) -> tuple[str, str]: 

64 """Normalize a registry item into a Python version and dependency file name. 

65 

66 :param item: A filename or a version string. 

67 :type item: str 

68 :return: A tuple of (python_version, reference_filename). 

69 :rtype: tuple[str, str] 

70 :raises ValueError: If the item is invalid or unsupported. 

71 """ 

72 item = item.strip() 

73 parsed = parse_pylock_reference_filename(item) 

74 if parsed is not None: 

75 return parsed 

76 

77 try: 

78 version = Version(item) 

79 except InvalidVersion as exc: 

80 raise ValueError(f'Invalid version or lock filename: {item}') from exc 

81 

82 if version.is_prerelease or version.is_devrelease: 

83 raise ValueError(f'Pre-release/dev versions are not supported here: {item}') 

84 

85 python_version = f'{version.major}.{version.minor}' 

86 return python_version, f'pylock.py{python_version}_ref.toml' 

87 

88 

89def normalize_mafw_version(version_text: str) -> str: 

90 """Normalize a MAFw version override and strip an optional leading ``v``. 

91 

92 :param version_text: The version string to normalize. 

93 :type version_text: str 

94 :return: The normalized version string. 

95 :rtype: str 

96 :raises ValueError: If the version string is invalid. 

97 """ 

98 match = _MAFW_VERSION_RE.fullmatch(version_text.strip()) 

99 if match is None: 

100 raise ValueError(f'Invalid MAFw version: {version_text}') 

101 return match.group('version') 

102 

103 

104def iter_local_pylock_reference_files(directory: Path) -> list[tuple[str, Path]]: 

105 """List local reference dependency files in a directory. 

106 

107 :param directory: The directory to search for reference files. 

108 :type directory: Path 

109 :return: A list of (python_version, file_path) tuples, sorted by version. 

110 :rtype: list[tuple[str, Path]] 

111 """ 

112 directory = Path(directory).resolve() 

113 if not directory.exists(): 

114 return [] 

115 items: list[tuple[str, Path]] = [] 

116 for fp in directory.iterdir(): 

117 if not fp.is_file(): 

118 continue 

119 parsed = parse_pylock_reference_filename(fp.name) 

120 if parsed is None: 

121 continue 

122 python_version, _ = parsed 

123 items.append((python_version, fp)) 

124 items.sort(key=lambda item: tuple(int(part) for part in item[0].split('.'))) 

125 return items 

126 

127 

128def list_generic_packages(api_config: GitlabAPIConfiguration, package_name: str) -> list[dict[str, Any]]: 

129 """List generic packages for a given name in the GitLab Package Registry. 

130 

131 :param api_config: The GitLab API configuration. 

132 :type api_config: GitlabAPIConfiguration 

133 :param package_name: The name of the package to list. 

134 :type package_name: str 

135 :return: A list of package dictionaries from the API. 

136 :rtype: list[dict[str, Any]] 

137 :raises RuntimeError: If the API request fails. 

138 """ 

139 url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages' 

140 headers = build_gitlab_auth_headers(api_config) 

141 out: list[dict[str, Any]] = [] 

142 page = 1 

143 while True: 

144 resp = requests.get( 

145 url, 

146 headers=headers, 

147 params=cast( 

148 dict[str, str | int], 

149 { 

150 'package_type': 'generic', 

151 'package_name': package_name, 

152 'per_page': 100, 

153 'page': page, 

154 'order_by': 'version', 

155 'sort': 'asc', 

156 }, 

157 ), 

158 timeout=60.0, 

159 ) 

160 if not (200 <= resp.status_code < 300): 

161 body_preview = (resp.text or '')[:500].replace('\n', ' ') 

162 raise RuntimeError(f'GitLab list packages failed ({resp.status_code}): {body_preview}') 

163 data = resp.json() 

164 if not data: 

165 break 

166 for pkg in data: 

167 if pkg.get('package_type') == 'generic' and pkg.get('name') == package_name: 

168 out.append(pkg) 

169 if len(data) < 100: 

170 break 

171 page += 1 

172 return out 

173 

174 

175def resolve_package_ids_by_version(api_config: GitlabAPIConfiguration, package_name: str) -> dict[str, int]: 

176 """Resolve package IDs for a package name by version. 

177 

178 :param api_config: The GitLab API configuration. 

179 :type api_config: GitlabAPIConfiguration 

180 :param package_name: The name of the package. 

181 :type package_name: str 

182 :return: A mapping of version strings to package IDs. 

183 :rtype: dict[str, int] 

184 """ 

185 mapping: dict[str, int] = {} 

186 for pkg in list_generic_packages(api_config, package_name): 

187 version = pkg.get('version') 

188 pkg_id = pkg.get('id') 

189 if isinstance(version, str) and isinstance(pkg_id, int): 

190 mapping[version] = pkg_id 

191 return mapping 

192 

193 

194def _infer_content_type(file_path: Path) -> str: 

195 """Infer a Content-Type from the file extension. 

196 

197 :param file_path: Path to the file. 

198 :type file_path: Path 

199 :return: A MIME type string. 

200 :rtype: str 

201 """ 

202 return _CONTENT_TYPE_MAP.get(file_path.suffix.lower(), 'application/octet-stream') 

203 

204 

205def upload_generic_file( 

206 api_config: GitlabAPIConfiguration, 

207 package_name: str, 

208 package_version: str, 

209 file_path: Path, 

210 *, 

211 replace_existing: bool = False, 

212 package_id: int | None = None, 

213 timeout_s: float = 60.0, 

214) -> bool: 

215 """Upload a file to the GitLab Generic Package Registry. 

216 

217 :param api_config: The GitLab API configuration. 

218 :type api_config: GitlabAPIConfiguration 

219 :param package_name: The name of the generic package. 

220 :type package_name: str 

221 :param package_version: The version of the generic package. 

222 :type package_version: str 

223 :param file_path: The local path to the file to upload. 

224 :type file_path: Path 

225 :param replace_existing: Whether to overwrite if the file already exists. 

226 :type replace_existing: bool 

227 :param package_id: Optional numeric package ID to avoid extra API calls. 

228 :type package_id: int | None 

229 :param timeout_s: Request timeout in seconds. 

230 :type timeout_s: float 

231 :return: True if the file was uploaded, False if skipped. 

232 :rtype: bool 

233 :raises FileNotFoundError: If the file_path does not exist. 

234 :raises RuntimeError: If the API request fails. 

235 """ 

236 file_path = Path(file_path).resolve() 

237 if not file_path.exists(): 

238 raise FileNotFoundError(f'File not found: {file_path}') 

239 

240 if package_id is None: 

241 mapping = resolve_package_ids_by_version(api_config, package_name) 

242 package_id = mapping.get(package_version) 

243 

244 if package_id is not None: 

245 existing_files = list_generic_package_files(api_config, package_id, timeout_s=timeout_s) 

246 matching = [f for f in existing_files if f.get('file_name') == file_path.name and isinstance(f.get('id'), int)] 

247 if matching: 

248 if not replace_existing: 

249 return False 

250 for f in matching: 

251 delete_generic_package_file(api_config, package_id, f['id'], timeout_s=timeout_s) 

252 

253 url = ( 

254 f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/generic/' 

255 f'{package_name}/{package_version}/{file_path.name}' 

256 ) 

257 headers = build_gitlab_auth_headers(api_config) 

258 with file_path.open('rb') as fp: 

259 put_headers = dict(headers) 

260 put_headers['Content-Type'] = _infer_content_type(file_path) 

261 resp = requests.put(url, headers=put_headers, data=fp, timeout=timeout_s) 

262 if not (200 <= resp.status_code < 300): 

263 body_preview = (resp.text or '')[:500].replace('\n', ' ') 

264 raise RuntimeError(f'GitLab upload failed ({resp.status_code}): {body_preview}') 

265 return True 

266 

267 

268def download_generic_file( 

269 api_config: GitlabAPIConfiguration, 

270 package_name: str, 

271 package_version: str, 

272 file_name: str, 

273 download_dir: Path, 

274 *, 

275 timeout_s: float = 60.0, 

276) -> Path | None: 

277 """Download a file from the GitLab Generic Package Registry. 

278 

279 :param api_config: The GitLab API configuration. 

280 :type api_config: GitlabAPIConfiguration 

281 :param package_name: The name of the generic package. 

282 :type package_name: str 

283 :param package_version: The version of the generic package. 

284 :type package_version: str 

285 :param file_name: The name of the file to download. 

286 :type file_name: str 

287 :param download_dir: The local directory where to save the file. 

288 :type download_dir: Path 

289 :param timeout_s: Request timeout in seconds. 

290 :type timeout_s: float 

291 :return: The Path to the downloaded file, or None if not found. 

292 :rtype: Path | None 

293 :raises RuntimeError: If the API request fails. 

294 """ 

295 download_dir = Path(download_dir).resolve() 

296 download_dir.mkdir(parents=True, exist_ok=True) 

297 

298 url = ( 

299 f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/generic/' 

300 f'{package_name}/{package_version}/{file_name}' 

301 ) 

302 headers = build_gitlab_auth_headers(api_config) 

303 head_resp = requests.head(url, headers=headers, timeout=timeout_s) 

304 if head_resp.status_code == 404: 

305 return None 

306 if head_resp.status_code != 200: 

307 body_preview = (head_resp.text or '')[:200].replace('\n', ' ') 

308 raise RuntimeError(f'GitLab existence check failed ({head_resp.status_code}): {body_preview}') 

309 

310 dest = download_dir / file_name 

311 resp = requests.get(url, headers=headers, stream=True, timeout=timeout_s) 

312 if not (200 <= resp.status_code < 300): 

313 body_preview = (resp.text or '')[:200].replace('\n', ' ') 

314 raise RuntimeError(f'GitLab download failed ({resp.status_code}): {body_preview}') 

315 

316 with dest.open('wb') as fp: 

317 for chunk in resp.iter_content(chunk_size=1024 * 1024): 

318 if chunk: 

319 fp.write(chunk) 

320 return dest 

321 

322 

323def delete_generic_package_version( 

324 api_config: GitlabAPIConfiguration, package_id: int, package_name: str, package_version: str 

325) -> bool: 

326 """Delete a package version from the GitLab Package Registry. 

327 

328 :param api_config: The GitLab API configuration. 

329 :type api_config: GitlabAPIConfiguration 

330 :param package_id: The numeric ID of the package version. 

331 :type package_id: int 

332 :param package_name: The name of the package (for error reporting). 

333 :type package_name: str 

334 :param package_version: The version of the package (for error reporting). 

335 :type package_version: str 

336 :return: True if deleted, False if not found or forbidden. 

337 :rtype: bool 

338 :raises RuntimeError: If the API request fails. 

339 """ 

340 url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/{package_id}' 

341 resp = requests.delete(url, headers=build_gitlab_auth_headers(api_config), timeout=60.0) 

342 if resp.status_code == 204: 

343 return True 

344 if resp.status_code in (403, 404): 

345 return False 

346 body_preview = (resp.text or '')[:500].replace('\n', ' ') 

347 raise RuntimeError( 

348 f'GitLab delete failed for {package_name}/{package_version} (id={package_id}): {resp.status_code} {body_preview}' 

349 ) 

350 

351 

352def list_generic_package_files( 

353 api_config: GitlabAPIConfiguration, 

354 package_id: int, 

355 *, 

356 timeout_s: float = 60.0, 

357) -> list[dict[str, Any]]: 

358 """List files belonging to a specific package version. 

359 

360 :param api_config: The GitLab API configuration. 

361 :type api_config: GitlabAPIConfiguration 

362 :param package_id: The numeric ID of the package version. 

363 :type package_id: int 

364 :param timeout_s: Request timeout in seconds. 

365 :type timeout_s: float 

366 :return: A list of file metadata dictionaries. 

367 :rtype: list[dict[str, Any]] 

368 :raises RuntimeError: If the API request fails. 

369 """ 

370 url = f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}/packages/{package_id}/package_files' 

371 headers = build_gitlab_auth_headers(api_config) 

372 out: list[dict[str, Any]] = [] 

373 page = 1 

374 while True: 

375 resp = requests.get( 

376 url, 

377 headers=headers, 

378 params={'per_page': 100, 'page': page}, 

379 timeout=timeout_s, 

380 ) 

381 if not (200 <= resp.status_code < 300): 

382 body_preview = (resp.text or '')[:500].replace('\n', ' ') 

383 raise RuntimeError(f'GitLab list package files failed ({resp.status_code}): {body_preview}') 

384 data = resp.json() 

385 if not data: 385 ↛ 386line 385 didn't jump to line 386 because the condition on line 385 was never true

386 break 

387 out.extend(data) 

388 if len(data) < 100: 

389 break 

390 page += 1 

391 return out 

392 

393 

394def delete_generic_package_file( 

395 api_config: GitlabAPIConfiguration, 

396 package_id: int, 

397 file_id: int, 

398 *, 

399 timeout_s: float = 60.0, 

400) -> bool: 

401 """Delete a single file from a package version. 

402 

403 :param api_config: The GitLab API configuration. 

404 :type api_config: GitlabAPIConfiguration 

405 :param package_id: The numeric ID of the package version. 

406 :type package_id: int 

407 :param file_id: The numeric ID of the package file to delete. 

408 :type file_id: int 

409 :param timeout_s: Request timeout in seconds. 

410 :type timeout_s: float 

411 :return: True if deleted, False if not found or forbidden. 

412 :rtype: bool 

413 :raises RuntimeError: If the API request fails. 

414 """ 

415 url = ( 

416 f'{api_config.api_url.rstrip("/")}/projects/{api_config.project_id}' 

417 f'/packages/{package_id}/package_files/{file_id}' 

418 ) 

419 resp = requests.delete(url, headers=build_gitlab_auth_headers(api_config), timeout=timeout_s) 

420 if resp.status_code == 204: 

421 return True 

422 if resp.status_code in (403, 404): 

423 return False 

424 body_preview = (resp.text or '')[:500].replace('\n', ' ') 

425 raise RuntimeError( 

426 f'GitLab delete file failed (package_id={package_id}, file_id={file_id}): {resp.status_code} {body_preview}' 

427 )