Coverage for src / mafw / db / trigger.py: 98%

294 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-12 09:03 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Module provides a Trigger class and related tools to create triggers in the database via the ORM. 

6 

7It supports SQLite, MySQL and PostgreSQL with dialect-specific SQL generation. 

8""" 

9 

10from abc import ABC, abstractmethod 

11from enum import StrEnum 

12from typing import Any, Self, cast 

13 

14import peewee 

15from peewee import Model 

16 

17from mafw.db.db_types import PeeweeModelWithMeta 

18from mafw.mafw_errors import MissingSQLStatement, UnsupportedDatabaseError 

19from mafw.tools.regexp import normalize_sql_spaces 

20 

21 

22def and_(*conditions: str) -> str: 

23 """ 

24 Concatenates conditions with logical AND. 

25 

26 :param conditions: The condition to join. 

27 :type conditions: str 

28 :return: The and-concatenated string of conditions 

29 :rtype: str 

30 """ 

31 conditions_l = [f'({c})' for c in conditions] 

32 return ' AND '.join(conditions_l) 

33 

34 

35def or_(*conditions: str) -> str: 

36 """ 

37 Concatenates conditions with logical OR. 

38 

39 :param conditions: The condition to join. 

40 :type conditions: str 

41 :return: The or-concatenated string of conditions. 

42 :rtype: str 

43 """ 

44 conditions_l = [f'({c})' for c in conditions] 

45 return ' OR '.join(conditions_l) 

46 

47 

48class TriggerWhen(StrEnum): 

49 """String enumerator for the trigger execution time (Before, After or Instead Of)""" 

50 

51 Before = 'BEFORE' 

52 After = 'AFTER' 

53 Instead = 'INSTEAD OF' 

54 

55 

56class TriggerAction(StrEnum): 

57 """String enumerator for the trigger action (Delete, Insert, Update)""" 

58 

59 Delete = 'DELETE' 

60 Insert = 'INSERT' 

61 Update = 'UPDATE' 

62 

63 

64class TriggerDialect(ABC): 

65 """Abstract base class for database-specific trigger SQL generation.""" 

66 

67 @abstractmethod 

68 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

69 """ 

70 Generate the SQL to create a trigger for a specific database dialect. 

71 

72 :param trigger: The trigger object 

73 :return: SQL string to create the trigger 

74 """ 

75 pass # pragma: no cover 

76 

77 @abstractmethod 

78 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

79 """ 

80 Generate the SQL to drop a trigger for a specific database dialect. 

81 

82 :param trigger_name: The name of the trigger to drop 

83 :type trigger_name: str 

84 :param safe: If True, add an IF EXISTS clause. Defaults to True. 

85 :type safe: bool, Optional 

86 :param table_name: The name of the target table for the trigger. Defaults to None. 

87 :type table_name: str, Optional 

88 :return: SQL string to drop the trigger 

89 :rtype: str 

90 """ 

91 pass # pragma: no cover 

92 

93 @abstractmethod 

94 def select_all_trigger_sql(self) -> str: 

95 pass # pragma: no cover 

96 

97 @abstractmethod 

98 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

99 """ 

100 Check if the database supports the specified trigger type. 

101 

102 :param when: When the trigger should fire (BEFORE, AFTER, INSTEAD OF) 

103 :param action: The action that triggers the trigger (INSERT, UPDATE, DELETE) 

104 :param on_view: Whether the trigger is on a view 

105 :return: True if supported, False otherwise 

106 """ 

107 pass # pragma: no cover 

108 

109 @abstractmethod 

110 def supports_safe_create(self) -> bool: 

111 """ 

112 Check if the database supports IF NOT EXISTS for triggers. 

113 

114 :return: True if supported, False otherwise 

115 """ 

116 pass # pragma: no cover 

117 

118 @abstractmethod 

119 def supports_update_of_columns(self) -> bool: 

120 """ 

121 Check if the database supports column-specific UPDATE triggers. 

122 

123 :return: True if supported, False otherwise 

124 """ 

125 pass # pragma: no cover 

126 

127 @abstractmethod 

128 def supports_when_clause(self) -> bool: 

129 """ 

130 Check if the database supports WHEN conditions. 

131 

132 :return: True if supported, False otherwise 

133 """ 

134 pass # pragma: no cover 

135 

136 

137class SQLiteDialect(TriggerDialect): 

138 """SQLite-specific trigger SQL generation.""" 

139 

140 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

141 """Generate SQLite trigger SQL.""" 

142 if_not_exists = 'IF NOT EXISTS' if trigger.safe else '' 

143 of_columns = ( 

144 f'OF {", ".join(trigger.update_columns)}' 

145 if trigger.trigger_action == TriggerAction.Update and trigger.update_columns 

146 else '' 

147 ) 

148 for_each_row = 'FOR EACH ROW' if trigger.for_each_row else '' 

149 compiled_conditions = trigger._compile_when_conditions() 

150 when_clause = f'WHEN {" AND ".join(f"({c})" for c in compiled_conditions)}' if compiled_conditions else '' 

151 sql_statements = '\n'.join(trigger._compile_sql_statement(stmt) for stmt in trigger._sql_list) 

152 

153 return normalize_sql_spaces( 

154 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n' 

155 f'{trigger.trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n' 

156 f'{for_each_row} {when_clause}\n' 

157 f'BEGIN\n' 

158 f'{sql_statements}\n' 

159 f'END;' 

160 ) 

161 

162 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

163 """Generate SQLite drop trigger SQL.""" 

164 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}') 

165 

166 def select_all_trigger_sql(self) -> str: 

167 return "SELECT name AS trigger_name, tbl_name AS table_name FROM sqlite_master WHERE type = 'trigger';" 

168 

169 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

170 """SQLite supports all trigger types except INSTEAD OF on tables (only on views).""" 

171 if when == TriggerWhen.Instead and not on_view: 

172 return False 

173 return True 

174 

175 def supports_safe_create(self) -> bool: 

176 """SQLite supports IF NOT EXISTS for triggers.""" 

177 return True 

178 

179 def supports_update_of_columns(self) -> bool: 

180 """SQLite supports column-specific UPDATE triggers.""" 

181 return True 

182 

183 def supports_when_clause(self) -> bool: 

184 """SQLite supports WHEN conditions.""" 

185 return True 

186 

187 

188class MySQLDialect(TriggerDialect): 

189 """MySQL-specific trigger SQL generation.""" 

190 

191 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

192 """Generate MySQL trigger SQL.""" 

193 # MySQL doesn't support INSTEAD OF triggers 

194 # MySQL doesn't support column-specific UPDATE triggers 

195 # MySQL requires FOR EACH ROW 

196 

197 if_not_exists = 'IF NOT EXISTS' if trigger.safe else '' 

198 

199 # In MySQL, we need to convert WHEN conditions to IF/THEN/END IF blocks 

200 sql_statements = [] 

201 

202 compiled_conditions = trigger._compile_when_conditions() 

203 compiled_statements = [trigger._compile_sql_statement(stmt) for stmt in trigger._sql_list] 

204 

205 # If there are conditional statements, wrap them in IF blocks 

206 if compiled_conditions: 

207 condition = ' AND '.join(f'({c})' for c in compiled_conditions) 

208 sql_statements.append(f'IF {condition} THEN') 

209 

210 # Add the SQL statements with indentation 

211 for stmt in compiled_statements: 

212 sql_statements.append(f' {stmt}') 

213 

214 # Close the IF block 

215 sql_statements.append('END IF;') 

216 else: 

217 # No conditions, just add the SQL statements directly 

218 sql_statements.extend(compiled_statements) 

219 

220 # Join all statements 

221 trigger_body = '\n'.join(sql_statements) 

222 

223 # Construct the final SQL 

224 sql = ( 

225 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n' 

226 f'{trigger.trigger_when} {trigger.trigger_action} ON {trigger.target_table}\n' 

227 f'FOR EACH ROW\n' 

228 f'BEGIN\n' 

229 f'{trigger_body}\n' 

230 f'END;' 

231 ) 

232 return normalize_sql_spaces(sql) 

233 

234 def select_all_trigger_sql(self) -> str: 

235 return 'SELECT trigger_name, event_object_table AS table_name FROM information_schema.TRIGGERS WHERE TRIGGER_SCHEMA = DATABASE();' 

236 

237 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

238 """Generate MySQL drop trigger SQL.""" 

239 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}') 

240 

241 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

242 """MySQL doesn't support INSTEAD OF triggers.""" 

243 return when != TriggerWhen.Instead 

244 

245 def supports_safe_create(self) -> bool: 

246 """MySQL supports IF NOT EXISTS for triggers.""" 

247 return True 

248 

249 def supports_update_of_columns(self) -> bool: 

250 """MySQL doesn't support column-specific UPDATE triggers.""" 

251 return False 

252 

253 def supports_when_clause(self) -> bool: 

254 """MySQL supports conditions but through WHERE instead of WHEN.""" 

255 return True 

256 

257 

258class PostgreSQLDialect(TriggerDialect): 

259 """PostgreSQL-specific trigger SQL generation.""" 

260 

261 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

262 """Generate PostgreSQL trigger SQL.""" 

263 # PostgreSQL handles INSTEAD OF differently 

264 # PostgreSQL uses functions for trigger bodies 

265 

266 function_name = f'fn_{trigger.trigger_name}' 

267 

268 # First create the function 

269 function_sql = f'CREATE OR REPLACE FUNCTION {function_name}() RETURNS TRIGGER AS $$\nBEGIN\n' 

270 

271 compiled_conditions = trigger._compile_when_conditions() 

272 compiled_statements = [self._clean_sql(trigger._compile_sql_statement(stmt)) for stmt in trigger._sql_list] 

273 

274 # Add WHEN condition as IF statements if needed 

275 if compiled_conditions: 

276 when_condition = ' AND '.join(f'({c})' for c in compiled_conditions) 

277 function_sql += f' IF {when_condition} THEN\n' 

278 for sql in compiled_statements: 

279 function_sql += f' {sql}\n' 

280 function_sql += ' END IF;\n' 

281 else: 

282 for sql in compiled_statements: 

283 function_sql += f' {sql}\n' 

284 

285 # For AFTER triggers, we need to return NULL 

286 if trigger.trigger_when == TriggerWhen.After: 

287 function_sql += ' RETURN NULL;\n' 

288 # For BEFORE DELETE triggers, we must return OLD to allow the delete to proceed. 

289 elif trigger.trigger_when == TriggerWhen.Before and trigger.trigger_action == TriggerAction.Delete: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 function_sql += ' RETURN OLD;\n' 

291 # For BEFORE or INSTEAD OF triggers, we need to return NEW 

292 else: 

293 function_sql += ' RETURN NEW;\n' 

294 

295 function_sql += 'END;\n$$ LANGUAGE plpgsql;' 

296 

297 # Then create the trigger - PostgreSQL doesn't support IF NOT EXISTS for triggers before v14 

298 # We'll handle this through a conditional drop 

299 drop_if_exists = '' 

300 if trigger.safe: 

301 drop_if_exists = f'DROP TRIGGER IF EXISTS {trigger.trigger_name} ON {trigger.target_table} CASCADE;\n' 

302 

303 # PostgreSQL uses different syntax for INSTEAD OF (only allowed on views) 

304 trigger_when = trigger.trigger_when 

305 

306 # Column-specific triggers in PostgreSQL 

307 of_columns = ( 

308 f'OF {", ".join(trigger.update_columns)}' 

309 if trigger.update_columns and trigger.trigger_action == TriggerAction.Update 

310 else '' 

311 ) 

312 

313 for_each = 'FOR EACH ROW' if trigger.for_each_row else 'FOR EACH STATEMENT' 

314 

315 trigger_sql = ( 

316 f'{drop_if_exists}' 

317 f'CREATE TRIGGER {trigger.trigger_name}\n' 

318 f'{trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n' 

319 f'{for_each}\n' 

320 f'EXECUTE FUNCTION {function_name}();' 

321 ) 

322 

323 sql = f'{function_sql}\n\n{trigger_sql}' 

324 

325 return normalize_sql_spaces(sql) 

326 

327 def _clean_sql(self, sql: str) -> str: 

328 """ 

329 Remove RETURNING clauses from SQL statements for PostgreSQL trigger functions. 

330 

331 :param sql: The SQL statement 

332 :return: SQL statement without RETURNING clause 

333 """ 

334 # Find the RETURNING clause position - case insensitive search 

335 sql_upper = sql.upper() 

336 returning_pos = sql_upper.find('RETURNING') 

337 

338 # If RETURNING exists, remove it and everything after it up to the semicolon 

339 if returning_pos != -1: 

340 semicolon_pos = sql.find(';', returning_pos) 

341 if semicolon_pos != -1: 

342 return sql[:returning_pos] + ';' 

343 return sql[:returning_pos] 

344 return sql 

345 

346 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

347 """Generate PostgreSQL drop trigger SQL.""" 

348 if table_name is None: 

349 raise RuntimeError('Cannot drop a trigger in PostgreSQL without a table_name') 

350 

351 function_name = f'fn_{trigger_name}' 

352 return normalize_sql_spaces( 

353 f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name} ON {table_name};\n' 

354 f'DROP FUNCTION {"IF EXISTS" if safe else ""} {function_name}();' 

355 ) 

356 

357 def select_all_trigger_sql(self) -> str: 

358 return "SELECT trigger_name, event_object_table AS table_name FROM information_schema.triggers WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema');" 

359 

360 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

361 """PostgreSQL supports INSTEAD OF only on views.""" 

362 if when == TriggerWhen.Instead and not on_view: 

363 return False 

364 return True 

365 

366 def supports_safe_create(self) -> bool: 

367 """PostgreSQL doesn't support IF NOT EXISTS for triggers before v14, but we implement safety differently.""" 

368 return True # We report True but handle it with DROP IF EXISTS 

369 

370 def supports_update_of_columns(self) -> bool: 

371 """PostgreSQL supports column-specific UPDATE triggers.""" 

372 return True 

373 

374 def supports_when_clause(self) -> bool: 

375 """PostgreSQL supports WHEN conditions.""" 

376 return True 

377 

378 

379class Trigger: 

380 """Trigger template wrapper for use with peewee ORM.""" 

381 

382 # noinspection PyProtectedMember 

383 def __init__( 

384 self, 

385 trigger_name: str, 

386 trigger_type: tuple[TriggerWhen, TriggerAction], 

387 source_table: type[Model] | Model | str, 

388 safe: bool = False, 

389 for_each_row: bool = False, 

390 update_columns: list[str] | None = None, 

391 on_view: bool = False, # Added parameter to indicate if the target is a view 

392 ): 

393 """ 

394 Constructor parameters: 

395 

396 :param trigger_name: The name of this trigger. It needs to be unique! 

397 :type trigger_name: str 

398 :param trigger_type: A tuple with :class:`TriggerWhen` and :class:`TriggerAction` to specify on which action 

399 the trigger should be invoked and if before, after or instead of. 

400 :type trigger_type: tuple[TriggerWhen, TriggerAction] 

401 :param source_table: The table originating the trigger. It can be a model class, instance, or also the name of 

402 the table. 

403 :type source_table: type[Model] | Model | str 

404 :param safe: A boolean flag to define if in the trigger creation statement a 'IF NOT EXISTS' clause should be 

405 included. Defaults to False 

406 :type safe: bool, Optional 

407 :param for_each_row: A boolean flag to repeat the script content for each modified row in the table. 

408 Defaults to False. 

409 :type for_each_row: bool, Optional 

410 :param update_columns: A list of column names. When defining a trigger on a table update, it is possible to 

411 restrict the firing of the trigger to the cases when a subset of all columns have been updated. An column 

412 is updated also when the new value is equal to the old one. If you want to discriminate this case, use the 

413 :meth:`add_when` method. Defaults to None. 

414 :type update_columns: list[str], Optional 

415 :param on_view: A boolean flag to indicate if the target is a view. This affects the support for INSTEAD OF. 

416 Defaults to False. 

417 :type on_view: bool, Optional 

418 """ 

419 self.trigger_name = trigger_name 

420 self.trigger_type = trigger_type 

421 self._trigger_when, self._trigger_op = self.trigger_type 

422 self.update_columns = update_columns or [] 

423 self.on_view = on_view 

424 

425 if isinstance(source_table, type): 

426 model_cls = cast(PeeweeModelWithMeta, source_table) 

427 self.target_table = model_cls._meta.table_name 

428 elif isinstance(source_table, Model): 

429 model_instance = cast(PeeweeModelWithMeta, source_table) 

430 self.target_table = model_instance._meta.table_name 

431 else: 

432 self.target_table = source_table 

433 

434 self.safe = safe 

435 self.for_each_row = for_each_row 

436 

437 self._when_list: list[str | peewee.Node] = [] 

438 self._sql_list: list[str | peewee.Query] = [] 

439 self._database: peewee.Database | peewee.DatabaseProxy | None = None 

440 self._dialect: TriggerDialect | None = None 

441 

442 @property 

443 def trigger_action(self) -> TriggerAction: 

444 return self._trigger_op 

445 

446 @trigger_action.setter 

447 def trigger_action(self, action: TriggerAction) -> None: 

448 self._trigger_op = action 

449 

450 @property 

451 def trigger_when(self) -> TriggerWhen: 

452 return self._trigger_when 

453 

454 @trigger_when.setter 

455 def trigger_when(self, when: TriggerWhen) -> None: 

456 self._trigger_when = when 

457 

458 def __setattr__(self, key: Any, value: Any) -> None: 

459 if key == 'safe': 

460 self.if_not_exists = 'IF NOT EXISTS' if value else '' 

461 elif key == 'for_each_row': 

462 self._for_each_row = 'FOR EACH ROW' if value else '' 

463 else: 

464 super().__setattr__(key, value) 

465 

466 def __getattr__(self, item: str) -> Any: 

467 """ 

468 Custom attribute getter for computed properties. 

469 

470 :param item: The attribute name to get 

471 :return: The attribute value 

472 :raises AttributeError: If the attribute doesn't exist 

473 """ 

474 if item == 'safe': 

475 return hasattr(self, 'if_not_exists') and self.if_not_exists == 'IF NOT EXISTS' 

476 elif item == 'for_each_row': 

477 return hasattr(self, '_for_each_row') and self._for_each_row == 'FOR EACH ROW' 

478 else: 

479 # Raise AttributeError for non-existent attributes (standard Python behaviour) 

480 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{item}'") 

481 

482 def add_sql(self, sql: str | peewee.Query) -> Self: 

483 """ 

484 Add an SQL statement to be executed by the trigger. 

485 

486 The ``sql`` can be either a string containing the sql statement, or it can be any other peewee Query. 

487 

488 For example: 

489 

490 .. code-block:: python 

491 

492 # assuming you have created a trigger ... 

493 

494 sql = AnotherTable.insert( 

495 field1=some_value, field2=another_value 

496 ) 

497 trigger.add_sql(sql) 

498 

499 In this way the SQL code is generated with parametric placeholder if needed. 

500 

501 :param sql: The SQL statement. 

502 :type sql: str | peewee.Query 

503 :return: self for easy chaining 

504 :rtype: Trigger 

505 """ 

506 self._sql_list.append(sql) 

507 return self 

508 

509 def add_when(self, *conditions: str | peewee.Node) -> Self: 

510 """ 

511 Add conditions to the `when` statements. 

512 

513 Conditions are logically ANDed. 

514 To have mixed `OR` and `AND` logic, use the functions :func:`and_` and :func:`or_`. 

515 

516 The ``conditions`` can be either strings containing SQL conditions, or peewee Node objects 

517 (such as Expression or Query objects). 

518 

519 For example: 

520 

521 .. code-block:: python 

522 

523 # String condition 

524 trigger.add_when("NEW.status = 'active'") 

525 

526 # Peewee expression 

527 subq = TriggerStatus.select(TriggerStatus.status).where( 

528 TriggerStatus.trigger_type == 'DELETE_FILES' 

529 ) 

530 trigger.add_when(Value(1) == subq) 

531 

532 .. versionchanged:: v2.0.0 

533 The argument can also be a generic peewee Node. 

534 

535 :param conditions: Conditions to be added with logical AND. Can be strings or peewee Node objects. 

536 :type conditions: str | peewee.Node 

537 :return: self for easy chaining 

538 :rtype: Trigger 

539 """ 

540 for c in conditions: 

541 self._when_list.append(c) 

542 return self 

543 

544 def _node_to_sql(self, node: peewee.Node) -> str: 

545 """ 

546 Convert a peewee Node (Expression, Query, etc.) to a SQL string with interpolated parameters. 

547 

548 This is based on peewee's internal query_to_string function for debugging/logging purposes. 

549 

550 .. versionadded:: v2.0.0 

551 

552 :param node: A peewee Node object 

553 :return: SQL string with parameters interpolated 

554 """ 

555 from peewee import Context, Expression, Select 

556 

557 # Check if this is an Expression with lhs and rhs attributes (like comparisons) 

558 if isinstance(node, Expression) and hasattr(node, 'lhs') and hasattr(node, 'rhs'): 

559 # Recursively convert left and right sides 

560 lhs_sql = self._node_to_sql(node.lhs) if isinstance(node.lhs, peewee.Node) else self._value_to_sql(node.lhs) 

561 rhs_sql = self._node_to_sql(node.rhs) if isinstance(node.rhs, peewee.Node) else self._value_to_sql(node.rhs) 

562 

563 # Get the operator (e.g., '=', '>', '<', etc.) 

564 op = getattr(node, 'op', '=') 

565 

566 # For subqueries on either side, wrap them in parentheses 

567 if isinstance(node.lhs, Select): 

568 lhs_sql = f'({lhs_sql})' 

569 if isinstance(node.rhs, Select): 

570 rhs_sql = f'({rhs_sql})' 

571 

572 return f'{lhs_sql} {op} {rhs_sql}' 

573 

574 # For other node types, use the standard Context approach 

575 # Get database context if available, otherwise use a default Context 

576 db = self.database 

577 if db is not None: 

578 ctx = db.get_sql_context() 

579 else: 

580 ctx = Context() 

581 

582 # Generate SQL with parameters 

583 sql, params = ctx.sql(node).query() 

584 

585 # If no parameters, return as-is 

586 if not params: 

587 return cast(str, sql) 

588 

589 # Interpolate parameters into the SQL string 

590 # This is safe for trigger definitions (not for execution) 

591 param_placeholder = getattr(ctx.state, 'param', '?') or '?' 

592 if param_placeholder == '?': 

593 sql = sql.replace('?', '%s') 

594 

595 # Transform parameters to SQL-safe values 

596 transformed_params = [self._value_to_sql(v) for v in params] 

597 

598 interpolated_str = sql % tuple(transformed_params) 

599 return cast(str, interpolated_str) 

600 

601 def _value_to_sql(self, value: Any) -> str: 

602 """ 

603 Convert a Python value to its SQL representation. 

604 

605 .. versionadded:: v2.0.0 

606 

607 :param value: A Python value (string, int, float, None, etc.) 

608 :return: SQL string representation of the value 

609 """ 

610 if isinstance(value, str): 

611 # Escape single quotes by doubling them 

612 escaped = value.replace("'", "''") 

613 return f"'{escaped}'" 

614 elif isinstance(value, bool): # bools are numbers as well! 

615 return '1' if value else '0' 

616 elif isinstance(value, (int, float)): 

617 return str(value) 

618 elif value is None: 

619 return 'NULL' 

620 else: 

621 return str(value) 

622 

623 def set_database(self, database: peewee.Database | peewee.DatabaseProxy) -> Self: 

624 """ 

625 Set the database to use for this trigger. 

626 

627 :param database: The database instance 

628 :return: self for easy chaining 

629 """ 

630 self._database = database 

631 return self 

632 

633 @property 

634 def database(self) -> peewee.Database | None: 

635 """Return the actual database, unwrapping proxies.""" 

636 db = self._database 

637 if isinstance(db, peewee.DatabaseProxy): 

638 return cast(peewee.Database, db.obj) 

639 return db 

640 

641 def _compile_query(self, query: peewee.Query) -> str: 

642 """Compile a Peewee query using the current database dialect.""" 

643 db = self.database 

644 sql, params = cast(tuple[str, tuple[Any, ...]], cast(Any, query).sql()) 

645 if params: 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true

646 if db: 

647 placeholder = db.param # normally ?, but it could be set to something else 

648 else: # if the db is not yet bound 

649 placeholder = '?' 

650 

651 sql = sql.replace(placeholder, '%s') % tuple(self._value_to_sql(p) for p in params) 

652 return sql 

653 

654 def _compile_node(self, node: peewee.Node) -> str: 

655 """Compile a Peewee node to SQL using the current database context.""" 

656 return self._node_to_sql(node) 

657 

658 def _compile_sql_statement(self, statement: str | peewee.Query) -> str: 

659 """Return a single SQL statement string with consistent formatting.""" 

660 if isinstance(statement, str): 

661 compiled = statement 

662 else: 

663 compiled = self._compile_query(statement) 

664 

665 compiled = compiled.strip() 

666 if not compiled.endswith(';'): 

667 compiled += ';' 

668 return compiled 

669 

670 def _compile_when_conditions(self) -> list[str]: 

671 """Compile each when condition, preserving nodes until creation time.""" 

672 compiled_conditions: list[str] = [] 

673 for cond in self._when_list: 

674 if isinstance(cond, str): 

675 compiled_conditions.append(cond.strip()) 

676 else: 

677 compiled_conditions.append(self._compile_node(cond)) 

678 return compiled_conditions 

679 

680 def _get_dialect(self) -> TriggerDialect: 

681 """ 

682 Get the appropriate dialect based on the database type. 

683 

684 :return: A dialect instance 

685 """ 

686 if self._dialect is not None: 

687 return self._dialect 

688 

689 if self._database is None: 

690 # Default to SQLite dialect 

691 return SQLiteDialect() 

692 

693 db = self._database 

694 if isinstance(db, peewee.DatabaseProxy): 

695 db = db.obj # Get the actual database from the proxy 

696 

697 if isinstance(db, peewee.SqliteDatabase): 

698 self._dialect = SQLiteDialect() 

699 elif isinstance(db, peewee.MySQLDatabase): 

700 self._dialect = MySQLDialect() 

701 elif isinstance(db, peewee.PostgresqlDatabase): 

702 self._dialect = PostgreSQLDialect() 

703 else: 

704 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}') 

705 

706 return self._dialect 

707 

708 def create(self) -> str: 

709 """ 

710 Generates the SQL create statement. 

711 

712 :return: The trigger creation statement. 

713 :raise MissingSQLStatement: if no SQL statements are provided. 

714 :raise UnsupportedDatabaseError: if the trigger type is not supported by the database. 

715 """ 

716 if len(self._sql_list) == 0: 

717 raise MissingSQLStatement('No SQL statements provided') 

718 

719 dialect = self._get_dialect() 

720 

721 # Check if the trigger type is supported 

722 if not dialect.supports_trigger_type(self.trigger_when, self.trigger_action, self.on_view): 

723 raise UnsupportedDatabaseError( 

724 f'Trigger type {self.trigger_when} {self.trigger_action} is not supported by the database' 

725 ) 

726 

727 # Check if safe create is supported 

728 if self.safe and not dialect.supports_safe_create(): 

729 # We can either ignore and continue without safe, or raise an error 

730 # For now, we'll just ignore and continue 

731 self.safe = False 

732 

733 # Check if update columns are supported 

734 if self.update_columns and not dialect.supports_update_of_columns(): 

735 # We can either ignore and continue without column-specific updates, or raise an error 

736 # For now, we'll ignore and continue 

737 self.update_columns = [] 

738 

739 # Generate the SQL 

740 return dialect.create_trigger_sql(self) 

741 

742 def drop(self, safe: bool = True) -> str: 

743 """ 

744 Generates the SQL drop statement. 

745 

746 :param safe: If True, add an IF EXIST. Defaults to True. 

747 :type safe: bool, Optional 

748 :return: The drop statement 

749 :rtype: str 

750 """ 

751 dialect = self._get_dialect() 

752 return dialect.drop_trigger_sql(self.trigger_name, safe)