# Copyright 2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""Application layer controller for steering configuration builds.
:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
:Description: Keep GUI and execution logic isolated while managing SteeringBuilder state.
"""
from __future__ import annotations
from copy import deepcopy
from functools import wraps
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Concatenate, Iterable, Mapping, ParamSpec, TypeVar, cast
from mafw.mafw_errors import MAFwException, ValidationIssue
from mafw.plugin_manager import LoadedPlugins, MAFwPluginManager, get_plugin_manager
from mafw.steering.builder import SteeringBuilder, ValidationLevel
from mafw.steering.models import (
FilterConfig,
GroupConfig,
ParameterConfig,
ParameterSchemaStatus,
ParameterSource,
ProcessorConfig,
ProcessorSchemaStatus,
)
from mafw.steering_gui.models.pipeline import PipelineItem, ProcessorPipeline
from mafw.tools.generics import deep_update
from mafw.tools.regexp import extract_protocol, parse_processor_name
P = ParamSpec('P')
"""Type variable capturing the positional parameters of the wrapped method."""
R = TypeVar('R')
"""Type variable capturing the return type of the wrapped method."""
[docs]
def recorded_mutation(
method: Callable[Concatenate['SteeringController', P], R],
) -> Callable[Concatenate['SteeringController', P], R]:
"""Decorator that records the controller state before mutating operations."""
@wraps(method)
def wrapper(self: 'SteeringController', /, *args: P.args, **kwargs: P.kwargs) -> R:
self._record_state()
result = method(self, *args, **kwargs)
self._mark_dirty()
return result
return wrapper
[docs]
class SteeringControllerError(MAFwException):
"""Raised when the controller is asked to perform an invalid operation."""
# TODO:
# Consider moving this function out of here to make valid in general and also used by the execution framework.
[docs]
def build_database_connection_configuration(config: Mapping[str, Any]) -> dict[str, Any]:
"""Return a normalized configuration dict for connection checks.
The returned dictionary always contains ``URL`` plus backend-specific keys derived from
``extract_protocol``.
"""
working = deepcopy(dict(config))
working.pop('backend', None)
url_value = working.get('url') or working.get('URL')
if not url_value:
raise ValueError('Database URL is required to build the connection configuration.')
protocol = extract_protocol(str(url_value)) or ''
normalized: dict[str, Any] = {'URL': url_value}
if protocol == 'sqlite':
pragmas = working.get('pragmas') or {}
normalized['pragmas'] = deepcopy(pragmas)
elif protocol == 'mysql':
if 'user' in working:
normalized['user'] = working['user']
if 'read_default_file' in working:
normalized['read_default_file'] = working['read_default_file']
elif protocol == 'postgresql':
if 'user' in working:
normalized['user'] = working['user']
return normalized
[docs]
class DirtyStateSignal:
"""Lightweight notifier for controller dirty-state transitions."""
def __init__(self) -> None:
self._listeners: list[Callable[[bool], None]] = []
[docs]
def connect(self, callback: Callable[[bool], None]) -> None:
"""Subscribe to dirty-state change notifications."""
self._listeners.append(callback)
[docs]
def emit(self, value: bool) -> None:
"""Notify every subscriber about the new dirty value."""
for listener in tuple(self._listeners):
listener(value)
[docs]
class SteeringController:
"""Controller orchestrating SteeringBuilder without GUI or execution ties."""
_max_history: int = 50
def __init__(
self,
builder: SteeringBuilder | None = None,
*,
max_history: int | None = None,
plugin_manager: MAFwPluginManager | None = None,
) -> None:
self._builder = builder or SteeringBuilder()
self._history: list[SteeringBuilder] = []
self._future: list[SteeringBuilder] = []
self._dirty = False
self._current_file: Path | None = None
self.dirty_state_signal = DirtyStateSignal()
self._validation_level: ValidationLevel | None = ValidationLevel.SEMANTIC
self._plugin_manager = plugin_manager or get_plugin_manager()
self.plugins: LoadedPlugins | None = None
if max_history is not None:
self._max_history = max_history
self._initial_snapshot: SteeringBuilder = deepcopy(self._builder)
[docs]
def load_plugins(self) -> LoadedPlugins:
"""Load the asynchronous plugin bundles and cache the result for GUI consumers.
:return: Plugins container with available processors, database modules, and UIs
:rtype: LoadedPlugins
"""
self.plugins = self._plugin_manager.load_plugins(('processors', 'db_modules', 'ui'))
return self.plugins
[docs]
def available_processor_count(self) -> int | None:
"""Return how many processor plugins have been loaded (or ``None`` until available)."""
return len(self.plugins.processor_list) if self.plugins else None
[docs]
def available_processor_names(self) -> list[str]:
"""Return the available processor names from loaded plugins."""
if self.plugins is None:
return []
return sorted(self.plugins.processor_dict.keys())
[docs]
def available_db_module_count(self) -> int | None:
"""Return how many database modules have been loaded (or ``None`` until available)."""
return len(self.plugins.db_model_modules) if self.plugins else None
[docs]
def available_ui_count(self) -> int | None:
"""Return how many user interface plugins have been loaded (or ``None`` until available)."""
return len(self.plugins.ui_list) if self.plugins else None
def _record_state(self) -> None:
snapshot = self._clone_builder()
self._push_history(snapshot)
self._future.clear()
def _push_history(self, snapshot: SteeringBuilder) -> None:
self._history.append(snapshot)
while len(self._history) > self._max_history:
self._history.pop(0)
def _clone_builder(self) -> SteeringBuilder:
return deepcopy(self._builder)
def _set_dirty_state(self, value: bool) -> None:
if self._dirty == value:
return
self._dirty = value
self.dirty_state_signal.emit(value)
def _mark_dirty(self) -> None:
self._set_dirty_state(True)
[docs]
def _mark_clean(self) -> None:
"""Mark the controller as clean and notify subscribers."""
self._set_dirty_state(False)
@property
def dirty(self) -> bool:
"""Expose whether the controller currently tracks unsaved changes."""
return self._dirty
[docs]
def validation_level(self) -> ValidationLevel | None:
"""Return the validation level currently configured on the controller."""
return self._validation_level
[docs]
def set_validation_level(self, level: ValidationLevel | None) -> None:
"""Keep track of the validation level preference without affecting builder data."""
self._validation_level = level
[docs]
@classmethod
def from_file(cls, path: Path | str) -> 'SteeringController':
"""Instantiate a controller from an existing steering file."""
controller = cls(SteeringBuilder.from_toml(path))
controller._current_file = Path(path)
controller._mark_clean()
return controller
[docs]
def load(self, path: Path | str) -> None:
"""Load steering metadata from disk, replace the current builder, and rebase the reset snapshot."""
self._record_state()
builder = SteeringBuilder.from_toml(path)
self._builder = builder
self._initial_snapshot = deepcopy(builder)
self._current_file = Path(path)
self._mark_clean()
[docs]
def save(self, *, skip_validation: bool = False) -> None:
"""Persist the current steering state to the associated file path."""
target = self._require_current_file()
if not skip_validation and self._validation_level is not None:
self.ensure_valid(self._validation_level)
self._write_to_path(target)
self._mark_clean()
[docs]
def save_as(self, path: Path | str, *, skip_validation: bool = False) -> None:
"""Persist the current steering state to a new file path and track it."""
target = Path(path)
if not skip_validation and self._validation_level is not None:
self.ensure_valid(self._validation_level)
self._write_to_path(target)
self._current_file = target
self._mark_clean()
[docs]
def current_file(self) -> Path | None:
"""Return the file currently associated with persistence operations."""
return self._current_file
[docs]
def _write_to_path(self, path: Path) -> None:
"""Serialize the builder to the provided path."""
self._builder.write(path)
[docs]
def export_to_path(self, path: Path | str) -> None:
"""Serialize the builder to the provided path without validation."""
target = Path(path)
self._builder.write(target)
[docs]
@recorded_mutation
def replace_builder(self, builder: SteeringBuilder) -> None:
"""Replace the current builder with the provided instance."""
self._builder = builder
def _require_current_file(self) -> Path:
if self._current_file is None:
raise ValueError('No file associated with the controller yet.')
return self._current_file
[docs]
def validate(self, validation_level: ValidationLevel = ValidationLevel.SEMANTIC) -> list[ValidationIssue]:
"""Run validation and return every detected issue."""
return self._builder.validate(validation_level)
[docs]
def ensure_valid(self, validation_level: ValidationLevel = ValidationLevel.SEMANTIC) -> None:
"""Raise the first validation issue or return silently when the builder is valid."""
issues = self.validate(validation_level)
if issues:
raise issues[0]
[docs]
def can_undo(self) -> bool:
"""Return whether the controller can revert to a previously recorded state."""
return bool(self._history)
[docs]
def can_redo(self) -> bool:
"""Return whether a previously undone state can be re-applied."""
return bool(self._future)
[docs]
def undo(self) -> None:
"""Revert the builder to the previous snapshot."""
if not self.can_undo():
raise SteeringControllerError('No historical state to undo.')
current = self._clone_builder()
previous = self._history.pop()
self._future.append(current)
self._builder = previous
[docs]
def redo(self) -> None:
"""Re-apply the last undone snapshot."""
if not self.can_redo():
raise SteeringControllerError('No future state to redo.')
current = self._clone_builder()
next_snapshot = self._future.pop()
self._push_history(current)
self._builder = next_snapshot
[docs]
def reset(self) -> None:
"""Restore the controller to its initial state and clear history."""
self._builder = SteeringBuilder()
self._history.clear()
self._future.clear()
self._initial_snapshot = deepcopy(self._builder)
self._current_file = None
self._validation_level = ValidationLevel.SEMANTIC
self._mark_clean()
# Globals helpers
[docs]
def get_processors_to_run(self) -> list[str]:
"""Return a copy of the ordered processor references."""
return list(self._builder.globals.processors_to_run)
[docs]
@recorded_mutation
def set_processors_to_run(self, processors: Iterable[str]) -> None:
"""Overwrite the processors_to_run list while keeping a copy of the iterable."""
self._builder.set_processors_to_run(processors)
[docs]
@recorded_mutation
def add_processor_to_run_list(self, base_name: str, replica: str | None = None) -> None:
"""Add a processor or replica to the execution list and ensure its section exists."""
self._builder.add_processor(base_name, replica)
[docs]
@recorded_mutation
def add_processors_with_defaults(self, base_names: Iterable[str]) -> None:
"""Append processors to the run list and set their parameters to schema defaults."""
if self.plugins is None:
raise SteeringControllerError('Plugins have not been loaded.')
for base_name in base_names:
full_name, replica = self._unique_processor_name(base_name)
self._builder.add_processor(base_name, replica)
config = self._builder.get_processor_config(full_name)
processor_cls = self.plugins.processor_dict.get(base_name)
if processor_cls is None:
config.processor_status = ProcessorSchemaStatus.UNKNOWN
continue
config.processor_status = ProcessorSchemaStatus.OK
schema = processor_cls.processor_schema()
for param_schema in schema.parameters:
if param_schema.name in config.parameters:
continue
default_value = self._normalize_schema_default(param_schema.default)
config.parameters[param_schema.name] = ParameterConfig(
name=param_schema.name,
value=default_value,
default=default_value,
source=ParameterSource.DEFAULT,
status=ParameterSchemaStatus.OK,
help=param_schema.help,
type=param_schema.annotation,
active_override=True,
)
[docs]
@recorded_mutation
def remove_pipeline_entries(self, names: Iterable[str]) -> None:
"""Remove pipeline entries from the top-level processors_to_run list."""
for name in names:
self._builder.remove_processor(name)
[docs]
def _unique_processor_name(self, base_name: str) -> tuple[str, str | None]:
"""Return a unique processor name and replica identifier for the requested base."""
run_list = self.get_processors_to_run()
if base_name not in run_list:
return base_name, None
counter = 1
while True:
replica = str(counter)
candidate = f'{base_name}#{replica}'
if candidate not in run_list:
return candidate, replica
counter += 1
[docs]
@staticmethod
def _normalize_schema_default(value: Any) -> Any:
"""Convert schema defaults into serializable values."""
if isinstance(value, PathLike):
return str(value)
return value
[docs]
@recorded_mutation
def remove_processor_from_run_list(self, full_name: str) -> None:
"""Remove a processor reference from the execution order."""
self._builder.remove_processor(full_name)
[docs]
@recorded_mutation
def move_processor_in_run_list(self, full_name: str, position: int) -> None:
"""Move a scheduled processor to a concrete slot in the execution order."""
run_list = self._builder.globals.processors_to_run
if full_name not in run_list:
raise SteeringControllerError(f"Processor '{full_name}' is not scheduled to run.")
if position < 0 or position >= len(run_list):
raise SteeringControllerError('Target position is out of range.')
run_list.remove(full_name)
run_list.insert(position, full_name)
[docs]
def get_analysis_name(self) -> str | None:
"""Return the configured analysis_name without exposing the builder."""
return self._builder.globals.analysis_name
[docs]
def set_analysis_name(self, value: str | None) -> None:
"""Update the top-level analysis_name setting."""
if self._builder.globals.analysis_name == value:
return
self._record_state()
self._builder.set_analysis_name(value)
self._mark_dirty()
[docs]
def get_analysis_description(self) -> str | None:
"""Return the configured analysis_description safely."""
return self._builder.globals.analysis_description
[docs]
def set_analysis_description(self, value: str | None) -> None:
"""Update the top-level analysis_description setting."""
if self._builder.globals.analysis_description == value:
return
self._record_state()
self._builder.set_analysis_description(value)
self._mark_dirty()
[docs]
def get_new_only(self) -> bool | None:
"""Return the current new_only flag."""
return self._builder.globals.new_only
[docs]
def set_new_only(self, value: bool | None) -> None:
"""Update the new_only flag."""
if self._builder.globals.new_only == value:
return
self._record_state()
self._builder.set_new_only(value)
self._mark_dirty()
[docs]
def get_create_standard_tables(self) -> bool | None:
"""Return the create_standard_tables flag."""
return self._builder.globals.create_standard_tables
[docs]
def set_create_standard_tables(self, value: bool | None) -> None:
"""Update the create_standard_tables flag."""
if self._builder.globals.create_standard_tables == value:
return
self._record_state()
self._builder.set_create_standard_tables(value)
self._mark_dirty()
[docs]
def get_globals(self) -> dict[str, Any]:
"""Return the globals metadata used by the UI."""
return {
'analysis_name': self._builder.globals.analysis_name,
'description': self._builder.globals.analysis_description,
'new_only': self._builder.globals.new_only,
'create_standard_tables': self._builder.globals.create_standard_tables,
}
# Database helpers
[docs]
def get_db_url(self) -> str | None:
"""Return the configured database URL."""
return self._builder.db_config.url
[docs]
@recorded_mutation
def set_db_url(self, value: str | None) -> None:
"""Update the DB URL while keeping builder semantics."""
self._builder.set_db_url(value)
[docs]
def get_db_pragmas(self) -> dict[str, Any]:
"""Return a copy of the DB pragmas map."""
return deepcopy(self._builder.db_config.pragmas)
[docs]
@recorded_mutation
def set_db_pragmas(self, pragmas: Mapping[str, Any]) -> None:
"""Replace the DB pragmas map with a copy of the provided mapping."""
self._builder.set_db_pragmas(dict(pragmas))
[docs]
def get_db_attributes(self) -> dict[str, Any]:
"""Return a copy of the database attributes map."""
return deepcopy(self._builder.db_config.attributes)
[docs]
def is_db_configuration_enabled(self) -> bool:
"""Return whether the DBConfiguration section will appear in serialized output."""
return self._builder.is_db_configuration_enabled()
[docs]
def _assemble_connection_source(self, overrides: dict[str, Any] | None = None) -> dict[str, Any]:
"""Combine controller-managed attributes with override input."""
source: dict[str, Any] = dict(self.get_db_attributes())
if overrides and 'pragmas' in overrides:
source['pragmas'] = deepcopy(overrides['pragmas'])
else:
source['pragmas'] = deepcopy(self.get_db_pragmas())
final_url = None
if overrides:
if overrides.get('url'):
final_url = overrides['url']
elif overrides.get('URL'):
final_url = overrides['URL']
if final_url is None:
final_url = self.get_db_url()
if final_url:
source['url'] = final_url
source['URL'] = final_url
if overrides:
extras = dict(overrides)
extras.pop('pragmas', None)
extras.pop('backend', None)
source.update(extras)
return source
[docs]
def build_connection_test_task(self, config: dict[str, Any] | None = None) -> Callable[[], bool]:
"""Return a callable that validates the current database settings via Peewee.
:param config: Optional overrides supplied by the GUI connection tester.
:return: Callable that returns True when the connection is verified; it raises on failure.
"""
connection_source = self._assemble_connection_source(config)
try:
connection_config = build_database_connection_configuration(connection_source)
except ValueError as exc:
raise SteeringControllerError(str(exc)) from exc
def task() -> bool:
import peewee
from playhouse.db_url import connect
assert peewee is not None
url = connection_config['URL']
kwargs = {key: value for key, value in connection_config.items() if key != 'URL'}
database = connect(url, **kwargs) # type: ignore[no-untyped-call]
database.connect()
database.close()
return True
return task
[docs]
@recorded_mutation
def set_db_attribute(self, key: str, value: Any | None) -> None:
"""Update a single entry stored under DBConfiguration."""
self._builder.set_db_attribute(key, value)
[docs]
@recorded_mutation
def enable_db_configuration(self) -> None:
"""Enable the DBConfiguration section for serialization."""
self._builder.enable_db_configuration()
[docs]
@recorded_mutation
def disable_db_configuration(self) -> None:
"""Disable the DBConfiguration section so it is omitted when written."""
self._builder.disable_db_configuration()
# UI helpers
[docs]
def get_ui_interface(self) -> str:
"""Return the configured UI interface identifier."""
return self._builder.ui_config.interface
[docs]
@recorded_mutation
def set_ui_interface(self, interface: str) -> None:
"""Update the selected UI interface (neutral placeholder, no GUI imports)."""
self._builder.set_ui_interface(interface)
# Processor helpers
[docs]
def list_processors(self) -> list[str]:
"""Return the known processor section names."""
return self._builder.list_processors()
[docs]
def list_groups(self) -> list[str]:
"""Return the declared group section names."""
return self._builder.list_groups()
[docs]
def build_pipeline(self, *, progress_callback: Callable[[int], None] | None = None) -> ProcessorPipeline:
"""Build the in-memory pipeline tree for the steering GUI."""
run_list = tuple(self.get_processors_to_run())
groups = self._builder.groups
processors = self._builder.processors
def _resolve_processor_config(full_name: str) -> ProcessorConfig:
base_name, replica = parse_processor_name(full_name)
base_config = processors.get(base_name)
replica_config = processors.get(full_name)
if replica is None:
# this is a processor without replica name
if base_config is None:
# We do not have a section for this processor.
# Return an empty ProcessorConfig and let's hope that the Schema reconciliation
# will fill the holes.
return ProcessorConfig(name=full_name)
# this is the standard case of a processor without replica name
# all specified parameters are from CONFIG
config = ProcessorConfig(
name=base_config.name,
parameters=deepcopy(base_config.parameters),
filters=deepcopy(base_config.filters),
new_only=base_config.new_only,
inheritance=base_config.inheritance,
has_filter_root=base_config.has_filter_root,
)
config.filter_logic = base_config.filter_logic
return config
if replica_config is None and base_config is None:
# The processor has a replica name, but we did not get neither the replica
# specific nor the base configuration.
# Return an empty config and hope that the schema reconciliation will fix it
return ProcessorConfig(name=full_name)
if replica_config is None and base_config is not None:
# The processor has a replica name and only a base config.
# Return the processor config built using the base parameters and mark
# all of them as inherited.
config = ProcessorConfig(
name=full_name,
parameters=deepcopy(base_config.parameters),
filters=deepcopy(base_config.filters),
new_only=base_config.new_only,
has_filter_root=base_config.has_filter_root,
)
config.filter_logic = (
base_config.filter_logic
) # filter_logic is a property and it will automatically set the ast value
for param in config.parameters.values():
param.source = ParameterSource.INHERITED
return config
if replica_config is not None and replica_config.inheritance is False:
# The processor has a replica name and a replica specific configuration, but
# without inheritance. So we will use only the replica specific configuration
# and set all parameters as config.
config = ProcessorConfig(
name=full_name,
parameters=deepcopy(replica_config.parameters),
filters=deepcopy(replica_config.filters),
new_only=replica_config.new_only,
inheritance=replica_config.inheritance,
has_filter_root=replica_config.has_filter_root,
)
config.filter_logic = (
replica_config.filter_logic
) # filter_logic is a property and it will automatically set the ast value
for param in config.parameters.values():
param.source = ParameterSource.CONFIG
return config
# This is the most complex case:
# - The processor has a replica
# - There are configuration for both the base and the replica
# - The replica allows for normal inheritance (True)
# The output is the base configuration updated with replica specific parameters.
# ParameterSource is CONFIG for replica specific and INHERITED for base.
base_parameters: dict[str, ParameterConfig] = {}
base_filters: dict[str, list[FilterConfig]] = {}
base_filter_logic: str | None = None
base_new_only: bool | None = None
base_has_filter_root = True
if base_config is not None:
base_parameters = deepcopy(base_config.parameters)
for param in base_parameters.values():
param.source = ParameterSource.INHERITED
base_filters = deepcopy(base_config.filters)
base_filter_logic = base_config.filter_logic
base_new_only = base_config.new_only
base_has_filter_root = base_config.has_filter_root
if TYPE_CHECKING:
assert replica_config is not None
# Deep update parameters manually because it's a dict of objects
merged_parameters = base_parameters
for key, replica_param in replica_config.parameters.items():
param_copy = deepcopy(replica_param)
param_copy.source = ParameterSource.CONFIG
merged_parameters[key] = param_copy
merged_filters = deep_update(base_filters, replica_config.filters, copy_first=False)
filter_logic = replica_config.filter_logic if replica_config.filter_logic is not None else base_filter_logic
new_only = replica_config.new_only if replica_config.new_only is not None else base_new_only
has_filter_root = replica_config.has_filter_root if replica_config.filters else base_has_filter_root
config = ProcessorConfig(
name=full_name,
parameters=merged_parameters,
filters=merged_filters,
new_only=new_only,
inheritance=replica_config.inheritance,
has_filter_root=has_filter_root,
)
config.filter_logic = filter_logic # filter_logic is a property and it will automatically set the ast value
return config
def _build_item(item: str, seen_groups: set[str], parent: PipelineItem | None) -> PipelineItem:
if item in groups:
group = groups[item]
node = PipelineItem(config=group, parent=parent)
if item in seen_groups:
return node
child_nodes = [_build_item(sub_item, {*seen_groups, item}, node) for sub_item in group.processors]
node.children = child_nodes
return node
config = _resolve_processor_config(item)
# --- Schema Alignment Layer ---
base_name, _ = parse_processor_name(item)
processor_cls = self.plugins.processor_dict.get(base_name) if self.plugins else None
config_param_names = set(config.parameters.keys())
if processor_cls is None:
# Processor not found in plugins
config.processor_status = ProcessorSchemaStatus.UNKNOWN
for param in config.parameters.values():
param.status = ParameterSchemaStatus.UNKNOWN
else:
config.processor_status = ProcessorSchemaStatus.OK
schema = processor_cls.processor_schema()
if schema.filter:
models = [schema.filter.root_model] + list(schema.filter.allowed_models)
config.schema_filter_models = [m.__name__ for m in models]
# 1. Check existing parameters in config
for param_name, param_config in list(config.parameters.items()):
try:
param_schema = schema.get_parameter(param_name)
# Parameter exists in both
param_config.status = ParameterSchemaStatus.OK
param_config.help = param_schema.help
param_config.type = param_schema.annotation
param_config.default = self._normalize_schema_default(param_schema.default)
except KeyError:
# Parameter only in config
param_config.status = ParameterSchemaStatus.DEPRECATED
param_config.help = 'Unknown'
param_config.type = Any
param_config.default = None
# 2. Check parameters only in schema (NEW)
for param_schema in schema.parameters:
if param_schema.name not in config_param_names:
default_value = self._normalize_schema_default(param_schema.default)
config.parameters[param_schema.name] = ParameterConfig(
name=param_schema.name,
value=default_value,
default=default_value,
source=ParameterSource.DEFAULT,
status=ParameterSchemaStatus.NEW,
help=param_schema.help,
type=param_schema.annotation,
active_override=False,
)
for param in config.parameters.values():
if param.name in config_param_names:
param.active_override = param.source != ParameterSource.INHERITED
else:
param.active_override = False
return PipelineItem(config=config, parent=parent)
items: list[PipelineItem] = []
total = len(run_list)
for index, item in enumerate(run_list):
items.append(_build_item(item, set(), None))
if progress_callback:
progress_callback(int((index + 1) / total * 100))
return ProcessorPipeline(items=items)
[docs]
@recorded_mutation
def move_pipeline_entry(
self,
entry_name: str,
entry_is_group: bool,
source_group: str | None,
target_group: str | None,
position: int,
) -> None:
"""Move a pipeline entry across parents and adjust execution ordering."""
source_list = self._resolve_pipeline_list(source_group)
target_list = self._resolve_pipeline_list(target_group)
if entry_name not in source_list:
raise SteeringControllerError(f"Pipeline entry '{entry_name}' not found in the source list.")
if entry_is_group and target_group is not None:
if entry_name not in self._builder.groups:
raise SteeringControllerError(f"Group '{entry_name}' does not exist.")
if self._would_create_group_cycle(entry_name, target_group):
raise SteeringControllerError('Moving the group would create a cycle.')
if source_list is target_list:
if position < 0 or position > len(source_list):
raise SteeringControllerError('Target position is out of range.')
source_list.remove(entry_name)
# If the item was before the target, the target index shifted
insert_pos = position if position < len(source_list) else len(source_list)
source_list.insert(insert_pos, entry_name)
return
if position < 0 or position > len(target_list):
raise SteeringControllerError('Target position is out of range.')
source_list.remove(entry_name)
target_list.insert(position, entry_name)
def _resolve_pipeline_list(self, group: str | None) -> list[str]:
if group is None:
return self._builder.globals.processors_to_run
if group not in self._builder.groups:
raise SteeringControllerError(f"Group '{group}' does not exist.")
return self._builder.groups[group].processors
def _would_create_group_cycle(self, entry_group: str, target_group: str) -> bool:
if entry_group == target_group:
return True
visited: set[str] = set()
stack = [entry_group]
while stack:
current = stack.pop()
if current in visited:
continue
visited.add(current)
group = self._builder.groups.get(current)
if group is None:
continue
for child in group.processors:
if child == target_group:
return True
if child in self._builder.groups:
stack.append(child)
return False
[docs]
def get_processor_snapshot(self, full_name: str) -> ProcessorConfig:
"""Return a deep copy snapshot of a processor configuration."""
config = self._require_processor(full_name)
res = ProcessorConfig(
name=config.name,
parameters=deepcopy(config.parameters),
processor_status=config.processor_status,
filters=deepcopy(config.filters),
new_only=config.new_only,
inheritance=config.inheritance,
has_filter_root=config.has_filter_root,
)
res.filter_logic = config.filter_logic
return res
[docs]
@recorded_mutation
def set_processor_parameter(self, full_name: str, key: str, value: Any) -> None:
"""Update a single processor parameter."""
self._builder.set_parameter(full_name, key, value)
[docs]
@recorded_mutation
def remove_processor_parameter(self, full_name: str, key: str) -> None:
"""Remove a processor parameter override if present."""
self._builder.remove_parameter(full_name, key)
[docs]
@recorded_mutation
def clear_processor_parameters(self, full_name: str) -> None:
"""Clear every parameter override for a processor."""
self._builder.clear_parameters(full_name)
[docs]
@recorded_mutation
def reset_processor_parameters(self, full_name: str) -> None:
"""Reset the processor parameters to their default values without dropping the overrides."""
config = self._require_processor(full_name)
defaults: dict[str, Any] = {}
if self.plugins is not None:
base_name, _ = parse_processor_name(full_name)
processor_cls = self.plugins.processor_dict.get(base_name)
if processor_cls is not None:
schema = processor_cls.processor_schema()
defaults = {entry.name: entry.default for entry in schema.parameters}
for param in config.parameters.values():
schema_default = defaults.get(param.name, param.default)
normalized = self._normalize_schema_default(schema_default) if schema_default is not None else None
default_value = deepcopy(normalized) if normalized is not None else None
param.value = default_value
param.source = ParameterSource.DEFAULT
[docs]
@recorded_mutation
def remove_pipeline_entry(self, entry_name: str, source_group: str | None) -> None:
"""Remove a pipeline entry from the top-level or a group list."""
if source_group is None:
self._builder.remove_processor(entry_name)
return
group = self._require_group(source_group)
if entry_name in group.processors:
group.processors.remove(entry_name)
[docs]
@recorded_mutation
def rename_processor_entry(self, old_name: str, new_name: str, source_group: str | None) -> None:
"""Rename a processor reference and keep its configuration aligned."""
if old_name == new_name:
return
source_list = self._resolve_pipeline_list(source_group)
if old_name not in source_list:
raise SteeringControllerError(f"Pipeline entry '{old_name}' not found in the source list.")
if new_name in source_list:
raise SteeringControllerError(f"Pipeline entry '{new_name}' already exists in the source list.")
old_base, old_replica = parse_processor_name(old_name)
new_base, new_replica = parse_processor_name(new_name)
if old_base != new_base:
raise SteeringControllerError('Renaming processors across different base names is not supported.')
source_list[source_list.index(old_name)] = new_name
processors = self._builder.processors
if old_name not in processors:
self._builder._ensure_processor(new_name)
return
if old_replica is None and new_replica is not None:
# Base -> replica: copy base config to replica and keep the base config intact.
if new_name in processors:
raise SteeringControllerError(f"Processor configuration '{new_name}' already exists.")
base_config = processors[old_name]
clone = deepcopy(base_config)
clone.name = new_name
processors[new_name] = clone
return
if new_name in processors and new_name != old_name:
raise SteeringControllerError(f"Processor configuration '{new_name}' already exists.")
config = processors.pop(old_name)
config.name = new_name
processors[new_name] = config
[docs]
@recorded_mutation
def add_replica(self, base_name: str, replica: str) -> None:
"""Ensure a replica section exists without altering parameters."""
self._builder.add_replica(base_name, replica)
[docs]
@recorded_mutation
def set_replica_inheritance(self, replica_full_name: str, value: bool | None) -> None:
"""Toggle replica inheritance."""
self._builder.set_replica_inheritance(replica_full_name, value)
[docs]
@recorded_mutation
def set_processor_new_only(self, full_name: str, value: bool | None) -> None:
"""Set the __new_only__ override for a processor section."""
self._builder.set_processor_new_only(full_name, value)
[docs]
@recorded_mutation
def set_processor_filters(
self, full_name: str, filters: Mapping[str, Iterable[FilterConfig]], logic: str | None
) -> None:
"""Replace the whole filter dictionary and logic for a processor."""
# Deepcopy to ensure isolation
# Convert Iterable back to list if needed, deepcopy should handle it if it's already a list or dict of lists
copied_filters = deepcopy(dict(filters))
self._builder.set_processor_filters(full_name, cast(dict[str, list[FilterConfig]], copied_filters), logic)
[docs]
@recorded_mutation
def set_filter_config(self, full_name: str, model_name: str, config: Mapping[str, Any]) -> None:
"""Replace a filter model configuration with a copy of the provided mapping."""
self._builder.set_filter_config(full_name, model_name, dict(config))
[docs]
@recorded_mutation
def set_filter_field(self, full_name: str, model_name: str, field: str, value: Any) -> None:
"""Set or update a single filter field."""
self._builder.set_filter_field(full_name, model_name, field, value)
[docs]
@recorded_mutation
def remove_filter(self, full_name: str, model_name: str) -> None:
"""Delete a filter model definition."""
self._builder.remove_filter(full_name, model_name)
[docs]
@recorded_mutation
def set_filter_logic(self, full_name: str, logic: str | None) -> None:
"""Update the __logic__ string for a processor's filters."""
self._builder.set_filter_logic(full_name, logic)
[docs]
@recorded_mutation
def set_filter_conditionals(
self, full_name: str, model_name: str, conditionals: list[Mapping[str, Any]] | None
) -> None:
"""Assign __conditional__ blocks to a filter model."""
if conditionals is None:
formatted: list[dict[str, Any]] | None = None
else:
formatted = [dict(item) for item in conditionals]
self._builder.set_filter_conditionals(full_name, model_name, formatted)
# Group helpers
[docs]
@recorded_mutation
def add_group(self, name: str, processors: Iterable[str], description: str | None = None) -> None:
"""Declare a new group section."""
self._builder.add_group(name, processors, description)
[docs]
@recorded_mutation
def add_group_to_run_list(self, name: str) -> None:
"""Append a group name to the processors_to_run list if missing."""
if name not in self._builder.groups:
raise SteeringControllerError(f"Group '{name}' is not defined.")
if name not in self._builder.globals.processors_to_run:
self._builder.globals.processors_to_run.append(name)
[docs]
@recorded_mutation
def add_group_entry_to_group(self, target_group: str, entry_name: str) -> None:
"""Append a processor or group reference to a group list."""
group = self._require_group(target_group)
if entry_name in group.processors:
return
if entry_name in self._builder.groups and self._would_create_group_cycle(entry_name, target_group):
raise SteeringControllerError('Adding the group would create a cycle.')
group.processors.append(entry_name)
[docs]
@recorded_mutation
def remove_group(self, name: str) -> None:
"""Delete a named group section."""
self._builder.remove_group(name)
[docs]
@recorded_mutation
def rename_group(self, old_name: str, new_name: str) -> None:
"""Rename a group and update all pipeline references."""
if old_name == new_name:
return
normalized = new_name.strip()
if not normalized:
raise SteeringControllerError('Group name cannot be empty.')
if normalized in self._builder.groups:
raise SteeringControllerError(f"Group '{normalized}' already exists.")
if normalized in self._builder.processors:
raise SteeringControllerError(f"Processor '{normalized}' already exists.")
group = self._require_group(old_name)
self._builder.groups.pop(old_name)
group.name = normalized
self._builder.groups[normalized] = group
self._replace_pipeline_entry(self._builder.globals.processors_to_run, old_name, normalized)
for group_config in self._builder.groups.values():
self._replace_pipeline_entry(group_config.processors, old_name, normalized)
[docs]
@recorded_mutation
def set_group_description(self, name: str, description: str | None) -> None:
"""Set or clear the description for a group."""
group = self._require_group(name)
normalized = description.strip() if isinstance(description, str) else None
group.description = normalized or None
[docs]
@recorded_mutation
def set_group_processors(self, name: str, processors: Iterable[str]) -> None:
"""Replace the processors list for the given group."""
group = self._require_group(name)
group.processors = [str(proc) for proc in processors]
[docs]
def get_group_snapshot(self, name: str) -> GroupConfig:
"""Return a copy of a group configuration."""
group = self._require_group(name)
return GroupConfig(
name=group.name,
processors=list(group.processors),
description=group.description,
attributes=deepcopy(group.attributes),
)
[docs]
@recorded_mutation
def set_group_attributes(self, name: str, attributes: Mapping[str, Any]) -> None:
"""Replace a group's attributes with a new copy."""
group = self._require_group(name)
group.attributes = dict(attributes)
[docs]
def get_group_attributes(self, name: str) -> dict[str, Any]:
"""Return a copy of a group's attributes."""
return deepcopy(self._require_group(name).attributes)
[docs]
@recorded_mutation
def add_processors_to_group_with_defaults(self, group_name: str, base_names: Iterable[str]) -> None:
"""Append processors to a group and set their parameters to schema defaults."""
if self.plugins is None:
raise SteeringControllerError('Plugins have not been loaded.')
group = self._require_group(group_name)
existing = set(group.processors)
for base_name in base_names:
full_name, replica = self._unique_processor_name_for_group(base_name, existing)
if full_name in existing:
continue
if replica is None:
self._builder._ensure_processor(full_name)
else:
self._builder.add_replica(base_name, replica)
self._apply_processor_defaults(base_name, full_name)
group.processors.append(full_name)
existing.add(full_name)
# Internal helpers
def _require_processor(self, full_name: str) -> ProcessorConfig:
try:
return self._builder.get_processor_config(full_name)
except KeyError as exc:
raise SteeringControllerError(f"Processor '{full_name}' is not defined.") from exc
def _require_group(self, name: str) -> GroupConfig:
try:
return self._builder.get_group(name)
except KeyError as exc:
raise SteeringControllerError(f"Group '{name}' is not defined.") from exc
@staticmethod
def _replace_pipeline_entry(values: list[str], old_name: str, new_name: str) -> None:
for index, value in enumerate(list(values)):
if value == old_name:
values[index] = new_name
@staticmethod
def _unique_processor_name_for_group(base_name: str, existing: set[str]) -> tuple[str, str | None]:
if base_name not in existing:
return base_name, None
counter = 1
while True:
replica = str(counter)
candidate = f'{base_name}#{replica}'
if candidate not in existing:
return candidate, replica
counter += 1
def _apply_processor_defaults(self, base_name: str, full_name: str) -> None:
config = self._builder.get_processor_config(full_name)
processor_cls = self.plugins.processor_dict.get(base_name) if self.plugins else None
if processor_cls is None:
config.processor_status = ProcessorSchemaStatus.UNKNOWN
return
config.processor_status = ProcessorSchemaStatus.OK
schema = processor_cls.processor_schema()
for param_schema in schema.parameters:
if param_schema.name in config.parameters:
continue
default_value = self._normalize_schema_default(param_schema.default)
config.parameters[param_schema.name] = ParameterConfig(
name=param_schema.name,
value=default_value,
default=default_value,
source=ParameterSource.DEFAULT,
status=ParameterSchemaStatus.OK,
help=param_schema.help,
type=param_schema.annotation,
active_override=True,
)