Coverage for src / mafw / db / db_filter.py: 99%
507 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Database filter module for MAFW.
7This module provides classes and utilities for creating and managing database filters
8using Peewee ORM. It supports various filtering operations including simple conditions,
9logical combinations, and conditional filters where one field's criteria depend on another.
11The module implements a flexible filter system that can handle:
12 - Simple field comparisons (equality, inequality, greater/less than, etc.)
13 - Complex logical operations (AND, OR, NOT)
14 - Conditional filters with dependent criteria
15 - Nested logical expressions
16 - Support for various data types and operations
18Key components include:
19 - :class:`FilterNode`: Abstract base class for filter nodes
20 - :class:`ConditionNode`: Represents individual field conditions
21 - :class:`LogicalNode`: Combines filter nodes with logical operators
22 - :class:`ConditionalNode`: Wraps conditional filter conditions
23 - :class:`ModelFilter`: Main class for building and applying filters to models
24 - :class:`ProcessorFilter`: Container for multiple model filters in a processor
26The module uses a hierarchical approach to build filter expressions that can be converted
27into Peewee expressions for database queries. It supports both simple and complex filtering
28scenarios through a combination of direct field conditions and logical expressions.
30.. versionchanged:: v2.0.0
31 Major overhaul introducing conditional filters and logical expression support.
33Example usage::
35 from mafw.db.db_filter import ModelFilter
37 # Create a simple filter
38 flt = ModelFilter(
39 'Processor.__filter__.Model',
40 field1='value1',
41 field2={'op': 'IN', 'value': [1, 2, 3]},
42 )
44 # Bind to a model and generate query
45 flt.bind(MyModel)
46 query = MyModel.select().where(flt.filter())
48.. seealso::
50 :link:`peewee` - The underlying ORM library used for database operations
52 :class:`~.mafw.enumerators.LogicalOp` - Logical operation enumerations used in filters
53"""
55import logging
56import operator
57import re
58from collections import OrderedDict, UserDict
59from copy import copy
60from functools import reduce
61from typing import TYPE_CHECKING, Any, Dict, Iterable, Literal, Self, Sequence, TypeAlias, Union, cast
63import peewee
64from peewee import Model
66from mafw.db.db_model import mafw_model_register
67from mafw.enumerators import LogicalOp
69log = logging.getLogger(__name__)
72Token = tuple[str, str]
73"""Type definition for a logical expression token"""
76def _format_expected(expected: str | Sequence[str]) -> str:
77 """Format a description of expected tokens for diagnostics."""
78 if isinstance(expected, str): 78 ↛ 80line 78 didn't jump to line 80 because the condition on line 78 was always true
79 return expected
80 return ' or '.join(expected)
83# 1. An atom is a tuple of the literal string 'NAME' and the value
84NameNode = tuple[Literal['NAME'], str]
85"""An atom is a tuple of the literal string 'NAME' and the value"""
87# 2. A NOT node is a tuple of 'NOT' and a recursive node
88# We use a string forward reference 'ExprNode' because it is defined below
89NotNode = tuple[Literal['NOT'], 'ExprNode']
90"""A NOT node is a tuple of 'NOT' and a recursive node"""
92# 3. AND/OR nodes are tuples of the operator and two recursive nodes
93BinaryNode = tuple[Literal['AND', 'OR'], 'ExprNode', 'ExprNode']
94"""AND/OR nodes are tuples of the operator and two recursive nodes"""
96# 4. The main recursive type combining all options
97ExprNode: TypeAlias = Union[NameNode, NotNode, BinaryNode]
98"""
99The main recursive type combining all options
101This type represents the abstract syntax tree (AST) nodes used in logical expressions.
102It can be one of:
104 - :data:`NameNode`: A named element (field name or filter name)
105 - :data:`NotNode`: A negation operation
106 - :data:`BinaryNode`: An AND/OR operation between two nodes
107"""
109TOKEN_SPECIFICATION = [
110 ('LPAREN', r'\('),
111 ('RPAREN', r'\)'),
112 ('AND', r'\bAND\b'),
113 ('OR', r'\bOR\b'),
114 ('NOT', r'\bNOT\b'),
115 ('NAME', r'[A-Za-z_][A-Za-z0-9_\.]*(?:\:[A-Za-z_][A-Za-z0-9_]*)?'),
116 ('SKIP', r'[ \t\n\r]+'),
117 ('MISMATCH', r'.'),
118]
119"""Token specifications"""
121MASTER_RE = re.compile('|'.join(f'(?P<{name}>{pattern})' for name, pattern in TOKEN_SPECIFICATION))
122"""Compiled regular expression to interpret the logical expression grammar"""
125class ParseError(ValueError):
126 """Base exception for logical expression parsing failures."""
128 def __init__(self, message: str, *, position: int | None = None) -> None:
129 super().__init__(message)
130 self.position = position
133class UnexpectedTokenError(ParseError):
134 """Raised when a token is present but not valid in the current context."""
136 def __init__(
137 self,
138 token: Token,
139 *,
140 expected: str | Sequence[str] | None = None,
141 position: int | None = None,
142 ) -> None:
143 expected_desc = f'; expected {_format_expected(expected)}' if expected else ''
144 message = f'Unexpected token {token[0]} ({token[1]}) at position {position}{expected_desc}'
145 super().__init__(message, position=position)
146 self.token = token
147 self.expected = expected
150class UnexpectedEndOfExpressionError(ParseError):
151 """Raised when the expression ends before the parser could finish."""
153 def __init__(self, *, expected: str | Sequence[str] | None = None, position: int | None = None) -> None:
154 expected_desc = f'; expected {_format_expected(expected)}' if expected else ''
155 message = f'Unexpected end of expression at position {position}{expected_desc}'
156 super().__init__(message, position=position)
157 self.expected = expected
160class MissingTokenError(ParseError):
161 """Raised when a specific token was required but missing."""
163 def __init__(self, expected: str | Sequence[str], *, position: int | None = None) -> None:
164 message = f'Expected {_format_expected(expected)} before end of expression at position {position}'
165 super().__init__(message, position=position)
166 self.expected = expected
169class UnknownNameError(ParseError):
170 """Raised when a NAME token is not in the supplied whitelist."""
172 def __init__(
173 self,
174 name: str,
175 *,
176 valid_names: Iterable[str],
177 position: int | None = None,
178 ) -> None:
179 valid_list = sorted(valid_names)
180 allowed = ', '.join(valid_list[:5])
181 if len(valid_list) > 5: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true
182 allowed = allowed + ', ...'
183 message = f'Unknown name {name!r} at position {position}; valid names: {allowed}'
184 super().__init__(message, position=position)
185 self.name = name
186 self.valid_names = tuple(valid_list)
189def _tokenize_with_positions(text: str) -> tuple[list[Token], list[int]]:
190 """Tokenize text while capturing the start offset of each token."""
191 tokens: list[Token] = []
192 positions: list[int] = []
193 for mo in MASTER_RE.finditer(text):
194 kind = mo.lastgroup
195 value = mo.group()
196 if kind == 'SKIP':
197 continue
198 elif kind == 'MISMATCH':
199 raise ParseError(f'Unexpected character {value!r}', position=mo.start())
200 else:
201 assert kind is not None
202 tokens.append((kind, value))
203 positions.append(mo.start())
204 return tokens, positions
207def tokenize(text: str) -> list[Token]:
208 """
209 Tokenize a logical expression string into a list of tokens.
211 This function breaks down a logical expression string into individual
212 tokens based on the defined token specifications. It skips whitespace
213 and raises a :exc:`ParseError` for unexpected characters.
215 :param text: The logical expression string to tokenize
216 :type text: str
217 :return: A list of tokens represented as (token_type, token_value) tuples
218 :rtype: list[:data:`Token`]
219 :raises ParseError: If an unexpected character is encountered in the text
220 """
221 tokens, _ = _tokenize_with_positions(text)
222 return tokens
225class ExprParser:
226 """
227 Recursive descent parser producing a simple Abstract Syntax Tree (AST).
229 The parser handles logical expressions with the following grammar:
231 .. code-block:: none
233 expr := or_expr
234 or_expr := and_expr ("OR" and_expr)*
235 and_expr:= not_expr ("AND" not_expr)*
236 not_expr:= "NOT" not_expr | atom
237 atom := NAME | "(" expr ")"
239 AST nodes are tuples representing different constructs:
241 - ("NAME", "token"): A named element (field name or filter name)
242 - ("NOT", node): A negation operation
243 - ("AND", left, right): An AND operation between two nodes
244 - ("OR", left, right): An OR operation between two nodes
246 .. versionadded:: v2.0.0
248 To help users diagnose grammar and semantic problems, the parser now
249 reports detailed error classes with character offsets and accepts
250 an optional ``valid_names`` iterable to reject unknown identifiers early.
251 """
253 def __init__(self, text: str, *, valid_names: Iterable[str] | None = None) -> None:
254 """
255 Initialize the expression parser with a logical expression string.
257 :param text: The logical expression to parse
258 :type text: str
259 :param valid_names: Optional whitelist of valid NAME tokens
260 :type valid_names: Iterable[str] | None
261 """
262 self._text = text
263 self.tokens, self.token_positions = _tokenize_with_positions(text)
264 self.pos = 0
265 self._valid_names = frozenset(valid_names) if valid_names is not None else None
267 def peek(self) -> Token | None:
268 """
269 Peek at the next token without consuming it.
271 :return: The next token if available, otherwise None
272 :rtype: :data:`Token` | None
273 """
274 if self.pos < len(self.tokens):
275 return self.tokens[self.pos]
276 return None
278 def accept(self, *kinds: str) -> Token | None:
279 """
280 Accept and consume the next token if it matches one of the given types.
282 :param kinds: Token types to accept
283 :type kinds: str
284 :return: The consumed token if matched, otherwise None
285 :rtype: :data:`Token` | None
286 """
287 tok = self.peek()
288 if tok and tok[0] in kinds:
289 self.pos += 1
290 return tok
291 return None
293 def _current_position(self) -> int:
294 """Return the character offset of the next token or the end of input."""
295 if self.pos < len(self.token_positions):
296 return self.token_positions[self.pos]
297 return len(self._text)
299 def expect(self, kind: str) -> 'Token':
300 """
301 Expect and consume a specific token type.
303 :param kind: The expected token type
304 :type kind: str
305 :return: The consumed token
306 :rtype: :data:`Token`
307 :raises ParseError: If the expected token is not found
308 """
309 tok = self.accept(kind)
310 if tok:
311 return tok
312 position = self._current_position()
313 current = self.peek()
314 if not current:
315 raise MissingTokenError(kind, position=position)
316 raise UnexpectedTokenError(current, expected=kind, position=position)
318 def parse(self) -> 'ExprNode':
319 """
320 Parse the entire logical expression and return the resulting AST.
322 :return: The abstract syntax tree representation of the expression
323 :rtype: :data:`ExprNode`
324 :raises ParseError: If the expression is malformed
325 """
326 node = self.parse_or()
327 if self.pos != len(self.tokens):
328 token = self.tokens[self.pos]
329 position = self.token_positions[self.pos]
330 raise UnexpectedTokenError(token, expected='end of expression', position=position)
331 return node
333 def parse_or(self) -> 'ExprNode':
334 """
335 Parse an OR expression.
337 :return: The parsed OR expression AST node
338 :rtype: :data:`ExprNode`
339 """
340 left = self.parse_and()
341 while self.accept('OR'):
342 right = self.parse_and()
343 left = ('OR', left, right)
344 return left
346 def parse_and(self) -> 'ExprNode':
347 """
348 Parse an AND expression.
350 :return: The parsed AND expression AST node
351 :rtype: :data:`ExprNode`
352 """
353 left = self.parse_not()
354 while self.accept('AND'):
355 right = self.parse_not()
356 left = ('AND', left, right)
357 return left
359 def parse_not(self) -> 'ExprNode':
360 """
361 Parse a NOT expression.
363 :return: The parsed NOT expression AST node
364 :rtype: :data:`ExprNode`
365 """
366 if self.accept('NOT'):
367 node = self.parse_not()
368 return 'NOT', node
369 return self.parse_atom()
371 def parse_atom(self) -> 'ExprNode':
372 """
373 Parse an atomic expression (NAME or parenthesised expression).
375 :return: The parsed atomic expression AST node
376 :rtype: :data:`ExprNode`
377 :raises ParseError: If an unexpected token is encountered
378 """
379 tok = self.peek()
380 if not tok:
381 raise UnexpectedEndOfExpressionError(position=self._current_position())
382 if tok[0] == 'LPAREN':
383 self.accept('LPAREN')
384 node = self.parse_or()
385 self.expect('RPAREN')
386 return node
387 elif tok[0] == 'NAME':
388 start_pos = self.token_positions[self.pos]
389 self.accept('NAME')
390 name = tok[1]
391 valid_names = self._valid_names
392 if valid_names is not None and name not in valid_names:
393 raise UnknownNameError(name, valid_names=valid_names, position=start_pos)
394 return 'NAME', name
395 else:
396 raise UnexpectedTokenError(tok, expected='NAME or LPAREN', position=self._current_position())
399def ast_to_string(ast: ExprNode) -> str:
400 """
401 Convert an abstract syntax tree (AST) back to its string representation.
403 :param ast: The AST to convert
404 :type ast: ExprNode
405 :return: The string representation of the AST
406 :rtype: str
407 """
408 t = ast[0]
409 if t == 'NAME':
410 return cast(NameNode, ast)[1]
411 elif t == 'NOT':
412 inner = cast(NotNode, ast)[1]
413 inner_str = ast_to_string(inner)
414 if inner[0] in ('AND', 'OR'):
415 return f'NOT ({inner_str})'
416 return f'NOT {inner_str}'
417 elif t == 'AND':
418 bin_ast = cast(BinaryNode, ast)
419 left, right = bin_ast[1], bin_ast[2]
420 left_str = ast_to_string(left)
421 right_str = ast_to_string(right)
422 if left[0] == 'OR':
423 left_str = f'({left_str})'
424 if right[0] == 'OR':
425 right_str = f'({right_str})'
426 return f'{left_str} AND {right_str}'
427 elif t == 'OR': 427 ↛ 433line 427 didn't jump to line 433 because the condition on line 427 was always true
428 bin_ast = cast(BinaryNode, ast)
429 left_str = ast_to_string(bin_ast[1])
430 right_str = ast_to_string(bin_ast[2])
431 return f'{left_str} OR {right_str}'
432 else:
433 raise ValueError(f'Unsupported AST node type: {t}')
436class FilterNode:
437 """Abstract base for nodes."""
439 def to_expression(self, model: type[Model]) -> peewee.Expression | bool:
440 raise NotImplementedError # pragma: no cover
443class ConditionNode(FilterNode):
444 """
445 Represents a single condition node in a filter expression.
447 This class encapsulates a single filtering condition that can be applied
448 to a model field. It supports various logical operations through the
449 :class:`.LogicalOp` enumerator or string representations of operations.
451 .. versionadded:: v2.0.0
452 """
454 def __init__(self, field: str | None, operation: LogicalOp | str, value: Any, name: str | None = None):
455 """
456 Initialize a condition node.
458 :param field: The name of the field to apply the condition to.
459 :type field: str | None
460 :param operation: The logical operation to perform.
461 :type operation: LogicalOp | str
462 :param value: The value to compare against.
463 :type value: Any
464 :param name: Optional name for this condition node.
465 :type name: str | None, Optional
466 """
467 self.field = field # may be None for some special nodes
468 if isinstance(operation, str):
469 try:
470 self.operation = LogicalOp(operation)
471 except ValueError:
472 raise ValueError(f'Unsupported operation: {operation}')
473 else:
474 self.operation = operation
475 self.value = value
476 self.name = name
478 def to_expression(self, model: type[Model]) -> peewee.Expression:
479 """
480 Convert this condition node to a Peewee expression.
482 This method translates the condition represented by this node into
483 a Peewee expression that can be used in database queries.
485 :param model: The model class containing the field to filter.
486 :type model: type[Model]
487 :return: A Peewee expression representing this condition.
488 :rtype: peewee.Expression
489 :raises RuntimeError: If the node has no field to evaluate.
490 :raises ValueError: If an unsupported operation is specified.
491 :raises TypeError: If operation requirements are not met (e.g., IN operation requires list/tuple).
492 """
493 if self.field is None:
494 # Should not happen for standard ConditionNode
495 raise RuntimeError('ConditionNode has no field to evaluate')
496 model_field = getattr(model, self.field)
497 op = self.operation
498 val = self.value
499 # the code is full of cast and redundant checks to make mypy happy.
500 # I do not know to which extent they make the code safer, but for sure they make it less readable.
501 if op == LogicalOp.EQ:
502 return cast(peewee.Expression, cast(object, model_field == val))
503 elif op == LogicalOp.NE:
504 return cast(peewee.Expression, cast(object, model_field != val))
505 elif op == LogicalOp.LT:
506 return cast(peewee.Expression, cast(object, model_field < val))
507 elif op == LogicalOp.LE:
508 return cast(peewee.Expression, cast(object, model_field <= val))
509 elif op == LogicalOp.GT:
510 return cast(peewee.Expression, cast(object, model_field > val))
511 elif op == LogicalOp.GE:
512 return cast(peewee.Expression, cast(object, model_field >= val))
513 elif op == LogicalOp.GLOB:
514 return cast(peewee.Expression, model_field % val)
515 elif op == LogicalOp.LIKE:
516 return cast(peewee.Expression, model_field**val)
517 elif op == LogicalOp.REGEXP:
518 if hasattr(model_field, 'regexp') and callable(getattr(model_field, 'regexp')):
519 return cast(peewee.Expression, getattr(model_field, 'regexp')(val))
520 else:
521 raise ValueError(f'REGEXP operation not supported for field type {type(model_field)}')
522 elif op == LogicalOp.IN:
523 if not isinstance(val, (list, tuple)):
524 raise TypeError(f'IN operation requires list/tuple, got {type(val)}')
525 if hasattr(model_field, 'in_') and callable(getattr(model_field, 'in_')):
526 return cast(peewee.Expression, getattr(model_field, 'in_')(val))
527 else:
528 raise ValueError(f'IN operation not supported for field type {type(model_field)}')
529 elif op == LogicalOp.NOT_IN:
530 if not isinstance(val, (list, tuple)):
531 raise TypeError(f'NOT_IN operation requires list/tuple, got {type(val)}')
532 if hasattr(model_field, 'not_in') and callable(getattr(model_field, 'not_in')):
533 return cast(peewee.Expression, getattr(model_field, 'not_in')(val))
534 else:
535 raise ValueError(f'NOT_IN operation not supported for field type {type(model_field)}')
536 elif op == LogicalOp.BETWEEN:
537 if not isinstance(val, (list, tuple)) or len(val) != 2:
538 raise TypeError(f'BETWEEN operation requires list/tuple of 2 elements, got {val}')
539 if hasattr(model_field, 'between') and callable(getattr(model_field, 'between')):
540 return cast(peewee.Expression, getattr(model_field, 'between')(val[0], val[1]))
541 else:
542 raise ValueError(f'BETWEEN operation not supported for field type {type(model_field)}')
543 elif op == LogicalOp.BIT_AND:
544 if hasattr(model_field, 'bin_and') and callable(getattr(model_field, 'bin_and')):
545 return cast(peewee.Expression, cast(object, getattr(model_field, 'bin_and')(val) != 0))
546 else:
547 raise ValueError(f'BIT_AND operation not supported for field type {type(model_field)}')
548 elif op == LogicalOp.BIT_OR:
549 if hasattr(model_field, 'bin_or') and callable(getattr(model_field, 'bin_or')):
550 return cast(peewee.Expression, cast(object, getattr(model_field, 'bin_or')(val) != 0))
551 else:
552 raise ValueError(f'BIT_OR operation not supported for field type {type(model_field)}')
553 elif op == LogicalOp.IS_NULL:
554 return cast(peewee.Expression, model_field.is_null())
555 elif op == LogicalOp.IS_NOT_NULL:
556 return cast(peewee.Expression, model_field.is_null(False))
557 else:
558 raise ValueError(f'Unsupported operation: {op}')
561class ConditionalNode(FilterNode):
562 """
563 Wraps :class:`ConditionalFilterCondition` behaviour as a :class:`FilterNode`.
565 This class serves as an adapter to integrate conditional filter conditions
566 into the filter node hierarchy, allowing them to be treated uniformly with
567 other filter nodes during expression evaluation.
569 .. versionadded:: v2.0.0
570 """
572 def __init__(self, conditional: 'ConditionalFilterCondition', name: str | None = None):
573 """
574 Initialize a conditional node.
576 :param conditional: The conditional filter condition to wrap
577 :type conditional: ConditionalFilterCondition
578 :param name: Optional name for this conditional node
579 :type name: str | None, Optional
580 """
581 self.conditional = conditional
582 self.name = name
584 def to_expression(self, model: type[Model]) -> peewee.Expression:
585 """
586 Convert this conditional node to a Peewee expression.
588 This method delegates the conversion to the wrapped conditional filter
589 condition's :meth:`to_expression` method.
591 :param model: The model class to generate the expression for
592 :type model: type[Model]
593 :return: A Peewee expression representing this conditional node
594 :rtype: peewee.Expression
595 """
596 return self.conditional.to_expression(model)
599class LogicalNode(FilterNode):
600 """
601 Logical combination of child nodes.
603 This class represents logical operations (AND, OR, NOT) applied to filter nodes.
604 It enables building complex filter expressions by combining simpler filter nodes
605 with logical operators.
607 .. versionadded:: v2.0.0
608 """
610 def __init__(self, op: str, *children: FilterNode):
611 """
612 Initialize a logical node.
614 :param op: The logical operation ('AND', 'OR', 'NOT')
615 :type op: str
616 :param children: Child filter nodes to combine with the logical operation
617 :type children: FilterNode
618 """
619 self.op = op # 'AND', 'OR', 'NOT'
620 self.children = list(children)
622 def to_expression(self, model: type[Model]) -> peewee.Expression | bool:
623 """
624 Convert this logical node to a Peewee expression.
626 This method evaluates the logical operation on the child nodes and returns
627 the corresponding Peewee expression.
629 :param model: The model class to generate the expression for
630 :type model: type[Model]
631 :return: A Peewee expression representing this logical node
632 :rtype: peewee.Expression | bool
633 :raises ValueError: If an unknown logical operation is specified
634 """
635 if self.op == 'NOT':
636 assert len(self.children) == 1
637 inner = self.children[0].to_expression(model)
638 return cast(peewee.Expression, ~inner)
639 elif self.op == 'AND':
640 expressions = [c.to_expression(model) for c in self.children]
641 return cast(peewee.Expression, reduce(operator.and_, expressions))
642 elif self.op == 'OR':
643 expressions = [c.to_expression(model) for c in self.children]
644 return cast(peewee.Expression, reduce(operator.or_, expressions))
645 else:
646 raise ValueError(f'Unknown logical op: {self.op}')
649class ConditionalFilterCondition:
650 """
651 Represents a conditional filter where one field's criteria depends on another.
653 This allows expressing logic like:
654 "IF field_a IN [x, y] THEN field_b IN [1, 2] ELSE no constraint on field_b"
656 Example usage:
658 .. code-block:: python
660 # Filter: sample_id in [1,2] if composite_image_id in [100,101]
661 condition = ConditionalFilterCondition(
662 condition_field='composite_image_id',
663 condition_op='IN',
664 condition_value=[100, 101],
665 then_field='sample_id',
666 then_op='IN',
667 then_value=[1, 2],
668 )
670 # This generates:
671 # WHERE (composite_image_id IN (100, 101) AND sample_id IN (1, 2))
672 # OR (composite_image_id NOT IN (100, 101))
673 """
675 def __init__(
676 self,
677 condition_field: str,
678 condition_op: str | LogicalOp,
679 condition_value: Any,
680 then_field: str,
681 then_op: str | LogicalOp,
682 then_value: Any,
683 else_field: str | None = None,
684 else_op: str | LogicalOp | None = None,
685 else_value: Any | None = None,
686 name: str | None = None,
687 ) -> None:
688 """
689 Initialise a conditional filter condition.
691 :param condition_field: The field to check for the condition
692 :type condition_field: str
693 :param condition_op: The operation for the condition (e.g., 'IN', '==')
694 :type condition_op: str | LogicalOp
695 :param condition_value: The value(s) for the condition
696 :type condition_value: Any
697 :param then_field: The field to filter when condition is true
698 :type then_field: str
699 :param then_op: The operation to apply when condition is true
700 :type then_op: str | LogicalOp
701 :param then_value: The value(s) for the then clause
702 :type then_value: Any
703 :param else_field: Optional field to filter when condition is false
704 :type else_field: str | None
705 :param else_op: Optional operation when condition is false
706 :type else_op: str | LogicalOp | None
707 :param else_value: Optional value(s) for the else clause
708 :type else_value: Any | None
709 :param name: The name of this condition. Avoid name clashing with model fields. Defaults to None
710 :type name: str | None, Optional
711 """
712 self.condition_field = condition_field
713 self.condition_op = condition_op
714 self.condition_value = condition_value
715 self.then_field = then_field
716 self.then_op = then_op
717 self.then_value = then_value
718 self.else_field = else_field
719 self.else_op = else_op
720 self.else_value = else_value
721 self.name = name
723 def to_expression(self, model: type[Model]) -> peewee.Expression:
724 """
725 Convert this conditional filter to a Peewee expression.
727 The resulting expression is:
728 (condition AND then_constraint) OR (NOT condition AND else_constraint)
730 Which logically means:
732 - When condition is true, apply then_constraint
733 - When condition is false, apply else_constraint (or no constraint)
735 :param model: The model class containing the fields
736 :type model: type[Model]
737 :return: A Peewee expression
738 :rtype: peewee.Expression
739 """
740 # Build the condition expression
741 condition_expr = ConditionNode(self.condition_field, self.condition_op, self.condition_value).to_expression(
742 model
743 )
745 # Build the then expression
746 then_expr = ConditionNode(self.then_field, self.then_op, self.then_value).to_expression(model)
748 # Build the else expression
749 if self.else_field is not None and self.else_op is not None:
750 else_expr = ConditionNode(self.else_field, self.else_op, self.else_value).to_expression(model)
751 else:
752 # No constraint in else clause - always true
753 # the nested cast is needed to make mypy happy.
754 else_expr = cast(peewee.Expression, cast(object, True))
756 # Combine: (condition AND then) OR (NOT condition AND else)
757 return cast(peewee.Expression, (condition_expr & then_expr) | (~condition_expr & else_expr))
759 def __eq__(self, other: Any) -> bool:
760 if not isinstance(other, ConditionalFilterCondition):
761 return False
763 return vars(self) == vars(other)
766class ModelFilter:
767 r"""
768 Class to filter rows from a model.
770 The filter object can be used to generate a where clause to be applied to Model.select().
772 The construction of a ModelFilter is normally done via a configuration file using the :meth:`from_conf` class method.
773 The name of the filter is playing a key role in this. If it follows a dot structure like:
775 *ProcessorName.__filter__.ModelName*
777 then the corresponding table from the TOML configuration object will be used.
779 For each processor, there might be many Filters, up to one for each Model used to get the input list. If a
780 processor is joining together three Models when performing the input select, there will be up to three Filters
781 collaborating on making the selection.
783 The filter configuration can contain the following key, value pair:
785 - key / string pairs, where the key is the name of a field in the corresponding Model
787 - key / numeric pairs
789 - key / arrays
791 - key / dict pairs with 'op' and 'value' keys for explicit operation specification
793 All fields from the configuration file will be added to the instance namespace, thus accessible with the dot
794 notation. Moreover, the field names and their filter value will be added to a private dictionary to simplify the
795 generation of the filter SQL code.
797 The user can use the filter object to store selection criteria. He can construct queries using the filter
798 contents in the same way as he could use processor parameters.
800 If he wants to automatically generate valid filtering expression, he can use the :meth:`filter` method. In order
801 for this to work, the ModelFilter object be :meth:`bound <bind>` to a Model. Without this binding the ModelFilter will not
802 be able to automatically generate expressions.
804 For each field in the filter, one condition will be generated according to the following scheme:
806 ================= ================= ==================
807 Filter field type Logical operation Example
808 ================= ================= ==================
809 Numeric, boolean == Field == 3.14
810 String GLOB Field GLOB '\*ree'
811 List IN Field IN [1, 2, 3]
812 Dict (explicit) op from dict Field BIT_AND 5
813 ================= ================= ==================
815 All conditions will be joined with a AND logic by default, but this can be changed.
817 The ModelFilter also supports logical expressions to combine multiple filter conditions using AND, OR, and NOT
818 operators. These expressions can reference named filter conditions within the same filter or even combine
819 conditions from different filters when used with :class:`ProcessorFilter`.
821 Conditional filters allow expressing logic like:
822 "IF field_a IN [x, y] THEN field_b IN [1, 2] ELSE no constraint on field_b"
824 Consider the following example:
826 .. code-block:: python
827 :linenos:
829 class MeasModel(MAFwBaseModel):
830 meas_id = AutoField(primary_key=True)
831 sample_name = TextField()
832 successful = BooleanField()
833 flags = IntegerField()
834 composite_image_id = IntegerField()
835 sample_id = IntegerField()
838 # Traditional simplified usage
839 flt = ModelFilter(
840 'MyProcessor.__filter__.MyModel',
841 sample_name='sample_00*',
842 meas_id=[1, 2, 3],
843 successful=True,
844 )
846 # New explicit operation usage
847 flt = ModelFilter(
848 'MyProcessor.__filter__.MyModel',
849 sample_name={'op': 'LIKE', 'value': 'sample_00%'},
850 flags={'op': 'BIT_AND', 'value': 5},
851 meas_id={'op': 'IN', 'value': [1, 2, 3]},
852 )
854 # Logical expression usage
855 flt = ModelFilter(
856 'MyProcessor.__filter__.MyModel',
857 sample_name={'op': 'LIKE', 'value': 'sample_00%'},
858 flags={'op': 'BIT_AND', 'value': 5},
859 meas_id={'op': 'IN', 'value': [1, 2, 3]},
860 __logic__='sample_name AND (flags OR meas_id)',
861 )
863 # Conditional filter usage
864 flt = ModelFilter(
865 'MyProcessor.__filter__.MyModel',
866 sample_name='sample_00*',
867 composite_image_id=[100, 101],
868 sample_id=[1, 2],
869 __conditional__=[
870 {
871 'condition_field': 'composite_image_id',
872 'condition_op': 'IN',
873 'condition_value': [100, 101],
874 'then_field': 'sample_id',
875 'then_op': 'IN',
876 'then_value': [1, 2],
877 }
878 ],
879 )
881 flt.bind(MeasModel)
882 filtered_query = MeasModel.select().where(flt.filter())
884 The explicit operation format allows for bitwise operations and other advanced filtering.
886 TOML Configuration Examples:
888 .. code-block:: toml
890 [MyProcessor.__filter__.MyModel]
891 sample_name = "sample_00*" # Traditional GLOB
892 successful = true # Traditional equality
894 # Explicit operations
895 flags = { op = "BIT_AND", value = 5 }
896 score = { op = ">=", value = 75.0 }
897 category = { op = "IN", value = ["A", "B", "C"] }
898 date_range = { op = "BETWEEN", value = ["2024-01-01", "2024-12-31"] }
900 # Logical expression for combining conditions
901 __logic__ = "sample_name AND (successful OR flags)"
903 # Conditional filters
904 [[MyProcessor.__filter__.MyModel.__conditional__]]
905 condition_field = "composite_image_id"
906 condition_op = "IN"
907 condition_value = [100, 101]
908 then_field = "sample_id"
909 then_op = "IN"
910 then_value = [1, 2]
912 # Nested conditions with logical expressions
913 [MyProcessor.__filter__.MyModel.nested_conditions]
914 __logic__ = "a OR b"
915 a = { op = "LIKE", value = "test%" }
916 b = { op = "IN", value = [1, 2, 3] }
918 .. seealso::
920 - :class:`mafw.db.db_filter.ProcessorFilter` - For combining multiple ModelFilters with logical expressions
921 - :class:`mafw.db.db_filter.ConditionalFilterCondition` - For conditional filtering logic
922 - :class:`mafw.db.db_filter.ExprParser` - For parsing logical expressions
923 """
925 logic_name = '__logic__'
926 """
927 The logic keyword identifier.
929 This value cannot be used as field name in the filter bound model.
930 """
931 conditional_name = '__conditional__'
932 """
933 The conditional keyword identifier.
935 This value cannot be used as field name in the filter bound model.
936 """
938 def __init__(self, name_: str, **kwargs: Any) -> None:
939 """
940 Constructor parameters:
942 :param `name_`: The name of the filter. It should be in dotted format to facilitate the configuration via the
943 steering file. The _ is used to allow the user to have a keyword argument named name.
944 :type `name_`: str
945 :param kwargs: Keyword parameters corresponding to fields and filter values.
947 .. versionchanged:: v1.2.0
948 The parameter *name* has been renamed as *name_*.
950 .. versionchanged:: v1.3.0
951 Implementation of explicit operation.
953 .. versionchanged:: v2.0.0
954 Introduction of conditional filters, logical expression and hierarchical structure.
955 Introduction of autobinding for MAFwBaseModels
957 """
958 self.name = name_
959 self.model_name = name_.split('.')[-1]
960 self.model: type[Model] | None = None
961 self._model_bound = False
963 # attempt to autobind
964 self._auto_bind()
966 # mapping name -> FilterNode
967 self._nodes: 'OrderedDict[str, FilterNode]' = OrderedDict()
968 # conditional nodes mapping (named)
969 self._cond_nodes: 'OrderedDict[str, ConditionalNode]' = OrderedDict()
970 # logic expression for this filter (combining top-level node names)
971 self._logic_expr: str | None = None
973 # Extract conditional filters if present
974 if self.conditional_name in kwargs:
975 conditionals = kwargs.pop(self.conditional_name)
976 if not isinstance(conditionals, list):
977 conditionals = [conditionals]
979 for cond_dict in conditionals:
980 self.add_conditional_from_dict(cond_dict)
982 # Extract logic for internal conditions, if provided
983 if self.logic_name in kwargs:
984 self._logic_expr = kwargs.pop(self.logic_name)
986 # now process remaining kwargs as either:
987 # - simple/extended condition for a field
988 # - or a nested mapping describing subconditions for field (field-level logic)
989 for k, v in kwargs.items():
990 # simple types map to ConditionNode
991 if isinstance(v, dict) and ('op' in v and 'value' in v):
992 # explicit op/value for field k
993 # extended operation condition
994 node = ConditionNode(k, v['op'], v['value'], name=k)
995 self._nodes[k] = node
996 elif isinstance(v, dict) and any(
997 isinstance(x, dict) or x == self.logic_name or x not in ['op', 'value']
998 for x in v.keys()
999 if isinstance(v, dict)
1000 ):
1001 # nested mapping: create sub-nodes for this field
1002 # v expected like {'__logic__': 'a OR b', 'a': {'op':..., 'value':...}, 'b': ...}
1003 subnodes: 'OrderedDict[str, FilterNode]' = OrderedDict()
1004 sub_logic = v.get(self.logic_name, None)
1005 for subk, subv in v.items():
1006 if subk == self.logic_name:
1007 continue
1008 if isinstance(subv, dict) and ('op' in subv and 'value' in subv):
1009 subnode = ConditionNode(k, subv['op'], subv['value'], name=subk)
1010 subnodes[subk] = subnode
1011 else:
1012 subnodes[subk] = self._create_condition_node_from_value(subv, k, subk)
1013 # combine subnodes using sub_logic or AND by default
1014 if sub_logic:
1015 ast = ExprParser(sub_logic).parse()
1016 ln = self._build_logical_node_from_ast(ast, subnodes, model_name_placeholder=k)
1017 else:
1018 # AND all subnodes
1019 ln = LogicalNode('AND', *subnodes.values())
1020 self._nodes[k] = ln
1021 else:
1022 self._nodes[k] = self._create_condition_node_from_value(v, k, k)
1024 def _auto_bind(self) -> None:
1025 """
1026 Attempt to automatically bind the filter to a model.
1028 This method tries to retrieve the model associated with the filter's model name from the
1029 :mod:`mafw.db.db_model` registry and bind it using the :meth:`bind` method.
1031 If the model cannot be found,a warning is logged indicating the failure to perform auto-binding.
1033 This model is automatically invoked by the :class:`.ModelFilter` constructor.
1034 """
1035 try:
1036 model = mafw_model_register.get_model(self.model_name)
1037 self.bind(model) # type: ignore[arg-type]
1038 except KeyError:
1039 log.warning(f'Impossible to perform auto-binding for model {self.model_name}')
1041 def _build_logical_node_from_ast(
1042 self, ast: ExprNode, name_to_nodes: Dict[str, FilterNode], model_name_placeholder: str | None = None
1043 ) -> FilterNode:
1044 """Recursively build LogicalNode from AST using a mapping name->FilterNode."""
1045 t = ast[0]
1046 if t == 'NAME':
1047 named_ast = cast(NameNode, ast)
1048 nm = named_ast[1]
1049 if nm not in name_to_nodes:
1050 raise KeyError(f'Unknown name {nm} in nested logic for field {model_name_placeholder}')
1051 return name_to_nodes[nm]
1052 elif t == 'NOT':
1053 not_ast = cast(NotNode, ast)
1054 child = self._build_logical_node_from_ast(not_ast[1], name_to_nodes, model_name_placeholder)
1055 return LogicalNode('NOT', child)
1056 elif t in ('AND', 'OR'):
1057 bin_ast = cast(BinaryNode, ast)
1058 left = self._build_logical_node_from_ast(bin_ast[1], name_to_nodes, model_name_placeholder)
1059 right = self._build_logical_node_from_ast(bin_ast[2], name_to_nodes, model_name_placeholder)
1060 return LogicalNode(t, left, right)
1061 else:
1062 raise ValueError(f'Unsupported AST node {t}')
1064 @staticmethod
1065 def _create_condition_node_from_value(value: Any, field_name: str, node_name: str | None = None) -> ConditionNode:
1066 """
1067 Create a FilterCondition based on value type (backward compatibility).
1069 :param value: The filter value
1070 :param field_name: The field name
1071 :return: A FilterCondition
1072 """
1073 if isinstance(value, (int, float, bool)):
1074 return ConditionNode(field_name, LogicalOp.EQ, value, node_name)
1075 elif isinstance(value, str):
1076 return ConditionNode(field_name, LogicalOp.GLOB, value, node_name)
1077 elif isinstance(value, list):
1078 return ConditionNode(field_name, LogicalOp.IN, value, node_name)
1079 else:
1080 raise TypeError(f'ModelFilter value of unsupported type {type(value)} for field {field_name}.')
1082 def bind(self, model: type[Model]) -> None:
1083 """
1084 Connects a filter to a Model class.
1086 :param model: Model to be bound.
1087 :type model: Model
1088 """
1090 self.model = model
1091 self._model_bound = True
1093 if hasattr(self.model, self.logic_name) and self._model_bound:
1094 if TYPE_CHECKING:
1095 assert self.model is not None
1097 log.warning(
1098 f'Model {self.model.__name__} has a field named {self.logic_name}. This is '
1099 f'preventing the logic expression to work.'
1100 )
1101 log.warning('Modify your model. Logic expression disabled.')
1102 self._logic_expr = None
1104 @property
1105 def is_bound(self) -> bool:
1106 """Returns true if the ModelFilter has been bound to a Model"""
1107 return self._model_bound
1109 def add_conditional(self, conditional: ConditionalFilterCondition) -> None:
1110 """
1111 Add a conditional filter.
1113 .. versionadded:: v2.0.0
1115 :param conditional: The conditional filter condition
1116 :type conditional: ConditionalFilterCondition
1117 """
1118 condition_name = conditional.name
1119 if condition_name is None:
1120 # it means that the user did not specify any name for this condition.
1121 # we will then assign one
1122 increment = 0
1123 while True:
1124 condition_name = f'__cond{increment + len(self._cond_nodes)}__'
1125 if condition_name not in self._cond_nodes:
1126 break
1127 else:
1128 increment += 1
1129 else:
1130 # the user specified a name for this condition. we will use it but first we check if it is not yet used
1131 if condition_name in self._cond_nodes:
1132 raise KeyError(
1133 f'A conditional filter named {condition_name} already exists. Please review your steering file.'
1134 )
1136 node = ConditionalNode(conditional, name=condition_name)
1137 self._cond_nodes[condition_name] = node
1138 self._nodes[condition_name] = node
1140 def add_conditional_from_dict(self, config: dict[str, Any]) -> None:
1141 """
1142 Add a conditional filter from a configuration dictionary.
1144 .. versionadded:: v2.0.0
1146 :param config: Dictionary with conditional filter configuration
1147 :type config: dict[str, Any]
1148 """
1149 conditional = ConditionalFilterCondition(
1150 condition_field=config['condition_field'],
1151 condition_op=config['condition_op'],
1152 condition_value=config['condition_value'],
1153 then_field=config['then_field'],
1154 then_op=config['then_op'],
1155 then_value=config['then_value'],
1156 else_field=config.get('else_field'),
1157 else_op=config.get('else_op'),
1158 else_value=config.get('else_value'),
1159 name=config.get('name'),
1160 )
1161 self.add_conditional(conditional)
1163 @classmethod
1164 def from_conf(cls, name: str, conf: dict[str, Any]) -> Self:
1165 """
1166 Builds a Filter object from a steering file dictionary.
1168 If the name is in dotted notation, then this should be corresponding to the table in the configuration file.
1169 If a default configuration is provided, this will be used as a starting point for the filter, and it will be
1170 updated by the actual configuration in ``conf``.
1172 In normal use, you would provide the specific configuration via the conf parameter.
1174 See details in the :class:`class documentation <ModelFilter>`
1176 :param name: The name of the filter in dotted notation.
1177 :type name: str
1178 :param conf: The configuration dictionary.
1179 :type conf: dict
1180 :return: A Filter object
1181 :rtype: ModelFilter
1182 """
1183 param = {}
1185 # split the name from dotted notation
1186 # ProcessorName#123.ModelName.Filter
1187 # the processor name is actually the processor replica name
1188 names = name.split('.')
1189 if len(names) == 3 and names[1] == '__filter__':
1190 proc_name, _, model_name = names
1191 if proc_name in conf and '__filter__' in conf[proc_name] and model_name in conf[proc_name]['__filter__']:
1192 param.update(copy(conf[proc_name]['__filter__'][model_name]))
1194 # if the name is not in the expected dotted notation, the use an empty filter.
1195 return cls(name, **param)
1197 def _evaluate_logic_ast(self, ast: ExprNode) -> peewee.Expression | bool:
1198 """
1199 Evaluate an abstract syntax tree (AST) representing a logical expression.
1201 This method recursively evaluates the AST nodes to produce a Peewee expression
1202 or boolean value representing the logical combination of filter conditions.
1204 :param ast: The abstract syntax tree node to evaluate
1205 :type ast: Any
1206 :return: A Peewee expression for logical operations or boolean True/False
1207 :rtype: peewee.Expression | bool
1208 :raises KeyError: If a referenced condition name is not found in the filter
1209 :raises ValueError: If an unsupported AST node type is encountered
1210 """
1211 t = ast[0]
1212 if t == 'NAME':
1213 named_ast = cast(NameNode, ast)
1214 nm = named_ast[1]
1215 if nm not in self._nodes:
1216 raise KeyError(f"Unknown node '{nm}' in logic for filter {self.name}")
1217 node = self._nodes[nm]
1219 if TYPE_CHECKING:
1220 assert self.model is not None
1221 return node.to_expression(self.model)
1222 elif t == 'NOT':
1223 not_ast = cast(NotNode, ast)
1224 val = self._evaluate_logic_ast(not_ast[1])
1225 return cast(peewee.Expression, ~val)
1226 elif t == 'AND':
1227 bin_ast = cast(BinaryNode, ast)
1228 left = self._evaluate_logic_ast(bin_ast[1])
1229 right = self._evaluate_logic_ast(bin_ast[2])
1230 return cast(peewee.Expression, cast(object, left & right))
1231 elif t == 'OR':
1232 bin_ast = cast(BinaryNode, ast)
1233 left = self._evaluate_logic_ast(bin_ast[1])
1234 right = self._evaluate_logic_ast(bin_ast[2])
1235 return cast(peewee.Expression, cast(object, left | right))
1236 else:
1237 raise ValueError(f'Unsupported AST node {t}')
1239 def filter(self, join_with: Literal['AND', 'OR'] = 'AND') -> peewee.Expression | bool:
1240 """
1241 Generates a filtering expression joining all filtering fields.
1243 See details in the :class:`class documentation <ModelFilter>`
1245 .. versionchanged:: v1.3.0
1246 Add the possibility to specify a `join_with` function
1248 .. versionchanged:: v2.0.0
1249 Add support for conditional filters and for logical expression
1251 :param join_with: How to join conditions ('AND' or 'OR'). Defaults to 'AND'.
1252 :type join_with: Literal['AND', 'OR'], default 'AND'
1253 :return: The filtering expression.
1254 :rtype: peewee.Expression | bool
1255 :raises TypeError: when the field value type is not supported.
1256 :raises ValueError: when join_with is not 'AND' or 'OR'.
1257 """
1258 if not self.is_bound:
1259 log.warning('Unable to generate the filter. Did you bind the filter to the model?')
1260 return True
1262 if TYPE_CHECKING:
1263 # if we get here, it means that we have a valid model
1264 assert self.model is not None
1266 # if logic provided for this filter, use it
1267 if self._logic_expr:
1268 try:
1269 ast = ExprParser(self._logic_expr).parse()
1270 except ParseError as e:
1271 raise ValueError(f'Error parsing logic for filter {self.name}: {e}')
1272 try:
1273 return self._evaluate_logic_ast(ast)
1274 except KeyError as e:
1275 raise ValueError(f'Error evaluating logic for filter {self.name}: {e}')
1277 # otherwise combine all top-level nodes (AND by default)
1278 exprs = [n.to_expression(self.model) for n in self._nodes.values()]
1279 if not exprs:
1280 return True
1281 if join_with not in ('AND', 'OR'):
1282 raise ValueError("join_with must be 'AND' or 'OR'")
1283 if join_with == 'AND':
1284 return cast(peewee.Expression, reduce(operator.and_, exprs))
1285 return cast(peewee.Expression, reduce(operator.or_, exprs))
1288class ProcessorFilter(UserDict[str, ModelFilter]):
1289 """
1290 A special dictionary to store all :class:`Filters <mafw.db.db_filter.ModelFilter>` in a processors.
1292 It contains a publicly accessible dictionary with the configuration of each ModelFilter using the Model name as
1293 keyword.
1295 It contains a private dictionary with the global filter configuration as well.
1296 The global filter is not directly accessible, but only some of its members will be exposed via properties.
1297 In particular, the new_only flag that is relevant only at the Processor level can be accessed directly using the
1298 :attr:`new_only`. If not specified in the configuration file, the new_only is by default True.
1300 It is possible to assign a logic operation string to the register that is used to join all the filters together
1301 when performing the :meth:`filter_all`. If no logic operation string is provided, the register will provide a join
1302 condition using either AND (default) or OR.
1303 """
1305 def __init__(self, data: dict[str, ModelFilter] | None = None, /, **kwargs: Any) -> None:
1306 """
1307 Constructor parameters:
1309 :param data: Initial data
1310 :type data: dict
1311 :param kwargs: Keywords arguments
1312 """
1313 self._global_filter: dict[str, Any] = {}
1314 self._logic: str | None = None
1315 super().__init__(data, **kwargs)
1317 @property
1318 def new_only(self) -> bool:
1319 """
1320 The new only flag.
1322 :return: True, if only new items, not already in the output database table must be processed.
1323 :rtype: bool
1324 """
1325 return cast(bool, self._global_filter.get('new_only', True))
1327 @new_only.setter
1328 def new_only(self, v: bool) -> None:
1329 self._global_filter['new_only'] = v
1331 def __setitem__(self, key: str, value: ModelFilter) -> None:
1332 """
1333 Set a new value at key.
1335 If value is not a Filter, then it will be automatically and silently discarded.
1337 :param key: Dictionary key. Normally the name of the model linked to the filter.
1338 :type key: str
1339 :param value: The Filter.
1340 :type value: ModelFilter
1341 """
1342 if not isinstance(value, ModelFilter):
1343 return
1344 super().__setitem__(key, value)
1346 def bind_all(self, models: list[type[Model]] | dict[str, type[Model]]) -> None:
1347 """
1348 Binds all filters to their models.
1350 The ``models`` list or dictionary should contain a valid model for all the ModelFilters in the registry.
1351 In the case of a dictionary, the key value should be the model name.
1353 :param models: List or dictionary of a databank of Models from which the ModelFilter can be bound.
1354 :type models: list[type(Model)] | dict[str,type(Model)]
1355 """
1356 if isinstance(models, list):
1357 models = {m.__name__: m for m in models}
1359 # check, if we have a filter for each listed models, if not create one using the default configuration.
1360 for model_name in models.keys():
1361 if model_name not in self.data:
1362 self.data[model_name] = ModelFilter.from_conf(f'{model_name}', conf={})
1364 for k, v in self.data.items():
1365 if k in self.data and k in models and not v.is_bound: 1365 ↛ 1364line 1365 didn't jump to line 1364 because the condition on line 1365 was always true
1366 v.bind(models[k])
1368 def filter_all(self, join_with: Literal['AND', 'OR'] = 'AND') -> peewee.Expression | bool:
1369 """
1370 Generates a where clause joining all filters.
1372 If a logic expression is present, it will be used to combine named filters.
1373 Otherwise, fall back to the legacy behaviour using join_with.
1375 :raise ValueError: If the parsing of the logical expression fails
1376 :param join_with: Logical function to join the filters if no logic expression is provided.
1377 :type join_with: Literal['AND', 'OR'], default: 'AND'
1378 :return: ModelFilter expression
1379 :rtype: peewee.Expression
1380 """
1381 # If a logic expression is present at the global level, use it to combine filters
1382 if self._logic:
1383 try:
1384 ast = ExprParser(self._logic).parse()
1385 except ParseError as e:
1386 raise ValueError(f'Error parsing global logic for ProcessorFilter: {e}')
1388 def eval_ast(node: ExprNode) -> peewee.Expression | bool:
1389 t = node[0]
1390 if t == 'NAME':
1391 named_node = cast(NameNode, node)
1392 nm = named_node[1]
1393 if nm not in self.data:
1394 raise KeyError(f"Unknown filter name '{nm}' in processor logic")
1395 flt = self.data[nm]
1396 if not flt.is_bound:
1397 log.warning(f"ModelFilter '{nm}' is not bound; using True for its expression")
1398 return True
1399 return flt.filter()
1400 elif t == 'NOT':
1401 not_node = cast(NotNode, node)
1402 return cast(peewee.Expression, ~eval_ast(not_node[1]))
1403 elif t == 'AND':
1404 bin_node = cast(BinaryNode, node)
1405 return cast(peewee.Expression, cast(object, eval_ast(bin_node[1]) & eval_ast(bin_node[2])))
1406 elif t == 'OR':
1407 bin_node = cast(BinaryNode, node)
1408 return cast(peewee.Expression, cast(object, eval_ast(bin_node[1]) | eval_ast(bin_node[2])))
1409 else:
1410 raise ValueError(f'Unsupported AST node {t}')
1412 try:
1413 return eval_ast(ast)
1414 except KeyError as e:
1415 raise ValueError(f'Error evaluating processor logic: {e}')
1417 # Legacy behaviour: combine all filters with join_with (AND/OR)
1418 filter_list = [flt.filter() for flt in self.data.values() if flt.is_bound]
1419 if join_with == 'AND':
1420 return cast(peewee.Expression, cast(object, reduce(operator.and_, filter_list, True)))
1421 else:
1422 return cast(peewee.Expression, cast(object, reduce(operator.or_, filter_list, True)))