Coverage for src / mafw / devtools / dependencies / freeze.py: 83%
169 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-28 13:34 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-06-28 13:34 +0000
1# Copyright 2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Dependency freezing and unfreezing utilities for MAFw.
7This module provides functions for adding and removing computed upper-bound
8constraints in ``pyproject.toml`` dependency declarations. It is used by
9the release workflow to pin dependencies during release and unpin them
10afterwards.
11"""
13from __future__ import annotations
15from typing import Any, Callable, Final
17import tomlkit
19from mafw.devtools import ensure_devtools_available
21ensure_devtools_available()
23from packaging.requirements import Requirement # noqa: E402
24from packaging.specifiers import Specifier, SpecifierSet # noqa: E402
25from packaging.version import Version # noqa: E402
27from mafw.devtools import DevtoolsError # noqa: E402
28from mafw.devtools.dependencies.compile import ( # noqa: E402
29 PYPROJECT_FILE,
30 collect_compiled_dependency_versions,
31 load_pyproject_doc,
32 project_python_versions_from_doc,
33)
34from mafw.devtools.documentation.requirements import REQUIREMENTS_GROUPS # noqa: E402
35from mafw.tools.shell_tools import CONSOLE # noqa: E402
36from mafw.tools.shell_tools import run as cmd # noqa: E402
38_FROZEN_OPERATORS: Final[set[str]] = {'<', '<=', '~=', '==', '==='}
39"""Operators that already constrain the maximum compatible version and should not be auto-frozen."""
42def compute_upper_bound(lower_bound: str) -> str:
43 """
44 Compute an upper bound for a dependency based on PEP 440 compatible-release philosophy.
46 The rule implemented here is purposely conservative and mirrors the intent of compatible
47 release clauses while remaining explicit:
49 - For major-versioned releases (``X.*`` with ``X > 0``), freeze to ``<(X + 1)``.
50 - For ``0.*`` releases, freeze to ``<0.(minor + 1)`` (rolling compatibility during pre-1.0).
52 :param lower_bound: Version string used as the starting point for the freeze rule.
53 :type lower_bound: str
54 :return: Upper-bound version string without operator.
55 :rtype: str
56 :raises DevtoolsError: If the version cannot be parsed.
57 """
58 try:
59 version = Version(lower_bound)
60 except Exception as exc: # pragma: no cover
61 raise DevtoolsError(f'Unable to parse version "{lower_bound}" while computing dependency upper bound.') from exc
63 if version.major > 0:
64 return str(version.major + 1)
65 return f'0.{version.minor + 1}'
68def format_requirement(requirement: Requirement) -> str:
69 """
70 Serialize a packaging requirement object back to a PEP 508 compatible string.
72 :param requirement: Parsed requirement instance.
73 :type requirement: Requirement
74 :return: PEP 508 requirement string.
75 :rtype: str
76 """
77 name = str(requirement.name)
78 extras = sorted(str(extra) for extra in getattr(requirement, 'extras', set()) or set())
79 if extras:
80 name = f'{name}[{",".join(extras)}]'
82 if requirement.url: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 rendered = f'{name} @ {requirement.url}'
84 else:
85 spec = str(requirement.specifier).strip()
86 rendered = f'{name}{spec}' if spec else name
88 if requirement.marker:
89 rendered = f'{rendered} ; {requirement.marker}'
90 return rendered
93def iter_specifiers(requirement: Requirement) -> list[Specifier]:
94 """
95 Return a concrete list of specifiers for the given requirement.
97 :param requirement: Parsed requirement instance.
98 :type requirement: Requirement
99 :return: List of specifier objects.
100 :rtype: list[Specifier]
101 """
102 return list(requirement.specifier) if requirement.specifier else []
105def has_frozen_upper_bound(requirement: Requirement) -> bool:
106 """
107 Determine whether a requirement already contains an upper bound constraint.
109 :param requirement: Parsed requirement instance.
110 :type requirement: Requirement
111 :return: ``True`` if the requirement is already frozen.
112 :rtype: bool
113 """
114 return any(getattr(spec, 'operator', '') in _FROZEN_OPERATORS for spec in iter_specifiers(requirement))
117def highest_lower_bound(requirement: Requirement) -> str | None:
118 """
119 Extract the highest lower-bound version from ``>=`` and ``>`` specifiers.
121 :param requirement: Parsed requirement instance.
122 :type requirement: Requirement
123 :return: Highest lower bound version string, or ``None`` if missing.
124 :rtype: str | None
125 :raises DevtoolsError: If version parsing fails.
126 """
127 best: tuple[Version, str] | None = None
128 for spec in iter_specifiers(requirement):
129 if spec.operator not in {'>=', '>'}:
130 continue
131 try:
132 parsed = Version(spec.version)
133 except Exception as exc: # pragma: no cover
134 raise DevtoolsError(
135 f'Unable to parse lower bound "{spec.version}" in requirement "{requirement}".'
136 ) from exc
137 if best is None or parsed > best[0]: 137 ↛ 128line 137 didn't jump to line 128 because the condition on line 137 was always true
138 best = (parsed, spec.version)
139 return best[1] if best else None
142def freeze_requirement(requirement_text: str, *, resolved_version: str | None = None) -> tuple[str, list[str]]:
143 """
144 Add a computed upper bound to a requirement string, if eligible.
146 The function is intentionally conservative:
148 - URL-based requirements are skipped (cannot be version constrained).
149 - Requirements already containing ``<``, ``<=``, ``~=``, ``==`` or ``===`` are skipped.
150 - Requirements without a lower bound emit a warning and are left unchanged.
151 - When a resolved version is provided, the upper bound is computed from that
152 version instead of the declared lower bound.
154 :param requirement_text: Raw PEP 508 requirement string.
155 :type requirement_text: str
156 :return: Updated requirement text plus warnings.
157 :rtype: tuple[str, list[str]]
158 :raises DevtoolsError: If parsing fails.
159 """
160 warnings: list[str] = []
161 stripped = requirement_text.strip()
162 if not stripped: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 return requirement_text, warnings
165 try:
166 requirement = Requirement(stripped)
167 except Exception as exc:
168 raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc
170 if requirement.url:
171 warnings.append(f'Skipping URL requirement (cannot freeze): {requirement_text}')
172 return requirement_text, warnings
174 if has_frozen_upper_bound(requirement):
175 return requirement_text, warnings
177 lower = highest_lower_bound(requirement)
178 if lower is None:
179 warnings.append(f'Dependency has no lower bound and will not be frozen: {requirement_text}')
180 return requirement_text, warnings
182 upper = compute_upper_bound(resolved_version or lower)
183 combined = SpecifierSet(f'{requirement.specifier},<{upper}' if str(requirement.specifier).strip() else f'<{upper}')
184 requirement.specifier = combined
185 return format_requirement(requirement), warnings
188def unfreeze_requirement(requirement_text: str) -> tuple[str, list[str]]:
189 """
190 Remove a computed upper bound from a requirement string, if it matches the computed rule.
192 Only the auto-generated upper bound ``<upper`` is removed; existing manual upper bounds are preserved.
194 :param requirement_text: Raw PEP 508 requirement string.
195 :type requirement_text: str
196 :return: Updated requirement text plus warnings.
197 :rtype: tuple[str, list[str]]
198 :raises DevtoolsError: If parsing fails.
199 """
200 warnings: list[str] = []
201 stripped = requirement_text.strip()
202 if not stripped: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 return requirement_text, warnings
205 try:
206 requirement = Requirement(stripped)
207 except Exception as exc:
208 raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc
210 if requirement.url: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 return requirement_text, warnings
213 # Do not try to unfreeze already pinned/compatible requirements, or requirements that already
214 # had an upper bound before freezing (we cannot safely distinguish manual bounds).
215 specifiers = iter_specifiers(requirement)
216 if any(spec.operator in {'~=', '==', '==='} for spec in specifiers):
217 return requirement_text, warnings
218 lower = highest_lower_bound(requirement)
219 if lower is None: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 return requirement_text, warnings
222 if any(spec.operator == '<=' for spec in specifiers): 222 ↛ 223line 222 didn't jump to line 223 because the condition on line 222 was never true
223 return requirement_text, warnings
225 remaining: list[Specifier] = []
226 removed = False
227 for spec in specifiers:
228 if spec.operator == '<':
229 removed = True
230 continue
231 remaining.append(spec)
233 if not removed: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 return requirement_text, warnings
236 requirement.specifier = SpecifierSet(','.join(str(spec) for spec in remaining))
237 return format_requirement(requirement), warnings
240def update_dependency_list(
241 dependencies: Any,
242 *,
243 transformer: Callable[[str], tuple[str, list[str]]],
244 warnings: list[str],
245 context: str,
246) -> None:
247 """
248 Update a TOML list of dependency strings in-place.
250 :param dependencies: TOML array that contains dependency strings.
251 :type dependencies: Any
252 :param transformer: Callable applied to each dependency string.
253 :type transformer: Any
254 :param warnings: List of warnings to append to.
255 :type warnings: list[str]
256 :param context: Human-readable location for warnings.
257 :type context: str
258 """
259 for idx, item in enumerate(list(dependencies)):
260 if not isinstance(item, str): 260 ↛ 261line 260 didn't jump to line 261 because the condition on line 260 was never true
261 warnings.append(f'Skipping non-string dependency entry in {context}: {item!r}')
262 continue
263 updated, item_warnings = transformer(item)
264 warnings.extend(item_warnings)
265 dependencies[idx] = updated
268def freeze_pyproject_toml(
269 toml_text: str,
270 *,
271 resolved_versions: dict[str, Version] | None = None,
272 doc: tomlkit.TOMLDocument | None = None,
273) -> tuple[str, list[str]]:
274 """
275 Freeze dependencies in a ``pyproject.toml`` payload by adding upper bounds.
277 The TOML structure is preserved via tomlkit; only dependency strings may be normalized.
279 When ``resolved_versions`` is provided, the function uses those compiled
280 versions as the basis for upper-bound computation. Otherwise the declared
281 lower bounds are used as a fallback.
283 :param toml_text: Raw TOML file content.
284 :type toml_text: str
285 :param resolved_versions: Optional mapping of dependency names to resolved versions.
286 :type resolved_versions: dict[str, Version] | None
287 :param doc: Optional pre-parsed TOML document to reuse when available.
288 :type doc: tomlkit.TOMLDocument | None
289 :return: Updated TOML plus warnings.
290 :rtype: tuple[str, list[str]]
291 :raises DevtoolsError: If TOML parsing fails.
292 """
293 warnings: list[str] = []
294 if doc is None:
295 doc = load_pyproject_doc(toml_text)
297 project = doc.get('project')
298 if project is None: 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true
299 raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.')
301 def freeze_for_requirement(requirement_text: str) -> tuple[str, list[str]]:
302 try:
303 requirement = Requirement(requirement_text.strip())
304 except Exception as exc:
305 raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc
307 resolved_version = None
308 if resolved_versions is not None: 308 ↛ 312line 308 didn't jump to line 312 because the condition on line 308 was always true
309 resolved = resolved_versions.get(requirement.name.lower())
310 if resolved is not None:
311 resolved_version = str(resolved)
312 return freeze_requirement(requirement_text, resolved_version=resolved_version)
314 dependencies = project.get('dependencies')
315 if dependencies is not None: 315 ↛ 323line 315 didn't jump to line 323 because the condition on line 315 was always true
316 update_dependency_list(
317 dependencies,
318 transformer=freeze_for_requirement,
319 warnings=warnings,
320 context='project.dependencies',
321 )
323 optional = project.get('optional-dependencies')
324 if optional is not None:
325 for group, group_deps in optional.items():
326 update_dependency_list(
327 group_deps,
328 transformer=freeze_for_requirement,
329 warnings=warnings,
330 context=f'project.optional-dependencies.{group}',
331 )
333 return tomlkit.dumps(doc), warnings
336def unfreeze_pyproject_toml(
337 toml_text: str,
338 *,
339 baseline_toml_text: str | None = None,
340 doc: tomlkit.TOMLDocument | None = None,
341 baseline_doc: tomlkit.TOMLDocument | None = None,
342) -> tuple[str, list[str]]:
343 """
344 Unfreeze dependencies in a ``pyproject.toml`` payload by removing computed upper bounds.
346 Only upper bounds matching the computed rule are removed; existing manual constraints remain.
348 :param toml_text: Raw TOML file content.
349 :type toml_text: str
350 :param baseline_toml_text: Optional original TOML text captured before freezing.
351 When provided, unfreezing is computed against the baseline to avoid
352 altering dependencies that were already frozen before release.
353 :type baseline_toml_text: str | None
354 :param doc: Optional pre-parsed TOML document to reuse when available.
355 :type doc: tomlkit.TOMLDocument | None
356 :param baseline_doc: Optional pre-parsed baseline TOML document to reuse when available.
357 :type baseline_doc: tomlkit.TOMLDocument | None
358 :return: Updated TOML plus warnings.
359 :rtype: tuple[str, list[str]]
360 :raises DevtoolsError: If TOML parsing fails.
361 """
362 warnings: list[str] = []
363 if doc is None:
364 doc = load_pyproject_doc(toml_text)
365 if baseline_toml_text is not None:
366 if baseline_doc is None:
367 baseline_doc = load_pyproject_doc(baseline_toml_text)
369 current_project = doc.get('project')
370 baseline_project = baseline_doc.get('project')
371 if current_project is None or baseline_project is None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.')
374 current_project['dependencies'] = baseline_project.get('dependencies', current_project.get('dependencies'))
375 current_optional = current_project.get('optional-dependencies')
376 baseline_optional = baseline_project.get('optional-dependencies')
377 if current_optional is not None and baseline_optional is not None:
378 for group in current_optional:
379 if group in baseline_optional: 379 ↛ 378line 379 didn't jump to line 378 because the condition on line 379 was always true
380 current_optional[group] = baseline_optional[group]
381 return tomlkit.dumps(doc), warnings
383 project = doc.get('project')
384 if project is None: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true
385 raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.')
387 dependencies = project.get('dependencies')
388 if dependencies is not None: 388 ↛ 396line 388 didn't jump to line 396 because the condition on line 388 was always true
389 update_dependency_list(
390 dependencies,
391 transformer=unfreeze_requirement,
392 warnings=warnings,
393 context='project.dependencies',
394 )
396 optional = project.get('optional-dependencies')
397 if optional is not None: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 for group, group_deps in optional.items():
399 update_dependency_list(
400 group_deps,
401 transformer=unfreeze_requirement,
402 warnings=warnings,
403 context=f'project.optional-dependencies.{group}',
404 )
406 return tomlkit.dumps(doc), warnings
409def summarize_freeze_changes(before: str, after: str) -> str:
410 """
411 Build a short summary for dry-run output.
413 :param before: Original TOML content.
414 :type before: str
415 :param after: Updated TOML content.
416 :type after: str
417 :return: Human-readable summary line.
418 :rtype: str
419 """
420 if before == after: 420 ↛ 422line 420 didn't jump to line 422 because the condition on line 420 was always true
421 return 'No dependency constraints would be updated.'
422 before_lines = before.splitlines()
423 after_lines = after.splitlines()
424 return f'pyproject.toml would be updated ({len(before_lines)} -> {len(after_lines)} lines).'
427def freeze_dependencies(*, dry_run: bool) -> str: # pragma: no cover
428 """
429 Freeze dependencies by adding computed upper bounds in ``pyproject.toml``.
431 :param dry_run: Whether command execution is disabled.
432 :type dry_run: bool
433 :return: Original ``pyproject.toml`` content captured before freeze.
434 :rtype: str
435 """
436 CONSOLE.print('Freezing dependencies (adding upper bounds) ...')
437 before = PYPROJECT_FILE.read_text(encoding='utf-8')
438 doc = load_pyproject_doc(before)
439 supported_python_versions = project_python_versions_from_doc(doc)
440 resolved_versions = collect_compiled_dependency_versions(supported_python_versions)
441 after, warnings = freeze_pyproject_toml(before, resolved_versions=resolved_versions, doc=doc)
442 for warning in warnings:
443 CONSOLE.print(f'WARNING: {warning}')
445 if dry_run:
446 CONSOLE.print(summarize_freeze_changes(before, after))
447 return before
448 if before != after:
449 PYPROJECT_FILE.write_text(after, encoding='utf-8')
450 return before
453def unfreeze_dependencies(original_pyproject_toml: str, *, dry_run: bool) -> None: # pragma: no cover
454 """
455 Unfreeze dependencies by removing computed upper bounds in ``pyproject.toml``.
457 :param dry_run: Whether command execution is disabled.
458 :type dry_run: bool
459 """
460 CONSOLE.print('Unfreezing dependencies (removing computed upper bounds) ...')
461 before = PYPROJECT_FILE.read_text(encoding='utf-8')
462 doc = load_pyproject_doc(before)
463 baseline_doc = load_pyproject_doc(original_pyproject_toml)
464 after, warnings = unfreeze_pyproject_toml(
465 before,
466 baseline_toml_text=original_pyproject_toml,
467 doc=doc,
468 baseline_doc=baseline_doc,
469 )
470 for warning in warnings:
471 CONSOLE.print(f'WARNING: {warning}')
473 if dry_run:
474 CONSOLE.print(summarize_freeze_changes(before, after))
475 return
476 if before != after:
477 PYPROJECT_FILE.write_text(after, encoding='utf-8')
480def update_requirements_and_readme(*, dry_run: bool) -> None: # pragma: no cover
481 """
482 Update the requirements RST files and README.rst.
484 :param dry_run: Whether command execution is disabled.
485 :type dry_run: bool
486 """
487 CONSOLE.print('Updating requirements and README.rst...')
488 cmd(['hatch', 'run', 'dev:multidoc', 'requirements', '--update-readme', *REQUIREMENTS_GROUPS], dry_run=dry_run)