Source code for mafw.devtools.dependencies.freeze

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""
Dependency freezing and unfreezing utilities for MAFw.

This module provides functions for adding and removing computed upper-bound
constraints in ``pyproject.toml`` dependency declarations. It is used by
the release workflow to pin dependencies during release and unpin them
afterwards.
"""

from __future__ import annotations

from typing import Any, Callable, Final

import tomlkit

from mafw.devtools import ensure_devtools_available

ensure_devtools_available()

from packaging.requirements import Requirement  # noqa: E402
from packaging.specifiers import Specifier, SpecifierSet  # noqa: E402
from packaging.version import Version  # noqa: E402

from mafw.devtools import DevtoolsError  # noqa: E402
from mafw.devtools.dependencies.compile import (  # noqa: E402
    PYPROJECT_FILE,
    collect_compiled_dependency_versions,
    load_pyproject_doc,
    project_python_versions_from_doc,
)
from mafw.devtools.documentation.requirements import REQUIREMENTS_GROUPS  # noqa: E402
from mafw.tools.shell_tools import CONSOLE  # noqa: E402
from mafw.tools.shell_tools import run as cmd  # noqa: E402

_FROZEN_OPERATORS: Final[set[str]] = {'<', '<=', '~=', '==', '==='}
"""Operators that already constrain the maximum compatible version and should not be auto-frozen."""


def compute_upper_bound(lower_bound: str) -> str:
    """
    Compute an upper bound for a dependency based on PEP 440 compatible-release philosophy.

    The rule implemented here is purposely conservative and mirrors the intent of compatible
    release clauses while remaining explicit:

    - For major-versioned releases (``X.*`` with ``X > 0``), freeze to ``<(X + 1)``.
    - For ``0.*`` releases, freeze to ``<0.(minor + 1)`` (rolling compatibility during pre-1.0).

    :param lower_bound: Version string used as the starting point for the freeze rule.
    :type lower_bound: str
    :return: Upper-bound version string without operator.
    :rtype: str
    :raises DevtoolsError: If the version cannot be parsed.
    """
    try:
        version = Version(lower_bound)
    except Exception as exc:  # pragma: no cover
        raise DevtoolsError(f'Unable to parse version "{lower_bound}" while computing dependency upper bound.') from exc

    if version.major > 0:
        return str(version.major + 1)
    return f'0.{version.minor + 1}'


[docs] def format_requirement(requirement: Requirement) -> str: """ Serialize a packaging requirement object back to a PEP 508 compatible string. :param requirement: Parsed requirement instance. :type requirement: Requirement :return: PEP 508 requirement string. :rtype: str """ name = str(requirement.name) extras = sorted(str(extra) for extra in getattr(requirement, 'extras', set()) or set()) if extras: name = f'{name}[{",".join(extras)}]' if requirement.url: rendered = f'{name} @ {requirement.url}' else: spec = str(requirement.specifier).strip() rendered = f'{name}{spec}' if spec else name if requirement.marker: rendered = f'{rendered} ; {requirement.marker}' return rendered
[docs] def iter_specifiers(requirement: Requirement) -> list[Specifier]: """ Return a concrete list of specifiers for the given requirement. :param requirement: Parsed requirement instance. :type requirement: Requirement :return: List of specifier objects. :rtype: list[Specifier] """ return list(requirement.specifier) if requirement.specifier else []
[docs] def has_frozen_upper_bound(requirement: Requirement) -> bool: """ Determine whether a requirement already contains an upper bound constraint. :param requirement: Parsed requirement instance. :type requirement: Requirement :return: ``True`` if the requirement is already frozen. :rtype: bool """ return any(getattr(spec, 'operator', '') in _FROZEN_OPERATORS for spec in iter_specifiers(requirement))
[docs] def highest_lower_bound(requirement: Requirement) -> str | None: """ Extract the highest lower-bound version from ``>=`` and ``>`` specifiers. :param requirement: Parsed requirement instance. :type requirement: Requirement :return: Highest lower bound version string, or ``None`` if missing. :rtype: str | None :raises DevtoolsError: If version parsing fails. """ best: tuple[Version, str] | None = None for spec in iter_specifiers(requirement): if spec.operator not in {'>=', '>'}: continue try: parsed = Version(spec.version) except Exception as exc: # pragma: no cover raise DevtoolsError( f'Unable to parse lower bound "{spec.version}" in requirement "{requirement}".' ) from exc if best is None or parsed > best[0]: best = (parsed, spec.version) return best[1] if best else None
def freeze_requirement(requirement_text: str, *, resolved_version: str | None = None) -> tuple[str, list[str]]: """ Add a computed upper bound to a requirement string, if eligible. The function is intentionally conservative: - URL-based requirements are skipped (cannot be version constrained). - Requirements already containing ``<``, ``<=``, ``~=``, ``==`` or ``===`` are skipped. - Requirements without a lower bound emit a warning and are left unchanged. - When a resolved version is provided, the upper bound is computed from that version instead of the declared lower bound. :param requirement_text: Raw PEP 508 requirement string. :type requirement_text: str :return: Updated requirement text plus warnings. :rtype: tuple[str, list[str]] :raises DevtoolsError: If parsing fails. """ warnings: list[str] = [] stripped = requirement_text.strip() if not stripped: return requirement_text, warnings try: requirement = Requirement(stripped) except Exception as exc: raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc if requirement.url: warnings.append(f'Skipping URL requirement (cannot freeze): {requirement_text}') return requirement_text, warnings if has_frozen_upper_bound(requirement): return requirement_text, warnings lower = highest_lower_bound(requirement) if lower is None: warnings.append(f'Dependency has no lower bound and will not be frozen: {requirement_text}') return requirement_text, warnings upper = compute_upper_bound(resolved_version or lower) combined = SpecifierSet(f'{requirement.specifier},<{upper}' if str(requirement.specifier).strip() else f'<{upper}') requirement.specifier = combined return format_requirement(requirement), warnings def unfreeze_requirement(requirement_text: str) -> tuple[str, list[str]]: """ Remove a computed upper bound from a requirement string, if it matches the computed rule. Only the auto-generated upper bound ``<upper`` is removed; existing manual upper bounds are preserved. :param requirement_text: Raw PEP 508 requirement string. :type requirement_text: str :return: Updated requirement text plus warnings. :rtype: tuple[str, list[str]] :raises DevtoolsError: If parsing fails. """ warnings: list[str] = [] stripped = requirement_text.strip() if not stripped: return requirement_text, warnings try: requirement = Requirement(stripped) except Exception as exc: raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc if requirement.url: return requirement_text, warnings # Do not try to unfreeze already pinned/compatible requirements, or requirements that already # had an upper bound before freezing (we cannot safely distinguish manual bounds). specifiers = iter_specifiers(requirement) if any(spec.operator in {'~=', '==', '==='} for spec in specifiers): return requirement_text, warnings lower = highest_lower_bound(requirement) if lower is None: return requirement_text, warnings if any(spec.operator == '<=' for spec in specifiers): return requirement_text, warnings remaining: list[Specifier] = [] removed = False for spec in specifiers: if spec.operator == '<': removed = True continue remaining.append(spec) if not removed: return requirement_text, warnings requirement.specifier = SpecifierSet(','.join(str(spec) for spec in remaining)) return format_requirement(requirement), warnings
[docs] def update_dependency_list( dependencies: Any, *, transformer: Callable[[str], tuple[str, list[str]]], warnings: list[str], context: str, ) -> None: """ Update a TOML list of dependency strings in-place. :param dependencies: TOML array that contains dependency strings. :type dependencies: Any :param transformer: Callable applied to each dependency string. :type transformer: Any :param warnings: List of warnings to append to. :type warnings: list[str] :param context: Human-readable location for warnings. :type context: str """ for idx, item in enumerate(list(dependencies)): if not isinstance(item, str): warnings.append(f'Skipping non-string dependency entry in {context}: {item!r}') continue updated, item_warnings = transformer(item) warnings.extend(item_warnings) dependencies[idx] = updated
def freeze_pyproject_toml( toml_text: str, *, resolved_versions: dict[str, Version] | None = None, doc: tomlkit.TOMLDocument | None = None, ) -> tuple[str, list[str]]: """ Freeze dependencies in a ``pyproject.toml`` payload by adding upper bounds. The TOML structure is preserved via tomlkit; only dependency strings may be normalized. When ``resolved_versions`` is provided, the function uses those compiled versions as the basis for upper-bound computation. Otherwise the declared lower bounds are used as a fallback. :param toml_text: Raw TOML file content. :type toml_text: str :param resolved_versions: Optional mapping of dependency names to resolved versions. :type resolved_versions: dict[str, Version] | None :param doc: Optional pre-parsed TOML document to reuse when available. :type doc: tomlkit.TOMLDocument | None :return: Updated TOML plus warnings. :rtype: tuple[str, list[str]] :raises DevtoolsError: If TOML parsing fails. """ warnings: list[str] = [] if doc is None: doc = load_pyproject_doc(toml_text) project = doc.get('project') if project is None: raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.') def freeze_for_requirement(requirement_text: str) -> tuple[str, list[str]]: try: requirement = Requirement(requirement_text.strip()) except Exception as exc: raise DevtoolsError(f'Unable to parse dependency requirement "{requirement_text}".') from exc resolved_version = None if resolved_versions is not None: resolved = resolved_versions.get(requirement.name.lower()) if resolved is not None: resolved_version = str(resolved) return freeze_requirement(requirement_text, resolved_version=resolved_version) dependencies = project.get('dependencies') if dependencies is not None: update_dependency_list( dependencies, transformer=freeze_for_requirement, warnings=warnings, context='project.dependencies', ) optional = project.get('optional-dependencies') if optional is not None: for group, group_deps in optional.items(): update_dependency_list( group_deps, transformer=freeze_for_requirement, warnings=warnings, context=f'project.optional-dependencies.{group}', ) return tomlkit.dumps(doc), warnings def unfreeze_pyproject_toml( toml_text: str, *, baseline_toml_text: str | None = None, doc: tomlkit.TOMLDocument | None = None, baseline_doc: tomlkit.TOMLDocument | None = None, ) -> tuple[str, list[str]]: """ Unfreeze dependencies in a ``pyproject.toml`` payload by removing computed upper bounds. Only upper bounds matching the computed rule are removed; existing manual constraints remain. :param toml_text: Raw TOML file content. :type toml_text: str :param baseline_toml_text: Optional original TOML text captured before freezing. When provided, unfreezing is computed against the baseline to avoid altering dependencies that were already frozen before release. :type baseline_toml_text: str | None :param doc: Optional pre-parsed TOML document to reuse when available. :type doc: tomlkit.TOMLDocument | None :param baseline_doc: Optional pre-parsed baseline TOML document to reuse when available. :type baseline_doc: tomlkit.TOMLDocument | None :return: Updated TOML plus warnings. :rtype: tuple[str, list[str]] :raises DevtoolsError: If TOML parsing fails. """ warnings: list[str] = [] if doc is None: doc = load_pyproject_doc(toml_text) if baseline_toml_text is not None: if baseline_doc is None: baseline_doc = load_pyproject_doc(baseline_toml_text) current_project = doc.get('project') baseline_project = baseline_doc.get('project') if current_project is None or baseline_project is None: raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.') current_project['dependencies'] = baseline_project.get('dependencies', current_project.get('dependencies')) current_optional = current_project.get('optional-dependencies') baseline_optional = baseline_project.get('optional-dependencies') if current_optional is not None and baseline_optional is not None: for group in current_optional: if group in baseline_optional: current_optional[group] = baseline_optional[group] return tomlkit.dumps(doc), warnings project = doc.get('project') if project is None: raise DevtoolsError(f'Missing [project] table in {PYPROJECT_FILE}.') dependencies = project.get('dependencies') if dependencies is not None: update_dependency_list( dependencies, transformer=unfreeze_requirement, warnings=warnings, context='project.dependencies', ) optional = project.get('optional-dependencies') if optional is not None: for group, group_deps in optional.items(): update_dependency_list( group_deps, transformer=unfreeze_requirement, warnings=warnings, context=f'project.optional-dependencies.{group}', ) return tomlkit.dumps(doc), warnings
[docs] def summarize_freeze_changes(before: str, after: str) -> str: """ Build a short summary for dry-run output. :param before: Original TOML content. :type before: str :param after: Updated TOML content. :type after: str :return: Human-readable summary line. :rtype: str """ if before == after: return 'No dependency constraints would be updated.' before_lines = before.splitlines() after_lines = after.splitlines() return f'pyproject.toml would be updated ({len(before_lines)} -> {len(after_lines)} lines).'
def freeze_dependencies(*, dry_run: bool) -> str: # pragma: no cover """ Freeze dependencies by adding computed upper bounds in ``pyproject.toml``. :param dry_run: Whether command execution is disabled. :type dry_run: bool :return: Original ``pyproject.toml`` content captured before freeze. :rtype: str """ CONSOLE.print('Freezing dependencies (adding upper bounds) ...') before = PYPROJECT_FILE.read_text(encoding='utf-8') doc = load_pyproject_doc(before) supported_python_versions = project_python_versions_from_doc(doc) resolved_versions = collect_compiled_dependency_versions(supported_python_versions) after, warnings = freeze_pyproject_toml(before, resolved_versions=resolved_versions, doc=doc) for warning in warnings: CONSOLE.print(f'WARNING: {warning}') if dry_run: CONSOLE.print(summarize_freeze_changes(before, after)) return before if before != after: PYPROJECT_FILE.write_text(after, encoding='utf-8') return before def unfreeze_dependencies(original_pyproject_toml: str, *, dry_run: bool) -> None: # pragma: no cover """ Unfreeze dependencies by removing computed upper bounds in ``pyproject.toml``. :param dry_run: Whether command execution is disabled. :type dry_run: bool """ CONSOLE.print('Unfreezing dependencies (removing computed upper bounds) ...') before = PYPROJECT_FILE.read_text(encoding='utf-8') doc = load_pyproject_doc(before) baseline_doc = load_pyproject_doc(original_pyproject_toml) after, warnings = unfreeze_pyproject_toml( before, baseline_toml_text=original_pyproject_toml, doc=doc, baseline_doc=baseline_doc, ) for warning in warnings: CONSOLE.print(f'WARNING: {warning}') if dry_run: CONSOLE.print(summarize_freeze_changes(before, after)) return if before != after: PYPROJECT_FILE.write_text(after, encoding='utf-8') def update_requirements_and_readme(*, dry_run: bool) -> None: # pragma: no cover """ Update the requirements RST files and README.rst. :param dry_run: Whether command execution is disabled. :type dry_run: bool """ CONSOLE.print('Updating requirements and README.rst...') cmd(['hatch', 'run', 'dev:multidoc', 'requirements', '--update-readme', *REQUIREMENTS_GROUPS], dry_run=dry_run)