Coverage for src / mafw / db / db_filter.py: 99%

424 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Database filter module for MAFW. 

6 

7This module provides classes and utilities for creating and managing database filters 

8using Peewee ORM. It supports various filtering operations including simple conditions, 

9logical combinations, and conditional filters where one field's criteria depend on another. 

10 

11The module implements a flexible filter system that can handle: 

12 - Simple field comparisons (equality, inequality, greater/less than, etc.) 

13 - Complex logical operations (AND, OR, NOT) 

14 - Conditional filters with dependent criteria 

15 - Nested logical expressions 

16 - Support for various data types and operations 

17 

18Key components include: 

19 - :class:`FilterNode`: Abstract base class for filter nodes 

20 - :class:`ConditionNode`: Represents individual field conditions 

21 - :class:`LogicalNode`: Combines filter nodes with logical operators 

22 - :class:`ConditionalNode`: Wraps conditional filter conditions 

23 - :class:`ModelFilter`: Main class for building and applying filters to models 

24 - :class:`ProcessorFilter`: Container for multiple model filters in a processor 

25 

26The module uses a hierarchical approach to build filter expressions that can be converted 

27into Peewee expressions for database queries. It supports both simple and complex filtering 

28scenarios through a combination of direct field conditions and logical expressions. 

29 

30.. versionchanged:: v2.0.0 

31 Major overhaul introducing conditional filters and logical expression support. 

32 

33Example usage:: 

34 

35 from mafw.db.db_filter import ModelFilter 

36 

37 # Create a simple filter 

38 flt = ModelFilter( 

39 'Processor.__filter__.Model', 

40 field1='value1', 

41 field2={'op': 'IN', 'value': [1, 2, 3]}, 

42 ) 

43 

44 # Bind to a model and generate query 

45 flt.bind(MyModel) 

46 query = MyModel.select().where(flt.filter()) 

47 

48.. seealso:: 

49 

50 :link:`peewee` - The underlying ORM library used for database operations 

51 

52 :class:`~.mafw.enumerators.LogicalOp` - Logical operation enumerations used in filters 

53""" 

54 

55import logging 

56import operator 

57import re 

58from collections import OrderedDict, UserDict 

59from copy import copy 

60from functools import reduce 

61from typing import TYPE_CHECKING, Any, Dict, Literal, Self, TypeAlias, Union, cast 

62 

63import peewee 

64from peewee import Model 

65 

66from mafw.db.db_model import mafw_model_register 

67from mafw.enumerators import LogicalOp 

68 

69log = logging.getLogger(__name__) 

70 

71 

72Token = tuple[str, str] 

73"""Type definition for a logical expression token""" 

74 

75# 1. An atom is a tuple of the literal string 'NAME' and the value 

76NameNode = tuple[Literal['NAME'], str] 

77"""An atom is a tuple of the literal string 'NAME' and the value""" 

78 

79# 2. A NOT node is a tuple of 'NOT' and a recursive node 

80# We use a string forward reference 'ExprNode' because it is defined below 

81NotNode = tuple[Literal['NOT'], 'ExprNode'] 

82"""A NOT node is a tuple of 'NOT' and a recursive node""" 

83 

84# 3. AND/OR nodes are tuples of the operator and two recursive nodes 

85BinaryNode = tuple[Literal['AND', 'OR'], 'ExprNode', 'ExprNode'] 

86"""AND/OR nodes are tuples of the operator and two recursive nodes""" 

87 

88# 4. The main recursive type combining all options 

89ExprNode: TypeAlias = Union[NameNode, NotNode, BinaryNode] 

90""" 

91The main recursive type combining all options 

92 

93This type represents the abstract syntax tree (AST) nodes used in logical expressions. 

94It can be one of: 

95 

96 - :data:`NameNode`: A named element (field name or filter name) 

97 - :data:`NotNode`: A negation operation 

98 - :data:`BinaryNode`: An AND/OR operation between two nodes 

99""" 

100 

101TOKEN_SPECIFICATION = [ 

102 ('LPAREN', r'\('), 

103 ('RPAREN', r'\)'), 

104 ('AND', r'\bAND\b'), 

105 ('OR', r'\bOR\b'), 

106 ('NOT', r'\bNOT\b'), 

107 ('NAME', r'[A-Za-z_][A-Za-z0-9_\.]*(?:\:[A-Za-z_][A-Za-z0-9_]*)?'), 

108 ('SKIP', r'[ \t\n\r]+'), 

109 ('MISMATCH', r'.'), 

110] 

111"""Token specifications""" 

112 

113MASTER_RE = re.compile('|'.join(f'(?P<{name}>{pattern})' for name, pattern in TOKEN_SPECIFICATION)) 

114"""Compiled regular expression to interpret the logical expression grammar""" 

115 

116 

117class ParseError(ValueError): 

118 """ 

119 Exception raised when parsing a logical expression fails. 

120 

121 This exception is raised when the parser encounters invalid syntax 

122 in a logical expression string. 

123 """ 

124 

125 pass 

126 

127 

128def tokenize(text: str) -> list[Token]: 

129 """ 

130 Tokenize a logical expression string into a list of tokens. 

131 

132 This function breaks down a logical expression string into individual 

133 tokens based on the defined token specifications. It skips whitespace 

134 and raises a :exc:`ParseError` for unexpected characters. 

135 

136 :param text: The logical expression string to tokenize 

137 :type text: str 

138 :return: A list of tokens represented as (token_type, token_value) tuples 

139 :rtype: list[:data:`Token`] 

140 :raises ParseError: If an unexpected character is encountered in the text 

141 """ 

142 tokens: list[Token] = [] 

143 for mo in MASTER_RE.finditer(text): 

144 kind = mo.lastgroup 

145 value = mo.group() 

146 if kind == 'SKIP': 

147 continue 

148 elif kind == 'MISMATCH': 

149 raise ParseError(f'Unexpected character {value!r}') 

150 else: 

151 if TYPE_CHECKING: 

152 assert kind is not None 

153 tokens.append((kind, value)) 

154 return tokens 

155 

156 

157class ExprParser: 

158 """ 

159 Recursive descent parser producing a simple Abstract Syntax Tree (AST). 

160 

161 The parser handles logical expressions with the following grammar: 

162 

163 .. code-block:: none 

164 

165 expr := or_expr 

166 or_expr := and_expr ("OR" and_expr)* 

167 and_expr:= not_expr ("AND" not_expr)* 

168 not_expr:= "NOT" not_expr | atom 

169 atom := NAME | "(" expr ")" 

170 

171 AST nodes are tuples representing different constructs: 

172 

173 - ("NAME", "token"): A named element (field name or filter name) 

174 - ("NOT", node): A negation operation 

175 - ("AND", left, right): An AND operation between two nodes 

176 - ("OR", left, right): An OR operation between two nodes 

177 

178 .. versionadded:: v2.0.0 

179 """ 

180 

181 def __init__(self, text: str) -> None: 

182 """ 

183 Initialize the expression parser with a logical expression string. 

184 

185 :param text: The logical expression to parse 

186 :type text: str 

187 """ 

188 self.tokens = tokenize(text) 

189 self.pos = 0 

190 

191 def peek(self) -> Token | None: 

192 """ 

193 Peek at the next token without consuming it. 

194 

195 :return: The next token if available, otherwise None 

196 :rtype: :data:`Token` | None 

197 """ 

198 if self.pos < len(self.tokens): 

199 return self.tokens[self.pos] 

200 return None 

201 

202 def accept(self, *kinds: str) -> Token | None: 

203 """ 

204 Accept and consume the next token if it matches one of the given types. 

205 

206 :param kinds: Token types to accept 

207 :type kinds: str 

208 :return: The consumed token if matched, otherwise None 

209 :rtype: :data:`Token` | None 

210 """ 

211 tok = self.peek() 

212 if tok and tok[0] in kinds: 

213 self.pos += 1 

214 return tok 

215 return None 

216 

217 def expect(self, kind: str) -> 'Token': 

218 """ 

219 Expect and consume a specific token type. 

220 

221 :param kind: The expected token type 

222 :type kind: str 

223 :return: The consumed token 

224 :rtype: :data:`Token` 

225 :raises ParseError: If the expected token is not found 

226 """ 

227 tok = self.accept(kind) 

228 if not tok: 

229 raise ParseError(f'Expected {kind} at position {self.pos}') 

230 return tok 

231 

232 def parse(self) -> 'ExprNode': 

233 """ 

234 Parse the entire logical expression and return the resulting AST. 

235 

236 :return: The abstract syntax tree representation of the expression 

237 :rtype: :data:`ExprNode` 

238 :raises ParseError: If the expression is malformed 

239 """ 

240 node = self.parse_or() 

241 if self.pos != len(self.tokens): 

242 raise ParseError('Unexpected token after end of expression') 

243 return node 

244 

245 def parse_or(self) -> 'ExprNode': 

246 """ 

247 Parse an OR expression. 

248 

249 :return: The parsed OR expression AST node 

250 :rtype: :data:`ExprNode` 

251 """ 

252 left = self.parse_and() 

253 while self.accept('OR'): 

254 right = self.parse_and() 

255 left = ('OR', left, right) 

256 return left 

257 

258 def parse_and(self) -> 'ExprNode': 

259 """ 

260 Parse an AND expression. 

261 

262 :return: The parsed AND expression AST node 

263 :rtype: :data:`ExprNode` 

264 """ 

265 left = self.parse_not() 

266 while self.accept('AND'): 

267 right = self.parse_not() 

268 left = ('AND', left, right) 

269 return left 

270 

271 def parse_not(self) -> 'ExprNode': 

272 """ 

273 Parse a NOT expression. 

274 

275 :return: The parsed NOT expression AST node 

276 :rtype: :data:`ExprNode` 

277 """ 

278 if self.accept('NOT'): 

279 node = self.parse_not() 

280 return 'NOT', node 

281 return self.parse_atom() 

282 

283 def parse_atom(self) -> 'ExprNode': 

284 """ 

285 Parse an atomic expression (NAME or parenthesised expression). 

286 

287 :return: The parsed atomic expression AST node 

288 :rtype: :data:`ExprNode` 

289 :raises ParseError: If an unexpected token is encountered 

290 """ 

291 tok = self.peek() 

292 if not tok: 

293 raise ParseError('Unexpected end of expression') 

294 if tok[0] == 'LPAREN': 

295 self.accept('LPAREN') 

296 node = self.parse_or() 

297 self.expect('RPAREN') 

298 return node 

299 elif tok[0] == 'NAME': 

300 self.accept('NAME') 

301 return 'NAME', tok[1] 

302 else: 

303 raise ParseError(f'Unexpected token {tok} at position {self.pos}') 

304 

305 

306class FilterNode: 

307 """Abstract base for nodes.""" 

308 

309 def to_expression(self, model: type[Model]) -> peewee.Expression | bool: 

310 raise NotImplementedError # pragma: no cover 

311 

312 

313class ConditionNode(FilterNode): 

314 """ 

315 Represents a single condition node in a filter expression. 

316 

317 This class encapsulates a single filtering condition that can be applied 

318 to a model field. It supports various logical operations through the 

319 :class:`.LogicalOp` enumerator or string representations of operations. 

320 

321 .. versionadded:: v2.0.0 

322 """ 

323 

324 def __init__(self, field: str | None, operation: LogicalOp | str, value: Any, name: str | None = None): 

325 """ 

326 Initialize a condition node. 

327 

328 :param field: The name of the field to apply the condition to. 

329 :type field: str | None 

330 :param operation: The logical operation to perform. 

331 :type operation: LogicalOp | str 

332 :param value: The value to compare against. 

333 :type value: Any 

334 :param name: Optional name for this condition node. 

335 :type name: str | None, Optional 

336 """ 

337 self.field = field # may be None for some special nodes 

338 if isinstance(operation, str): 

339 try: 

340 self.operation = LogicalOp(operation) 

341 except ValueError: 

342 raise ValueError(f'Unsupported operation: {operation}') 

343 else: 

344 self.operation = operation 

345 self.value = value 

346 self.name = name 

347 

348 def to_expression(self, model: type[Model]) -> peewee.Expression: 

349 """ 

350 Convert this condition node to a Peewee expression. 

351 

352 This method translates the condition represented by this node into 

353 a Peewee expression that can be used in database queries. 

354 

355 :param model: The model class containing the field to filter. 

356 :type model: type[Model] 

357 :return: A Peewee expression representing this condition. 

358 :rtype: peewee.Expression 

359 :raises RuntimeError: If the node has no field to evaluate. 

360 :raises ValueError: If an unsupported operation is specified. 

361 :raises TypeError: If operation requirements are not met (e.g., IN operation requires list/tuple). 

362 """ 

363 if self.field is None: 

364 # Should not happen for standard ConditionNode 

365 raise RuntimeError('ConditionNode has no field to evaluate') 

366 model_field = getattr(model, self.field) 

367 op = self.operation 

368 val = self.value 

369 # the code is full of cast and redundant checks to make mypy happy. 

370 # I do not know to which extent they make the code safer, but for sure they make it less readable. 

371 if op == LogicalOp.EQ: 

372 return cast(peewee.Expression, cast(object, model_field == val)) 

373 elif op == LogicalOp.NE: 

374 return cast(peewee.Expression, cast(object, model_field != val)) 

375 elif op == LogicalOp.LT: 

376 return cast(peewee.Expression, cast(object, model_field < val)) 

377 elif op == LogicalOp.LE: 

378 return cast(peewee.Expression, cast(object, model_field <= val)) 

379 elif op == LogicalOp.GT: 

380 return cast(peewee.Expression, cast(object, model_field > val)) 

381 elif op == LogicalOp.GE: 

382 return cast(peewee.Expression, cast(object, model_field >= val)) 

383 elif op == LogicalOp.GLOB: 

384 return cast(peewee.Expression, model_field % val) 

385 elif op == LogicalOp.LIKE: 

386 return cast(peewee.Expression, model_field**val) 

387 elif op == LogicalOp.REGEXP: 

388 if hasattr(model_field, 'regexp') and callable(getattr(model_field, 'regexp')): 

389 return cast(peewee.Expression, getattr(model_field, 'regexp')(val)) 

390 else: 

391 raise ValueError(f'REGEXP operation not supported for field type {type(model_field)}') 

392 elif op == LogicalOp.IN: 

393 if not isinstance(val, (list, tuple)): 

394 raise TypeError(f'IN operation requires list/tuple, got {type(val)}') 

395 if hasattr(model_field, 'in_') and callable(getattr(model_field, 'in_')): 

396 return cast(peewee.Expression, getattr(model_field, 'in_')(val)) 

397 else: 

398 raise ValueError(f'IN operation not supported for field type {type(model_field)}') 

399 elif op == LogicalOp.NOT_IN: 

400 if not isinstance(val, (list, tuple)): 

401 raise TypeError(f'NOT_IN operation requires list/tuple, got {type(val)}') 

402 if hasattr(model_field, 'not_in') and callable(getattr(model_field, 'not_in')): 

403 return cast(peewee.Expression, getattr(model_field, 'not_in')(val)) 

404 else: 

405 raise ValueError(f'NOT_IN operation not supported for field type {type(model_field)}') 

406 elif op == LogicalOp.BETWEEN: 

407 if not isinstance(val, (list, tuple)) or len(val) != 2: 

408 raise TypeError(f'BETWEEN operation requires list/tuple of 2 elements, got {val}') 

409 if hasattr(model_field, 'between') and callable(getattr(model_field, 'between')): 

410 return cast(peewee.Expression, getattr(model_field, 'between')(val[0], val[1])) 

411 else: 

412 raise ValueError(f'BETWEEN operation not supported for field type {type(model_field)}') 

413 elif op == LogicalOp.BIT_AND: 

414 if hasattr(model_field, 'bin_and') and callable(getattr(model_field, 'bin_and')): 

415 return cast(peewee.Expression, cast(object, getattr(model_field, 'bin_and')(val) != 0)) 

416 else: 

417 raise ValueError(f'BIT_AND operation not supported for field type {type(model_field)}') 

418 elif op == LogicalOp.BIT_OR: 

419 if hasattr(model_field, 'bin_or') and callable(getattr(model_field, 'bin_or')): 

420 return cast(peewee.Expression, cast(object, getattr(model_field, 'bin_or')(val) != 0)) 

421 else: 

422 raise ValueError(f'BIT_OR operation not supported for field type {type(model_field)}') 

423 elif op == LogicalOp.IS_NULL: 

424 return cast(peewee.Expression, model_field.is_null()) 

425 elif op == LogicalOp.IS_NOT_NULL: 

426 return cast(peewee.Expression, model_field.is_null(False)) 

427 else: 

428 raise ValueError(f'Unsupported operation: {op}') 

429 

430 

431class ConditionalNode(FilterNode): 

432 """ 

433 Wraps :class:`ConditionalFilterCondition` behaviour as a :class:`FilterNode`. 

434 

435 This class serves as an adapter to integrate conditional filter conditions 

436 into the filter node hierarchy, allowing them to be treated uniformly with 

437 other filter nodes during expression evaluation. 

438 

439 .. versionadded:: v2.0.0 

440 """ 

441 

442 def __init__(self, conditional: 'ConditionalFilterCondition', name: str | None = None): 

443 """ 

444 Initialize a conditional node. 

445 

446 :param conditional: The conditional filter condition to wrap 

447 :type conditional: ConditionalFilterCondition 

448 :param name: Optional name for this conditional node 

449 :type name: str | None, Optional 

450 """ 

451 self.conditional = conditional 

452 self.name = name 

453 

454 def to_expression(self, model: type[Model]) -> peewee.Expression: 

455 """ 

456 Convert this conditional node to a Peewee expression. 

457 

458 This method delegates the conversion to the wrapped conditional filter 

459 condition's :meth:`to_expression` method. 

460 

461 :param model: The model class to generate the expression for 

462 :type model: type[Model] 

463 :return: A Peewee expression representing this conditional node 

464 :rtype: peewee.Expression 

465 """ 

466 return self.conditional.to_expression(model) 

467 

468 

469class LogicalNode(FilterNode): 

470 """ 

471 Logical combination of child nodes. 

472 

473 This class represents logical operations (AND, OR, NOT) applied to filter nodes. 

474 It enables building complex filter expressions by combining simpler filter nodes 

475 with logical operators. 

476 

477 .. versionadded:: v2.0.0 

478 """ 

479 

480 def __init__(self, op: str, *children: FilterNode): 

481 """ 

482 Initialize a logical node. 

483 

484 :param op: The logical operation ('AND', 'OR', 'NOT') 

485 :type op: str 

486 :param children: Child filter nodes to combine with the logical operation 

487 :type children: FilterNode 

488 """ 

489 self.op = op # 'AND', 'OR', 'NOT' 

490 self.children = list(children) 

491 

492 def to_expression(self, model: type[Model]) -> peewee.Expression | bool: 

493 """ 

494 Convert this logical node to a Peewee expression. 

495 

496 This method evaluates the logical operation on the child nodes and returns 

497 the corresponding Peewee expression. 

498 

499 :param model: The model class to generate the expression for 

500 :type model: type[Model] 

501 :return: A Peewee expression representing this logical node 

502 :rtype: peewee.Expression | bool 

503 :raises ValueError: If an unknown logical operation is specified 

504 """ 

505 if self.op == 'NOT': 

506 assert len(self.children) == 1 

507 inner = self.children[0].to_expression(model) 

508 return cast(peewee.Expression, ~inner) 

509 elif self.op == 'AND': 

510 expressions = [c.to_expression(model) for c in self.children] 

511 return cast(peewee.Expression, reduce(operator.and_, expressions)) 

512 elif self.op == 'OR': 

513 expressions = [c.to_expression(model) for c in self.children] 

514 return cast(peewee.Expression, reduce(operator.or_, expressions)) 

515 else: 

516 raise ValueError(f'Unknown logical op: {self.op}') 

517 

518 

519class ConditionalFilterCondition: 

520 """ 

521 Represents a conditional filter where one field's criteria depends on another. 

522 

523 This allows expressing logic like: 

524 "IF field_a IN [x, y] THEN field_b IN [1, 2] ELSE no constraint on field_b" 

525 

526 Example usage: 

527 

528 .. code-block:: python 

529 

530 # Filter: sample_id in [1,2] if composite_image_id in [100,101] 

531 condition = ConditionalFilterCondition( 

532 condition_field='composite_image_id', 

533 condition_op='IN', 

534 condition_value=[100, 101], 

535 then_field='sample_id', 

536 then_op='IN', 

537 then_value=[1, 2], 

538 ) 

539 

540 # This generates: 

541 # WHERE (composite_image_id IN (100, 101) AND sample_id IN (1, 2)) 

542 # OR (composite_image_id NOT IN (100, 101)) 

543 """ 

544 

545 def __init__( 

546 self, 

547 condition_field: str, 

548 condition_op: str | LogicalOp, 

549 condition_value: Any, 

550 then_field: str, 

551 then_op: str | LogicalOp, 

552 then_value: Any, 

553 else_field: str | None = None, 

554 else_op: str | LogicalOp | None = None, 

555 else_value: Any | None = None, 

556 name: str | None = None, 

557 ) -> None: 

558 """ 

559 Initialise a conditional filter condition. 

560 

561 :param condition_field: The field to check for the condition 

562 :type condition_field: str 

563 :param condition_op: The operation for the condition (e.g., 'IN', '==') 

564 :type condition_op: str | LogicalOp 

565 :param condition_value: The value(s) for the condition 

566 :type condition_value: Any 

567 :param then_field: The field to filter when condition is true 

568 :type then_field: str 

569 :param then_op: The operation to apply when condition is true 

570 :type then_op: str | LogicalOp 

571 :param then_value: The value(s) for the then clause 

572 :type then_value: Any 

573 :param else_field: Optional field to filter when condition is false 

574 :type else_field: str | None 

575 :param else_op: Optional operation when condition is false 

576 :type else_op: str | LogicalOp | None 

577 :param else_value: Optional value(s) for the else clause 

578 :type else_value: Any | None 

579 :param name: The name of this condition. Avoid name clashing with model fields. Defaults to None 

580 :type name: str | None, Optional 

581 """ 

582 self.condition_field = condition_field 

583 self.condition_op = condition_op 

584 self.condition_value = condition_value 

585 self.then_field = then_field 

586 self.then_op = then_op 

587 self.then_value = then_value 

588 self.else_field = else_field 

589 self.else_op = else_op 

590 self.else_value = else_value 

591 self.name = name 

592 

593 def to_expression(self, model: type[Model]) -> peewee.Expression: 

594 """ 

595 Convert this conditional filter to a Peewee expression. 

596 

597 The resulting expression is: 

598 (condition AND then_constraint) OR (NOT condition AND else_constraint) 

599 

600 Which logically means: 

601 

602 - When condition is true, apply then_constraint 

603 - When condition is false, apply else_constraint (or no constraint) 

604 

605 :param model: The model class containing the fields 

606 :type model: type[Model] 

607 :return: A Peewee expression 

608 :rtype: peewee.Expression 

609 """ 

610 # Build the condition expression 

611 condition_expr = ConditionNode(self.condition_field, self.condition_op, self.condition_value).to_expression( 

612 model 

613 ) 

614 

615 # Build the then expression 

616 then_expr = ConditionNode(self.then_field, self.then_op, self.then_value).to_expression(model) 

617 

618 # Build the else expression 

619 if self.else_field is not None and self.else_op is not None: 

620 else_expr = ConditionNode(self.else_field, self.else_op, self.else_value).to_expression(model) 

621 else: 

622 # No constraint in else clause - always true 

623 # the nested cast is needed to make mypy happy. 

624 else_expr = cast(peewee.Expression, cast(object, True)) 

625 

626 # Combine: (condition AND then) OR (NOT condition AND else) 

627 return cast(peewee.Expression, (condition_expr & then_expr) | (~condition_expr & else_expr)) 

628 

629 def __eq__(self, other: Any) -> bool: 

630 if not isinstance(other, ConditionalFilterCondition): 

631 return False 

632 

633 return vars(self) == vars(other) 

634 

635 

636class ModelFilter: 

637 r""" 

638 Class to filter rows from a model. 

639 

640 The filter object can be used to generate a where clause to be applied to Model.select(). 

641 

642 The construction of a ModelFilter is normally done via a configuration file using the :meth:`from_conf` class method. 

643 The name of the filter is playing a key role in this. If it follows a dot structure like: 

644 

645 *ProcessorName.__filter__.ModelName* 

646 

647 then the corresponding table from the TOML configuration object will be used. 

648 

649 For each processor, there might be many Filters, up to one for each Model used to get the input list. If a 

650 processor is joining together three Models when performing the input select, there will be up to three Filters 

651 collaborating on making the selection. 

652 

653 The filter configuration can contain the following key, value pair: 

654 

655 - key / string pairs, where the key is the name of a field in the corresponding Model 

656 

657 - key / numeric pairs 

658 

659 - key / arrays 

660 

661 - key / dict pairs with 'op' and 'value' keys for explicit operation specification 

662 

663 All fields from the configuration file will be added to the instance namespace, thus accessible with the dot 

664 notation. Moreover, the field names and their filter value will be added to a private dictionary to simplify the 

665 generation of the filter SQL code. 

666 

667 The user can use the filter object to store selection criteria. He can construct queries using the filter 

668 contents in the same way as he could use processor parameters. 

669 

670 If he wants to automatically generate valid filtering expression, he can use the :meth:`filter` method. In order 

671 for this to work, the ModelFilter object be :meth:`bound <bind>` to a Model. Without this binding the ModelFilter will not 

672 be able to automatically generate expressions. 

673 

674 For each field in the filter, one condition will be generated according to the following scheme: 

675 

676 ================= ================= ================== 

677 Filter field type Logical operation Example 

678 ================= ================= ================== 

679 Numeric, boolean == Field == 3.14 

680 String GLOB Field GLOB '\*ree' 

681 List IN Field IN [1, 2, 3] 

682 Dict (explicit) op from dict Field BIT_AND 5 

683 ================= ================= ================== 

684 

685 All conditions will be joined with a AND logic by default, but this can be changed. 

686 

687 The ModelFilter also supports logical expressions to combine multiple filter conditions using AND, OR, and NOT 

688 operators. These expressions can reference named filter conditions within the same filter or even combine 

689 conditions from different filters when used with :class:`ProcessorFilter`. 

690 

691 Conditional filters allow expressing logic like: 

692 "IF field_a IN [x, y] THEN field_b IN [1, 2] ELSE no constraint on field_b" 

693 

694 Consider the following example: 

695 

696 .. code-block:: python 

697 :linenos: 

698 

699 class MeasModel(MAFwBaseModel): 

700 meas_id = AutoField(primary_key=True) 

701 sample_name = TextField() 

702 successful = BooleanField() 

703 flags = IntegerField() 

704 composite_image_id = IntegerField() 

705 sample_id = IntegerField() 

706 

707 

708 # Traditional simplified usage 

709 flt = ModelFilter( 

710 'MyProcessor.__filter__.MyModel', 

711 sample_name='sample_00*', 

712 meas_id=[1, 2, 3], 

713 successful=True, 

714 ) 

715 

716 # New explicit operation usage 

717 flt = ModelFilter( 

718 'MyProcessor.__filter__.MyModel', 

719 sample_name={'op': 'LIKE', 'value': 'sample_00%'}, 

720 flags={'op': 'BIT_AND', 'value': 5}, 

721 meas_id={'op': 'IN', 'value': [1, 2, 3]}, 

722 ) 

723 

724 # Logical expression usage 

725 flt = ModelFilter( 

726 'MyProcessor.__filter__.MyModel', 

727 sample_name={'op': 'LIKE', 'value': 'sample_00%'}, 

728 flags={'op': 'BIT_AND', 'value': 5}, 

729 meas_id={'op': 'IN', 'value': [1, 2, 3]}, 

730 __logic__='sample_name AND (flags OR meas_id)', 

731 ) 

732 

733 # Conditional filter usage 

734 flt = ModelFilter( 

735 'MyProcessor.__filter__.MyModel', 

736 sample_name='sample_00*', 

737 composite_image_id=[100, 101], 

738 sample_id=[1, 2], 

739 __conditional__=[ 

740 { 

741 'condition_field': 'composite_image_id', 

742 'condition_op': 'IN', 

743 'condition_value': [100, 101], 

744 'then_field': 'sample_id', 

745 'then_op': 'IN', 

746 'then_value': [1, 2], 

747 } 

748 ], 

749 ) 

750 

751 flt.bind(MeasModel) 

752 filtered_query = MeasModel.select().where(flt.filter()) 

753 

754 The explicit operation format allows for bitwise operations and other advanced filtering. 

755 

756 TOML Configuration Examples: 

757 

758 .. code-block:: toml 

759 

760 [MyProcessor.__filter__.MyModel] 

761 sample_name = "sample_00*" # Traditional GLOB 

762 successful = true # Traditional equality 

763 

764 # Explicit operations 

765 flags = { op = "BIT_AND", value = 5 } 

766 score = { op = ">=", value = 75.0 } 

767 category = { op = "IN", value = ["A", "B", "C"] } 

768 date_range = { op = "BETWEEN", value = ["2024-01-01", "2024-12-31"] } 

769 

770 # Logical expression for combining conditions 

771 __logic__ = "sample_name AND (successful OR flags)" 

772 

773 # Conditional filters 

774 [[MyProcessor.__filter__.MyModel.__conditional__]] 

775 condition_field = "composite_image_id" 

776 condition_op = "IN" 

777 condition_value = [100, 101] 

778 then_field = "sample_id" 

779 then_op = "IN" 

780 then_value = [1, 2] 

781 

782 # Nested conditions with logical expressions 

783 [MyProcessor.__filter__.MyModel.nested_conditions] 

784 __logic__ = "a OR b" 

785 a = { op = "LIKE", value = "test%" } 

786 b = { op = "IN", value = [1, 2, 3] } 

787 

788 .. seealso:: 

789 

790 - :class:`mafw.db.db_filter.ProcessorFilter` - For combining multiple ModelFilters with logical expressions 

791 - :class:`mafw.db.db_filter.ConditionalFilterCondition` - For conditional filtering logic 

792 - :class:`mafw.db.db_filter.ExprParser` - For parsing logical expressions 

793 """ 

794 

795 logic_name = '__logic__' 

796 """ 

797 The logic keyword identifier. 

798  

799 This value cannot be used as field name in the filter bound model. 

800 """ 

801 conditional_name = '__conditional__' 

802 """ 

803 The conditional keyword identifier. 

804  

805 This value cannot be used as field name in the filter bound model. 

806 """ 

807 

808 def __init__(self, name_: str, **kwargs: Any) -> None: 

809 """ 

810 Constructor parameters: 

811 

812 :param `name_`: The name of the filter. It should be in dotted format to facilitate the configuration via the 

813 steering file. The _ is used to allow the user to have a keyword argument named name. 

814 :type `name_`: str 

815 :param kwargs: Keyword parameters corresponding to fields and filter values. 

816 

817 .. versionchanged:: v1.2.0 

818 The parameter *name* has been renamed as *name_*. 

819 

820 .. versionchanged:: v1.3.0 

821 Implementation of explicit operation. 

822 

823 .. versionchanged:: v2.0.0 

824 Introduction of conditional filters, logical expression and hierarchical structure. 

825 Introduction of autobinding for MAFwBaseModels 

826 

827 """ 

828 self.name = name_ 

829 self.model_name = name_.split('.')[-1] 

830 self.model: type[Model] | None = None 

831 self._model_bound = False 

832 

833 # attempt to autobind 

834 self._auto_bind() 

835 

836 # mapping name -> FilterNode 

837 self._nodes: 'OrderedDict[str, FilterNode]' = OrderedDict() 

838 # conditional nodes mapping (named) 

839 self._cond_nodes: 'OrderedDict[str, ConditionalNode]' = OrderedDict() 

840 # logic expression for this filter (combining top-level node names) 

841 self._logic_expr: str | None = None 

842 

843 # Extract conditional filters if present 

844 if self.conditional_name in kwargs: 

845 conditionals = kwargs.pop(self.conditional_name) 

846 if not isinstance(conditionals, list): 

847 conditionals = [conditionals] 

848 

849 for cond_dict in conditionals: 

850 self.add_conditional_from_dict(cond_dict) 

851 

852 # Extract logic for internal conditions, if provided 

853 if self.logic_name in kwargs: 

854 self._logic_expr = kwargs.pop(self.logic_name) 

855 

856 # now process remaining kwargs as either: 

857 # - simple/extended condition for a field 

858 # - or a nested mapping describing subconditions for field (field-level logic) 

859 for k, v in kwargs.items(): 

860 # simple types map to ConditionNode 

861 if isinstance(v, dict) and ('op' in v and 'value' in v): 

862 # explicit op/value for field k 

863 # extended operation condition 

864 node = ConditionNode(k, v['op'], v['value'], name=k) 

865 self._nodes[k] = node 

866 elif isinstance(v, dict) and any( 

867 isinstance(x, dict) or x == self.logic_name or x not in ['op', 'value'] 

868 for x in v.keys() 

869 if isinstance(v, dict) 

870 ): 

871 # nested mapping: create sub-nodes for this field 

872 # v expected like {'__logic__': 'a OR b', 'a': {'op':..., 'value':...}, 'b': ...} 

873 subnodes: 'OrderedDict[str, FilterNode]' = OrderedDict() 

874 sub_logic = v.get(self.logic_name, None) 

875 for subk, subv in v.items(): 

876 if subk == self.logic_name: 

877 continue 

878 if isinstance(subv, dict) and ('op' in subv and 'value' in subv): 

879 subnode = ConditionNode(k, subv['op'], subv['value'], name=subk) 

880 subnodes[subk] = subnode 

881 else: 

882 subnodes[subk] = self._create_condition_node_from_value(subv, k, subk) 

883 # combine subnodes using sub_logic or AND by default 

884 if sub_logic: 

885 ast = ExprParser(sub_logic).parse() 

886 ln = self._build_logical_node_from_ast(ast, subnodes, model_name_placeholder=k) 

887 else: 

888 # AND all subnodes 

889 ln = LogicalNode('AND', *subnodes.values()) 

890 self._nodes[k] = ln 

891 else: 

892 self._nodes[k] = self._create_condition_node_from_value(v, k, k) 

893 

894 def _auto_bind(self) -> None: 

895 try: 

896 model = mafw_model_register.get_model(self.model_name) 

897 self.bind(model) # type: ignore[arg-type] 

898 except KeyError: 

899 log.warning(f'Impossible to perform auto-binding for model {self.model_name}') 

900 

901 def _build_logical_node_from_ast( 

902 self, ast: ExprNode, name_to_nodes: Dict[str, FilterNode], model_name_placeholder: str | None = None 

903 ) -> FilterNode: 

904 """Recursively build LogicalNode from AST using a mapping name->FilterNode.""" 

905 t = ast[0] 

906 if t == 'NAME': 

907 named_ast = cast(NameNode, ast) 

908 nm = named_ast[1] 

909 if nm not in name_to_nodes: 

910 raise KeyError(f'Unknown name {nm} in nested logic for field {model_name_placeholder}') 

911 return name_to_nodes[nm] 

912 elif t == 'NOT': 

913 not_ast = cast(NotNode, ast) 

914 child = self._build_logical_node_from_ast(not_ast[1], name_to_nodes, model_name_placeholder) 

915 return LogicalNode('NOT', child) 

916 elif t in ('AND', 'OR'): 

917 bin_ast = cast(BinaryNode, ast) 

918 left = self._build_logical_node_from_ast(bin_ast[1], name_to_nodes, model_name_placeholder) 

919 right = self._build_logical_node_from_ast(bin_ast[2], name_to_nodes, model_name_placeholder) 

920 return LogicalNode(t, left, right) 

921 else: 

922 raise ValueError(f'Unsupported AST node {t}') 

923 

924 @staticmethod 

925 def _create_condition_node_from_value(value: Any, field_name: str, node_name: str | None = None) -> ConditionNode: 

926 """ 

927 Create a FilterCondition based on value type (backward compatibility). 

928 

929 :param value: The filter value 

930 :param field_name: The field name 

931 :return: A FilterCondition 

932 """ 

933 if isinstance(value, (int, float, bool)): 

934 return ConditionNode(field_name, LogicalOp.EQ, value, node_name) 

935 elif isinstance(value, str): 

936 return ConditionNode(field_name, LogicalOp.GLOB, value, node_name) 

937 elif isinstance(value, list): 

938 return ConditionNode(field_name, LogicalOp.IN, value, node_name) 

939 else: 

940 raise TypeError(f'ModelFilter value of unsupported type {type(value)} for field {field_name}.') 

941 

942 def bind(self, model: type[Model]) -> None: 

943 """ 

944 Connects a filter to a Model class. 

945 

946 :param model: Model to be bound. 

947 :type model: Model 

948 """ 

949 

950 self.model = model 

951 self._model_bound = True 

952 

953 if hasattr(self.model, self.logic_name) and self._model_bound: 

954 if TYPE_CHECKING: 

955 assert self.model is not None 

956 

957 log.warning( 

958 f'Model {self.model.__name__} has a field named {self.logic_name}. This is ' 

959 f'preventing the logic expression to work.' 

960 ) 

961 log.warning('Modify your model. Logic expression disabled.') 

962 self._logic_expr = None 

963 

964 @property 

965 def is_bound(self) -> bool: 

966 """Returns true if the ModelFilter has been bound to a Model""" 

967 return self._model_bound 

968 

969 def add_conditional(self, conditional: ConditionalFilterCondition) -> None: 

970 """ 

971 Add a conditional filter. 

972 

973 .. versionadded:: v2.0.0 

974 

975 :param conditional: The conditional filter condition 

976 :type conditional: ConditionalFilterCondition 

977 """ 

978 condition_name = conditional.name 

979 if condition_name is None: 

980 # it means that the user did not specify any name for this condition. 

981 # we will then assign one 

982 increment = 0 

983 while True: 

984 condition_name = f'__cond{increment + len(self._cond_nodes)}__' 

985 if condition_name not in self._cond_nodes: 

986 break 

987 else: 

988 increment += 1 

989 else: 

990 # the user specified a name for this condition. we will use it but first we check if it is not yet used 

991 if condition_name in self._cond_nodes: 

992 raise KeyError( 

993 f'A conditional filter named {condition_name} already exists. Please review your steering file.' 

994 ) 

995 

996 node = ConditionalNode(conditional, name=condition_name) 

997 self._cond_nodes[condition_name] = node 

998 self._nodes[condition_name] = node 

999 

1000 def add_conditional_from_dict(self, config: dict[str, Any]) -> None: 

1001 """ 

1002 Add a conditional filter from a configuration dictionary. 

1003 

1004 .. versionadded:: v2.0.0 

1005 

1006 :param config: Dictionary with conditional filter configuration 

1007 :type config: dict[str, Any] 

1008 """ 

1009 conditional = ConditionalFilterCondition( 

1010 condition_field=config['condition_field'], 

1011 condition_op=config['condition_op'], 

1012 condition_value=config['condition_value'], 

1013 then_field=config['then_field'], 

1014 then_op=config['then_op'], 

1015 then_value=config['then_value'], 

1016 else_field=config.get('else_field'), 

1017 else_op=config.get('else_op'), 

1018 else_value=config.get('else_value'), 

1019 name=config.get('name'), 

1020 ) 

1021 self.add_conditional(conditional) 

1022 

1023 @classmethod 

1024 def from_conf(cls, name: str, conf: dict[str, Any]) -> Self: 

1025 """ 

1026 Builds a Filter object from a steering file dictionary. 

1027 

1028 If the name is in dotted notation, then this should be corresponding to the table in the configuration file. 

1029 If a default configuration is provided, this will be used as a starting point for the filter, and it will be 

1030 updated by the actual configuration in ``conf``. 

1031 

1032 In normal use, you would provide the specific configuration via the conf parameter. 

1033 

1034 See details in the :class:`class documentation <ModelFilter>` 

1035 

1036 :param name: The name of the filter in dotted notation. 

1037 :type name: str 

1038 :param conf: The configuration dictionary. 

1039 :type conf: dict 

1040 :return: A Filter object 

1041 :rtype: ModelFilter 

1042 """ 

1043 param = {} 

1044 

1045 # split the name from dotted notation 

1046 # ProcessorName#123.ModelName.Filter 

1047 # the processor name is actually the processor replica name 

1048 names = name.split('.') 

1049 if len(names) == 3 and names[1] == '__filter__': 

1050 proc_name, _, model_name = names 

1051 if proc_name in conf and '__filter__' in conf[proc_name] and model_name in conf[proc_name]['__filter__']: 

1052 param.update(copy(conf[proc_name]['__filter__'][model_name])) 

1053 

1054 # if the name is not in the expected dotted notation, the use an empty filter. 

1055 return cls(name, **param) 

1056 

1057 def _evaluate_logic_ast(self, ast: ExprNode) -> peewee.Expression | bool: 

1058 """ 

1059 Evaluate an abstract syntax tree (AST) representing a logical expression. 

1060 

1061 This method recursively evaluates the AST nodes to produce a Peewee expression 

1062 or boolean value representing the logical combination of filter conditions. 

1063 

1064 :param ast: The abstract syntax tree node to evaluate 

1065 :type ast: Any 

1066 :return: A Peewee expression for logical operations or boolean True/False 

1067 :rtype: peewee.Expression | bool 

1068 :raises KeyError: If a referenced condition name is not found in the filter 

1069 :raises ValueError: If an unsupported AST node type is encountered 

1070 """ 

1071 t = ast[0] 

1072 if t == 'NAME': 

1073 named_ast = cast(NameNode, ast) 

1074 nm = named_ast[1] 

1075 if nm not in self._nodes: 

1076 raise KeyError(f"Unknown node '{nm}' in logic for filter {self.name}") 

1077 node = self._nodes[nm] 

1078 

1079 if TYPE_CHECKING: 

1080 assert self.model is not None 

1081 return node.to_expression(self.model) 

1082 elif t == 'NOT': 

1083 not_ast = cast(NotNode, ast) 

1084 val = self._evaluate_logic_ast(not_ast[1]) 

1085 return cast(peewee.Expression, ~val) 

1086 elif t == 'AND': 

1087 bin_ast = cast(BinaryNode, ast) 

1088 left = self._evaluate_logic_ast(bin_ast[1]) 

1089 right = self._evaluate_logic_ast(bin_ast[2]) 

1090 return cast(peewee.Expression, cast(object, left & right)) 

1091 elif t == 'OR': 

1092 bin_ast = cast(BinaryNode, ast) 

1093 left = self._evaluate_logic_ast(bin_ast[1]) 

1094 right = self._evaluate_logic_ast(bin_ast[2]) 

1095 return cast(peewee.Expression, cast(object, left | right)) 

1096 else: 

1097 raise ValueError(f'Unsupported AST node {t}') 

1098 

1099 def filter(self, join_with: Literal['AND', 'OR'] = 'AND') -> peewee.Expression | bool: 

1100 """ 

1101 Generates a filtering expression joining all filtering fields. 

1102 

1103 See details in the :class:`class documentation <ModelFilter>` 

1104 

1105 .. versionchanged:: v1.3.0 

1106 Add the possibility to specify a `join_with` function 

1107 

1108 .. versionchanged:: v2.0.0 

1109 Add support for conditional filters and for logical expression 

1110 

1111 :param join_with: How to join conditions ('AND' or 'OR'). Defaults to 'AND'. 

1112 :type join_with: Literal['AND', 'OR'], default 'AND' 

1113 :return: The filtering expression. 

1114 :rtype: peewee.Expression | bool 

1115 :raises TypeError: when the field value type is not supported. 

1116 :raises ValueError: when join_with is not 'AND' or 'OR'. 

1117 """ 

1118 if not self.is_bound: 

1119 log.warning('Unable to generate the filter. Did you bind the filter to the model?') 

1120 return True 

1121 

1122 if TYPE_CHECKING: 

1123 # if we get here, it means that we have a valid model 

1124 assert self.model is not None 

1125 

1126 # if logic provided for this filter, use it 

1127 if self._logic_expr: 

1128 try: 

1129 ast = ExprParser(self._logic_expr).parse() 

1130 except ParseError as e: 

1131 raise ValueError(f'Error parsing logic for filter {self.name}: {e}') 

1132 try: 

1133 return self._evaluate_logic_ast(ast) 

1134 except KeyError as e: 

1135 raise ValueError(f'Error evaluating logic for filter {self.name}: {e}') 

1136 

1137 # otherwise combine all top-level nodes (AND by default) 

1138 exprs = [n.to_expression(self.model) for n in self._nodes.values()] 

1139 if not exprs: 

1140 return True 

1141 if join_with not in ('AND', 'OR'): 

1142 raise ValueError("join_with must be 'AND' or 'OR'") 

1143 if join_with == 'AND': 

1144 return cast(peewee.Expression, reduce(operator.and_, exprs)) 

1145 return cast(peewee.Expression, reduce(operator.or_, exprs)) 

1146 

1147 

1148class ProcessorFilter(UserDict[str, ModelFilter]): 

1149 """ 

1150 A special dictionary to store all :class:`Filters <mafw.db.db_filter.ModelFilter>` in a processors. 

1151 

1152 It contains a publicly accessible dictionary with the configuration of each ModelFilter using the Model name as 

1153 keyword. 

1154 

1155 It contains a private dictionary with the global filter configuration as well. 

1156 The global filter is not directly accessible, but only some of its members will be exposed via properties. 

1157 In particular, the new_only flag that is relevant only at the Processor level can be accessed directly using the 

1158 :attr:`new_only`. If not specified in the configuration file, the new_only is by default True. 

1159 

1160 It is possible to assign a logic operation string to the register that is used to join all the filters together 

1161 when performing the :meth:`filter_all`. If no logic operation string is provided, the register will provide a join 

1162 condition using either AND (default) or OR. 

1163 """ 

1164 

1165 def __init__(self, data: dict[str, ModelFilter] | None = None, /, **kwargs: Any) -> None: 

1166 """ 

1167 Constructor parameters: 

1168 

1169 :param data: Initial data 

1170 :type data: dict 

1171 :param kwargs: Keywords arguments 

1172 """ 

1173 self._global_filter: dict[str, Any] = {} 

1174 self._logic: str | None = None 

1175 super().__init__(data, **kwargs) 

1176 

1177 @property 

1178 def new_only(self) -> bool: 

1179 """ 

1180 The new only flag. 

1181 

1182 :return: True, if only new items, not already in the output database table must be processed. 

1183 :rtype: bool 

1184 """ 

1185 return cast(bool, self._global_filter.get('new_only', True)) 

1186 

1187 @new_only.setter 

1188 def new_only(self, v: bool) -> None: 

1189 self._global_filter['new_only'] = v 

1190 

1191 def __setitem__(self, key: str, value: ModelFilter) -> None: 

1192 """ 

1193 Set a new value at key. 

1194 

1195 If value is not a Filter, then it will be automatically and silently discarded. 

1196 

1197 :param key: Dictionary key. Normally the name of the model linked to the filter. 

1198 :type key: str 

1199 :param value: The Filter. 

1200 :type value: ModelFilter 

1201 """ 

1202 if not isinstance(value, ModelFilter): 

1203 return 

1204 super().__setitem__(key, value) 

1205 

1206 def bind_all(self, models: list[type[Model]] | dict[str, type[Model]]) -> None: 

1207 """ 

1208 Binds all filters to their models. 

1209 

1210 The ``models`` list or dictionary should contain a valid model for all the ModelFilters in the registry. 

1211 In the case of a dictionary, the key value should be the model name. 

1212 

1213 :param models: List or dictionary of a databank of Models from which the ModelFilter can be bound. 

1214 :type models: list[type(Model)] | dict[str,type(Model)] 

1215 """ 

1216 if isinstance(models, list): 

1217 models = {m.__name__: m for m in models} 

1218 

1219 # check, if we have a filter for each listed models, if not create one using the default configuration. 

1220 for model_name in models.keys(): 

1221 if model_name not in self.data: 

1222 self.data[model_name] = ModelFilter.from_conf(f'{model_name}', conf={}) 

1223 

1224 for k, v in self.data.items(): 

1225 if k in self.data and k in models and not v.is_bound: 1225 ↛ 1224line 1225 didn't jump to line 1224 because the condition on line 1225 was always true

1226 v.bind(models[k]) 

1227 

1228 def filter_all(self, join_with: Literal['AND', 'OR'] = 'AND') -> peewee.Expression | bool: 

1229 """ 

1230 Generates a where clause joining all filters. 

1231 

1232 If a logic expression is present, it will be used to combine named filters. 

1233 Otherwise, fall back to the legacy behaviour using join_with. 

1234 

1235 :raise ValueError: If the parsing of the logical expression fails 

1236 :param join_with: Logical function to join the filters if no logic expression is provided. 

1237 :type join_with: Literal['AND', 'OR'], default: 'AND' 

1238 :return: ModelFilter expression 

1239 :rtype: peewee.Expression 

1240 """ 

1241 # If a logic expression is present at the global level, use it to combine filters 

1242 if self._logic: 

1243 try: 

1244 ast = ExprParser(self._logic).parse() 

1245 except ParseError as e: 

1246 raise ValueError(f'Error parsing global logic for ProcessorFilter: {e}') 

1247 

1248 def eval_ast(node: ExprNode) -> peewee.Expression | bool: 

1249 t = node[0] 

1250 if t == 'NAME': 

1251 named_node = cast(NameNode, node) 

1252 nm = named_node[1] 

1253 if nm not in self.data: 

1254 raise KeyError(f"Unknown filter name '{nm}' in processor logic") 

1255 flt = self.data[nm] 

1256 if not flt.is_bound: 

1257 log.warning(f"ModelFilter '{nm}' is not bound; using True for its expression") 

1258 return True 

1259 return flt.filter() 

1260 elif t == 'NOT': 

1261 not_node = cast(NotNode, node) 

1262 return cast(peewee.Expression, ~eval_ast(not_node[1])) 

1263 elif t == 'AND': 

1264 bin_node = cast(BinaryNode, node) 

1265 return cast(peewee.Expression, cast(object, eval_ast(bin_node[1]) & eval_ast(bin_node[2]))) 

1266 elif t == 'OR': 

1267 bin_node = cast(BinaryNode, node) 

1268 return cast(peewee.Expression, cast(object, eval_ast(bin_node[1]) | eval_ast(bin_node[2]))) 

1269 else: 

1270 raise ValueError(f'Unsupported AST node {t}') 

1271 

1272 try: 

1273 return eval_ast(ast) 

1274 except KeyError as e: 

1275 raise ValueError(f'Error evaluating processor logic: {e}') 

1276 

1277 # Legacy behaviour: combine all filters with join_with (AND/OR) 

1278 filter_list = [flt.filter() for flt in self.data.values() if flt.is_bound] 

1279 if join_with == 'AND': 

1280 return cast(peewee.Expression, cast(object, reduce(operator.and_, filter_list, True))) 

1281 else: 

1282 return cast(peewee.Expression, cast(object, reduce(operator.or_, filter_list, True)))