Coverage for src / mafw / db / db_filter.py: 99%

507 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 16:10 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Database filter module for MAFW. 

6 

7This module provides classes and utilities for creating and managing database filters 

8using Peewee ORM. It supports various filtering operations including simple conditions, 

9logical combinations, and conditional filters where one field's criteria depend on another. 

10 

11The module implements a flexible filter system that can handle: 

12 - Simple field comparisons (equality, inequality, greater/less than, etc.) 

13 - Complex logical operations (AND, OR, NOT) 

14 - Conditional filters with dependent criteria 

15 - Nested logical expressions 

16 - Support for various data types and operations 

17 

18Key components include: 

19 - :class:`FilterNode`: Abstract base class for filter nodes 

20 - :class:`ConditionNode`: Represents individual field conditions 

21 - :class:`LogicalNode`: Combines filter nodes with logical operators 

22 - :class:`ConditionalNode`: Wraps conditional filter conditions 

23 - :class:`ModelFilter`: Main class for building and applying filters to models 

24 - :class:`ProcessorFilter`: Container for multiple model filters in a processor 

25 

26The module uses a hierarchical approach to build filter expressions that can be converted 

27into Peewee expressions for database queries. It supports both simple and complex filtering 

28scenarios through a combination of direct field conditions and logical expressions. 

29 

30.. versionchanged:: v2.0.0 

31 Major overhaul introducing conditional filters and logical expression support. 

32 

33Example usage:: 

34 

35 from mafw.db.db_filter import ModelFilter 

36 

37 # Create a simple filter 

38 flt = ModelFilter( 

39 'Processor.__filter__.Model', 

40 field1='value1', 

41 field2={'op': 'IN', 'value': [1, 2, 3]}, 

42 ) 

43 

44 # Bind to a model and generate query 

45 flt.bind(MyModel) 

46 query = MyModel.select().where(flt.filter()) 

47 

48.. seealso:: 

49 

50 :link:`peewee` - The underlying ORM library used for database operations 

51 

52 :class:`~.mafw.enumerators.LogicalOp` - Logical operation enumerations used in filters 

53""" 

54 

55import logging 

56import operator 

57import re 

58from collections import OrderedDict, UserDict 

59from copy import copy 

60from functools import reduce 

61from typing import TYPE_CHECKING, Any, Dict, Iterable, Literal, Self, Sequence, TypeAlias, Union, cast 

62 

63import peewee 

64from peewee import Model 

65 

66from mafw.db.db_model import mafw_model_register 

67from mafw.enumerators import LogicalOp 

68 

69log = logging.getLogger(__name__) 

70 

71 

72Token = tuple[str, str] 

73"""Type definition for a logical expression token""" 

74 

75 

76def _format_expected(expected: str | Sequence[str]) -> str: 

77 """Format a description of expected tokens for diagnostics.""" 

78 if isinstance(expected, str): 78 ↛ 80line 78 didn't jump to line 80 because the condition on line 78 was always true

79 return expected 

80 return ' or '.join(expected) 

81 

82 

83# 1. An atom is a tuple of the literal string 'NAME' and the value 

84NameNode = tuple[Literal['NAME'], str] 

85"""An atom is a tuple of the literal string 'NAME' and the value""" 

86 

87# 2. A NOT node is a tuple of 'NOT' and a recursive node 

88# We use a string forward reference 'ExprNode' because it is defined below 

89NotNode = tuple[Literal['NOT'], 'ExprNode'] 

90"""A NOT node is a tuple of 'NOT' and a recursive node""" 

91 

92# 3. AND/OR nodes are tuples of the operator and two recursive nodes 

93BinaryNode = tuple[Literal['AND', 'OR'], 'ExprNode', 'ExprNode'] 

94"""AND/OR nodes are tuples of the operator and two recursive nodes""" 

95 

96# 4. The main recursive type combining all options 

97ExprNode: TypeAlias = Union[NameNode, NotNode, BinaryNode] 

98""" 

99The main recursive type combining all options 

100 

101This type represents the abstract syntax tree (AST) nodes used in logical expressions. 

102It can be one of: 

103 

104 - :data:`NameNode`: A named element (field name or filter name) 

105 - :data:`NotNode`: A negation operation 

106 - :data:`BinaryNode`: An AND/OR operation between two nodes 

107""" 

108 

109TOKEN_SPECIFICATION = [ 

110 ('LPAREN', r'\('), 

111 ('RPAREN', r'\)'), 

112 ('AND', r'\bAND\b'), 

113 ('OR', r'\bOR\b'), 

114 ('NOT', r'\bNOT\b'), 

115 ('NAME', r'[A-Za-z_][A-Za-z0-9_\.]*(?:\:[A-Za-z_][A-Za-z0-9_]*)?'), 

116 ('SKIP', r'[ \t\n\r]+'), 

117 ('MISMATCH', r'.'), 

118] 

119"""Token specifications""" 

120 

121MASTER_RE = re.compile('|'.join(f'(?P<{name}>{pattern})' for name, pattern in TOKEN_SPECIFICATION)) 

122"""Compiled regular expression to interpret the logical expression grammar""" 

123 

124 

125class ParseError(ValueError): 

126 """Base exception for logical expression parsing failures.""" 

127 

128 def __init__(self, message: str, *, position: int | None = None) -> None: 

129 super().__init__(message) 

130 self.position = position 

131 

132 

133class UnexpectedTokenError(ParseError): 

134 """Raised when a token is present but not valid in the current context.""" 

135 

136 def __init__( 

137 self, 

138 token: Token, 

139 *, 

140 expected: str | Sequence[str] | None = None, 

141 position: int | None = None, 

142 ) -> None: 

143 expected_desc = f'; expected {_format_expected(expected)}' if expected else '' 

144 message = f'Unexpected token {token[0]} ({token[1]}) at position {position}{expected_desc}' 

145 super().__init__(message, position=position) 

146 self.token = token 

147 self.expected = expected 

148 

149 

150class UnexpectedEndOfExpressionError(ParseError): 

151 """Raised when the expression ends before the parser could finish.""" 

152 

153 def __init__(self, *, expected: str | Sequence[str] | None = None, position: int | None = None) -> None: 

154 expected_desc = f'; expected {_format_expected(expected)}' if expected else '' 

155 message = f'Unexpected end of expression at position {position}{expected_desc}' 

156 super().__init__(message, position=position) 

157 self.expected = expected 

158 

159 

160class MissingTokenError(ParseError): 

161 """Raised when a specific token was required but missing.""" 

162 

163 def __init__(self, expected: str | Sequence[str], *, position: int | None = None) -> None: 

164 message = f'Expected {_format_expected(expected)} before end of expression at position {position}' 

165 super().__init__(message, position=position) 

166 self.expected = expected 

167 

168 

169class UnknownNameError(ParseError): 

170 """Raised when a NAME token is not in the supplied whitelist.""" 

171 

172 def __init__( 

173 self, 

174 name: str, 

175 *, 

176 valid_names: Iterable[str], 

177 position: int | None = None, 

178 ) -> None: 

179 valid_list = sorted(valid_names) 

180 allowed = ', '.join(valid_list[:5]) 

181 if len(valid_list) > 5: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 allowed = allowed + ', ...' 

183 message = f'Unknown name {name!r} at position {position}; valid names: {allowed}' 

184 super().__init__(message, position=position) 

185 self.name = name 

186 self.valid_names = tuple(valid_list) 

187 

188 

189def _tokenize_with_positions(text: str) -> tuple[list[Token], list[int]]: 

190 """Tokenize text while capturing the start offset of each token.""" 

191 tokens: list[Token] = [] 

192 positions: list[int] = [] 

193 for mo in MASTER_RE.finditer(text): 

194 kind = mo.lastgroup 

195 value = mo.group() 

196 if kind == 'SKIP': 

197 continue 

198 elif kind == 'MISMATCH': 

199 raise ParseError(f'Unexpected character {value!r}', position=mo.start()) 

200 else: 

201 assert kind is not None 

202 tokens.append((kind, value)) 

203 positions.append(mo.start()) 

204 return tokens, positions 

205 

206 

207def tokenize(text: str) -> list[Token]: 

208 """ 

209 Tokenize a logical expression string into a list of tokens. 

210 

211 This function breaks down a logical expression string into individual 

212 tokens based on the defined token specifications. It skips whitespace 

213 and raises a :exc:`ParseError` for unexpected characters. 

214 

215 :param text: The logical expression string to tokenize 

216 :type text: str 

217 :return: A list of tokens represented as (token_type, token_value) tuples 

218 :rtype: list[:data:`Token`] 

219 :raises ParseError: If an unexpected character is encountered in the text 

220 """ 

221 tokens, _ = _tokenize_with_positions(text) 

222 return tokens 

223 

224 

225class ExprParser: 

226 """ 

227 Recursive descent parser producing a simple Abstract Syntax Tree (AST). 

228 

229 The parser handles logical expressions with the following grammar: 

230 

231 .. code-block:: none 

232 

233 expr := or_expr 

234 or_expr := and_expr ("OR" and_expr)* 

235 and_expr:= not_expr ("AND" not_expr)* 

236 not_expr:= "NOT" not_expr | atom 

237 atom := NAME | "(" expr ")" 

238 

239 AST nodes are tuples representing different constructs: 

240 

241 - ("NAME", "token"): A named element (field name or filter name) 

242 - ("NOT", node): A negation operation 

243 - ("AND", left, right): An AND operation between two nodes 

244 - ("OR", left, right): An OR operation between two nodes 

245 

246 .. versionadded:: v2.0.0 

247 

248 To help users diagnose grammar and semantic problems, the parser now 

249 reports detailed error classes with character offsets and accepts 

250 an optional ``valid_names`` iterable to reject unknown identifiers early. 

251 """ 

252 

253 def __init__(self, text: str, *, valid_names: Iterable[str] | None = None) -> None: 

254 """ 

255 Initialize the expression parser with a logical expression string. 

256 

257 :param text: The logical expression to parse 

258 :type text: str 

259 :param valid_names: Optional whitelist of valid NAME tokens 

260 :type valid_names: Iterable[str] | None 

261 """ 

262 self._text = text 

263 self.tokens, self.token_positions = _tokenize_with_positions(text) 

264 self.pos = 0 

265 self._valid_names = frozenset(valid_names) if valid_names is not None else None 

266 

267 def peek(self) -> Token | None: 

268 """ 

269 Peek at the next token without consuming it. 

270 

271 :return: The next token if available, otherwise None 

272 :rtype: :data:`Token` | None 

273 """ 

274 if self.pos < len(self.tokens): 

275 return self.tokens[self.pos] 

276 return None 

277 

278 def accept(self, *kinds: str) -> Token | None: 

279 """ 

280 Accept and consume the next token if it matches one of the given types. 

281 

282 :param kinds: Token types to accept 

283 :type kinds: str 

284 :return: The consumed token if matched, otherwise None 

285 :rtype: :data:`Token` | None 

286 """ 

287 tok = self.peek() 

288 if tok and tok[0] in kinds: 

289 self.pos += 1 

290 return tok 

291 return None 

292 

293 def _current_position(self) -> int: 

294 """Return the character offset of the next token or the end of input.""" 

295 if self.pos < len(self.token_positions): 

296 return self.token_positions[self.pos] 

297 return len(self._text) 

298 

299 def expect(self, kind: str) -> 'Token': 

300 """ 

301 Expect and consume a specific token type. 

302 

303 :param kind: The expected token type 

304 :type kind: str 

305 :return: The consumed token 

306 :rtype: :data:`Token` 

307 :raises ParseError: If the expected token is not found 

308 """ 

309 tok = self.accept(kind) 

310 if tok: 

311 return tok 

312 position = self._current_position() 

313 current = self.peek() 

314 if not current: 

315 raise MissingTokenError(kind, position=position) 

316 raise UnexpectedTokenError(current, expected=kind, position=position) 

317 

318 def parse(self) -> 'ExprNode': 

319 """ 

320 Parse the entire logical expression and return the resulting AST. 

321 

322 :return: The abstract syntax tree representation of the expression 

323 :rtype: :data:`ExprNode` 

324 :raises ParseError: If the expression is malformed 

325 """ 

326 node = self.parse_or() 

327 if self.pos != len(self.tokens): 

328 token = self.tokens[self.pos] 

329 position = self.token_positions[self.pos] 

330 raise UnexpectedTokenError(token, expected='end of expression', position=position) 

331 return node 

332 

333 def parse_or(self) -> 'ExprNode': 

334 """ 

335 Parse an OR expression. 

336 

337 :return: The parsed OR expression AST node 

338 :rtype: :data:`ExprNode` 

339 """ 

340 left = self.parse_and() 

341 while self.accept('OR'): 

342 right = self.parse_and() 

343 left = ('OR', left, right) 

344 return left 

345 

346 def parse_and(self) -> 'ExprNode': 

347 """ 

348 Parse an AND expression. 

349 

350 :return: The parsed AND expression AST node 

351 :rtype: :data:`ExprNode` 

352 """ 

353 left = self.parse_not() 

354 while self.accept('AND'): 

355 right = self.parse_not() 

356 left = ('AND', left, right) 

357 return left 

358 

359 def parse_not(self) -> 'ExprNode': 

360 """ 

361 Parse a NOT expression. 

362 

363 :return: The parsed NOT expression AST node 

364 :rtype: :data:`ExprNode` 

365 """ 

366 if self.accept('NOT'): 

367 node = self.parse_not() 

368 return 'NOT', node 

369 return self.parse_atom() 

370 

371 def parse_atom(self) -> 'ExprNode': 

372 """ 

373 Parse an atomic expression (NAME or parenthesised expression). 

374 

375 :return: The parsed atomic expression AST node 

376 :rtype: :data:`ExprNode` 

377 :raises ParseError: If an unexpected token is encountered 

378 """ 

379 tok = self.peek() 

380 if not tok: 

381 raise UnexpectedEndOfExpressionError(position=self._current_position()) 

382 if tok[0] == 'LPAREN': 

383 self.accept('LPAREN') 

384 node = self.parse_or() 

385 self.expect('RPAREN') 

386 return node 

387 elif tok[0] == 'NAME': 

388 start_pos = self.token_positions[self.pos] 

389 self.accept('NAME') 

390 name = tok[1] 

391 valid_names = self._valid_names 

392 if valid_names is not None and name not in valid_names: 

393 raise UnknownNameError(name, valid_names=valid_names, position=start_pos) 

394 return 'NAME', name 

395 else: 

396 raise UnexpectedTokenError(tok, expected='NAME or LPAREN', position=self._current_position()) 

397 

398 

399def ast_to_string(ast: ExprNode) -> str: 

400 """ 

401 Convert an abstract syntax tree (AST) back to its string representation. 

402 

403 :param ast: The AST to convert 

404 :type ast: ExprNode 

405 :return: The string representation of the AST 

406 :rtype: str 

407 """ 

408 t = ast[0] 

409 if t == 'NAME': 

410 return cast(NameNode, ast)[1] 

411 elif t == 'NOT': 

412 inner = cast(NotNode, ast)[1] 

413 inner_str = ast_to_string(inner) 

414 if inner[0] in ('AND', 'OR'): 

415 return f'NOT ({inner_str})' 

416 return f'NOT {inner_str}' 

417 elif t == 'AND': 

418 bin_ast = cast(BinaryNode, ast) 

419 left, right = bin_ast[1], bin_ast[2] 

420 left_str = ast_to_string(left) 

421 right_str = ast_to_string(right) 

422 if left[0] == 'OR': 

423 left_str = f'({left_str})' 

424 if right[0] == 'OR': 

425 right_str = f'({right_str})' 

426 return f'{left_str} AND {right_str}' 

427 elif t == 'OR': 427 ↛ 433line 427 didn't jump to line 433 because the condition on line 427 was always true

428 bin_ast = cast(BinaryNode, ast) 

429 left_str = ast_to_string(bin_ast[1]) 

430 right_str = ast_to_string(bin_ast[2]) 

431 return f'{left_str} OR {right_str}' 

432 else: 

433 raise ValueError(f'Unsupported AST node type: {t}') 

434 

435 

436class FilterNode: 

437 """Abstract base for nodes.""" 

438 

439 def to_expression(self, model: type[Model]) -> peewee.Expression | bool: 

440 raise NotImplementedError # pragma: no cover 

441 

442 

443class ConditionNode(FilterNode): 

444 """ 

445 Represents a single condition node in a filter expression. 

446 

447 This class encapsulates a single filtering condition that can be applied 

448 to a model field. It supports various logical operations through the 

449 :class:`.LogicalOp` enumerator or string representations of operations. 

450 

451 .. versionadded:: v2.0.0 

452 """ 

453 

454 def __init__(self, field: str | None, operation: LogicalOp | str, value: Any, name: str | None = None): 

455 """ 

456 Initialize a condition node. 

457 

458 :param field: The name of the field to apply the condition to. 

459 :type field: str | None 

460 :param operation: The logical operation to perform. 

461 :type operation: LogicalOp | str 

462 :param value: The value to compare against. 

463 :type value: Any 

464 :param name: Optional name for this condition node. 

465 :type name: str | None, Optional 

466 """ 

467 self.field = field # may be None for some special nodes 

468 if isinstance(operation, str): 

469 try: 

470 self.operation = LogicalOp(operation) 

471 except ValueError: 

472 raise ValueError(f'Unsupported operation: {operation}') 

473 else: 

474 self.operation = operation 

475 self.value = value 

476 self.name = name 

477 

478 def to_expression(self, model: type[Model]) -> peewee.Expression: 

479 """ 

480 Convert this condition node to a Peewee expression. 

481 

482 This method translates the condition represented by this node into 

483 a Peewee expression that can be used in database queries. 

484 

485 :param model: The model class containing the field to filter. 

486 :type model: type[Model] 

487 :return: A Peewee expression representing this condition. 

488 :rtype: peewee.Expression 

489 :raises RuntimeError: If the node has no field to evaluate. 

490 :raises ValueError: If an unsupported operation is specified. 

491 :raises TypeError: If operation requirements are not met (e.g., IN operation requires list/tuple). 

492 """ 

493 if self.field is None: 

494 # Should not happen for standard ConditionNode 

495 raise RuntimeError('ConditionNode has no field to evaluate') 

496 model_field = getattr(model, self.field) 

497 op = self.operation 

498 val = self.value 

499 # the code is full of cast and redundant checks to make mypy happy. 

500 # I do not know to which extent they make the code safer, but for sure they make it less readable. 

501 if op == LogicalOp.EQ: 

502 return cast(peewee.Expression, cast(object, model_field == val)) 

503 elif op == LogicalOp.NE: 

504 return cast(peewee.Expression, cast(object, model_field != val)) 

505 elif op == LogicalOp.LT: 

506 return cast(peewee.Expression, cast(object, model_field < val)) 

507 elif op == LogicalOp.LE: 

508 return cast(peewee.Expression, cast(object, model_field <= val)) 

509 elif op == LogicalOp.GT: 

510 return cast(peewee.Expression, cast(object, model_field > val)) 

511 elif op == LogicalOp.GE: 

512 return cast(peewee.Expression, cast(object, model_field >= val)) 

513 elif op == LogicalOp.GLOB: 

514 return cast(peewee.Expression, model_field % val) 

515 elif op == LogicalOp.LIKE: 

516 return cast(peewee.Expression, model_field**val) 

517 elif op == LogicalOp.REGEXP: 

518 if hasattr(model_field, 'regexp') and callable(getattr(model_field, 'regexp')): 

519 return cast(peewee.Expression, getattr(model_field, 'regexp')(val)) 

520 else: 

521 raise ValueError(f'REGEXP operation not supported for field type {type(model_field)}') 

522 elif op == LogicalOp.IN: 

523 if not isinstance(val, (list, tuple)): 

524 raise TypeError(f'IN operation requires list/tuple, got {type(val)}') 

525 if hasattr(model_field, 'in_') and callable(getattr(model_field, 'in_')): 

526 return cast(peewee.Expression, getattr(model_field, 'in_')(val)) 

527 else: 

528 raise ValueError(f'IN operation not supported for field type {type(model_field)}') 

529 elif op == LogicalOp.NOT_IN: 

530 if not isinstance(val, (list, tuple)): 

531 raise TypeError(f'NOT_IN operation requires list/tuple, got {type(val)}') 

532 if hasattr(model_field, 'not_in') and callable(getattr(model_field, 'not_in')): 

533 return cast(peewee.Expression, getattr(model_field, 'not_in')(val)) 

534 else: 

535 raise ValueError(f'NOT_IN operation not supported for field type {type(model_field)}') 

536 elif op == LogicalOp.BETWEEN: 

537 if not isinstance(val, (list, tuple)) or len(val) != 2: 

538 raise TypeError(f'BETWEEN operation requires list/tuple of 2 elements, got {val}') 

539 if hasattr(model_field, 'between') and callable(getattr(model_field, 'between')): 

540 return cast(peewee.Expression, getattr(model_field, 'between')(val[0], val[1])) 

541 else: 

542 raise ValueError(f'BETWEEN operation not supported for field type {type(model_field)}') 

543 elif op == LogicalOp.BIT_AND: 

544 if hasattr(model_field, 'bin_and') and callable(getattr(model_field, 'bin_and')): 

545 return cast(peewee.Expression, cast(object, getattr(model_field, 'bin_and')(val) != 0)) 

546 else: 

547 raise ValueError(f'BIT_AND operation not supported for field type {type(model_field)}') 

548 elif op == LogicalOp.BIT_OR: 

549 if hasattr(model_field, 'bin_or') and callable(getattr(model_field, 'bin_or')): 

550 return cast(peewee.Expression, cast(object, getattr(model_field, 'bin_or')(val) != 0)) 

551 else: 

552 raise ValueError(f'BIT_OR operation not supported for field type {type(model_field)}') 

553 elif op == LogicalOp.IS_NULL: 

554 return cast(peewee.Expression, model_field.is_null()) 

555 elif op == LogicalOp.IS_NOT_NULL: 

556 return cast(peewee.Expression, model_field.is_null(False)) 

557 else: 

558 raise ValueError(f'Unsupported operation: {op}') 

559 

560 

561class ConditionalNode(FilterNode): 

562 """ 

563 Wraps :class:`ConditionalFilterCondition` behaviour as a :class:`FilterNode`. 

564 

565 This class serves as an adapter to integrate conditional filter conditions 

566 into the filter node hierarchy, allowing them to be treated uniformly with 

567 other filter nodes during expression evaluation. 

568 

569 .. versionadded:: v2.0.0 

570 """ 

571 

572 def __init__(self, conditional: 'ConditionalFilterCondition', name: str | None = None): 

573 """ 

574 Initialize a conditional node. 

575 

576 :param conditional: The conditional filter condition to wrap 

577 :type conditional: ConditionalFilterCondition 

578 :param name: Optional name for this conditional node 

579 :type name: str | None, Optional 

580 """ 

581 self.conditional = conditional 

582 self.name = name 

583 

584 def to_expression(self, model: type[Model]) -> peewee.Expression: 

585 """ 

586 Convert this conditional node to a Peewee expression. 

587 

588 This method delegates the conversion to the wrapped conditional filter 

589 condition's :meth:`to_expression` method. 

590 

591 :param model: The model class to generate the expression for 

592 :type model: type[Model] 

593 :return: A Peewee expression representing this conditional node 

594 :rtype: peewee.Expression 

595 """ 

596 return self.conditional.to_expression(model) 

597 

598 

599class LogicalNode(FilterNode): 

600 """ 

601 Logical combination of child nodes. 

602 

603 This class represents logical operations (AND, OR, NOT) applied to filter nodes. 

604 It enables building complex filter expressions by combining simpler filter nodes 

605 with logical operators. 

606 

607 .. versionadded:: v2.0.0 

608 """ 

609 

610 def __init__(self, op: str, *children: FilterNode): 

611 """ 

612 Initialize a logical node. 

613 

614 :param op: The logical operation ('AND', 'OR', 'NOT') 

615 :type op: str 

616 :param children: Child filter nodes to combine with the logical operation 

617 :type children: FilterNode 

618 """ 

619 self.op = op # 'AND', 'OR', 'NOT' 

620 self.children = list(children) 

621 

622 def to_expression(self, model: type[Model]) -> peewee.Expression | bool: 

623 """ 

624 Convert this logical node to a Peewee expression. 

625 

626 This method evaluates the logical operation on the child nodes and returns 

627 the corresponding Peewee expression. 

628 

629 :param model: The model class to generate the expression for 

630 :type model: type[Model] 

631 :return: A Peewee expression representing this logical node 

632 :rtype: peewee.Expression | bool 

633 :raises ValueError: If an unknown logical operation is specified 

634 """ 

635 if self.op == 'NOT': 

636 assert len(self.children) == 1 

637 inner = self.children[0].to_expression(model) 

638 return cast(peewee.Expression, ~inner) 

639 elif self.op == 'AND': 

640 expressions = [c.to_expression(model) for c in self.children] 

641 return cast(peewee.Expression, reduce(operator.and_, expressions)) 

642 elif self.op == 'OR': 

643 expressions = [c.to_expression(model) for c in self.children] 

644 return cast(peewee.Expression, reduce(operator.or_, expressions)) 

645 else: 

646 raise ValueError(f'Unknown logical op: {self.op}') 

647 

648 

649class ConditionalFilterCondition: 

650 """ 

651 Represents a conditional filter where one field's criteria depends on another. 

652 

653 This allows expressing logic like: 

654 "IF field_a IN [x, y] THEN field_b IN [1, 2] ELSE no constraint on field_b" 

655 

656 Example usage: 

657 

658 .. code-block:: python 

659 

660 # Filter: sample_id in [1,2] if composite_image_id in [100,101] 

661 condition = ConditionalFilterCondition( 

662 condition_field='composite_image_id', 

663 condition_op='IN', 

664 condition_value=[100, 101], 

665 then_field='sample_id', 

666 then_op='IN', 

667 then_value=[1, 2], 

668 ) 

669 

670 # This generates: 

671 # WHERE (composite_image_id IN (100, 101) AND sample_id IN (1, 2)) 

672 # OR (composite_image_id NOT IN (100, 101)) 

673 """ 

674 

675 def __init__( 

676 self, 

677 condition_field: str, 

678 condition_op: str | LogicalOp, 

679 condition_value: Any, 

680 then_field: str, 

681 then_op: str | LogicalOp, 

682 then_value: Any, 

683 else_field: str | None = None, 

684 else_op: str | LogicalOp | None = None, 

685 else_value: Any | None = None, 

686 name: str | None = None, 

687 ) -> None: 

688 """ 

689 Initialise a conditional filter condition. 

690 

691 :param condition_field: The field to check for the condition 

692 :type condition_field: str 

693 :param condition_op: The operation for the condition (e.g., 'IN', '==') 

694 :type condition_op: str | LogicalOp 

695 :param condition_value: The value(s) for the condition 

696 :type condition_value: Any 

697 :param then_field: The field to filter when condition is true 

698 :type then_field: str 

699 :param then_op: The operation to apply when condition is true 

700 :type then_op: str | LogicalOp 

701 :param then_value: The value(s) for the then clause 

702 :type then_value: Any 

703 :param else_field: Optional field to filter when condition is false 

704 :type else_field: str | None 

705 :param else_op: Optional operation when condition is false 

706 :type else_op: str | LogicalOp | None 

707 :param else_value: Optional value(s) for the else clause 

708 :type else_value: Any | None 

709 :param name: The name of this condition. Avoid name clashing with model fields. Defaults to None 

710 :type name: str | None, Optional 

711 """ 

712 self.condition_field = condition_field 

713 self.condition_op = condition_op 

714 self.condition_value = condition_value 

715 self.then_field = then_field 

716 self.then_op = then_op 

717 self.then_value = then_value 

718 self.else_field = else_field 

719 self.else_op = else_op 

720 self.else_value = else_value 

721 self.name = name 

722 

723 def to_expression(self, model: type[Model]) -> peewee.Expression: 

724 """ 

725 Convert this conditional filter to a Peewee expression. 

726 

727 The resulting expression is: 

728 (condition AND then_constraint) OR (NOT condition AND else_constraint) 

729 

730 Which logically means: 

731 

732 - When condition is true, apply then_constraint 

733 - When condition is false, apply else_constraint (or no constraint) 

734 

735 :param model: The model class containing the fields 

736 :type model: type[Model] 

737 :return: A Peewee expression 

738 :rtype: peewee.Expression 

739 """ 

740 # Build the condition expression 

741 condition_expr = ConditionNode(self.condition_field, self.condition_op, self.condition_value).to_expression( 

742 model 

743 ) 

744 

745 # Build the then expression 

746 then_expr = ConditionNode(self.then_field, self.then_op, self.then_value).to_expression(model) 

747 

748 # Build the else expression 

749 if self.else_field is not None and self.else_op is not None: 

750 else_expr = ConditionNode(self.else_field, self.else_op, self.else_value).to_expression(model) 

751 else: 

752 # No constraint in else clause - always true 

753 # the nested cast is needed to make mypy happy. 

754 else_expr = cast(peewee.Expression, cast(object, True)) 

755 

756 # Combine: (condition AND then) OR (NOT condition AND else) 

757 return cast(peewee.Expression, (condition_expr & then_expr) | (~condition_expr & else_expr)) 

758 

759 def __eq__(self, other: Any) -> bool: 

760 if not isinstance(other, ConditionalFilterCondition): 

761 return False 

762 

763 return vars(self) == vars(other) 

764 

765 

766class ModelFilter: 

767 r""" 

768 Class to filter rows from a model. 

769 

770 The filter object can be used to generate a where clause to be applied to Model.select(). 

771 

772 The construction of a ModelFilter is normally done via a configuration file using the :meth:`from_conf` class method. 

773 The name of the filter is playing a key role in this. If it follows a dot structure like: 

774 

775 *ProcessorName.__filter__.ModelName* 

776 

777 then the corresponding table from the TOML configuration object will be used. 

778 

779 For each processor, there might be many Filters, up to one for each Model used to get the input list. If a 

780 processor is joining together three Models when performing the input select, there will be up to three Filters 

781 collaborating on making the selection. 

782 

783 The filter configuration can contain the following key, value pair: 

784 

785 - key / string pairs, where the key is the name of a field in the corresponding Model 

786 

787 - key / numeric pairs 

788 

789 - key / arrays 

790 

791 - key / dict pairs with 'op' and 'value' keys for explicit operation specification 

792 

793 All fields from the configuration file will be added to the instance namespace, thus accessible with the dot 

794 notation. Moreover, the field names and their filter value will be added to a private dictionary to simplify the 

795 generation of the filter SQL code. 

796 

797 The user can use the filter object to store selection criteria. He can construct queries using the filter 

798 contents in the same way as he could use processor parameters. 

799 

800 If he wants to automatically generate valid filtering expression, he can use the :meth:`filter` method. In order 

801 for this to work, the ModelFilter object be :meth:`bound <bind>` to a Model. Without this binding the ModelFilter will not 

802 be able to automatically generate expressions. 

803 

804 For each field in the filter, one condition will be generated according to the following scheme: 

805 

806 ================= ================= ================== 

807 Filter field type Logical operation Example 

808 ================= ================= ================== 

809 Numeric, boolean == Field == 3.14 

810 String GLOB Field GLOB '\*ree' 

811 List IN Field IN [1, 2, 3] 

812 Dict (explicit) op from dict Field BIT_AND 5 

813 ================= ================= ================== 

814 

815 All conditions will be joined with a AND logic by default, but this can be changed. 

816 

817 The ModelFilter also supports logical expressions to combine multiple filter conditions using AND, OR, and NOT 

818 operators. These expressions can reference named filter conditions within the same filter or even combine 

819 conditions from different filters when used with :class:`ProcessorFilter`. 

820 

821 Conditional filters allow expressing logic like: 

822 "IF field_a IN [x, y] THEN field_b IN [1, 2] ELSE no constraint on field_b" 

823 

824 Consider the following example: 

825 

826 .. code-block:: python 

827 :linenos: 

828 

829 class MeasModel(MAFwBaseModel): 

830 meas_id = AutoField(primary_key=True) 

831 sample_name = TextField() 

832 successful = BooleanField() 

833 flags = IntegerField() 

834 composite_image_id = IntegerField() 

835 sample_id = IntegerField() 

836 

837 

838 # Traditional simplified usage 

839 flt = ModelFilter( 

840 'MyProcessor.__filter__.MyModel', 

841 sample_name='sample_00*', 

842 meas_id=[1, 2, 3], 

843 successful=True, 

844 ) 

845 

846 # New explicit operation usage 

847 flt = ModelFilter( 

848 'MyProcessor.__filter__.MyModel', 

849 sample_name={'op': 'LIKE', 'value': 'sample_00%'}, 

850 flags={'op': 'BIT_AND', 'value': 5}, 

851 meas_id={'op': 'IN', 'value': [1, 2, 3]}, 

852 ) 

853 

854 # Logical expression usage 

855 flt = ModelFilter( 

856 'MyProcessor.__filter__.MyModel', 

857 sample_name={'op': 'LIKE', 'value': 'sample_00%'}, 

858 flags={'op': 'BIT_AND', 'value': 5}, 

859 meas_id={'op': 'IN', 'value': [1, 2, 3]}, 

860 __logic__='sample_name AND (flags OR meas_id)', 

861 ) 

862 

863 # Conditional filter usage 

864 flt = ModelFilter( 

865 'MyProcessor.__filter__.MyModel', 

866 sample_name='sample_00*', 

867 composite_image_id=[100, 101], 

868 sample_id=[1, 2], 

869 __conditional__=[ 

870 { 

871 'condition_field': 'composite_image_id', 

872 'condition_op': 'IN', 

873 'condition_value': [100, 101], 

874 'then_field': 'sample_id', 

875 'then_op': 'IN', 

876 'then_value': [1, 2], 

877 } 

878 ], 

879 ) 

880 

881 flt.bind(MeasModel) 

882 filtered_query = MeasModel.select().where(flt.filter()) 

883 

884 The explicit operation format allows for bitwise operations and other advanced filtering. 

885 

886 TOML Configuration Examples: 

887 

888 .. code-block:: toml 

889 

890 [MyProcessor.__filter__.MyModel] 

891 sample_name = "sample_00*" # Traditional GLOB 

892 successful = true # Traditional equality 

893 

894 # Explicit operations 

895 flags = { op = "BIT_AND", value = 5 } 

896 score = { op = ">=", value = 75.0 } 

897 category = { op = "IN", value = ["A", "B", "C"] } 

898 date_range = { op = "BETWEEN", value = ["2024-01-01", "2024-12-31"] } 

899 

900 # Logical expression for combining conditions 

901 __logic__ = "sample_name AND (successful OR flags)" 

902 

903 # Conditional filters 

904 [[MyProcessor.__filter__.MyModel.__conditional__]] 

905 condition_field = "composite_image_id" 

906 condition_op = "IN" 

907 condition_value = [100, 101] 

908 then_field = "sample_id" 

909 then_op = "IN" 

910 then_value = [1, 2] 

911 

912 # Nested conditions with logical expressions 

913 [MyProcessor.__filter__.MyModel.nested_conditions] 

914 __logic__ = "a OR b" 

915 a = { op = "LIKE", value = "test%" } 

916 b = { op = "IN", value = [1, 2, 3] } 

917 

918 .. seealso:: 

919 

920 - :class:`mafw.db.db_filter.ProcessorFilter` - For combining multiple ModelFilters with logical expressions 

921 - :class:`mafw.db.db_filter.ConditionalFilterCondition` - For conditional filtering logic 

922 - :class:`mafw.db.db_filter.ExprParser` - For parsing logical expressions 

923 """ 

924 

925 logic_name = '__logic__' 

926 """ 

927 The logic keyword identifier. 

928  

929 This value cannot be used as field name in the filter bound model. 

930 """ 

931 conditional_name = '__conditional__' 

932 """ 

933 The conditional keyword identifier. 

934  

935 This value cannot be used as field name in the filter bound model. 

936 """ 

937 

938 def __init__(self, name_: str, **kwargs: Any) -> None: 

939 """ 

940 Constructor parameters: 

941 

942 :param `name_`: The name of the filter. It should be in dotted format to facilitate the configuration via the 

943 steering file. The _ is used to allow the user to have a keyword argument named name. 

944 :type `name_`: str 

945 :param kwargs: Keyword parameters corresponding to fields and filter values. 

946 

947 .. versionchanged:: v1.2.0 

948 The parameter *name* has been renamed as *name_*. 

949 

950 .. versionchanged:: v1.3.0 

951 Implementation of explicit operation. 

952 

953 .. versionchanged:: v2.0.0 

954 Introduction of conditional filters, logical expression and hierarchical structure. 

955 Introduction of autobinding for MAFwBaseModels 

956 

957 """ 

958 self.name = name_ 

959 self.model_name = name_.split('.')[-1] 

960 self.model: type[Model] | None = None 

961 self._model_bound = False 

962 

963 # attempt to autobind 

964 self._auto_bind() 

965 

966 # mapping name -> FilterNode 

967 self._nodes: 'OrderedDict[str, FilterNode]' = OrderedDict() 

968 # conditional nodes mapping (named) 

969 self._cond_nodes: 'OrderedDict[str, ConditionalNode]' = OrderedDict() 

970 # logic expression for this filter (combining top-level node names) 

971 self._logic_expr: str | None = None 

972 

973 # Extract conditional filters if present 

974 if self.conditional_name in kwargs: 

975 conditionals = kwargs.pop(self.conditional_name) 

976 if not isinstance(conditionals, list): 

977 conditionals = [conditionals] 

978 

979 for cond_dict in conditionals: 

980 self.add_conditional_from_dict(cond_dict) 

981 

982 # Extract logic for internal conditions, if provided 

983 if self.logic_name in kwargs: 

984 self._logic_expr = kwargs.pop(self.logic_name) 

985 

986 # now process remaining kwargs as either: 

987 # - simple/extended condition for a field 

988 # - or a nested mapping describing subconditions for field (field-level logic) 

989 for k, v in kwargs.items(): 

990 # simple types map to ConditionNode 

991 if isinstance(v, dict) and ('op' in v and 'value' in v): 

992 # explicit op/value for field k 

993 # extended operation condition 

994 node = ConditionNode(k, v['op'], v['value'], name=k) 

995 self._nodes[k] = node 

996 elif isinstance(v, dict) and any( 

997 isinstance(x, dict) or x == self.logic_name or x not in ['op', 'value'] 

998 for x in v.keys() 

999 if isinstance(v, dict) 

1000 ): 

1001 # nested mapping: create sub-nodes for this field 

1002 # v expected like {'__logic__': 'a OR b', 'a': {'op':..., 'value':...}, 'b': ...} 

1003 subnodes: 'OrderedDict[str, FilterNode]' = OrderedDict() 

1004 sub_logic = v.get(self.logic_name, None) 

1005 for subk, subv in v.items(): 

1006 if subk == self.logic_name: 

1007 continue 

1008 if isinstance(subv, dict) and ('op' in subv and 'value' in subv): 

1009 subnode = ConditionNode(k, subv['op'], subv['value'], name=subk) 

1010 subnodes[subk] = subnode 

1011 else: 

1012 subnodes[subk] = self._create_condition_node_from_value(subv, k, subk) 

1013 # combine subnodes using sub_logic or AND by default 

1014 if sub_logic: 

1015 ast = ExprParser(sub_logic).parse() 

1016 ln = self._build_logical_node_from_ast(ast, subnodes, model_name_placeholder=k) 

1017 else: 

1018 # AND all subnodes 

1019 ln = LogicalNode('AND', *subnodes.values()) 

1020 self._nodes[k] = ln 

1021 else: 

1022 self._nodes[k] = self._create_condition_node_from_value(v, k, k) 

1023 

1024 def _auto_bind(self) -> None: 

1025 """ 

1026 Attempt to automatically bind the filter to a model. 

1027 

1028 This method tries to retrieve the model associated with the filter's model name from the 

1029 :mod:`mafw.db.db_model` registry and bind it using the :meth:`bind` method. 

1030 

1031 If the model cannot be found,a warning is logged indicating the failure to perform auto-binding. 

1032 

1033 This model is automatically invoked by the :class:`.ModelFilter` constructor. 

1034 """ 

1035 try: 

1036 model = mafw_model_register.get_model(self.model_name) 

1037 self.bind(model) # type: ignore[arg-type] 

1038 except KeyError: 

1039 log.warning(f'Impossible to perform auto-binding for model {self.model_name}') 

1040 

1041 def _build_logical_node_from_ast( 

1042 self, ast: ExprNode, name_to_nodes: Dict[str, FilterNode], model_name_placeholder: str | None = None 

1043 ) -> FilterNode: 

1044 """Recursively build LogicalNode from AST using a mapping name->FilterNode.""" 

1045 t = ast[0] 

1046 if t == 'NAME': 

1047 named_ast = cast(NameNode, ast) 

1048 nm = named_ast[1] 

1049 if nm not in name_to_nodes: 

1050 raise KeyError(f'Unknown name {nm} in nested logic for field {model_name_placeholder}') 

1051 return name_to_nodes[nm] 

1052 elif t == 'NOT': 

1053 not_ast = cast(NotNode, ast) 

1054 child = self._build_logical_node_from_ast(not_ast[1], name_to_nodes, model_name_placeholder) 

1055 return LogicalNode('NOT', child) 

1056 elif t in ('AND', 'OR'): 

1057 bin_ast = cast(BinaryNode, ast) 

1058 left = self._build_logical_node_from_ast(bin_ast[1], name_to_nodes, model_name_placeholder) 

1059 right = self._build_logical_node_from_ast(bin_ast[2], name_to_nodes, model_name_placeholder) 

1060 return LogicalNode(t, left, right) 

1061 else: 

1062 raise ValueError(f'Unsupported AST node {t}') 

1063 

1064 @staticmethod 

1065 def _create_condition_node_from_value(value: Any, field_name: str, node_name: str | None = None) -> ConditionNode: 

1066 """ 

1067 Create a FilterCondition based on value type (backward compatibility). 

1068 

1069 :param value: The filter value 

1070 :param field_name: The field name 

1071 :return: A FilterCondition 

1072 """ 

1073 if isinstance(value, (int, float, bool)): 

1074 return ConditionNode(field_name, LogicalOp.EQ, value, node_name) 

1075 elif isinstance(value, str): 

1076 return ConditionNode(field_name, LogicalOp.GLOB, value, node_name) 

1077 elif isinstance(value, list): 

1078 return ConditionNode(field_name, LogicalOp.IN, value, node_name) 

1079 else: 

1080 raise TypeError(f'ModelFilter value of unsupported type {type(value)} for field {field_name}.') 

1081 

1082 def bind(self, model: type[Model]) -> None: 

1083 """ 

1084 Connects a filter to a Model class. 

1085 

1086 :param model: Model to be bound. 

1087 :type model: Model 

1088 """ 

1089 

1090 self.model = model 

1091 self._model_bound = True 

1092 

1093 if hasattr(self.model, self.logic_name) and self._model_bound: 

1094 if TYPE_CHECKING: 

1095 assert self.model is not None 

1096 

1097 log.warning( 

1098 f'Model {self.model.__name__} has a field named {self.logic_name}. This is ' 

1099 f'preventing the logic expression to work.' 

1100 ) 

1101 log.warning('Modify your model. Logic expression disabled.') 

1102 self._logic_expr = None 

1103 

1104 @property 

1105 def is_bound(self) -> bool: 

1106 """Returns true if the ModelFilter has been bound to a Model""" 

1107 return self._model_bound 

1108 

1109 def add_conditional(self, conditional: ConditionalFilterCondition) -> None: 

1110 """ 

1111 Add a conditional filter. 

1112 

1113 .. versionadded:: v2.0.0 

1114 

1115 :param conditional: The conditional filter condition 

1116 :type conditional: ConditionalFilterCondition 

1117 """ 

1118 condition_name = conditional.name 

1119 if condition_name is None: 

1120 # it means that the user did not specify any name for this condition. 

1121 # we will then assign one 

1122 increment = 0 

1123 while True: 

1124 condition_name = f'__cond{increment + len(self._cond_nodes)}__' 

1125 if condition_name not in self._cond_nodes: 

1126 break 

1127 else: 

1128 increment += 1 

1129 else: 

1130 # the user specified a name for this condition. we will use it but first we check if it is not yet used 

1131 if condition_name in self._cond_nodes: 

1132 raise KeyError( 

1133 f'A conditional filter named {condition_name} already exists. Please review your steering file.' 

1134 ) 

1135 

1136 node = ConditionalNode(conditional, name=condition_name) 

1137 self._cond_nodes[condition_name] = node 

1138 self._nodes[condition_name] = node 

1139 

1140 def add_conditional_from_dict(self, config: dict[str, Any]) -> None: 

1141 """ 

1142 Add a conditional filter from a configuration dictionary. 

1143 

1144 .. versionadded:: v2.0.0 

1145 

1146 :param config: Dictionary with conditional filter configuration 

1147 :type config: dict[str, Any] 

1148 """ 

1149 conditional = ConditionalFilterCondition( 

1150 condition_field=config['condition_field'], 

1151 condition_op=config['condition_op'], 

1152 condition_value=config['condition_value'], 

1153 then_field=config['then_field'], 

1154 then_op=config['then_op'], 

1155 then_value=config['then_value'], 

1156 else_field=config.get('else_field'), 

1157 else_op=config.get('else_op'), 

1158 else_value=config.get('else_value'), 

1159 name=config.get('name'), 

1160 ) 

1161 self.add_conditional(conditional) 

1162 

1163 @classmethod 

1164 def from_conf(cls, name: str, conf: dict[str, Any]) -> Self: 

1165 """ 

1166 Builds a Filter object from a steering file dictionary. 

1167 

1168 If the name is in dotted notation, then this should be corresponding to the table in the configuration file. 

1169 If a default configuration is provided, this will be used as a starting point for the filter, and it will be 

1170 updated by the actual configuration in ``conf``. 

1171 

1172 In normal use, you would provide the specific configuration via the conf parameter. 

1173 

1174 See details in the :class:`class documentation <ModelFilter>` 

1175 

1176 :param name: The name of the filter in dotted notation. 

1177 :type name: str 

1178 :param conf: The configuration dictionary. 

1179 :type conf: dict 

1180 :return: A Filter object 

1181 :rtype: ModelFilter 

1182 """ 

1183 param = {} 

1184 

1185 # split the name from dotted notation 

1186 # ProcessorName#123.ModelName.Filter 

1187 # the processor name is actually the processor replica name 

1188 names = name.split('.') 

1189 if len(names) == 3 and names[1] == '__filter__': 

1190 proc_name, _, model_name = names 

1191 if proc_name in conf and '__filter__' in conf[proc_name] and model_name in conf[proc_name]['__filter__']: 

1192 param.update(copy(conf[proc_name]['__filter__'][model_name])) 

1193 

1194 # if the name is not in the expected dotted notation, the use an empty filter. 

1195 return cls(name, **param) 

1196 

1197 def _evaluate_logic_ast(self, ast: ExprNode) -> peewee.Expression | bool: 

1198 """ 

1199 Evaluate an abstract syntax tree (AST) representing a logical expression. 

1200 

1201 This method recursively evaluates the AST nodes to produce a Peewee expression 

1202 or boolean value representing the logical combination of filter conditions. 

1203 

1204 :param ast: The abstract syntax tree node to evaluate 

1205 :type ast: Any 

1206 :return: A Peewee expression for logical operations or boolean True/False 

1207 :rtype: peewee.Expression | bool 

1208 :raises KeyError: If a referenced condition name is not found in the filter 

1209 :raises ValueError: If an unsupported AST node type is encountered 

1210 """ 

1211 t = ast[0] 

1212 if t == 'NAME': 

1213 named_ast = cast(NameNode, ast) 

1214 nm = named_ast[1] 

1215 if nm not in self._nodes: 

1216 raise KeyError(f"Unknown node '{nm}' in logic for filter {self.name}") 

1217 node = self._nodes[nm] 

1218 

1219 if TYPE_CHECKING: 

1220 assert self.model is not None 

1221 return node.to_expression(self.model) 

1222 elif t == 'NOT': 

1223 not_ast = cast(NotNode, ast) 

1224 val = self._evaluate_logic_ast(not_ast[1]) 

1225 return cast(peewee.Expression, ~val) 

1226 elif t == 'AND': 

1227 bin_ast = cast(BinaryNode, ast) 

1228 left = self._evaluate_logic_ast(bin_ast[1]) 

1229 right = self._evaluate_logic_ast(bin_ast[2]) 

1230 return cast(peewee.Expression, cast(object, left & right)) 

1231 elif t == 'OR': 

1232 bin_ast = cast(BinaryNode, ast) 

1233 left = self._evaluate_logic_ast(bin_ast[1]) 

1234 right = self._evaluate_logic_ast(bin_ast[2]) 

1235 return cast(peewee.Expression, cast(object, left | right)) 

1236 else: 

1237 raise ValueError(f'Unsupported AST node {t}') 

1238 

1239 def filter(self, join_with: Literal['AND', 'OR'] = 'AND') -> peewee.Expression | bool: 

1240 """ 

1241 Generates a filtering expression joining all filtering fields. 

1242 

1243 See details in the :class:`class documentation <ModelFilter>` 

1244 

1245 .. versionchanged:: v1.3.0 

1246 Add the possibility to specify a `join_with` function 

1247 

1248 .. versionchanged:: v2.0.0 

1249 Add support for conditional filters and for logical expression 

1250 

1251 :param join_with: How to join conditions ('AND' or 'OR'). Defaults to 'AND'. 

1252 :type join_with: Literal['AND', 'OR'], default 'AND' 

1253 :return: The filtering expression. 

1254 :rtype: peewee.Expression | bool 

1255 :raises TypeError: when the field value type is not supported. 

1256 :raises ValueError: when join_with is not 'AND' or 'OR'. 

1257 """ 

1258 if not self.is_bound: 

1259 log.warning('Unable to generate the filter. Did you bind the filter to the model?') 

1260 return True 

1261 

1262 if TYPE_CHECKING: 

1263 # if we get here, it means that we have a valid model 

1264 assert self.model is not None 

1265 

1266 # if logic provided for this filter, use it 

1267 if self._logic_expr: 

1268 try: 

1269 ast = ExprParser(self._logic_expr).parse() 

1270 except ParseError as e: 

1271 raise ValueError(f'Error parsing logic for filter {self.name}: {e}') 

1272 try: 

1273 return self._evaluate_logic_ast(ast) 

1274 except KeyError as e: 

1275 raise ValueError(f'Error evaluating logic for filter {self.name}: {e}') 

1276 

1277 # otherwise combine all top-level nodes (AND by default) 

1278 exprs = [n.to_expression(self.model) for n in self._nodes.values()] 

1279 if not exprs: 

1280 return True 

1281 if join_with not in ('AND', 'OR'): 

1282 raise ValueError("join_with must be 'AND' or 'OR'") 

1283 if join_with == 'AND': 

1284 return cast(peewee.Expression, reduce(operator.and_, exprs)) 

1285 return cast(peewee.Expression, reduce(operator.or_, exprs)) 

1286 

1287 

1288class ProcessorFilter(UserDict[str, ModelFilter]): 

1289 """ 

1290 A special dictionary to store all :class:`Filters <mafw.db.db_filter.ModelFilter>` in a processors. 

1291 

1292 It contains a publicly accessible dictionary with the configuration of each ModelFilter using the Model name as 

1293 keyword. 

1294 

1295 It contains a private dictionary with the global filter configuration as well. 

1296 The global filter is not directly accessible, but only some of its members will be exposed via properties. 

1297 In particular, the new_only flag that is relevant only at the Processor level can be accessed directly using the 

1298 :attr:`new_only`. If not specified in the configuration file, the new_only is by default True. 

1299 

1300 It is possible to assign a logic operation string to the register that is used to join all the filters together 

1301 when performing the :meth:`filter_all`. If no logic operation string is provided, the register will provide a join 

1302 condition using either AND (default) or OR. 

1303 """ 

1304 

1305 def __init__(self, data: dict[str, ModelFilter] | None = None, /, **kwargs: Any) -> None: 

1306 """ 

1307 Constructor parameters: 

1308 

1309 :param data: Initial data 

1310 :type data: dict 

1311 :param kwargs: Keywords arguments 

1312 """ 

1313 self._global_filter: dict[str, Any] = {} 

1314 self._logic: str | None = None 

1315 super().__init__(data, **kwargs) 

1316 

1317 @property 

1318 def new_only(self) -> bool: 

1319 """ 

1320 The new only flag. 

1321 

1322 :return: True, if only new items, not already in the output database table must be processed. 

1323 :rtype: bool 

1324 """ 

1325 return cast(bool, self._global_filter.get('new_only', True)) 

1326 

1327 @new_only.setter 

1328 def new_only(self, v: bool) -> None: 

1329 self._global_filter['new_only'] = v 

1330 

1331 def __setitem__(self, key: str, value: ModelFilter) -> None: 

1332 """ 

1333 Set a new value at key. 

1334 

1335 If value is not a Filter, then it will be automatically and silently discarded. 

1336 

1337 :param key: Dictionary key. Normally the name of the model linked to the filter. 

1338 :type key: str 

1339 :param value: The Filter. 

1340 :type value: ModelFilter 

1341 """ 

1342 if not isinstance(value, ModelFilter): 

1343 return 

1344 super().__setitem__(key, value) 

1345 

1346 def bind_all(self, models: list[type[Model]] | dict[str, type[Model]]) -> None: 

1347 """ 

1348 Binds all filters to their models. 

1349 

1350 The ``models`` list or dictionary should contain a valid model for all the ModelFilters in the registry. 

1351 In the case of a dictionary, the key value should be the model name. 

1352 

1353 :param models: List or dictionary of a databank of Models from which the ModelFilter can be bound. 

1354 :type models: list[type(Model)] | dict[str,type(Model)] 

1355 """ 

1356 if isinstance(models, list): 

1357 models = {m.__name__: m for m in models} 

1358 

1359 # check, if we have a filter for each listed models, if not create one using the default configuration. 

1360 for model_name in models.keys(): 

1361 if model_name not in self.data: 

1362 self.data[model_name] = ModelFilter.from_conf(f'{model_name}', conf={}) 

1363 

1364 for k, v in self.data.items(): 

1365 if k in self.data and k in models and not v.is_bound: 1365 ↛ 1364line 1365 didn't jump to line 1364 because the condition on line 1365 was always true

1366 v.bind(models[k]) 

1367 

1368 def filter_all(self, join_with: Literal['AND', 'OR'] = 'AND') -> peewee.Expression | bool: 

1369 """ 

1370 Generates a where clause joining all filters. 

1371 

1372 If a logic expression is present, it will be used to combine named filters. 

1373 Otherwise, fall back to the legacy behaviour using join_with. 

1374 

1375 :raise ValueError: If the parsing of the logical expression fails 

1376 :param join_with: Logical function to join the filters if no logic expression is provided. 

1377 :type join_with: Literal['AND', 'OR'], default: 'AND' 

1378 :return: ModelFilter expression 

1379 :rtype: peewee.Expression 

1380 """ 

1381 # If a logic expression is present at the global level, use it to combine filters 

1382 if self._logic: 

1383 try: 

1384 ast = ExprParser(self._logic).parse() 

1385 except ParseError as e: 

1386 raise ValueError(f'Error parsing global logic for ProcessorFilter: {e}') 

1387 

1388 def eval_ast(node: ExprNode) -> peewee.Expression | bool: 

1389 t = node[0] 

1390 if t == 'NAME': 

1391 named_node = cast(NameNode, node) 

1392 nm = named_node[1] 

1393 if nm not in self.data: 

1394 raise KeyError(f"Unknown filter name '{nm}' in processor logic") 

1395 flt = self.data[nm] 

1396 if not flt.is_bound: 

1397 log.warning(f"ModelFilter '{nm}' is not bound; using True for its expression") 

1398 return True 

1399 return flt.filter() 

1400 elif t == 'NOT': 

1401 not_node = cast(NotNode, node) 

1402 return cast(peewee.Expression, ~eval_ast(not_node[1])) 

1403 elif t == 'AND': 

1404 bin_node = cast(BinaryNode, node) 

1405 return cast(peewee.Expression, cast(object, eval_ast(bin_node[1]) & eval_ast(bin_node[2]))) 

1406 elif t == 'OR': 

1407 bin_node = cast(BinaryNode, node) 

1408 return cast(peewee.Expression, cast(object, eval_ast(bin_node[1]) | eval_ast(bin_node[2]))) 

1409 else: 

1410 raise ValueError(f'Unsupported AST node {t}') 

1411 

1412 try: 

1413 return eval_ast(ast) 

1414 except KeyError as e: 

1415 raise ValueError(f'Error evaluating processor logic: {e}') 

1416 

1417 # Legacy behaviour: combine all filters with join_with (AND/OR) 

1418 filter_list = [flt.filter() for flt in self.data.values() if flt.is_bound] 

1419 if join_with == 'AND': 

1420 return cast(peewee.Expression, cast(object, reduce(operator.and_, filter_list, True))) 

1421 else: 

1422 return cast(peewee.Expression, cast(object, reduce(operator.or_, filter_list, True)))