Coverage for src / mafw / db / db_filter.py: 99%
424 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Database filter module for MAFW.
7This module provides classes and utilities for creating and managing database filters
8using Peewee ORM. It supports various filtering operations including simple conditions,
9logical combinations, and conditional filters where one field's criteria depend on another.
11The module implements a flexible filter system that can handle:
12 - Simple field comparisons (equality, inequality, greater/less than, etc.)
13 - Complex logical operations (AND, OR, NOT)
14 - Conditional filters with dependent criteria
15 - Nested logical expressions
16 - Support for various data types and operations
18Key components include:
19 - :class:`FilterNode`: Abstract base class for filter nodes
20 - :class:`ConditionNode`: Represents individual field conditions
21 - :class:`LogicalNode`: Combines filter nodes with logical operators
22 - :class:`ConditionalNode`: Wraps conditional filter conditions
23 - :class:`ModelFilter`: Main class for building and applying filters to models
24 - :class:`ProcessorFilter`: Container for multiple model filters in a processor
26The module uses a hierarchical approach to build filter expressions that can be converted
27into Peewee expressions for database queries. It supports both simple and complex filtering
28scenarios through a combination of direct field conditions and logical expressions.
30.. versionchanged:: v2.0.0
31 Major overhaul introducing conditional filters and logical expression support.
33Example usage::
35 from mafw.db.db_filter import ModelFilter
37 # Create a simple filter
38 flt = ModelFilter(
39 'Processor.__filter__.Model',
40 field1='value1',
41 field2={'op': 'IN', 'value': [1, 2, 3]},
42 )
44 # Bind to a model and generate query
45 flt.bind(MyModel)
46 query = MyModel.select().where(flt.filter())
48.. seealso::
50 :link:`peewee` - The underlying ORM library used for database operations
52 :class:`~.mafw.enumerators.LogicalOp` - Logical operation enumerations used in filters
53"""
55import logging
56import operator
57import re
58from collections import OrderedDict, UserDict
59from copy import copy
60from functools import reduce
61from typing import TYPE_CHECKING, Any, Dict, Literal, Self, TypeAlias, Union, cast
63import peewee
64from peewee import Model
66from mafw.db.db_model import mafw_model_register
67from mafw.enumerators import LogicalOp
69log = logging.getLogger(__name__)
72Token = tuple[str, str]
73"""Type definition for a logical expression token"""
75# 1. An atom is a tuple of the literal string 'NAME' and the value
76NameNode = tuple[Literal['NAME'], str]
77"""An atom is a tuple of the literal string 'NAME' and the value"""
79# 2. A NOT node is a tuple of 'NOT' and a recursive node
80# We use a string forward reference 'ExprNode' because it is defined below
81NotNode = tuple[Literal['NOT'], 'ExprNode']
82"""A NOT node is a tuple of 'NOT' and a recursive node"""
84# 3. AND/OR nodes are tuples of the operator and two recursive nodes
85BinaryNode = tuple[Literal['AND', 'OR'], 'ExprNode', 'ExprNode']
86"""AND/OR nodes are tuples of the operator and two recursive nodes"""
88# 4. The main recursive type combining all options
89ExprNode: TypeAlias = Union[NameNode, NotNode, BinaryNode]
90"""
91The main recursive type combining all options
93This type represents the abstract syntax tree (AST) nodes used in logical expressions.
94It can be one of:
96 - :data:`NameNode`: A named element (field name or filter name)
97 - :data:`NotNode`: A negation operation
98 - :data:`BinaryNode`: An AND/OR operation between two nodes
99"""
101TOKEN_SPECIFICATION = [
102 ('LPAREN', r'\('),
103 ('RPAREN', r'\)'),
104 ('AND', r'\bAND\b'),
105 ('OR', r'\bOR\b'),
106 ('NOT', r'\bNOT\b'),
107 ('NAME', r'[A-Za-z_][A-Za-z0-9_\.]*(?:\:[A-Za-z_][A-Za-z0-9_]*)?'),
108 ('SKIP', r'[ \t\n\r]+'),
109 ('MISMATCH', r'.'),
110]
111"""Token specifications"""
113MASTER_RE = re.compile('|'.join(f'(?P<{name}>{pattern})' for name, pattern in TOKEN_SPECIFICATION))
114"""Compiled regular expression to interpret the logical expression grammar"""
117class ParseError(ValueError):
118 """
119 Exception raised when parsing a logical expression fails.
121 This exception is raised when the parser encounters invalid syntax
122 in a logical expression string.
123 """
125 pass
128def tokenize(text: str) -> list[Token]:
129 """
130 Tokenize a logical expression string into a list of tokens.
132 This function breaks down a logical expression string into individual
133 tokens based on the defined token specifications. It skips whitespace
134 and raises a :exc:`ParseError` for unexpected characters.
136 :param text: The logical expression string to tokenize
137 :type text: str
138 :return: A list of tokens represented as (token_type, token_value) tuples
139 :rtype: list[:data:`Token`]
140 :raises ParseError: If an unexpected character is encountered in the text
141 """
142 tokens: list[Token] = []
143 for mo in MASTER_RE.finditer(text):
144 kind = mo.lastgroup
145 value = mo.group()
146 if kind == 'SKIP':
147 continue
148 elif kind == 'MISMATCH':
149 raise ParseError(f'Unexpected character {value!r}')
150 else:
151 if TYPE_CHECKING:
152 assert kind is not None
153 tokens.append((kind, value))
154 return tokens
157class ExprParser:
158 """
159 Recursive descent parser producing a simple Abstract Syntax Tree (AST).
161 The parser handles logical expressions with the following grammar:
163 .. code-block:: none
165 expr := or_expr
166 or_expr := and_expr ("OR" and_expr)*
167 and_expr:= not_expr ("AND" not_expr)*
168 not_expr:= "NOT" not_expr | atom
169 atom := NAME | "(" expr ")"
171 AST nodes are tuples representing different constructs:
173 - ("NAME", "token"): A named element (field name or filter name)
174 - ("NOT", node): A negation operation
175 - ("AND", left, right): An AND operation between two nodes
176 - ("OR", left, right): An OR operation between two nodes
178 .. versionadded:: v2.0.0
179 """
181 def __init__(self, text: str) -> None:
182 """
183 Initialize the expression parser with a logical expression string.
185 :param text: The logical expression to parse
186 :type text: str
187 """
188 self.tokens = tokenize(text)
189 self.pos = 0
191 def peek(self) -> Token | None:
192 """
193 Peek at the next token without consuming it.
195 :return: The next token if available, otherwise None
196 :rtype: :data:`Token` | None
197 """
198 if self.pos < len(self.tokens):
199 return self.tokens[self.pos]
200 return None
202 def accept(self, *kinds: str) -> Token | None:
203 """
204 Accept and consume the next token if it matches one of the given types.
206 :param kinds: Token types to accept
207 :type kinds: str
208 :return: The consumed token if matched, otherwise None
209 :rtype: :data:`Token` | None
210 """
211 tok = self.peek()
212 if tok and tok[0] in kinds:
213 self.pos += 1
214 return tok
215 return None
217 def expect(self, kind: str) -> 'Token':
218 """
219 Expect and consume a specific token type.
221 :param kind: The expected token type
222 :type kind: str
223 :return: The consumed token
224 :rtype: :data:`Token`
225 :raises ParseError: If the expected token is not found
226 """
227 tok = self.accept(kind)
228 if not tok:
229 raise ParseError(f'Expected {kind} at position {self.pos}')
230 return tok
232 def parse(self) -> 'ExprNode':
233 """
234 Parse the entire logical expression and return the resulting AST.
236 :return: The abstract syntax tree representation of the expression
237 :rtype: :data:`ExprNode`
238 :raises ParseError: If the expression is malformed
239 """
240 node = self.parse_or()
241 if self.pos != len(self.tokens):
242 raise ParseError('Unexpected token after end of expression')
243 return node
245 def parse_or(self) -> 'ExprNode':
246 """
247 Parse an OR expression.
249 :return: The parsed OR expression AST node
250 :rtype: :data:`ExprNode`
251 """
252 left = self.parse_and()
253 while self.accept('OR'):
254 right = self.parse_and()
255 left = ('OR', left, right)
256 return left
258 def parse_and(self) -> 'ExprNode':
259 """
260 Parse an AND expression.
262 :return: The parsed AND expression AST node
263 :rtype: :data:`ExprNode`
264 """
265 left = self.parse_not()
266 while self.accept('AND'):
267 right = self.parse_not()
268 left = ('AND', left, right)
269 return left
271 def parse_not(self) -> 'ExprNode':
272 """
273 Parse a NOT expression.
275 :return: The parsed NOT expression AST node
276 :rtype: :data:`ExprNode`
277 """
278 if self.accept('NOT'):
279 node = self.parse_not()
280 return 'NOT', node
281 return self.parse_atom()
283 def parse_atom(self) -> 'ExprNode':
284 """
285 Parse an atomic expression (NAME or parenthesised expression).
287 :return: The parsed atomic expression AST node
288 :rtype: :data:`ExprNode`
289 :raises ParseError: If an unexpected token is encountered
290 """
291 tok = self.peek()
292 if not tok:
293 raise ParseError('Unexpected end of expression')
294 if tok[0] == 'LPAREN':
295 self.accept('LPAREN')
296 node = self.parse_or()
297 self.expect('RPAREN')
298 return node
299 elif tok[0] == 'NAME':
300 self.accept('NAME')
301 return 'NAME', tok[1]
302 else:
303 raise ParseError(f'Unexpected token {tok} at position {self.pos}')
306class FilterNode:
307 """Abstract base for nodes."""
309 def to_expression(self, model: type[Model]) -> peewee.Expression | bool:
310 raise NotImplementedError # pragma: no cover
313class ConditionNode(FilterNode):
314 """
315 Represents a single condition node in a filter expression.
317 This class encapsulates a single filtering condition that can be applied
318 to a model field. It supports various logical operations through the
319 :class:`.LogicalOp` enumerator or string representations of operations.
321 .. versionadded:: v2.0.0
322 """
324 def __init__(self, field: str | None, operation: LogicalOp | str, value: Any, name: str | None = None):
325 """
326 Initialize a condition node.
328 :param field: The name of the field to apply the condition to.
329 :type field: str | None
330 :param operation: The logical operation to perform.
331 :type operation: LogicalOp | str
332 :param value: The value to compare against.
333 :type value: Any
334 :param name: Optional name for this condition node.
335 :type name: str | None, Optional
336 """
337 self.field = field # may be None for some special nodes
338 if isinstance(operation, str):
339 try:
340 self.operation = LogicalOp(operation)
341 except ValueError:
342 raise ValueError(f'Unsupported operation: {operation}')
343 else:
344 self.operation = operation
345 self.value = value
346 self.name = name
348 def to_expression(self, model: type[Model]) -> peewee.Expression:
349 """
350 Convert this condition node to a Peewee expression.
352 This method translates the condition represented by this node into
353 a Peewee expression that can be used in database queries.
355 :param model: The model class containing the field to filter.
356 :type model: type[Model]
357 :return: A Peewee expression representing this condition.
358 :rtype: peewee.Expression
359 :raises RuntimeError: If the node has no field to evaluate.
360 :raises ValueError: If an unsupported operation is specified.
361 :raises TypeError: If operation requirements are not met (e.g., IN operation requires list/tuple).
362 """
363 if self.field is None:
364 # Should not happen for standard ConditionNode
365 raise RuntimeError('ConditionNode has no field to evaluate')
366 model_field = getattr(model, self.field)
367 op = self.operation
368 val = self.value
369 # the code is full of cast and redundant checks to make mypy happy.
370 # I do not know to which extent they make the code safer, but for sure they make it less readable.
371 if op == LogicalOp.EQ:
372 return cast(peewee.Expression, cast(object, model_field == val))
373 elif op == LogicalOp.NE:
374 return cast(peewee.Expression, cast(object, model_field != val))
375 elif op == LogicalOp.LT:
376 return cast(peewee.Expression, cast(object, model_field < val))
377 elif op == LogicalOp.LE:
378 return cast(peewee.Expression, cast(object, model_field <= val))
379 elif op == LogicalOp.GT:
380 return cast(peewee.Expression, cast(object, model_field > val))
381 elif op == LogicalOp.GE:
382 return cast(peewee.Expression, cast(object, model_field >= val))
383 elif op == LogicalOp.GLOB:
384 return cast(peewee.Expression, model_field % val)
385 elif op == LogicalOp.LIKE:
386 return cast(peewee.Expression, model_field**val)
387 elif op == LogicalOp.REGEXP:
388 if hasattr(model_field, 'regexp') and callable(getattr(model_field, 'regexp')):
389 return cast(peewee.Expression, getattr(model_field, 'regexp')(val))
390 else:
391 raise ValueError(f'REGEXP operation not supported for field type {type(model_field)}')
392 elif op == LogicalOp.IN:
393 if not isinstance(val, (list, tuple)):
394 raise TypeError(f'IN operation requires list/tuple, got {type(val)}')
395 if hasattr(model_field, 'in_') and callable(getattr(model_field, 'in_')):
396 return cast(peewee.Expression, getattr(model_field, 'in_')(val))
397 else:
398 raise ValueError(f'IN operation not supported for field type {type(model_field)}')
399 elif op == LogicalOp.NOT_IN:
400 if not isinstance(val, (list, tuple)):
401 raise TypeError(f'NOT_IN operation requires list/tuple, got {type(val)}')
402 if hasattr(model_field, 'not_in') and callable(getattr(model_field, 'not_in')):
403 return cast(peewee.Expression, getattr(model_field, 'not_in')(val))
404 else:
405 raise ValueError(f'NOT_IN operation not supported for field type {type(model_field)}')
406 elif op == LogicalOp.BETWEEN:
407 if not isinstance(val, (list, tuple)) or len(val) != 2:
408 raise TypeError(f'BETWEEN operation requires list/tuple of 2 elements, got {val}')
409 if hasattr(model_field, 'between') and callable(getattr(model_field, 'between')):
410 return cast(peewee.Expression, getattr(model_field, 'between')(val[0], val[1]))
411 else:
412 raise ValueError(f'BETWEEN operation not supported for field type {type(model_field)}')
413 elif op == LogicalOp.BIT_AND:
414 if hasattr(model_field, 'bin_and') and callable(getattr(model_field, 'bin_and')):
415 return cast(peewee.Expression, cast(object, getattr(model_field, 'bin_and')(val) != 0))
416 else:
417 raise ValueError(f'BIT_AND operation not supported for field type {type(model_field)}')
418 elif op == LogicalOp.BIT_OR:
419 if hasattr(model_field, 'bin_or') and callable(getattr(model_field, 'bin_or')):
420 return cast(peewee.Expression, cast(object, getattr(model_field, 'bin_or')(val) != 0))
421 else:
422 raise ValueError(f'BIT_OR operation not supported for field type {type(model_field)}')
423 elif op == LogicalOp.IS_NULL:
424 return cast(peewee.Expression, model_field.is_null())
425 elif op == LogicalOp.IS_NOT_NULL:
426 return cast(peewee.Expression, model_field.is_null(False))
427 else:
428 raise ValueError(f'Unsupported operation: {op}')
431class ConditionalNode(FilterNode):
432 """
433 Wraps :class:`ConditionalFilterCondition` behaviour as a :class:`FilterNode`.
435 This class serves as an adapter to integrate conditional filter conditions
436 into the filter node hierarchy, allowing them to be treated uniformly with
437 other filter nodes during expression evaluation.
439 .. versionadded:: v2.0.0
440 """
442 def __init__(self, conditional: 'ConditionalFilterCondition', name: str | None = None):
443 """
444 Initialize a conditional node.
446 :param conditional: The conditional filter condition to wrap
447 :type conditional: ConditionalFilterCondition
448 :param name: Optional name for this conditional node
449 :type name: str | None, Optional
450 """
451 self.conditional = conditional
452 self.name = name
454 def to_expression(self, model: type[Model]) -> peewee.Expression:
455 """
456 Convert this conditional node to a Peewee expression.
458 This method delegates the conversion to the wrapped conditional filter
459 condition's :meth:`to_expression` method.
461 :param model: The model class to generate the expression for
462 :type model: type[Model]
463 :return: A Peewee expression representing this conditional node
464 :rtype: peewee.Expression
465 """
466 return self.conditional.to_expression(model)
469class LogicalNode(FilterNode):
470 """
471 Logical combination of child nodes.
473 This class represents logical operations (AND, OR, NOT) applied to filter nodes.
474 It enables building complex filter expressions by combining simpler filter nodes
475 with logical operators.
477 .. versionadded:: v2.0.0
478 """
480 def __init__(self, op: str, *children: FilterNode):
481 """
482 Initialize a logical node.
484 :param op: The logical operation ('AND', 'OR', 'NOT')
485 :type op: str
486 :param children: Child filter nodes to combine with the logical operation
487 :type children: FilterNode
488 """
489 self.op = op # 'AND', 'OR', 'NOT'
490 self.children = list(children)
492 def to_expression(self, model: type[Model]) -> peewee.Expression | bool:
493 """
494 Convert this logical node to a Peewee expression.
496 This method evaluates the logical operation on the child nodes and returns
497 the corresponding Peewee expression.
499 :param model: The model class to generate the expression for
500 :type model: type[Model]
501 :return: A Peewee expression representing this logical node
502 :rtype: peewee.Expression | bool
503 :raises ValueError: If an unknown logical operation is specified
504 """
505 if self.op == 'NOT':
506 assert len(self.children) == 1
507 inner = self.children[0].to_expression(model)
508 return cast(peewee.Expression, ~inner)
509 elif self.op == 'AND':
510 expressions = [c.to_expression(model) for c in self.children]
511 return cast(peewee.Expression, reduce(operator.and_, expressions))
512 elif self.op == 'OR':
513 expressions = [c.to_expression(model) for c in self.children]
514 return cast(peewee.Expression, reduce(operator.or_, expressions))
515 else:
516 raise ValueError(f'Unknown logical op: {self.op}')
519class ConditionalFilterCondition:
520 """
521 Represents a conditional filter where one field's criteria depends on another.
523 This allows expressing logic like:
524 "IF field_a IN [x, y] THEN field_b IN [1, 2] ELSE no constraint on field_b"
526 Example usage:
528 .. code-block:: python
530 # Filter: sample_id in [1,2] if composite_image_id in [100,101]
531 condition = ConditionalFilterCondition(
532 condition_field='composite_image_id',
533 condition_op='IN',
534 condition_value=[100, 101],
535 then_field='sample_id',
536 then_op='IN',
537 then_value=[1, 2],
538 )
540 # This generates:
541 # WHERE (composite_image_id IN (100, 101) AND sample_id IN (1, 2))
542 # OR (composite_image_id NOT IN (100, 101))
543 """
545 def __init__(
546 self,
547 condition_field: str,
548 condition_op: str | LogicalOp,
549 condition_value: Any,
550 then_field: str,
551 then_op: str | LogicalOp,
552 then_value: Any,
553 else_field: str | None = None,
554 else_op: str | LogicalOp | None = None,
555 else_value: Any | None = None,
556 name: str | None = None,
557 ) -> None:
558 """
559 Initialise a conditional filter condition.
561 :param condition_field: The field to check for the condition
562 :type condition_field: str
563 :param condition_op: The operation for the condition (e.g., 'IN', '==')
564 :type condition_op: str | LogicalOp
565 :param condition_value: The value(s) for the condition
566 :type condition_value: Any
567 :param then_field: The field to filter when condition is true
568 :type then_field: str
569 :param then_op: The operation to apply when condition is true
570 :type then_op: str | LogicalOp
571 :param then_value: The value(s) for the then clause
572 :type then_value: Any
573 :param else_field: Optional field to filter when condition is false
574 :type else_field: str | None
575 :param else_op: Optional operation when condition is false
576 :type else_op: str | LogicalOp | None
577 :param else_value: Optional value(s) for the else clause
578 :type else_value: Any | None
579 :param name: The name of this condition. Avoid name clashing with model fields. Defaults to None
580 :type name: str | None, Optional
581 """
582 self.condition_field = condition_field
583 self.condition_op = condition_op
584 self.condition_value = condition_value
585 self.then_field = then_field
586 self.then_op = then_op
587 self.then_value = then_value
588 self.else_field = else_field
589 self.else_op = else_op
590 self.else_value = else_value
591 self.name = name
593 def to_expression(self, model: type[Model]) -> peewee.Expression:
594 """
595 Convert this conditional filter to a Peewee expression.
597 The resulting expression is:
598 (condition AND then_constraint) OR (NOT condition AND else_constraint)
600 Which logically means:
602 - When condition is true, apply then_constraint
603 - When condition is false, apply else_constraint (or no constraint)
605 :param model: The model class containing the fields
606 :type model: type[Model]
607 :return: A Peewee expression
608 :rtype: peewee.Expression
609 """
610 # Build the condition expression
611 condition_expr = ConditionNode(self.condition_field, self.condition_op, self.condition_value).to_expression(
612 model
613 )
615 # Build the then expression
616 then_expr = ConditionNode(self.then_field, self.then_op, self.then_value).to_expression(model)
618 # Build the else expression
619 if self.else_field is not None and self.else_op is not None:
620 else_expr = ConditionNode(self.else_field, self.else_op, self.else_value).to_expression(model)
621 else:
622 # No constraint in else clause - always true
623 # the nested cast is needed to make mypy happy.
624 else_expr = cast(peewee.Expression, cast(object, True))
626 # Combine: (condition AND then) OR (NOT condition AND else)
627 return cast(peewee.Expression, (condition_expr & then_expr) | (~condition_expr & else_expr))
629 def __eq__(self, other: Any) -> bool:
630 if not isinstance(other, ConditionalFilterCondition):
631 return False
633 return vars(self) == vars(other)
636class ModelFilter:
637 r"""
638 Class to filter rows from a model.
640 The filter object can be used to generate a where clause to be applied to Model.select().
642 The construction of a ModelFilter is normally done via a configuration file using the :meth:`from_conf` class method.
643 The name of the filter is playing a key role in this. If it follows a dot structure like:
645 *ProcessorName.__filter__.ModelName*
647 then the corresponding table from the TOML configuration object will be used.
649 For each processor, there might be many Filters, up to one for each Model used to get the input list. If a
650 processor is joining together three Models when performing the input select, there will be up to three Filters
651 collaborating on making the selection.
653 The filter configuration can contain the following key, value pair:
655 - key / string pairs, where the key is the name of a field in the corresponding Model
657 - key / numeric pairs
659 - key / arrays
661 - key / dict pairs with 'op' and 'value' keys for explicit operation specification
663 All fields from the configuration file will be added to the instance namespace, thus accessible with the dot
664 notation. Moreover, the field names and their filter value will be added to a private dictionary to simplify the
665 generation of the filter SQL code.
667 The user can use the filter object to store selection criteria. He can construct queries using the filter
668 contents in the same way as he could use processor parameters.
670 If he wants to automatically generate valid filtering expression, he can use the :meth:`filter` method. In order
671 for this to work, the ModelFilter object be :meth:`bound <bind>` to a Model. Without this binding the ModelFilter will not
672 be able to automatically generate expressions.
674 For each field in the filter, one condition will be generated according to the following scheme:
676 ================= ================= ==================
677 Filter field type Logical operation Example
678 ================= ================= ==================
679 Numeric, boolean == Field == 3.14
680 String GLOB Field GLOB '\*ree'
681 List IN Field IN [1, 2, 3]
682 Dict (explicit) op from dict Field BIT_AND 5
683 ================= ================= ==================
685 All conditions will be joined with a AND logic by default, but this can be changed.
687 The ModelFilter also supports logical expressions to combine multiple filter conditions using AND, OR, and NOT
688 operators. These expressions can reference named filter conditions within the same filter or even combine
689 conditions from different filters when used with :class:`ProcessorFilter`.
691 Conditional filters allow expressing logic like:
692 "IF field_a IN [x, y] THEN field_b IN [1, 2] ELSE no constraint on field_b"
694 Consider the following example:
696 .. code-block:: python
697 :linenos:
699 class MeasModel(MAFwBaseModel):
700 meas_id = AutoField(primary_key=True)
701 sample_name = TextField()
702 successful = BooleanField()
703 flags = IntegerField()
704 composite_image_id = IntegerField()
705 sample_id = IntegerField()
708 # Traditional simplified usage
709 flt = ModelFilter(
710 'MyProcessor.__filter__.MyModel',
711 sample_name='sample_00*',
712 meas_id=[1, 2, 3],
713 successful=True,
714 )
716 # New explicit operation usage
717 flt = ModelFilter(
718 'MyProcessor.__filter__.MyModel',
719 sample_name={'op': 'LIKE', 'value': 'sample_00%'},
720 flags={'op': 'BIT_AND', 'value': 5},
721 meas_id={'op': 'IN', 'value': [1, 2, 3]},
722 )
724 # Logical expression usage
725 flt = ModelFilter(
726 'MyProcessor.__filter__.MyModel',
727 sample_name={'op': 'LIKE', 'value': 'sample_00%'},
728 flags={'op': 'BIT_AND', 'value': 5},
729 meas_id={'op': 'IN', 'value': [1, 2, 3]},
730 __logic__='sample_name AND (flags OR meas_id)',
731 )
733 # Conditional filter usage
734 flt = ModelFilter(
735 'MyProcessor.__filter__.MyModel',
736 sample_name='sample_00*',
737 composite_image_id=[100, 101],
738 sample_id=[1, 2],
739 __conditional__=[
740 {
741 'condition_field': 'composite_image_id',
742 'condition_op': 'IN',
743 'condition_value': [100, 101],
744 'then_field': 'sample_id',
745 'then_op': 'IN',
746 'then_value': [1, 2],
747 }
748 ],
749 )
751 flt.bind(MeasModel)
752 filtered_query = MeasModel.select().where(flt.filter())
754 The explicit operation format allows for bitwise operations and other advanced filtering.
756 TOML Configuration Examples:
758 .. code-block:: toml
760 [MyProcessor.__filter__.MyModel]
761 sample_name = "sample_00*" # Traditional GLOB
762 successful = true # Traditional equality
764 # Explicit operations
765 flags = { op = "BIT_AND", value = 5 }
766 score = { op = ">=", value = 75.0 }
767 category = { op = "IN", value = ["A", "B", "C"] }
768 date_range = { op = "BETWEEN", value = ["2024-01-01", "2024-12-31"] }
770 # Logical expression for combining conditions
771 __logic__ = "sample_name AND (successful OR flags)"
773 # Conditional filters
774 [[MyProcessor.__filter__.MyModel.__conditional__]]
775 condition_field = "composite_image_id"
776 condition_op = "IN"
777 condition_value = [100, 101]
778 then_field = "sample_id"
779 then_op = "IN"
780 then_value = [1, 2]
782 # Nested conditions with logical expressions
783 [MyProcessor.__filter__.MyModel.nested_conditions]
784 __logic__ = "a OR b"
785 a = { op = "LIKE", value = "test%" }
786 b = { op = "IN", value = [1, 2, 3] }
788 .. seealso::
790 - :class:`mafw.db.db_filter.ProcessorFilter` - For combining multiple ModelFilters with logical expressions
791 - :class:`mafw.db.db_filter.ConditionalFilterCondition` - For conditional filtering logic
792 - :class:`mafw.db.db_filter.ExprParser` - For parsing logical expressions
793 """
795 logic_name = '__logic__'
796 """
797 The logic keyword identifier.
799 This value cannot be used as field name in the filter bound model.
800 """
801 conditional_name = '__conditional__'
802 """
803 The conditional keyword identifier.
805 This value cannot be used as field name in the filter bound model.
806 """
808 def __init__(self, name_: str, **kwargs: Any) -> None:
809 """
810 Constructor parameters:
812 :param `name_`: The name of the filter. It should be in dotted format to facilitate the configuration via the
813 steering file. The _ is used to allow the user to have a keyword argument named name.
814 :type `name_`: str
815 :param kwargs: Keyword parameters corresponding to fields and filter values.
817 .. versionchanged:: v1.2.0
818 The parameter *name* has been renamed as *name_*.
820 .. versionchanged:: v1.3.0
821 Implementation of explicit operation.
823 .. versionchanged:: v2.0.0
824 Introduction of conditional filters, logical expression and hierarchical structure.
825 Introduction of autobinding for MAFwBaseModels
827 """
828 self.name = name_
829 self.model_name = name_.split('.')[-1]
830 self.model: type[Model] | None = None
831 self._model_bound = False
833 # attempt to autobind
834 self._auto_bind()
836 # mapping name -> FilterNode
837 self._nodes: 'OrderedDict[str, FilterNode]' = OrderedDict()
838 # conditional nodes mapping (named)
839 self._cond_nodes: 'OrderedDict[str, ConditionalNode]' = OrderedDict()
840 # logic expression for this filter (combining top-level node names)
841 self._logic_expr: str | None = None
843 # Extract conditional filters if present
844 if self.conditional_name in kwargs:
845 conditionals = kwargs.pop(self.conditional_name)
846 if not isinstance(conditionals, list):
847 conditionals = [conditionals]
849 for cond_dict in conditionals:
850 self.add_conditional_from_dict(cond_dict)
852 # Extract logic for internal conditions, if provided
853 if self.logic_name in kwargs:
854 self._logic_expr = kwargs.pop(self.logic_name)
856 # now process remaining kwargs as either:
857 # - simple/extended condition for a field
858 # - or a nested mapping describing subconditions for field (field-level logic)
859 for k, v in kwargs.items():
860 # simple types map to ConditionNode
861 if isinstance(v, dict) and ('op' in v and 'value' in v):
862 # explicit op/value for field k
863 # extended operation condition
864 node = ConditionNode(k, v['op'], v['value'], name=k)
865 self._nodes[k] = node
866 elif isinstance(v, dict) and any(
867 isinstance(x, dict) or x == self.logic_name or x not in ['op', 'value']
868 for x in v.keys()
869 if isinstance(v, dict)
870 ):
871 # nested mapping: create sub-nodes for this field
872 # v expected like {'__logic__': 'a OR b', 'a': {'op':..., 'value':...}, 'b': ...}
873 subnodes: 'OrderedDict[str, FilterNode]' = OrderedDict()
874 sub_logic = v.get(self.logic_name, None)
875 for subk, subv in v.items():
876 if subk == self.logic_name:
877 continue
878 if isinstance(subv, dict) and ('op' in subv and 'value' in subv):
879 subnode = ConditionNode(k, subv['op'], subv['value'], name=subk)
880 subnodes[subk] = subnode
881 else:
882 subnodes[subk] = self._create_condition_node_from_value(subv, k, subk)
883 # combine subnodes using sub_logic or AND by default
884 if sub_logic:
885 ast = ExprParser(sub_logic).parse()
886 ln = self._build_logical_node_from_ast(ast, subnodes, model_name_placeholder=k)
887 else:
888 # AND all subnodes
889 ln = LogicalNode('AND', *subnodes.values())
890 self._nodes[k] = ln
891 else:
892 self._nodes[k] = self._create_condition_node_from_value(v, k, k)
894 def _auto_bind(self) -> None:
895 try:
896 model = mafw_model_register.get_model(self.model_name)
897 self.bind(model) # type: ignore[arg-type]
898 except KeyError:
899 log.warning(f'Impossible to perform auto-binding for model {self.model_name}')
901 def _build_logical_node_from_ast(
902 self, ast: ExprNode, name_to_nodes: Dict[str, FilterNode], model_name_placeholder: str | None = None
903 ) -> FilterNode:
904 """Recursively build LogicalNode from AST using a mapping name->FilterNode."""
905 t = ast[0]
906 if t == 'NAME':
907 named_ast = cast(NameNode, ast)
908 nm = named_ast[1]
909 if nm not in name_to_nodes:
910 raise KeyError(f'Unknown name {nm} in nested logic for field {model_name_placeholder}')
911 return name_to_nodes[nm]
912 elif t == 'NOT':
913 not_ast = cast(NotNode, ast)
914 child = self._build_logical_node_from_ast(not_ast[1], name_to_nodes, model_name_placeholder)
915 return LogicalNode('NOT', child)
916 elif t in ('AND', 'OR'):
917 bin_ast = cast(BinaryNode, ast)
918 left = self._build_logical_node_from_ast(bin_ast[1], name_to_nodes, model_name_placeholder)
919 right = self._build_logical_node_from_ast(bin_ast[2], name_to_nodes, model_name_placeholder)
920 return LogicalNode(t, left, right)
921 else:
922 raise ValueError(f'Unsupported AST node {t}')
924 @staticmethod
925 def _create_condition_node_from_value(value: Any, field_name: str, node_name: str | None = None) -> ConditionNode:
926 """
927 Create a FilterCondition based on value type (backward compatibility).
929 :param value: The filter value
930 :param field_name: The field name
931 :return: A FilterCondition
932 """
933 if isinstance(value, (int, float, bool)):
934 return ConditionNode(field_name, LogicalOp.EQ, value, node_name)
935 elif isinstance(value, str):
936 return ConditionNode(field_name, LogicalOp.GLOB, value, node_name)
937 elif isinstance(value, list):
938 return ConditionNode(field_name, LogicalOp.IN, value, node_name)
939 else:
940 raise TypeError(f'ModelFilter value of unsupported type {type(value)} for field {field_name}.')
942 def bind(self, model: type[Model]) -> None:
943 """
944 Connects a filter to a Model class.
946 :param model: Model to be bound.
947 :type model: Model
948 """
950 self.model = model
951 self._model_bound = True
953 if hasattr(self.model, self.logic_name) and self._model_bound:
954 if TYPE_CHECKING:
955 assert self.model is not None
957 log.warning(
958 f'Model {self.model.__name__} has a field named {self.logic_name}. This is '
959 f'preventing the logic expression to work.'
960 )
961 log.warning('Modify your model. Logic expression disabled.')
962 self._logic_expr = None
964 @property
965 def is_bound(self) -> bool:
966 """Returns true if the ModelFilter has been bound to a Model"""
967 return self._model_bound
969 def add_conditional(self, conditional: ConditionalFilterCondition) -> None:
970 """
971 Add a conditional filter.
973 .. versionadded:: v2.0.0
975 :param conditional: The conditional filter condition
976 :type conditional: ConditionalFilterCondition
977 """
978 condition_name = conditional.name
979 if condition_name is None:
980 # it means that the user did not specify any name for this condition.
981 # we will then assign one
982 increment = 0
983 while True:
984 condition_name = f'__cond{increment + len(self._cond_nodes)}__'
985 if condition_name not in self._cond_nodes:
986 break
987 else:
988 increment += 1
989 else:
990 # the user specified a name for this condition. we will use it but first we check if it is not yet used
991 if condition_name in self._cond_nodes:
992 raise KeyError(
993 f'A conditional filter named {condition_name} already exists. Please review your steering file.'
994 )
996 node = ConditionalNode(conditional, name=condition_name)
997 self._cond_nodes[condition_name] = node
998 self._nodes[condition_name] = node
1000 def add_conditional_from_dict(self, config: dict[str, Any]) -> None:
1001 """
1002 Add a conditional filter from a configuration dictionary.
1004 .. versionadded:: v2.0.0
1006 :param config: Dictionary with conditional filter configuration
1007 :type config: dict[str, Any]
1008 """
1009 conditional = ConditionalFilterCondition(
1010 condition_field=config['condition_field'],
1011 condition_op=config['condition_op'],
1012 condition_value=config['condition_value'],
1013 then_field=config['then_field'],
1014 then_op=config['then_op'],
1015 then_value=config['then_value'],
1016 else_field=config.get('else_field'),
1017 else_op=config.get('else_op'),
1018 else_value=config.get('else_value'),
1019 name=config.get('name'),
1020 )
1021 self.add_conditional(conditional)
1023 @classmethod
1024 def from_conf(cls, name: str, conf: dict[str, Any]) -> Self:
1025 """
1026 Builds a Filter object from a steering file dictionary.
1028 If the name is in dotted notation, then this should be corresponding to the table in the configuration file.
1029 If a default configuration is provided, this will be used as a starting point for the filter, and it will be
1030 updated by the actual configuration in ``conf``.
1032 In normal use, you would provide the specific configuration via the conf parameter.
1034 See details in the :class:`class documentation <ModelFilter>`
1036 :param name: The name of the filter in dotted notation.
1037 :type name: str
1038 :param conf: The configuration dictionary.
1039 :type conf: dict
1040 :return: A Filter object
1041 :rtype: ModelFilter
1042 """
1043 param = {}
1045 # split the name from dotted notation
1046 # ProcessorName#123.ModelName.Filter
1047 # the processor name is actually the processor replica name
1048 names = name.split('.')
1049 if len(names) == 3 and names[1] == '__filter__':
1050 proc_name, _, model_name = names
1051 if proc_name in conf and '__filter__' in conf[proc_name] and model_name in conf[proc_name]['__filter__']:
1052 param.update(copy(conf[proc_name]['__filter__'][model_name]))
1054 # if the name is not in the expected dotted notation, the use an empty filter.
1055 return cls(name, **param)
1057 def _evaluate_logic_ast(self, ast: ExprNode) -> peewee.Expression | bool:
1058 """
1059 Evaluate an abstract syntax tree (AST) representing a logical expression.
1061 This method recursively evaluates the AST nodes to produce a Peewee expression
1062 or boolean value representing the logical combination of filter conditions.
1064 :param ast: The abstract syntax tree node to evaluate
1065 :type ast: Any
1066 :return: A Peewee expression for logical operations or boolean True/False
1067 :rtype: peewee.Expression | bool
1068 :raises KeyError: If a referenced condition name is not found in the filter
1069 :raises ValueError: If an unsupported AST node type is encountered
1070 """
1071 t = ast[0]
1072 if t == 'NAME':
1073 named_ast = cast(NameNode, ast)
1074 nm = named_ast[1]
1075 if nm not in self._nodes:
1076 raise KeyError(f"Unknown node '{nm}' in logic for filter {self.name}")
1077 node = self._nodes[nm]
1079 if TYPE_CHECKING:
1080 assert self.model is not None
1081 return node.to_expression(self.model)
1082 elif t == 'NOT':
1083 not_ast = cast(NotNode, ast)
1084 val = self._evaluate_logic_ast(not_ast[1])
1085 return cast(peewee.Expression, ~val)
1086 elif t == 'AND':
1087 bin_ast = cast(BinaryNode, ast)
1088 left = self._evaluate_logic_ast(bin_ast[1])
1089 right = self._evaluate_logic_ast(bin_ast[2])
1090 return cast(peewee.Expression, cast(object, left & right))
1091 elif t == 'OR':
1092 bin_ast = cast(BinaryNode, ast)
1093 left = self._evaluate_logic_ast(bin_ast[1])
1094 right = self._evaluate_logic_ast(bin_ast[2])
1095 return cast(peewee.Expression, cast(object, left | right))
1096 else:
1097 raise ValueError(f'Unsupported AST node {t}')
1099 def filter(self, join_with: Literal['AND', 'OR'] = 'AND') -> peewee.Expression | bool:
1100 """
1101 Generates a filtering expression joining all filtering fields.
1103 See details in the :class:`class documentation <ModelFilter>`
1105 .. versionchanged:: v1.3.0
1106 Add the possibility to specify a `join_with` function
1108 .. versionchanged:: v2.0.0
1109 Add support for conditional filters and for logical expression
1111 :param join_with: How to join conditions ('AND' or 'OR'). Defaults to 'AND'.
1112 :type join_with: Literal['AND', 'OR'], default 'AND'
1113 :return: The filtering expression.
1114 :rtype: peewee.Expression | bool
1115 :raises TypeError: when the field value type is not supported.
1116 :raises ValueError: when join_with is not 'AND' or 'OR'.
1117 """
1118 if not self.is_bound:
1119 log.warning('Unable to generate the filter. Did you bind the filter to the model?')
1120 return True
1122 if TYPE_CHECKING:
1123 # if we get here, it means that we have a valid model
1124 assert self.model is not None
1126 # if logic provided for this filter, use it
1127 if self._logic_expr:
1128 try:
1129 ast = ExprParser(self._logic_expr).parse()
1130 except ParseError as e:
1131 raise ValueError(f'Error parsing logic for filter {self.name}: {e}')
1132 try:
1133 return self._evaluate_logic_ast(ast)
1134 except KeyError as e:
1135 raise ValueError(f'Error evaluating logic for filter {self.name}: {e}')
1137 # otherwise combine all top-level nodes (AND by default)
1138 exprs = [n.to_expression(self.model) for n in self._nodes.values()]
1139 if not exprs:
1140 return True
1141 if join_with not in ('AND', 'OR'):
1142 raise ValueError("join_with must be 'AND' or 'OR'")
1143 if join_with == 'AND':
1144 return cast(peewee.Expression, reduce(operator.and_, exprs))
1145 return cast(peewee.Expression, reduce(operator.or_, exprs))
1148class ProcessorFilter(UserDict[str, ModelFilter]):
1149 """
1150 A special dictionary to store all :class:`Filters <mafw.db.db_filter.ModelFilter>` in a processors.
1152 It contains a publicly accessible dictionary with the configuration of each ModelFilter using the Model name as
1153 keyword.
1155 It contains a private dictionary with the global filter configuration as well.
1156 The global filter is not directly accessible, but only some of its members will be exposed via properties.
1157 In particular, the new_only flag that is relevant only at the Processor level can be accessed directly using the
1158 :attr:`new_only`. If not specified in the configuration file, the new_only is by default True.
1160 It is possible to assign a logic operation string to the register that is used to join all the filters together
1161 when performing the :meth:`filter_all`. If no logic operation string is provided, the register will provide a join
1162 condition using either AND (default) or OR.
1163 """
1165 def __init__(self, data: dict[str, ModelFilter] | None = None, /, **kwargs: Any) -> None:
1166 """
1167 Constructor parameters:
1169 :param data: Initial data
1170 :type data: dict
1171 :param kwargs: Keywords arguments
1172 """
1173 self._global_filter: dict[str, Any] = {}
1174 self._logic: str | None = None
1175 super().__init__(data, **kwargs)
1177 @property
1178 def new_only(self) -> bool:
1179 """
1180 The new only flag.
1182 :return: True, if only new items, not already in the output database table must be processed.
1183 :rtype: bool
1184 """
1185 return cast(bool, self._global_filter.get('new_only', True))
1187 @new_only.setter
1188 def new_only(self, v: bool) -> None:
1189 self._global_filter['new_only'] = v
1191 def __setitem__(self, key: str, value: ModelFilter) -> None:
1192 """
1193 Set a new value at key.
1195 If value is not a Filter, then it will be automatically and silently discarded.
1197 :param key: Dictionary key. Normally the name of the model linked to the filter.
1198 :type key: str
1199 :param value: The Filter.
1200 :type value: ModelFilter
1201 """
1202 if not isinstance(value, ModelFilter):
1203 return
1204 super().__setitem__(key, value)
1206 def bind_all(self, models: list[type[Model]] | dict[str, type[Model]]) -> None:
1207 """
1208 Binds all filters to their models.
1210 The ``models`` list or dictionary should contain a valid model for all the ModelFilters in the registry.
1211 In the case of a dictionary, the key value should be the model name.
1213 :param models: List or dictionary of a databank of Models from which the ModelFilter can be bound.
1214 :type models: list[type(Model)] | dict[str,type(Model)]
1215 """
1216 if isinstance(models, list):
1217 models = {m.__name__: m for m in models}
1219 # check, if we have a filter for each listed models, if not create one using the default configuration.
1220 for model_name in models.keys():
1221 if model_name not in self.data:
1222 self.data[model_name] = ModelFilter.from_conf(f'{model_name}', conf={})
1224 for k, v in self.data.items():
1225 if k in self.data and k in models and not v.is_bound: 1225 ↛ 1224line 1225 didn't jump to line 1224 because the condition on line 1225 was always true
1226 v.bind(models[k])
1228 def filter_all(self, join_with: Literal['AND', 'OR'] = 'AND') -> peewee.Expression | bool:
1229 """
1230 Generates a where clause joining all filters.
1232 If a logic expression is present, it will be used to combine named filters.
1233 Otherwise, fall back to the legacy behaviour using join_with.
1235 :raise ValueError: If the parsing of the logical expression fails
1236 :param join_with: Logical function to join the filters if no logic expression is provided.
1237 :type join_with: Literal['AND', 'OR'], default: 'AND'
1238 :return: ModelFilter expression
1239 :rtype: peewee.Expression
1240 """
1241 # If a logic expression is present at the global level, use it to combine filters
1242 if self._logic:
1243 try:
1244 ast = ExprParser(self._logic).parse()
1245 except ParseError as e:
1246 raise ValueError(f'Error parsing global logic for ProcessorFilter: {e}')
1248 def eval_ast(node: ExprNode) -> peewee.Expression | bool:
1249 t = node[0]
1250 if t == 'NAME':
1251 named_node = cast(NameNode, node)
1252 nm = named_node[1]
1253 if nm not in self.data:
1254 raise KeyError(f"Unknown filter name '{nm}' in processor logic")
1255 flt = self.data[nm]
1256 if not flt.is_bound:
1257 log.warning(f"ModelFilter '{nm}' is not bound; using True for its expression")
1258 return True
1259 return flt.filter()
1260 elif t == 'NOT':
1261 not_node = cast(NotNode, node)
1262 return cast(peewee.Expression, ~eval_ast(not_node[1]))
1263 elif t == 'AND':
1264 bin_node = cast(BinaryNode, node)
1265 return cast(peewee.Expression, cast(object, eval_ast(bin_node[1]) & eval_ast(bin_node[2])))
1266 elif t == 'OR':
1267 bin_node = cast(BinaryNode, node)
1268 return cast(peewee.Expression, cast(object, eval_ast(bin_node[1]) | eval_ast(bin_node[2])))
1269 else:
1270 raise ValueError(f'Unsupported AST node {t}')
1272 try:
1273 return eval_ast(ast)
1274 except KeyError as e:
1275 raise ValueError(f'Error evaluating processor logic: {e}')
1277 # Legacy behaviour: combine all filters with join_with (AND/OR)
1278 filter_list = [flt.filter() for flt in self.data.values() if flt.is_bound]
1279 if join_with == 'AND':
1280 return cast(peewee.Expression, cast(object, reduce(operator.and_, filter_list, True)))
1281 else:
1282 return cast(peewee.Expression, cast(object, reduce(operator.or_, filter_list, True)))