# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Dependency freezing and unfreezing utilities for MAFw.
This module provides functions for adding and removing computed upper-bound
constraints in ``pyproject.toml`` dependency declarations. It is used by
the release workflow to pin dependencies during release and unpin them
afterwards.
"""
from __future__ import annotations
from typing import Any, Callable, Final
import tomlkit
from mafw.devtools import ensure_devtools_available
ensure_devtools_available()
from packaging.requirements import Requirement # noqa: E402
from packaging.specifiers import Specifier, SpecifierSet # noqa: E402
from packaging.version import Version # noqa: E402
from mafw.devtools import DevtoolsError # noqa: E402
from mafw.devtools.dependencies.compile import ( # noqa: E402
PYPROJECT_FILE,
collect_compiled_dependency_versions,
load_pyproject_doc,
project_python_versions_from_doc,
)
from mafw.devtools.documentation.requirements import REQUIREMENTS_GROUPS # noqa: E402
from mafw.tools.shell_tools import CONSOLE # noqa: E402
from mafw.tools.shell_tools import run as cmd # noqa: E402
_FROZEN_OPERATORS: Final[set[str]] = {'<', '<=', '~=', '==', '==='}
"""Operators that already constrain the maximum compatible version and should not be auto-frozen."""
def compute_upper_bound(lower_bound: str) -> str:
"""
Compute an upper bound for a dependency based on PEP 440 compatible-release philosophy.
The rule implemented here is purposely conservative and mirrors the intent of compatible
release clauses while remaining explicit:
- For major-versioned releases (``X.*`` with ``X > 0``), freeze to ``<(X + 1)``.
- For ``0.*`` releases, freeze to ``<0.(minor + 1)`` (rolling compatibility during pre-1.0).
:param lower_bound: Version string used as the starting point for the freeze rule.
:type lower_bound: str
:return: Upper-bound version string without operator.
:rtype: str
:raises DevtoolsError: If the version cannot be parsed.
"""
try:
version = Version(lower_bound)
except Exception as exc: # pragma: no cover
raise DevtoolsError(f'Unable to parse version "{lower_bound}" while computing dependency upper bound.') from exc
if version.major > 0:
return str(version.major + 1)
return f'0.{version.minor + 1}'
[docs]
def iter_specifiers(requirement: Requirement) -> list[Specifier]:
"""
Return a concrete list of specifiers for the given requirement.
:param requirement: Parsed requirement instance.
:type requirement: Requirement
:return: List of specifier objects.
:rtype: list[Specifier]
"""
return list(requirement.specifier) if requirement.specifier else []
[docs]
def has_frozen_upper_bound(requirement: Requirement) -> bool:
"""
Determine whether a requirement already contains an upper bound constraint.
:param requirement: Parsed requirement instance.
:type requirement: Requirement
:return: ``True`` if the requirement is already frozen.
:rtype: bool
"""
return any(getattr(spec, 'operator', '') in _FROZEN_OPERATORS for spec in iter_specifiers(requirement))
[docs]
def highest_lower_bound(requirement: Requirement) -> str | None:
"""
Extract the highest lower-bound version from ``>=`` and ``>`` specifiers.
:param requirement: Parsed requirement instance.
:type requirement: Requirement
:return: Highest lower bound version string, or ``None`` if missing.
:rtype: str | None
:raises DevtoolsError: If version parsing fails.
"""
best: tuple[Version, str] | None = None
for spec in iter_specifiers(requirement):
if spec.operator not in {'>=', '>'}:
continue
try:
parsed = Version(spec.version)
except Exception as exc: # pragma: no cover
raise DevtoolsError(
f'Unable to parse lower bound "{spec.version}" in requirement "{requirement}".'
) from exc
if best is None or parsed > best[0]:
best = (parsed, spec.version)
return best[1] if best else None
def freeze_requirement(requirement_text: str, *, resolved_version: str | None = None) -> tuple[str, list[str]]:
"""
Add a computed upper bound to a requirement string, if eligible.
The function is intentionally conservative:
- URL-based requirements are skipped (cannot be version constrained).
- Requirements already containing ``<``, ``<=``, ``~=``, ``==`` or ``===`` are skipped.
- Requirements without a lower bound emit a warning and are left unchanged.
- When a resolved version is provided, the upper bound is computed from that
version instead of the declared lower bound.
:param requirement_text: Raw PEP 508 requirement string.
:type requirement_text: str
:return: Updated requirement text plus warnings.
:rtype: tuple[str, list[str]]
:raises DevtoolsError: If parsing fails.
"""
warnings: list[str] = []
stripped = requirement_text.strip()
if not stripped:
return requirement_text, warnings
try:
requirement = Requirement(stripped)
except Exception as exc:
raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc
if requirement.url:
warnings.append(f'Skipping URL requirement (cannot freeze): {requirement_text}')
return requirement_text, warnings
if has_frozen_upper_bound(requirement):
return requirement_text, warnings
lower = highest_lower_bound(requirement)
if lower is None:
warnings.append(f'Dependency has no lower bound and will not be frozen: {requirement_text}')
return requirement_text, warnings
upper = compute_upper_bound(resolved_version or lower)
combined = SpecifierSet(f'{requirement.specifier},<{upper}' if str(requirement.specifier).strip() else f'<{upper}')
requirement.specifier = combined
return format_requirement(requirement), warnings
def unfreeze_requirement(requirement_text: str) -> tuple[str, list[str]]:
"""
Remove a computed upper bound from a requirement string, if it matches the computed rule.
Only the auto-generated upper bound ``<upper`` is removed; existing manual upper bounds are preserved.
:param requirement_text: Raw PEP 508 requirement string.
:type requirement_text: str
:return: Updated requirement text plus warnings.
:rtype: tuple[str, list[str]]
:raises DevtoolsError: If parsing fails.
"""
warnings: list[str] = []
stripped = requirement_text.strip()
if not stripped:
return requirement_text, warnings
try:
requirement = Requirement(stripped)
except Exception as exc:
raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc
if requirement.url:
return requirement_text, warnings
# Do not try to unfreeze already pinned/compatible requirements, or requirements that already
# had an upper bound before freezing (we cannot safely distinguish manual bounds).
specifiers = iter_specifiers(requirement)
if any(spec.operator in {'~=', '==', '==='} for spec in specifiers):
return requirement_text, warnings
lower = highest_lower_bound(requirement)
if lower is None:
return requirement_text, warnings
if any(spec.operator == '<=' for spec in specifiers):
return requirement_text, warnings
remaining: list[Specifier] = []
removed = False
for spec in specifiers:
if spec.operator == '<':
removed = True
continue
remaining.append(spec)
if not removed:
return requirement_text, warnings
requirement.specifier = SpecifierSet(','.join(str(spec) for spec in remaining))
return format_requirement(requirement), warnings
[docs]
def update_dependency_list(
dependencies: Any,
*,
transformer: Callable[[str], tuple[str, list[str]]],
warnings: list[str],
context: str,
) -> None:
"""
Update a TOML list of dependency strings in-place.
:param dependencies: TOML array that contains dependency strings.
:type dependencies: Any
:param transformer: Callable applied to each dependency string.
:type transformer: Any
:param warnings: List of warnings to append to.
:type warnings: list[str]
:param context: Human-readable location for warnings.
:type context: str
"""
for idx, item in enumerate(list(dependencies)):
if not isinstance(item, str):
warnings.append(f'Skipping non-string dependency entry in {context}: {item!r}')
continue
updated, item_warnings = transformer(item)
warnings.extend(item_warnings)
dependencies[idx] = updated
def freeze_pyproject_toml(
toml_text: str,
*,
resolved_versions: dict[str, Version] | None = None,
doc: tomlkit.TOMLDocument | None = None,
) -> tuple[str, list[str]]:
"""
Freeze dependencies in a ``pyproject.toml`` payload by adding upper bounds.
The TOML structure is preserved via tomlkit; only dependency strings may be normalized.
When ``resolved_versions`` is provided, the function uses those compiled
versions as the basis for upper-bound computation. Otherwise the declared
lower bounds are used as a fallback.
:param toml_text: Raw TOML file content.
:type toml_text: str
:param resolved_versions: Optional mapping of dependency names to resolved versions.
:type resolved_versions: dict[str, Version] | None
:param doc: Optional pre-parsed TOML document to reuse when available.
:type doc: tomlkit.TOMLDocument | None
:return: Updated TOML plus warnings.
:rtype: tuple[str, list[str]]
:raises DevtoolsError: If TOML parsing fails.
"""
warnings: list[str] = []
if doc is None:
doc = load_pyproject_doc(toml_text)
project = doc.get('project')
if project is None:
raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.')
def freeze_for_requirement(requirement_text: str) -> tuple[str, list[str]]:
try:
requirement = Requirement(requirement_text.strip())
except Exception as exc:
raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc
resolved_version = None
if resolved_versions is not None:
resolved = resolved_versions.get(requirement.name.lower())
if resolved is not None:
resolved_version = str(resolved)
return freeze_requirement(requirement_text, resolved_version=resolved_version)
dependencies = project.get('dependencies')
if dependencies is not None:
update_dependency_list(
dependencies,
transformer=freeze_for_requirement,
warnings=warnings,
context='project.dependencies',
)
optional = project.get('optional-dependencies')
if optional is not None:
for group, group_deps in optional.items():
update_dependency_list(
group_deps,
transformer=freeze_for_requirement,
warnings=warnings,
context=f'project.optional-dependencies.{group}',
)
return tomlkit.dumps(doc), warnings
def unfreeze_pyproject_toml(
toml_text: str,
*,
baseline_toml_text: str | None = None,
doc: tomlkit.TOMLDocument | None = None,
baseline_doc: tomlkit.TOMLDocument | None = None,
) -> tuple[str, list[str]]:
"""
Unfreeze dependencies in a ``pyproject.toml`` payload by removing computed upper bounds.
Only upper bounds matching the computed rule are removed; existing manual constraints remain.
:param toml_text: Raw TOML file content.
:type toml_text: str
:param baseline_toml_text: Optional original TOML text captured before freezing.
When provided, unfreezing is computed against the baseline to avoid
altering dependencies that were already frozen before release.
:type baseline_toml_text: str | None
:param doc: Optional pre-parsed TOML document to reuse when available.
:type doc: tomlkit.TOMLDocument | None
:param baseline_doc: Optional pre-parsed baseline TOML document to reuse when available.
:type baseline_doc: tomlkit.TOMLDocument | None
:return: Updated TOML plus warnings.
:rtype: tuple[str, list[str]]
:raises DevtoolsError: If TOML parsing fails.
"""
warnings: list[str] = []
if doc is None:
doc = load_pyproject_doc(toml_text)
if baseline_toml_text is not None:
if baseline_doc is None:
baseline_doc = load_pyproject_doc(baseline_toml_text)
current_project = doc.get('project')
baseline_project = baseline_doc.get('project')
if current_project is None or baseline_project is None:
raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.')
current_project['dependencies'] = baseline_project.get('dependencies', current_project.get('dependencies'))
current_optional = current_project.get('optional-dependencies')
baseline_optional = baseline_project.get('optional-dependencies')
if current_optional is not None and baseline_optional is not None:
for group in current_optional:
if group in baseline_optional:
current_optional[group] = baseline_optional[group]
return tomlkit.dumps(doc), warnings
project = doc.get('project')
if project is None:
raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.')
dependencies = project.get('dependencies')
if dependencies is not None:
update_dependency_list(
dependencies,
transformer=unfreeze_requirement,
warnings=warnings,
context='project.dependencies',
)
optional = project.get('optional-dependencies')
if optional is not None:
for group, group_deps in optional.items():
update_dependency_list(
group_deps,
transformer=unfreeze_requirement,
warnings=warnings,
context=f'project.optional-dependencies.{group}',
)
return tomlkit.dumps(doc), warnings
[docs]
def summarize_freeze_changes(before: str, after: str) -> str:
"""
Build a short summary for dry-run output.
:param before: Original TOML content.
:type before: str
:param after: Updated TOML content.
:type after: str
:return: Human-readable summary line.
:rtype: str
"""
if before == after:
return 'No dependency constraints would be updated.'
before_lines = before.splitlines()
after_lines = after.splitlines()
return f'pyproject.toml would be updated ({len(before_lines)} -> {len(after_lines)} lines).'
def freeze_dependencies(*, dry_run: bool) -> str: # pragma: no cover
"""
Freeze dependencies by adding computed upper bounds in ``pyproject.toml``.
:param dry_run: Whether command execution is disabled.
:type dry_run: bool
:return: Original ``pyproject.toml`` content captured before freeze.
:rtype: str
"""
CONSOLE.print('Freezing dependencies (adding upper bounds) ...')
before = PYPROJECT_FILE.read_text(encoding='utf-8')
doc = load_pyproject_doc(before)
supported_python_versions = project_python_versions_from_doc(doc)
resolved_versions = collect_compiled_dependency_versions(supported_python_versions)
after, warnings = freeze_pyproject_toml(before, resolved_versions=resolved_versions, doc=doc)
for warning in warnings:
CONSOLE.print(f'WARNING: {warning}')
if dry_run:
CONSOLE.print(summarize_freeze_changes(before, after))
return before
if before != after:
PYPROJECT_FILE.write_text(after, encoding='utf-8')
return before
def unfreeze_dependencies(original_pyproject_toml: str, *, dry_run: bool) -> None: # pragma: no cover
"""
Unfreeze dependencies by removing computed upper bounds in ``pyproject.toml``.
:param dry_run: Whether command execution is disabled.
:type dry_run: bool
"""
CONSOLE.print('Unfreezing dependencies (removing computed upper bounds) ...')
before = PYPROJECT_FILE.read_text(encoding='utf-8')
doc = load_pyproject_doc(before)
baseline_doc = load_pyproject_doc(original_pyproject_toml)
after, warnings = unfreeze_pyproject_toml(
before,
baseline_toml_text=original_pyproject_toml,
doc=doc,
baseline_doc=baseline_doc,
)
for warning in warnings:
CONSOLE.print(f'WARNING: {warning}')
if dry_run:
CONSOLE.print(summarize_freeze_changes(before, after))
return
if before != after:
PYPROJECT_FILE.write_text(after, encoding='utf-8')
def update_requirements_and_readme(*, dry_run: bool) -> None: # pragma: no cover
"""
Update the requirements RST files and README.rst.
:param dry_run: Whether command execution is disabled.
:type dry_run: bool
"""
CONSOLE.print('Updating requirements and README.rst...')
cmd(['hatch', 'run', 'dev:multidoc', 'requirements', '--update-readme', *REQUIREMENTS_GROUPS], dry_run=dry_run)