Coverage for src / mafw / devtools / dependencies / freeze.py: 83%

169 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-06-28 13:34 +0000

1# Copyright 2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Dependency freezing and unfreezing utilities for MAFw. 

6 

7This module provides functions for adding and removing computed upper-bound 

8constraints in ``pyproject.toml`` dependency declarations. It is used by 

9the release workflow to pin dependencies during release and unpin them 

10afterwards. 

11""" 

12 

13from __future__ import annotations 

14 

15from typing import Any, Callable, Final 

16 

17import tomlkit 

18 

19from mafw.devtools import ensure_devtools_available 

20 

21ensure_devtools_available() 

22 

23from packaging.requirements import Requirement # noqa: E402 

24from packaging.specifiers import Specifier, SpecifierSet # noqa: E402 

25from packaging.version import Version # noqa: E402 

26 

27from mafw.devtools import DevtoolsError # noqa: E402 

28from mafw.devtools.dependencies.compile import ( # noqa: E402 

29 PYPROJECT_FILE, 

30 collect_compiled_dependency_versions, 

31 load_pyproject_doc, 

32 project_python_versions_from_doc, 

33) 

34from mafw.devtools.documentation.requirements import REQUIREMENTS_GROUPS # noqa: E402 

35from mafw.tools.shell_tools import CONSOLE # noqa: E402 

36from mafw.tools.shell_tools import run as cmd # noqa: E402 

37 

38_FROZEN_OPERATORS: Final[set[str]] = {'<', '<=', '~=', '==', '==='} 

39"""Operators that already constrain the maximum compatible version and should not be auto-frozen.""" 

40 

41 

42def compute_upper_bound(lower_bound: str) -> str: 

43 """ 

44 Compute an upper bound for a dependency based on PEP 440 compatible-release philosophy. 

45 

46 The rule implemented here is purposely conservative and mirrors the intent of compatible 

47 release clauses while remaining explicit: 

48 

49 - For major-versioned releases (``X.*`` with ``X > 0``), freeze to ``<(X + 1)``. 

50 - For ``0.*`` releases, freeze to ``<0.(minor + 1)`` (rolling compatibility during pre-1.0). 

51 

52 :param lower_bound: Version string used as the starting point for the freeze rule. 

53 :type lower_bound: str 

54 :return: Upper-bound version string without operator. 

55 :rtype: str 

56 :raises DevtoolsError: If the version cannot be parsed. 

57 """ 

58 try: 

59 version = Version(lower_bound) 

60 except Exception as exc: # pragma: no cover 

61 raise DevtoolsError(f'Unable to parse version "{lower_bound}" while computing dependency upper bound.') from exc 

62 

63 if version.major > 0: 

64 return str(version.major + 1) 

65 return f'0.{version.minor + 1}' 

66 

67 

68def format_requirement(requirement: Requirement) -> str: 

69 """ 

70 Serialize a packaging requirement object back to a PEP 508 compatible string. 

71 

72 :param requirement: Parsed requirement instance. 

73 :type requirement: Requirement 

74 :return: PEP 508 requirement string. 

75 :rtype: str 

76 """ 

77 name = str(requirement.name) 

78 extras = sorted(str(extra) for extra in getattr(requirement, 'extras', set()) or set()) 

79 if extras: 

80 name = f'{name}[{",".join(extras)}]' 

81 

82 if requirement.url: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 rendered = f'{name} @ {requirement.url}' 

84 else: 

85 spec = str(requirement.specifier).strip() 

86 rendered = f'{name}{spec}' if spec else name 

87 

88 if requirement.marker: 

89 rendered = f'{rendered} ; {requirement.marker}' 

90 return rendered 

91 

92 

93def iter_specifiers(requirement: Requirement) -> list[Specifier]: 

94 """ 

95 Return a concrete list of specifiers for the given requirement. 

96 

97 :param requirement: Parsed requirement instance. 

98 :type requirement: Requirement 

99 :return: List of specifier objects. 

100 :rtype: list[Specifier] 

101 """ 

102 return list(requirement.specifier) if requirement.specifier else [] 

103 

104 

105def has_frozen_upper_bound(requirement: Requirement) -> bool: 

106 """ 

107 Determine whether a requirement already contains an upper bound constraint. 

108 

109 :param requirement: Parsed requirement instance. 

110 :type requirement: Requirement 

111 :return: ``True`` if the requirement is already frozen. 

112 :rtype: bool 

113 """ 

114 return any(getattr(spec, 'operator', '') in _FROZEN_OPERATORS for spec in iter_specifiers(requirement)) 

115 

116 

117def highest_lower_bound(requirement: Requirement) -> str | None: 

118 """ 

119 Extract the highest lower-bound version from ``>=`` and ``>`` specifiers. 

120 

121 :param requirement: Parsed requirement instance. 

122 :type requirement: Requirement 

123 :return: Highest lower bound version string, or ``None`` if missing. 

124 :rtype: str | None 

125 :raises DevtoolsError: If version parsing fails. 

126 """ 

127 best: tuple[Version, str] | None = None 

128 for spec in iter_specifiers(requirement): 

129 if spec.operator not in {'>=', '>'}: 

130 continue 

131 try: 

132 parsed = Version(spec.version) 

133 except Exception as exc: # pragma: no cover 

134 raise DevtoolsError( 

135 f'Unable to parse lower bound "{spec.version}" in requirement "{requirement}".' 

136 ) from exc 

137 if best is None or parsed > best[0]: 137 ↛ 128line 137 didn't jump to line 128 because the condition on line 137 was always true

138 best = (parsed, spec.version) 

139 return best[1] if best else None 

140 

141 

142def freeze_requirement(requirement_text: str, *, resolved_version: str | None = None) -> tuple[str, list[str]]: 

143 """ 

144 Add a computed upper bound to a requirement string, if eligible. 

145 

146 The function is intentionally conservative: 

147 

148 - URL-based requirements are skipped (cannot be version constrained). 

149 - Requirements already containing ``<``, ``<=``, ``~=``, ``==`` or ``===`` are skipped. 

150 - Requirements without a lower bound emit a warning and are left unchanged. 

151 - When a resolved version is provided, the upper bound is computed from that 

152 version instead of the declared lower bound. 

153 

154 :param requirement_text: Raw PEP 508 requirement string. 

155 :type requirement_text: str 

156 :return: Updated requirement text plus warnings. 

157 :rtype: tuple[str, list[str]] 

158 :raises DevtoolsError: If parsing fails. 

159 """ 

160 warnings: list[str] = [] 

161 stripped = requirement_text.strip() 

162 if not stripped: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 return requirement_text, warnings 

164 

165 try: 

166 requirement = Requirement(stripped) 

167 except Exception as exc: 

168 raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc 

169 

170 if requirement.url: 

171 warnings.append(f'Skipping URL requirement (cannot freeze): {requirement_text}') 

172 return requirement_text, warnings 

173 

174 if has_frozen_upper_bound(requirement): 

175 return requirement_text, warnings 

176 

177 lower = highest_lower_bound(requirement) 

178 if lower is None: 

179 warnings.append(f'Dependency has no lower bound and will not be frozen: {requirement_text}') 

180 return requirement_text, warnings 

181 

182 upper = compute_upper_bound(resolved_version or lower) 

183 combined = SpecifierSet(f'{requirement.specifier},<{upper}' if str(requirement.specifier).strip() else f'<{upper}') 

184 requirement.specifier = combined 

185 return format_requirement(requirement), warnings 

186 

187 

188def unfreeze_requirement(requirement_text: str) -> tuple[str, list[str]]: 

189 """ 

190 Remove a computed upper bound from a requirement string, if it matches the computed rule. 

191 

192 Only the auto-generated upper bound ``<upper`` is removed; existing manual upper bounds are preserved. 

193 

194 :param requirement_text: Raw PEP 508 requirement string. 

195 :type requirement_text: str 

196 :return: Updated requirement text plus warnings. 

197 :rtype: tuple[str, list[str]] 

198 :raises DevtoolsError: If parsing fails. 

199 """ 

200 warnings: list[str] = [] 

201 stripped = requirement_text.strip() 

202 if not stripped: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 return requirement_text, warnings 

204 

205 try: 

206 requirement = Requirement(stripped) 

207 except Exception as exc: 

208 raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc 

209 

210 if requirement.url: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 return requirement_text, warnings 

212 

213 # Do not try to unfreeze already pinned/compatible requirements, or requirements that already 

214 # had an upper bound before freezing (we cannot safely distinguish manual bounds). 

215 specifiers = iter_specifiers(requirement) 

216 if any(spec.operator in {'~=', '==', '==='} for spec in specifiers): 

217 return requirement_text, warnings 

218 lower = highest_lower_bound(requirement) 

219 if lower is None: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 return requirement_text, warnings 

221 

222 if any(spec.operator == '<=' for spec in specifiers): 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true

223 return requirement_text, warnings 

224 

225 remaining: list[Specifier] = [] 

226 removed = False 

227 for spec in specifiers: 

228 if spec.operator == '<': 

229 removed = True 

230 continue 

231 remaining.append(spec) 

232 

233 if not removed: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 return requirement_text, warnings 

235 

236 requirement.specifier = SpecifierSet(','.join(str(spec) for spec in remaining)) 

237 return format_requirement(requirement), warnings 

238 

239 

240def update_dependency_list( 

241 dependencies: Any, 

242 *, 

243 transformer: Callable[[str], tuple[str, list[str]]], 

244 warnings: list[str], 

245 context: str, 

246) -> None: 

247 """ 

248 Update a TOML list of dependency strings in-place. 

249 

250 :param dependencies: TOML array that contains dependency strings. 

251 :type dependencies: Any 

252 :param transformer: Callable applied to each dependency string. 

253 :type transformer: Any 

254 :param warnings: List of warnings to append to. 

255 :type warnings: list[str] 

256 :param context: Human-readable location for warnings. 

257 :type context: str 

258 """ 

259 for idx, item in enumerate(list(dependencies)): 

260 if not isinstance(item, str): 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true

261 warnings.append(f'Skipping non-string dependency entry in {context}: {item!r}') 

262 continue 

263 updated, item_warnings = transformer(item) 

264 warnings.extend(item_warnings) 

265 dependencies[idx] = updated 

266 

267 

268def freeze_pyproject_toml( 

269 toml_text: str, 

270 *, 

271 resolved_versions: dict[str, Version] | None = None, 

272 doc: tomlkit.TOMLDocument | None = None, 

273) -> tuple[str, list[str]]: 

274 """ 

275 Freeze dependencies in a ``pyproject.toml`` payload by adding upper bounds. 

276 

277 The TOML structure is preserved via tomlkit; only dependency strings may be normalized. 

278 

279 When ``resolved_versions`` is provided, the function uses those compiled 

280 versions as the basis for upper-bound computation. Otherwise the declared 

281 lower bounds are used as a fallback. 

282 

283 :param toml_text: Raw TOML file content. 

284 :type toml_text: str 

285 :param resolved_versions: Optional mapping of dependency names to resolved versions. 

286 :type resolved_versions: dict[str, Version] | None 

287 :param doc: Optional pre-parsed TOML document to reuse when available. 

288 :type doc: tomlkit.TOMLDocument | None 

289 :return: Updated TOML plus warnings. 

290 :rtype: tuple[str, list[str]] 

291 :raises DevtoolsError: If TOML parsing fails. 

292 """ 

293 warnings: list[str] = [] 

294 if doc is None: 

295 doc = load_pyproject_doc(toml_text) 

296 

297 project = doc.get('project') 

298 if project is None: 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true

299 raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.') 

300 

301 def freeze_for_requirement(requirement_text: str) -> tuple[str, list[str]]: 

302 try: 

303 requirement = Requirement(requirement_text.strip()) 

304 except Exception as exc: 

305 raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc 

306 

307 resolved_version = None 

308 if resolved_versions is not None: 308 ↛ 312line 308 didn't jump to line 312 because the condition on line 308 was always true

309 resolved = resolved_versions.get(requirement.name.lower()) 

310 if resolved is not None: 

311 resolved_version = str(resolved) 

312 return freeze_requirement(requirement_text, resolved_version=resolved_version) 

313 

314 dependencies = project.get('dependencies') 

315 if dependencies is not None: 315 ↛ 323line 315 didn't jump to line 323 because the condition on line 315 was always true

316 update_dependency_list( 

317 dependencies, 

318 transformer=freeze_for_requirement, 

319 warnings=warnings, 

320 context='project.dependencies', 

321 ) 

322 

323 optional = project.get('optional-dependencies') 

324 if optional is not None: 

325 for group, group_deps in optional.items(): 

326 update_dependency_list( 

327 group_deps, 

328 transformer=freeze_for_requirement, 

329 warnings=warnings, 

330 context=f'project.optional-dependencies.{group}', 

331 ) 

332 

333 return tomlkit.dumps(doc), warnings 

334 

335 

336def unfreeze_pyproject_toml( 

337 toml_text: str, 

338 *, 

339 baseline_toml_text: str | None = None, 

340 doc: tomlkit.TOMLDocument | None = None, 

341 baseline_doc: tomlkit.TOMLDocument | None = None, 

342) -> tuple[str, list[str]]: 

343 """ 

344 Unfreeze dependencies in a ``pyproject.toml`` payload by removing computed upper bounds. 

345 

346 Only upper bounds matching the computed rule are removed; existing manual constraints remain. 

347 

348 :param toml_text: Raw TOML file content. 

349 :type toml_text: str 

350 :param baseline_toml_text: Optional original TOML text captured before freezing. 

351 When provided, unfreezing is computed against the baseline to avoid 

352 altering dependencies that were already frozen before release. 

353 :type baseline_toml_text: str | None 

354 :param doc: Optional pre-parsed TOML document to reuse when available. 

355 :type doc: tomlkit.TOMLDocument | None 

356 :param baseline_doc: Optional pre-parsed baseline TOML document to reuse when available. 

357 :type baseline_doc: tomlkit.TOMLDocument | None 

358 :return: Updated TOML plus warnings. 

359 :rtype: tuple[str, list[str]] 

360 :raises DevtoolsError: If TOML parsing fails. 

361 """ 

362 warnings: list[str] = [] 

363 if doc is None: 

364 doc = load_pyproject_doc(toml_text) 

365 if baseline_toml_text is not None: 

366 if baseline_doc is None: 

367 baseline_doc = load_pyproject_doc(baseline_toml_text) 

368 

369 current_project = doc.get('project') 

370 baseline_project = baseline_doc.get('project') 

371 if current_project is None or baseline_project is None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.') 

373 

374 current_project['dependencies'] = baseline_project.get('dependencies', current_project.get('dependencies')) 

375 current_optional = current_project.get('optional-dependencies') 

376 baseline_optional = baseline_project.get('optional-dependencies') 

377 if current_optional is not None and baseline_optional is not None: 

378 for group in current_optional: 

379 if group in baseline_optional: 379 ↛ 378line 379 didn't jump to line 378 because the condition on line 379 was always true

380 current_optional[group] = baseline_optional[group] 

381 return tomlkit.dumps(doc), warnings 

382 

383 project = doc.get('project') 

384 if project is None: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true

385 raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.') 

386 

387 dependencies = project.get('dependencies') 

388 if dependencies is not None: 388 ↛ 396line 388 didn't jump to line 396 because the condition on line 388 was always true

389 update_dependency_list( 

390 dependencies, 

391 transformer=unfreeze_requirement, 

392 warnings=warnings, 

393 context='project.dependencies', 

394 ) 

395 

396 optional = project.get('optional-dependencies') 

397 if optional is not None: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true

398 for group, group_deps in optional.items(): 

399 update_dependency_list( 

400 group_deps, 

401 transformer=unfreeze_requirement, 

402 warnings=warnings, 

403 context=f'project.optional-dependencies.{group}', 

404 ) 

405 

406 return tomlkit.dumps(doc), warnings 

407 

408 

409def summarize_freeze_changes(before: str, after: str) -> str: 

410 """ 

411 Build a short summary for dry-run output. 

412 

413 :param before: Original TOML content. 

414 :type before: str 

415 :param after: Updated TOML content. 

416 :type after: str 

417 :return: Human-readable summary line. 

418 :rtype: str 

419 """ 

420 if before == after: 420 ↛ 422line 420 didn't jump to line 422 because the condition on line 420 was always true

421 return 'No dependency constraints would be updated.' 

422 before_lines = before.splitlines() 

423 after_lines = after.splitlines() 

424 return f'pyproject.toml would be updated ({len(before_lines)} -> {len(after_lines)} lines).' 

425 

426 

427def freeze_dependencies(*, dry_run: bool) -> str: # pragma: no cover 

428 """ 

429 Freeze dependencies by adding computed upper bounds in ``pyproject.toml``. 

430 

431 :param dry_run: Whether command execution is disabled. 

432 :type dry_run: bool 

433 :return: Original ``pyproject.toml`` content captured before freeze. 

434 :rtype: str 

435 """ 

436 CONSOLE.print('Freezing dependencies (adding upper bounds) ...') 

437 before = PYPROJECT_FILE.read_text(encoding='utf-8') 

438 doc = load_pyproject_doc(before) 

439 supported_python_versions = project_python_versions_from_doc(doc) 

440 resolved_versions = collect_compiled_dependency_versions(supported_python_versions) 

441 after, warnings = freeze_pyproject_toml(before, resolved_versions=resolved_versions, doc=doc) 

442 for warning in warnings: 

443 CONSOLE.print(f'WARNING: {warning}') 

444 

445 if dry_run: 

446 CONSOLE.print(summarize_freeze_changes(before, after)) 

447 return before 

448 if before != after: 

449 PYPROJECT_FILE.write_text(after, encoding='utf-8') 

450 return before 

451 

452 

453def unfreeze_dependencies(original_pyproject_toml: str, *, dry_run: bool) -> None: # pragma: no cover 

454 """ 

455 Unfreeze dependencies by removing computed upper bounds in ``pyproject.toml``. 

456 

457 :param dry_run: Whether command execution is disabled. 

458 :type dry_run: bool 

459 """ 

460 CONSOLE.print('Unfreezing dependencies (removing computed upper bounds) ...') 

461 before = PYPROJECT_FILE.read_text(encoding='utf-8') 

462 doc = load_pyproject_doc(before) 

463 baseline_doc = load_pyproject_doc(original_pyproject_toml) 

464 after, warnings = unfreeze_pyproject_toml( 

465 before, 

466 baseline_toml_text=original_pyproject_toml, 

467 doc=doc, 

468 baseline_doc=baseline_doc, 

469 ) 

470 for warning in warnings: 

471 CONSOLE.print(f'WARNING: {warning}') 

472 

473 if dry_run: 

474 CONSOLE.print(summarize_freeze_changes(before, after)) 

475 return 

476 if before != after: 

477 PYPROJECT_FILE.write_text(after, encoding='utf-8') 

478 

479 

480def update_requirements_and_readme(*, dry_run: bool) -> None: # pragma: no cover 

481 """ 

482 Update the requirements RST files and README.rst. 

483 

484 :param dry_run: Whether command execution is disabled. 

485 :type dry_run: bool 

486 """ 

487 CONSOLE.print('Updating requirements and README.rst...') 

488 cmd(['hatch', 'run', 'dev:multidoc', 'requirements', '--update-readme', *REQUIREMENTS_GROUPS], dry_run=dry_run)