Source code for mafw.steering_gui.controllers.steering_controller

#  Copyright 2026 European Union
#  Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
#  SPDX-License-Identifier: EUPL-1.2
"""Application layer controller for steering configuration builds.

:Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
:Description: Keep GUI and execution logic isolated while managing SteeringBuilder state.
"""

from __future__ import annotations

from copy import deepcopy
from functools import wraps
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Concatenate, Iterable, Mapping, ParamSpec, TypeVar, cast

from mafw.mafw_errors import MAFwException, ValidationIssue
from mafw.plugin_manager import LoadedPlugins, MAFwPluginManager, get_plugin_manager
from mafw.steering.builder import SteeringBuilder, ValidationLevel
from mafw.steering.models import (
    FilterConfig,
    GroupConfig,
    ParameterConfig,
    ParameterSchemaStatus,
    ParameterSource,
    ProcessorConfig,
    ProcessorSchemaStatus,
)
from mafw.steering_gui.models.pipeline import PipelineItem, ProcessorPipeline
from mafw.tools.generics import deep_update
from mafw.tools.regexp import extract_protocol, parse_processor_name

P = ParamSpec('P')
"""Type variable capturing the positional parameters of the wrapped method."""
R = TypeVar('R')
"""Type variable capturing the return type of the wrapped method."""


[docs] def recorded_mutation( method: Callable[Concatenate['SteeringController', P], R], ) -> Callable[Concatenate['SteeringController', P], R]: """Decorator that records the controller state before mutating operations.""" @wraps(method) def wrapper(self: 'SteeringController', /, *args: P.args, **kwargs: P.kwargs) -> R: self._record_state() result = method(self, *args, **kwargs) self._mark_dirty() return result return wrapper
[docs] class SteeringControllerError(MAFwException): """Raised when the controller is asked to perform an invalid operation."""
# TODO: # Consider moving this function out of here to make valid in general and also used by the execution framework.
[docs] def build_database_connection_configuration(config: Mapping[str, Any]) -> dict[str, Any]: """Return a normalized configuration dict for connection checks. The returned dictionary always contains ``URL`` plus backend-specific keys derived from ``extract_protocol``. """ working = deepcopy(dict(config)) working.pop('backend', None) url_value = working.get('url') or working.get('URL') if not url_value: raise ValueError('Database URL is required to build the connection configuration.') protocol = extract_protocol(str(url_value)) or '' normalized: dict[str, Any] = {'URL': url_value} if protocol == 'sqlite': pragmas = working.get('pragmas') or {} normalized['pragmas'] = deepcopy(pragmas) elif protocol == 'mysql': if 'user' in working: normalized['user'] = working['user'] if 'read_default_file' in working: normalized['read_default_file'] = working['read_default_file'] elif protocol == 'postgresql': if 'user' in working: normalized['user'] = working['user'] return normalized
[docs] class DirtyStateSignal: """Lightweight notifier for controller dirty-state transitions.""" def __init__(self) -> None: self._listeners: list[Callable[[bool], None]] = []
[docs] def connect(self, callback: Callable[[bool], None]) -> None: """Subscribe to dirty-state change notifications.""" self._listeners.append(callback)
[docs] def emit(self, value: bool) -> None: """Notify every subscriber about the new dirty value.""" for listener in tuple(self._listeners): listener(value)
[docs] class SteeringController: """Controller orchestrating SteeringBuilder without GUI or execution ties.""" _max_history: int = 50 def __init__( self, builder: SteeringBuilder | None = None, *, max_history: int | None = None, plugin_manager: MAFwPluginManager | None = None, ) -> None: self._builder = builder or SteeringBuilder() self._history: list[SteeringBuilder] = [] self._future: list[SteeringBuilder] = [] self._dirty = False self._current_file: Path | None = None self.dirty_state_signal = DirtyStateSignal() self._validation_level: ValidationLevel | None = ValidationLevel.SEMANTIC self._plugin_manager = plugin_manager or get_plugin_manager() self.plugins: LoadedPlugins | None = None if max_history is not None: self._max_history = max_history self._initial_snapshot: SteeringBuilder = deepcopy(self._builder)
[docs] def load_plugins(self) -> LoadedPlugins: """Load the asynchronous plugin bundles and cache the result for GUI consumers. :return: Plugins container with available processors, database modules, and UIs :rtype: LoadedPlugins """ self.plugins = self._plugin_manager.load_plugins(('processors', 'db_modules', 'ui')) return self.plugins
[docs] def available_processor_count(self) -> int | None: """Return how many processor plugins have been loaded (or ``None`` until available).""" return len(self.plugins.processor_list) if self.plugins else None
[docs] def available_processor_names(self) -> list[str]: """Return the available processor names from loaded plugins.""" if self.plugins is None: return [] return sorted(self.plugins.processor_dict.keys())
[docs] def available_db_module_count(self) -> int | None: """Return how many database modules have been loaded (or ``None`` until available).""" return len(self.plugins.db_model_modules) if self.plugins else None
[docs] def available_ui_count(self) -> int | None: """Return how many user interface plugins have been loaded (or ``None`` until available).""" return len(self.plugins.ui_list) if self.plugins else None
def _record_state(self) -> None: snapshot = self._clone_builder() self._push_history(snapshot) self._future.clear() def _push_history(self, snapshot: SteeringBuilder) -> None: self._history.append(snapshot) while len(self._history) > self._max_history: self._history.pop(0) def _clone_builder(self) -> SteeringBuilder: return deepcopy(self._builder) def _set_dirty_state(self, value: bool) -> None: if self._dirty == value: return self._dirty = value self.dirty_state_signal.emit(value) def _mark_dirty(self) -> None: self._set_dirty_state(True)
[docs] def _mark_clean(self) -> None: """Mark the controller as clean and notify subscribers.""" self._set_dirty_state(False)
@property def dirty(self) -> bool: """Expose whether the controller currently tracks unsaved changes.""" return self._dirty
[docs] def validation_level(self) -> ValidationLevel | None: """Return the validation level currently configured on the controller.""" return self._validation_level
[docs] def set_validation_level(self, level: ValidationLevel | None) -> None: """Keep track of the validation level preference without affecting builder data.""" self._validation_level = level
[docs] @classmethod def from_file(cls, path: Path | str) -> 'SteeringController': """Instantiate a controller from an existing steering file.""" controller = cls(SteeringBuilder.from_toml(path)) controller._current_file = Path(path) controller._mark_clean() return controller
[docs] def load(self, path: Path | str) -> None: """Load steering metadata from disk, replace the current builder, and rebase the reset snapshot.""" self._record_state() builder = SteeringBuilder.from_toml(path) self._builder = builder self._initial_snapshot = deepcopy(builder) self._current_file = Path(path) self._mark_clean()
[docs] def save(self, *, skip_validation: bool = False) -> None: """Persist the current steering state to the associated file path.""" target = self._require_current_file() if not skip_validation and self._validation_level is not None: self.ensure_valid(self._validation_level) self._write_to_path(target) self._mark_clean()
[docs] def save_as(self, path: Path | str, *, skip_validation: bool = False) -> None: """Persist the current steering state to a new file path and track it.""" target = Path(path) if not skip_validation and self._validation_level is not None: self.ensure_valid(self._validation_level) self._write_to_path(target) self._current_file = target self._mark_clean()
[docs] def current_file(self) -> Path | None: """Return the file currently associated with persistence operations.""" return self._current_file
[docs] def _write_to_path(self, path: Path) -> None: """Serialize the builder to the provided path.""" self._builder.write(path)
[docs] def export_to_path(self, path: Path | str) -> None: """Serialize the builder to the provided path without validation.""" target = Path(path) self._builder.write(target)
[docs] @recorded_mutation def replace_builder(self, builder: SteeringBuilder) -> None: """Replace the current builder with the provided instance.""" self._builder = builder
def _require_current_file(self) -> Path: if self._current_file is None: raise ValueError('No file associated with the controller yet.') return self._current_file
[docs] def validate(self, validation_level: ValidationLevel = ValidationLevel.SEMANTIC) -> list[ValidationIssue]: """Run validation and return every detected issue.""" return self._builder.validate(validation_level)
[docs] def ensure_valid(self, validation_level: ValidationLevel = ValidationLevel.SEMANTIC) -> None: """Raise the first validation issue or return silently when the builder is valid.""" issues = self.validate(validation_level) if issues: raise issues[0]
[docs] def can_undo(self) -> bool: """Return whether the controller can revert to a previously recorded state.""" return bool(self._history)
[docs] def can_redo(self) -> bool: """Return whether a previously undone state can be re-applied.""" return bool(self._future)
[docs] def undo(self) -> None: """Revert the builder to the previous snapshot.""" if not self.can_undo(): raise SteeringControllerError('No historical state to undo.') current = self._clone_builder() previous = self._history.pop() self._future.append(current) self._builder = previous
[docs] def redo(self) -> None: """Re-apply the last undone snapshot.""" if not self.can_redo(): raise SteeringControllerError('No future state to redo.') current = self._clone_builder() next_snapshot = self._future.pop() self._push_history(current) self._builder = next_snapshot
[docs] def reset(self) -> None: """Restore the controller to its initial state and clear history.""" self._builder = SteeringBuilder() self._history.clear() self._future.clear() self._initial_snapshot = deepcopy(self._builder) self._current_file = None self._validation_level = ValidationLevel.SEMANTIC self._mark_clean()
# Globals helpers
[docs] def get_processors_to_run(self) -> list[str]: """Return a copy of the ordered processor references.""" return list(self._builder.globals.processors_to_run)
[docs] @recorded_mutation def set_processors_to_run(self, processors: Iterable[str]) -> None: """Overwrite the processors_to_run list while keeping a copy of the iterable.""" self._builder.set_processors_to_run(processors)
[docs] @recorded_mutation def add_processor_to_run_list(self, base_name: str, replica: str | None = None) -> None: """Add a processor or replica to the execution list and ensure its section exists.""" self._builder.add_processor(base_name, replica)
[docs] @recorded_mutation def add_processors_with_defaults(self, base_names: Iterable[str]) -> None: """Append processors to the run list and set their parameters to schema defaults.""" if self.plugins is None: raise SteeringControllerError('Plugins have not been loaded.') for base_name in base_names: full_name, replica = self._unique_processor_name(base_name) self._builder.add_processor(base_name, replica) config = self._builder.get_processor_config(full_name) processor_cls = self.plugins.processor_dict.get(base_name) if processor_cls is None: config.processor_status = ProcessorSchemaStatus.UNKNOWN continue config.processor_status = ProcessorSchemaStatus.OK schema = processor_cls.processor_schema() for param_schema in schema.parameters: if param_schema.name in config.parameters: continue default_value = self._normalize_schema_default(param_schema.default) config.parameters[param_schema.name] = ParameterConfig( name=param_schema.name, value=default_value, default=default_value, source=ParameterSource.DEFAULT, status=ParameterSchemaStatus.OK, help=param_schema.help, type=param_schema.annotation, active_override=True, )
[docs] @recorded_mutation def remove_pipeline_entries(self, names: Iterable[str]) -> None: """Remove pipeline entries from the top-level processors_to_run list.""" for name in names: self._builder.remove_processor(name)
[docs] def _unique_processor_name(self, base_name: str) -> tuple[str, str | None]: """Return a unique processor name and replica identifier for the requested base.""" run_list = self.get_processors_to_run() if base_name not in run_list: return base_name, None counter = 1 while True: replica = str(counter) candidate = f'{base_name}#{replica}' if candidate not in run_list: return candidate, replica counter += 1
[docs] @staticmethod def _normalize_schema_default(value: Any) -> Any: """Convert schema defaults into serializable values.""" if isinstance(value, PathLike): return str(value) return value
[docs] @recorded_mutation def remove_processor_from_run_list(self, full_name: str) -> None: """Remove a processor reference from the execution order.""" self._builder.remove_processor(full_name)
[docs] @recorded_mutation def move_processor_in_run_list(self, full_name: str, position: int) -> None: """Move a scheduled processor to a concrete slot in the execution order.""" run_list = self._builder.globals.processors_to_run if full_name not in run_list: raise SteeringControllerError(f"Processor '{full_name}' is not scheduled to run.") if position < 0 or position >= len(run_list): raise SteeringControllerError('Target position is out of range.') run_list.remove(full_name) run_list.insert(position, full_name)
[docs] def get_analysis_name(self) -> str | None: """Return the configured analysis_name without exposing the builder.""" return self._builder.globals.analysis_name
[docs] def set_analysis_name(self, value: str | None) -> None: """Update the top-level analysis_name setting.""" if self._builder.globals.analysis_name == value: return self._record_state() self._builder.set_analysis_name(value) self._mark_dirty()
[docs] def get_analysis_description(self) -> str | None: """Return the configured analysis_description safely.""" return self._builder.globals.analysis_description
[docs] def set_analysis_description(self, value: str | None) -> None: """Update the top-level analysis_description setting.""" if self._builder.globals.analysis_description == value: return self._record_state() self._builder.set_analysis_description(value) self._mark_dirty()
[docs] def get_new_only(self) -> bool | None: """Return the current new_only flag.""" return self._builder.globals.new_only
[docs] def set_new_only(self, value: bool | None) -> None: """Update the new_only flag.""" if self._builder.globals.new_only == value: return self._record_state() self._builder.set_new_only(value) self._mark_dirty()
[docs] def get_create_standard_tables(self) -> bool | None: """Return the create_standard_tables flag.""" return self._builder.globals.create_standard_tables
[docs] def set_create_standard_tables(self, value: bool | None) -> None: """Update the create_standard_tables flag.""" if self._builder.globals.create_standard_tables == value: return self._record_state() self._builder.set_create_standard_tables(value) self._mark_dirty()
[docs] def get_globals(self) -> dict[str, Any]: """Return the globals metadata used by the UI.""" return { 'analysis_name': self._builder.globals.analysis_name, 'description': self._builder.globals.analysis_description, 'new_only': self._builder.globals.new_only, 'create_standard_tables': self._builder.globals.create_standard_tables, }
# Database helpers
[docs] def get_db_url(self) -> str | None: """Return the configured database URL.""" return self._builder.db_config.url
[docs] @recorded_mutation def set_db_url(self, value: str | None) -> None: """Update the DB URL while keeping builder semantics.""" self._builder.set_db_url(value)
[docs] def get_db_pragmas(self) -> dict[str, Any]: """Return a copy of the DB pragmas map.""" return deepcopy(self._builder.db_config.pragmas)
[docs] @recorded_mutation def set_db_pragmas(self, pragmas: Mapping[str, Any]) -> None: """Replace the DB pragmas map with a copy of the provided mapping.""" self._builder.set_db_pragmas(dict(pragmas))
[docs] def get_db_attributes(self) -> dict[str, Any]: """Return a copy of the database attributes map.""" return deepcopy(self._builder.db_config.attributes)
[docs] def is_db_configuration_enabled(self) -> bool: """Return whether the DBConfiguration section will appear in serialized output.""" return self._builder.is_db_configuration_enabled()
[docs] def _assemble_connection_source(self, overrides: dict[str, Any] | None = None) -> dict[str, Any]: """Combine controller-managed attributes with override input.""" source: dict[str, Any] = dict(self.get_db_attributes()) if overrides and 'pragmas' in overrides: source['pragmas'] = deepcopy(overrides['pragmas']) else: source['pragmas'] = deepcopy(self.get_db_pragmas()) final_url = None if overrides: if overrides.get('url'): final_url = overrides['url'] elif overrides.get('URL'): final_url = overrides['URL'] if final_url is None: final_url = self.get_db_url() if final_url: source['url'] = final_url source['URL'] = final_url if overrides: extras = dict(overrides) extras.pop('pragmas', None) extras.pop('backend', None) source.update(extras) return source
[docs] def build_connection_test_task(self, config: dict[str, Any] | None = None) -> Callable[[], bool]: """Return a callable that validates the current database settings via Peewee. :param config: Optional overrides supplied by the GUI connection tester. :return: Callable that returns True when the connection is verified; it raises on failure. """ connection_source = self._assemble_connection_source(config) try: connection_config = build_database_connection_configuration(connection_source) except ValueError as exc: raise SteeringControllerError(str(exc)) from exc def task() -> bool: import peewee from playhouse.db_url import connect assert peewee is not None url = connection_config['URL'] kwargs = {key: value for key, value in connection_config.items() if key != 'URL'} database = connect(url, **kwargs) # type: ignore[no-untyped-call] database.connect() database.close() return True return task
[docs] @recorded_mutation def set_db_attribute(self, key: str, value: Any | None) -> None: """Update a single entry stored under DBConfiguration.""" self._builder.set_db_attribute(key, value)
[docs] @recorded_mutation def enable_db_configuration(self) -> None: """Enable the DBConfiguration section for serialization.""" self._builder.enable_db_configuration()
[docs] @recorded_mutation def disable_db_configuration(self) -> None: """Disable the DBConfiguration section so it is omitted when written.""" self._builder.disable_db_configuration()
# UI helpers
[docs] def get_ui_interface(self) -> str: """Return the configured UI interface identifier.""" return self._builder.ui_config.interface
[docs] @recorded_mutation def set_ui_interface(self, interface: str) -> None: """Update the selected UI interface (neutral placeholder, no GUI imports).""" self._builder.set_ui_interface(interface)
# Processor helpers
[docs] def list_processors(self) -> list[str]: """Return the known processor section names.""" return self._builder.list_processors()
[docs] def list_groups(self) -> list[str]: """Return the declared group section names.""" return self._builder.list_groups()
[docs] def build_pipeline(self, *, progress_callback: Callable[[int], None] | None = None) -> ProcessorPipeline: """Build the in-memory pipeline tree for the steering GUI.""" run_list = tuple(self.get_processors_to_run()) groups = self._builder.groups processors = self._builder.processors def _resolve_processor_config(full_name: str) -> ProcessorConfig: base_name, replica = parse_processor_name(full_name) base_config = processors.get(base_name) replica_config = processors.get(full_name) if replica is None: # this is a processor without replica name if base_config is None: # We do not have a section for this processor. # Return an empty ProcessorConfig and let's hope that the Schema reconciliation # will fill the holes. return ProcessorConfig(name=full_name) # this is the standard case of a processor without replica name # all specified parameters are from CONFIG config = ProcessorConfig( name=base_config.name, parameters=deepcopy(base_config.parameters), filters=deepcopy(base_config.filters), new_only=base_config.new_only, inheritance=base_config.inheritance, has_filter_root=base_config.has_filter_root, ) config.filter_logic = base_config.filter_logic return config if replica_config is None and base_config is None: # The processor has a replica name, but we did not get neither the replica # specific nor the base configuration. # Return an empty config and hope that the schema reconciliation will fix it return ProcessorConfig(name=full_name) if replica_config is None and base_config is not None: # The processor has a replica name and only a base config. # Return the processor config built using the base parameters and mark # all of them as inherited. config = ProcessorConfig( name=full_name, parameters=deepcopy(base_config.parameters), filters=deepcopy(base_config.filters), new_only=base_config.new_only, has_filter_root=base_config.has_filter_root, ) config.filter_logic = ( base_config.filter_logic ) # filter_logic is a property and it will automatically set the ast value for param in config.parameters.values(): param.source = ParameterSource.INHERITED return config if replica_config is not None and replica_config.inheritance is False: # The processor has a replica name and a replica specific configuration, but # without inheritance. So we will use only the replica specific configuration # and set all parameters as config. config = ProcessorConfig( name=full_name, parameters=deepcopy(replica_config.parameters), filters=deepcopy(replica_config.filters), new_only=replica_config.new_only, inheritance=replica_config.inheritance, has_filter_root=replica_config.has_filter_root, ) config.filter_logic = ( replica_config.filter_logic ) # filter_logic is a property and it will automatically set the ast value for param in config.parameters.values(): param.source = ParameterSource.CONFIG return config # This is the most complex case: # - The processor has a replica # - There are configuration for both the base and the replica # - The replica allows for normal inheritance (True) # The output is the base configuration updated with replica specific parameters. # ParameterSource is CONFIG for replica specific and INHERITED for base. base_parameters: dict[str, ParameterConfig] = {} base_filters: dict[str, list[FilterConfig]] = {} base_filter_logic: str | None = None base_new_only: bool | None = None base_has_filter_root = True if base_config is not None: base_parameters = deepcopy(base_config.parameters) for param in base_parameters.values(): param.source = ParameterSource.INHERITED base_filters = deepcopy(base_config.filters) base_filter_logic = base_config.filter_logic base_new_only = base_config.new_only base_has_filter_root = base_config.has_filter_root if TYPE_CHECKING: assert replica_config is not None # Deep update parameters manually because it's a dict of objects merged_parameters = base_parameters for key, replica_param in replica_config.parameters.items(): param_copy = deepcopy(replica_param) param_copy.source = ParameterSource.CONFIG merged_parameters[key] = param_copy merged_filters = deep_update(base_filters, replica_config.filters, copy_first=False) filter_logic = replica_config.filter_logic if replica_config.filter_logic is not None else base_filter_logic new_only = replica_config.new_only if replica_config.new_only is not None else base_new_only has_filter_root = replica_config.has_filter_root if replica_config.filters else base_has_filter_root config = ProcessorConfig( name=full_name, parameters=merged_parameters, filters=merged_filters, new_only=new_only, inheritance=replica_config.inheritance, has_filter_root=has_filter_root, ) config.filter_logic = filter_logic # filter_logic is a property and it will automatically set the ast value return config def _build_item(item: str, seen_groups: set[str], parent: PipelineItem | None) -> PipelineItem: if item in groups: group = groups[item] node = PipelineItem(config=group, parent=parent) if item in seen_groups: return node child_nodes = [_build_item(sub_item, {*seen_groups, item}, node) for sub_item in group.processors] node.children = child_nodes return node config = _resolve_processor_config(item) # --- Schema Alignment Layer --- base_name, _ = parse_processor_name(item) processor_cls = self.plugins.processor_dict.get(base_name) if self.plugins else None config_param_names = set(config.parameters.keys()) if processor_cls is None: # Processor not found in plugins config.processor_status = ProcessorSchemaStatus.UNKNOWN for param in config.parameters.values(): param.status = ParameterSchemaStatus.UNKNOWN else: config.processor_status = ProcessorSchemaStatus.OK schema = processor_cls.processor_schema() if schema.filter: models = [schema.filter.root_model] + list(schema.filter.allowed_models) config.schema_filter_models = [m.__name__ for m in models] # 1. Check existing parameters in config for param_name, param_config in list(config.parameters.items()): try: param_schema = schema.get_parameter(param_name) # Parameter exists in both param_config.status = ParameterSchemaStatus.OK param_config.help = param_schema.help param_config.type = param_schema.annotation param_config.default = self._normalize_schema_default(param_schema.default) except KeyError: # Parameter only in config param_config.status = ParameterSchemaStatus.DEPRECATED param_config.help = 'Unknown' param_config.type = Any param_config.default = None # 2. Check parameters only in schema (NEW) for param_schema in schema.parameters: if param_schema.name not in config_param_names: default_value = self._normalize_schema_default(param_schema.default) config.parameters[param_schema.name] = ParameterConfig( name=param_schema.name, value=default_value, default=default_value, source=ParameterSource.DEFAULT, status=ParameterSchemaStatus.NEW, help=param_schema.help, type=param_schema.annotation, active_override=False, ) for param in config.parameters.values(): if param.name in config_param_names: param.active_override = param.source != ParameterSource.INHERITED else: param.active_override = False return PipelineItem(config=config, parent=parent) items: list[PipelineItem] = [] total = len(run_list) for index, item in enumerate(run_list): items.append(_build_item(item, set(), None)) if progress_callback: progress_callback(int((index + 1) / total * 100)) return ProcessorPipeline(items=items)
[docs] @recorded_mutation def move_pipeline_entry( self, entry_name: str, entry_is_group: bool, source_group: str | None, target_group: str | None, position: int, ) -> None: """Move a pipeline entry across parents and adjust execution ordering.""" source_list = self._resolve_pipeline_list(source_group) target_list = self._resolve_pipeline_list(target_group) if entry_name not in source_list: raise SteeringControllerError(f"Pipeline entry '{entry_name}' not found in the source list.") if entry_is_group and target_group is not None: if entry_name not in self._builder.groups: raise SteeringControllerError(f"Group '{entry_name}' does not exist.") if self._would_create_group_cycle(entry_name, target_group): raise SteeringControllerError('Moving the group would create a cycle.') if source_list is target_list: if position < 0 or position > len(source_list): raise SteeringControllerError('Target position is out of range.') source_list.remove(entry_name) # If the item was before the target, the target index shifted insert_pos = position if position < len(source_list) else len(source_list) source_list.insert(insert_pos, entry_name) return if position < 0 or position > len(target_list): raise SteeringControllerError('Target position is out of range.') source_list.remove(entry_name) target_list.insert(position, entry_name)
def _resolve_pipeline_list(self, group: str | None) -> list[str]: if group is None: return self._builder.globals.processors_to_run if group not in self._builder.groups: raise SteeringControllerError(f"Group '{group}' does not exist.") return self._builder.groups[group].processors def _would_create_group_cycle(self, entry_group: str, target_group: str) -> bool: if entry_group == target_group: return True visited: set[str] = set() stack = [entry_group] while stack: current = stack.pop() if current in visited: continue visited.add(current) group = self._builder.groups.get(current) if group is None: continue for child in group.processors: if child == target_group: return True if child in self._builder.groups: stack.append(child) return False
[docs] def get_processor_snapshot(self, full_name: str) -> ProcessorConfig: """Return a deep copy snapshot of a processor configuration.""" config = self._require_processor(full_name) res = ProcessorConfig( name=config.name, parameters=deepcopy(config.parameters), processor_status=config.processor_status, filters=deepcopy(config.filters), new_only=config.new_only, inheritance=config.inheritance, has_filter_root=config.has_filter_root, ) res.filter_logic = config.filter_logic return res
[docs] @recorded_mutation def set_processor_parameter(self, full_name: str, key: str, value: Any) -> None: """Update a single processor parameter.""" self._builder.set_parameter(full_name, key, value)
[docs] @recorded_mutation def remove_processor_parameter(self, full_name: str, key: str) -> None: """Remove a processor parameter override if present.""" self._builder.remove_parameter(full_name, key)
[docs] @recorded_mutation def clear_processor_parameters(self, full_name: str) -> None: """Clear every parameter override for a processor.""" self._builder.clear_parameters(full_name)
[docs] @recorded_mutation def reset_processor_parameters(self, full_name: str) -> None: """Reset the processor parameters to their default values without dropping the overrides.""" config = self._require_processor(full_name) defaults: dict[str, Any] = {} if self.plugins is not None: base_name, _ = parse_processor_name(full_name) processor_cls = self.plugins.processor_dict.get(base_name) if processor_cls is not None: schema = processor_cls.processor_schema() defaults = {entry.name: entry.default for entry in schema.parameters} for param in config.parameters.values(): schema_default = defaults.get(param.name, param.default) normalized = self._normalize_schema_default(schema_default) if schema_default is not None else None default_value = deepcopy(normalized) if normalized is not None else None param.value = default_value param.source = ParameterSource.DEFAULT
[docs] @recorded_mutation def remove_pipeline_entry(self, entry_name: str, source_group: str | None) -> None: """Remove a pipeline entry from the top-level or a group list.""" if source_group is None: self._builder.remove_processor(entry_name) return group = self._require_group(source_group) if entry_name in group.processors: group.processors.remove(entry_name)
[docs] @recorded_mutation def rename_processor_entry(self, old_name: str, new_name: str, source_group: str | None) -> None: """Rename a processor reference and keep its configuration aligned.""" if old_name == new_name: return source_list = self._resolve_pipeline_list(source_group) if old_name not in source_list: raise SteeringControllerError(f"Pipeline entry '{old_name}' not found in the source list.") if new_name in source_list: raise SteeringControllerError(f"Pipeline entry '{new_name}' already exists in the source list.") old_base, old_replica = parse_processor_name(old_name) new_base, new_replica = parse_processor_name(new_name) if old_base != new_base: raise SteeringControllerError('Renaming processors across different base names is not supported.') source_list[source_list.index(old_name)] = new_name processors = self._builder.processors if old_name not in processors: self._builder._ensure_processor(new_name) return if old_replica is None and new_replica is not None: # Base -> replica: copy base config to replica and keep the base config intact. if new_name in processors: raise SteeringControllerError(f"Processor configuration '{new_name}' already exists.") base_config = processors[old_name] clone = deepcopy(base_config) clone.name = new_name processors[new_name] = clone return if new_name in processors and new_name != old_name: raise SteeringControllerError(f"Processor configuration '{new_name}' already exists.") config = processors.pop(old_name) config.name = new_name processors[new_name] = config
[docs] @recorded_mutation def add_replica(self, base_name: str, replica: str) -> None: """Ensure a replica section exists without altering parameters.""" self._builder.add_replica(base_name, replica)
[docs] @recorded_mutation def set_replica_inheritance(self, replica_full_name: str, value: bool | None) -> None: """Toggle replica inheritance.""" self._builder.set_replica_inheritance(replica_full_name, value)
[docs] @recorded_mutation def set_processor_new_only(self, full_name: str, value: bool | None) -> None: """Set the __new_only__ override for a processor section.""" self._builder.set_processor_new_only(full_name, value)
[docs] @recorded_mutation def set_processor_filters( self, full_name: str, filters: Mapping[str, Iterable[FilterConfig]], logic: str | None ) -> None: """Replace the whole filter dictionary and logic for a processor.""" # Deepcopy to ensure isolation # Convert Iterable back to list if needed, deepcopy should handle it if it's already a list or dict of lists copied_filters = deepcopy(dict(filters)) self._builder.set_processor_filters(full_name, cast(dict[str, list[FilterConfig]], copied_filters), logic)
[docs] @recorded_mutation def set_filter_config(self, full_name: str, model_name: str, config: Mapping[str, Any]) -> None: """Replace a filter model configuration with a copy of the provided mapping.""" self._builder.set_filter_config(full_name, model_name, dict(config))
[docs] @recorded_mutation def set_filter_field(self, full_name: str, model_name: str, field: str, value: Any) -> None: """Set or update a single filter field.""" self._builder.set_filter_field(full_name, model_name, field, value)
[docs] @recorded_mutation def remove_filter(self, full_name: str, model_name: str) -> None: """Delete a filter model definition.""" self._builder.remove_filter(full_name, model_name)
[docs] @recorded_mutation def set_filter_logic(self, full_name: str, logic: str | None) -> None: """Update the __logic__ string for a processor's filters.""" self._builder.set_filter_logic(full_name, logic)
[docs] @recorded_mutation def set_filter_conditionals( self, full_name: str, model_name: str, conditionals: list[Mapping[str, Any]] | None ) -> None: """Assign __conditional__ blocks to a filter model.""" if conditionals is None: formatted: list[dict[str, Any]] | None = None else: formatted = [dict(item) for item in conditionals] self._builder.set_filter_conditionals(full_name, model_name, formatted)
# Group helpers
[docs] @recorded_mutation def add_group(self, name: str, processors: Iterable[str], description: str | None = None) -> None: """Declare a new group section.""" self._builder.add_group(name, processors, description)
[docs] @recorded_mutation def add_group_to_run_list(self, name: str) -> None: """Append a group name to the processors_to_run list if missing.""" if name not in self._builder.groups: raise SteeringControllerError(f"Group '{name}' is not defined.") if name not in self._builder.globals.processors_to_run: self._builder.globals.processors_to_run.append(name)
[docs] @recorded_mutation def add_group_entry_to_group(self, target_group: str, entry_name: str) -> None: """Append a processor or group reference to a group list.""" group = self._require_group(target_group) if entry_name in group.processors: return if entry_name in self._builder.groups and self._would_create_group_cycle(entry_name, target_group): raise SteeringControllerError('Adding the group would create a cycle.') group.processors.append(entry_name)
[docs] @recorded_mutation def remove_group(self, name: str) -> None: """Delete a named group section.""" self._builder.remove_group(name)
[docs] @recorded_mutation def rename_group(self, old_name: str, new_name: str) -> None: """Rename a group and update all pipeline references.""" if old_name == new_name: return normalized = new_name.strip() if not normalized: raise SteeringControllerError('Group name cannot be empty.') if normalized in self._builder.groups: raise SteeringControllerError(f"Group '{normalized}' already exists.") if normalized in self._builder.processors: raise SteeringControllerError(f"Processor '{normalized}' already exists.") group = self._require_group(old_name) self._builder.groups.pop(old_name) group.name = normalized self._builder.groups[normalized] = group self._replace_pipeline_entry(self._builder.globals.processors_to_run, old_name, normalized) for group_config in self._builder.groups.values(): self._replace_pipeline_entry(group_config.processors, old_name, normalized)
[docs] @recorded_mutation def set_group_description(self, name: str, description: str | None) -> None: """Set or clear the description for a group.""" group = self._require_group(name) normalized = description.strip() if isinstance(description, str) else None group.description = normalized or None
[docs] @recorded_mutation def set_group_processors(self, name: str, processors: Iterable[str]) -> None: """Replace the processors list for the given group.""" group = self._require_group(name) group.processors = [str(proc) for proc in processors]
[docs] def get_group_snapshot(self, name: str) -> GroupConfig: """Return a copy of a group configuration.""" group = self._require_group(name) return GroupConfig( name=group.name, processors=list(group.processors), description=group.description, attributes=deepcopy(group.attributes), )
[docs] @recorded_mutation def set_group_attributes(self, name: str, attributes: Mapping[str, Any]) -> None: """Replace a group's attributes with a new copy.""" group = self._require_group(name) group.attributes = dict(attributes)
[docs] def get_group_attributes(self, name: str) -> dict[str, Any]: """Return a copy of a group's attributes.""" return deepcopy(self._require_group(name).attributes)
[docs] @recorded_mutation def add_processors_to_group_with_defaults(self, group_name: str, base_names: Iterable[str]) -> None: """Append processors to a group and set their parameters to schema defaults.""" if self.plugins is None: raise SteeringControllerError('Plugins have not been loaded.') group = self._require_group(group_name) existing = set(group.processors) for base_name in base_names: full_name, replica = self._unique_processor_name_for_group(base_name, existing) if full_name in existing: continue if replica is None: self._builder._ensure_processor(full_name) else: self._builder.add_replica(base_name, replica) self._apply_processor_defaults(base_name, full_name) group.processors.append(full_name) existing.add(full_name)
# Internal helpers def _require_processor(self, full_name: str) -> ProcessorConfig: try: return self._builder.get_processor_config(full_name) except KeyError as exc: raise SteeringControllerError(f"Processor '{full_name}' is not defined.") from exc def _require_group(self, name: str) -> GroupConfig: try: return self._builder.get_group(name) except KeyError as exc: raise SteeringControllerError(f"Group '{name}' is not defined.") from exc @staticmethod def _replace_pipeline_entry(values: list[str], old_name: str, new_name: str) -> None: for index, value in enumerate(list(values)): if value == old_name: values[index] = new_name @staticmethod def _unique_processor_name_for_group(base_name: str, existing: set[str]) -> tuple[str, str | None]: if base_name not in existing: return base_name, None counter = 1 while True: replica = str(counter) candidate = f'{base_name}#{replica}' if candidate not in existing: return candidate, replica counter += 1 def _apply_processor_defaults(self, base_name: str, full_name: str) -> None: config = self._builder.get_processor_config(full_name) processor_cls = self.plugins.processor_dict.get(base_name) if self.plugins else None if processor_cls is None: config.processor_status = ProcessorSchemaStatus.UNKNOWN return config.processor_status = ProcessorSchemaStatus.OK schema = processor_cls.processor_schema() for param_schema in schema.parameters: if param_schema.name in config.parameters: continue default_value = self._normalize_schema_default(param_schema.default) config.parameters[param_schema.name] = ParameterConfig( name=param_schema.name, value=default_value, default=default_value, source=ParameterSource.DEFAULT, status=ParameterSchemaStatus.OK, help=param_schema.help, type=param_schema.annotation, active_override=True, )