Coverage for src / mafw / db / trigger.py: 99%

288 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 16:10 +0000

1# Copyright 2025–2026 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Module provides a Trigger class and related tools to create triggers in the database via the ORM. 

6 

7It supports SQLite, MySQL and PostgreSQL with dialect-specific SQL generation. 

8""" 

9 

10from abc import ABC, abstractmethod 

11from enum import StrEnum 

12from typing import Any, Self, cast 

13 

14import peewee 

15from peewee import Model 

16 

17from mafw.db.db_types import PeeweeModelWithMeta 

18from mafw.mafw_errors import MissingSQLStatement, UnsupportedDatabaseError 

19from mafw.tools.regexp import normalize_sql_spaces 

20 

21 

22def and_(*conditions: str) -> str: 

23 """ 

24 Concatenates conditions with logical AND. 

25 

26 :param conditions: The condition to join. 

27 :type conditions: str 

28 :return: The and-concatenated string of conditions 

29 :rtype: str 

30 """ 

31 conditions_l = [f'({c})' for c in conditions] 

32 return ' AND '.join(conditions_l) 

33 

34 

35def or_(*conditions: str) -> str: 

36 """ 

37 Concatenates conditions with logical OR. 

38 

39 :param conditions: The condition to join. 

40 :type conditions: str 

41 :return: The or-concatenated string of conditions. 

42 :rtype: str 

43 """ 

44 conditions_l = [f'({c})' for c in conditions] 

45 return ' OR '.join(conditions_l) 

46 

47 

48class TriggerWhen(StrEnum): 

49 """String enumerator for the trigger execution time (Before, After or Instead Of)""" 

50 

51 Before = 'BEFORE' 

52 After = 'AFTER' 

53 Instead = 'INSTEAD OF' 

54 

55 

56class TriggerAction(StrEnum): 

57 """String enumerator for the trigger action (Delete, Insert, Update)""" 

58 

59 Delete = 'DELETE' 

60 Insert = 'INSERT' 

61 Update = 'UPDATE' 

62 

63 

64class TriggerDialect(ABC): 

65 """Abstract base class for database-specific trigger SQL generation.""" 

66 

67 @abstractmethod 

68 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

69 """ 

70 Generate the SQL to create a trigger for a specific database dialect. 

71 

72 :param trigger: The trigger object 

73 :return: SQL string to create the trigger 

74 """ 

75 pass # pragma: no cover 

76 

77 @abstractmethod 

78 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

79 """ 

80 Generate the SQL to drop a trigger for a specific database dialect. 

81 

82 :param trigger_name: The name of the trigger to drop 

83 :type trigger_name: str 

84 :param safe: If True, add an IF EXISTS clause. Defaults to True. 

85 :type safe: bool, Optional 

86 :param table_name: The name of the target table for the trigger. Defaults to None. 

87 :type table_name: str, Optional 

88 :return: SQL string to drop the trigger 

89 :rtype: str 

90 """ 

91 pass # pragma: no cover 

92 

93 @abstractmethod 

94 def select_all_trigger_sql(self) -> str: 

95 pass # pragma: no cover 

96 

97 @abstractmethod 

98 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

99 """ 

100 Check if the database supports the specified trigger type. 

101 

102 :param when: When the trigger should fire (BEFORE, AFTER, INSTEAD OF) 

103 :param action: The action that triggers the trigger (INSERT, UPDATE, DELETE) 

104 :param on_view: Whether the trigger is on a view 

105 :return: True if supported, False otherwise 

106 """ 

107 pass # pragma: no cover 

108 

109 @abstractmethod 

110 def supports_safe_create(self) -> bool: 

111 """ 

112 Check if the database supports IF NOT EXISTS for triggers. 

113 

114 :return: True if supported, False otherwise 

115 """ 

116 pass # pragma: no cover 

117 

118 @abstractmethod 

119 def supports_update_of_columns(self) -> bool: 

120 """ 

121 Check if the database supports column-specific UPDATE triggers. 

122 

123 :return: True if supported, False otherwise 

124 """ 

125 pass # pragma: no cover 

126 

127 @abstractmethod 

128 def supports_when_clause(self) -> bool: 

129 """ 

130 Check if the database supports WHEN conditions. 

131 

132 :return: True if supported, False otherwise 

133 """ 

134 pass # pragma: no cover 

135 

136 

137class SQLiteDialect(TriggerDialect): 

138 """SQLite-specific trigger SQL generation.""" 

139 

140 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

141 """Generate SQLite trigger SQL.""" 

142 if_not_exists = 'IF NOT EXISTS' if trigger.safe else '' 

143 of_columns = ( 

144 f'OF {", ".join(trigger.update_columns)}' 

145 if trigger.trigger_action == TriggerAction.Update and trigger.update_columns 

146 else '' 

147 ) 

148 for_each_row = 'FOR EACH ROW' if trigger.for_each_row else '' 

149 compiled_conditions = trigger._compile_when_conditions() 

150 when_clause = f'WHEN {" AND ".join(f"({c})" for c in compiled_conditions)}' if compiled_conditions else '' 

151 sql_statements = '\n'.join(trigger._compile_sql_statement(stmt) for stmt in trigger._sql_list) 

152 

153 return normalize_sql_spaces( 

154 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n' 

155 f'{trigger.trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n' 

156 f'{for_each_row} {when_clause}\n' 

157 f'BEGIN\n' 

158 f'{sql_statements}\n' 

159 f'END;' 

160 ) 

161 

162 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

163 """Generate SQLite drop trigger SQL.""" 

164 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}') 

165 

166 def select_all_trigger_sql(self) -> str: 

167 return "SELECT name AS trigger_name, tbl_name AS table_name FROM sqlite_master WHERE type = 'trigger';" 

168 

169 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

170 """SQLite supports all trigger types except INSTEAD OF on tables (only on views).""" 

171 if when == TriggerWhen.Instead and not on_view: 

172 return False 

173 return True 

174 

175 def supports_safe_create(self) -> bool: 

176 """SQLite supports IF NOT EXISTS for triggers.""" 

177 return True 

178 

179 def supports_update_of_columns(self) -> bool: 

180 """SQLite supports column-specific UPDATE triggers.""" 

181 return True 

182 

183 def supports_when_clause(self) -> bool: 

184 """SQLite supports WHEN conditions.""" 

185 return True 

186 

187 

188class MySQLDialect(TriggerDialect): 

189 """MySQL-specific trigger SQL generation.""" 

190 

191 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

192 """Generate MySQL trigger SQL.""" 

193 # MySQL doesn't support INSTEAD OF triggers 

194 # MySQL doesn't support column-specific UPDATE triggers 

195 # MySQL requires FOR EACH ROW 

196 

197 if_not_exists = 'IF NOT EXISTS' if trigger.safe else '' 

198 

199 # In MySQL, we need to convert WHEN conditions to IF/THEN/END IF blocks 

200 sql_statements = [] 

201 

202 compiled_conditions = trigger._compile_when_conditions() 

203 compiled_statements = [trigger._compile_sql_statement(stmt) for stmt in trigger._sql_list] 

204 

205 # If there are conditional statements, wrap them in IF blocks 

206 if compiled_conditions: 

207 condition = ' AND '.join(f'({c})' for c in compiled_conditions) 

208 sql_statements.append(f'IF {condition} THEN') 

209 

210 # Add the SQL statements with indentation 

211 for stmt in compiled_statements: 

212 sql_statements.append(f' {stmt}') 

213 

214 # Close the IF block 

215 sql_statements.append('END IF;') 

216 else: 

217 # No conditions, just add the SQL statements directly 

218 sql_statements.extend(compiled_statements) 

219 

220 # Join all statements 

221 trigger_body = '\n'.join(sql_statements) 

222 

223 # Construct the final SQL 

224 sql = ( 

225 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n' 

226 f'{trigger.trigger_when} {trigger.trigger_action} ON {trigger.target_table}\n' 

227 f'FOR EACH ROW\n' 

228 f'BEGIN\n' 

229 f'{trigger_body}\n' 

230 f'END;' 

231 ) 

232 return normalize_sql_spaces(sql) 

233 

234 def select_all_trigger_sql(self) -> str: 

235 return 'SELECT trigger_name, event_object_table AS table_name FROM information_schema.TRIGGERS WHERE TRIGGER_SCHEMA = DATABASE();' 

236 

237 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

238 """Generate MySQL drop trigger SQL.""" 

239 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}') 

240 

241 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

242 """MySQL doesn't support INSTEAD OF triggers.""" 

243 return when != TriggerWhen.Instead 

244 

245 def supports_safe_create(self) -> bool: 

246 """MySQL supports IF NOT EXISTS for triggers.""" 

247 return True 

248 

249 def supports_update_of_columns(self) -> bool: 

250 """MySQL doesn't support column-specific UPDATE triggers.""" 

251 return False 

252 

253 def supports_when_clause(self) -> bool: 

254 """MySQL supports conditions but through WHERE instead of WHEN.""" 

255 return True 

256 

257 

258class PostgreSQLDialect(TriggerDialect): 

259 """PostgreSQL-specific trigger SQL generation.""" 

260 

261 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

262 """Generate PostgreSQL trigger SQL.""" 

263 # PostgreSQL handles INSTEAD OF differently 

264 # PostgreSQL uses functions for trigger bodies 

265 

266 function_name = f'fn_{trigger.trigger_name}' 

267 

268 # First create the function 

269 function_sql = f'CREATE OR REPLACE FUNCTION {function_name}() RETURNS TRIGGER AS $$\nBEGIN\n' 

270 

271 compiled_conditions = trigger._compile_when_conditions() 

272 compiled_statements = [self._clean_sql(trigger._compile_sql_statement(stmt)) for stmt in trigger._sql_list] 

273 

274 # Add WHEN condition as IF statements if needed 

275 if compiled_conditions: 

276 when_condition = ' AND '.join(f'({c})' for c in compiled_conditions) 

277 function_sql += f' IF {when_condition} THEN\n' 

278 for sql in compiled_statements: 

279 function_sql += f' {sql}\n' 

280 function_sql += ' END IF;\n' 

281 else: 

282 for sql in compiled_statements: 

283 function_sql += f' {sql}\n' 

284 

285 # For AFTER triggers, we need to return NULL or NEW 

286 if trigger.trigger_when == TriggerWhen.After: 

287 function_sql += ' RETURN NULL;\n' 

288 # For BEFORE or INSTEAD OF triggers, we need to return NEW 

289 else: 

290 function_sql += ' RETURN NEW;\n' 

291 

292 function_sql += 'END;\n$$ LANGUAGE plpgsql;' 

293 

294 # Then create the trigger - PostgreSQL doesn't support IF NOT EXISTS for triggers before v14 

295 # We'll handle this through a conditional drop 

296 drop_if_exists = '' 

297 if trigger.safe: 

298 drop_if_exists = f'DROP TRIGGER IF EXISTS {trigger.trigger_name} ON {trigger.target_table} CASCADE;\n' 

299 

300 # PostgreSQL uses different syntax for INSTEAD OF (only allowed on views) 

301 trigger_when = trigger.trigger_when 

302 

303 # Column-specific triggers in PostgreSQL 

304 of_columns = ( 

305 f'OF {", ".join(trigger.update_columns)}' 

306 if trigger.update_columns and trigger.trigger_action == TriggerAction.Update 

307 else '' 

308 ) 

309 

310 for_each = 'FOR EACH ROW' if trigger.for_each_row else 'FOR EACH STATEMENT' 

311 

312 trigger_sql = ( 

313 f'{drop_if_exists}' 

314 f'CREATE TRIGGER {trigger.trigger_name}\n' 

315 f'{trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n' 

316 f'{for_each}\n' 

317 f'EXECUTE FUNCTION {function_name}();' 

318 ) 

319 

320 sql = f'{function_sql}\n\n{trigger_sql}' 

321 

322 return normalize_sql_spaces(sql) 

323 

324 def _clean_sql(self, sql: str) -> str: 

325 """ 

326 Remove RETURNING clauses from SQL statements for PostgreSQL trigger functions. 

327 

328 :param sql: The SQL statement 

329 :return: SQL statement without RETURNING clause 

330 """ 

331 # Find the RETURNING clause position - case insensitive search 

332 sql_upper = sql.upper() 

333 returning_pos = sql_upper.find('RETURNING') 

334 

335 # If RETURNING exists, remove it and everything after it up to the semicolon 

336 if returning_pos != -1: 

337 semicolon_pos = sql.find(';', returning_pos) 

338 if semicolon_pos != -1: 

339 return sql[:returning_pos] + ';' 

340 return sql[:returning_pos] 

341 return sql 

342 

343 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

344 """Generate PostgreSQL drop trigger SQL.""" 

345 if table_name is None: 

346 raise RuntimeError('Cannot drop a trigger in PostgreSQL without a table_name') 

347 

348 function_name = f'fn_{trigger_name}' 

349 return normalize_sql_spaces( 

350 f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name} ON {table_name};\n' 

351 f'DROP FUNCTION {"IF EXISTS" if safe else ""} {function_name}();' 

352 ) 

353 

354 def select_all_trigger_sql(self) -> str: 

355 return "SELECT trigger_name, event_object_table AS table_name FROM information_schema.triggers WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema');" 

356 

357 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

358 """PostgreSQL supports INSTEAD OF only on views.""" 

359 if when == TriggerWhen.Instead and not on_view: 

360 return False 

361 return True 

362 

363 def supports_safe_create(self) -> bool: 

364 """PostgreSQL doesn't support IF NOT EXISTS for triggers before v14, but we implement safety differently.""" 

365 return True # We report True but handle it with DROP IF EXISTS 

366 

367 def supports_update_of_columns(self) -> bool: 

368 """PostgreSQL supports column-specific UPDATE triggers.""" 

369 return True 

370 

371 def supports_when_clause(self) -> bool: 

372 """PostgreSQL supports WHEN conditions.""" 

373 return True 

374 

375 

376class Trigger: 

377 """Trigger template wrapper for use with peewee ORM.""" 

378 

379 # noinspection PyProtectedMember 

380 def __init__( 

381 self, 

382 trigger_name: str, 

383 trigger_type: tuple[TriggerWhen, TriggerAction], 

384 source_table: type[Model] | Model | str, 

385 safe: bool = False, 

386 for_each_row: bool = False, 

387 update_columns: list[str] | None = None, 

388 on_view: bool = False, # Added parameter to indicate if the target is a view 

389 ): 

390 """ 

391 Constructor parameters: 

392 

393 :param trigger_name: The name of this trigger. It needs to be unique! 

394 :type trigger_name: str 

395 :param trigger_type: A tuple with :class:`TriggerWhen` and :class:`TriggerAction` to specify on which action 

396 the trigger should be invoked and if before, after or instead of. 

397 :type trigger_type: tuple[TriggerWhen, TriggerAction] 

398 :param source_table: The table originating the trigger. It can be a model class, instance, or also the name of 

399 the table. 

400 :type source_table: type[Model] | Model | str 

401 :param safe: A boolean flag to define if in the trigger creation statement a 'IF NOT EXISTS' clause should be 

402 included. Defaults to False 

403 :type safe: bool, Optional 

404 :param for_each_row: A boolean flag to repeat the script content for each modified row in the table. 

405 Defaults to False. 

406 :type for_each_row: bool, Optional 

407 :param update_columns: A list of column names. When defining a trigger on a table update, it is possible to 

408 restrict the firing of the trigger to the cases when a subset of all columns have been updated. An column 

409 is updated also when the new value is equal to the old one. If you want to discriminate this case, use the 

410 :meth:`add_when` method. Defaults to None. 

411 :type update_columns: list[str], Optional 

412 :param on_view: A boolean flag to indicate if the target is a view. This affects the support for INSTEAD OF. 

413 Defaults to False. 

414 :type on_view: bool, Optional 

415 """ 

416 self.trigger_name = trigger_name 

417 self.trigger_type = trigger_type 

418 self._trigger_when, self._trigger_op = self.trigger_type 

419 self.update_columns = update_columns or [] 

420 self.on_view = on_view 

421 

422 if isinstance(source_table, type): 

423 model_cls = cast(PeeweeModelWithMeta, source_table) 

424 self.target_table = model_cls._meta.table_name 

425 elif isinstance(source_table, Model): 

426 model_instance = cast(PeeweeModelWithMeta, source_table) 

427 self.target_table = model_instance._meta.table_name 

428 else: 

429 self.target_table = source_table 

430 

431 self.safe = safe 

432 self.for_each_row = for_each_row 

433 

434 self._when_list: list[str | peewee.Node] = [] 

435 self._sql_list: list[str | peewee.Query] = [] 

436 self._database: peewee.Database | peewee.DatabaseProxy | None = None 

437 self._dialect: TriggerDialect | None = None 

438 

439 @property 

440 def trigger_action(self) -> TriggerAction: 

441 return self._trigger_op 

442 

443 @trigger_action.setter 

444 def trigger_action(self, action: TriggerAction) -> None: 

445 self._trigger_op = action 

446 

447 @property 

448 def trigger_when(self) -> TriggerWhen: 

449 return self._trigger_when 

450 

451 @trigger_when.setter 

452 def trigger_when(self, when: TriggerWhen) -> None: 

453 self._trigger_when = when 

454 

455 def __setattr__(self, key: Any, value: Any) -> None: 

456 if key == 'safe': 

457 self.if_not_exists = 'IF NOT EXISTS' if value else '' 

458 elif key == 'for_each_row': 

459 self._for_each_row = 'FOR EACH ROW' if value else '' 

460 else: 

461 super().__setattr__(key, value) 

462 

463 def __getattr__(self, item: str) -> Any: 

464 """ 

465 Custom attribute getter for computed properties. 

466 

467 :param item: The attribute name to get 

468 :return: The attribute value 

469 :raises AttributeError: If the attribute doesn't exist 

470 """ 

471 if item == 'safe': 

472 return hasattr(self, 'if_not_exists') and self.if_not_exists == 'IF NOT EXISTS' 

473 elif item == 'for_each_row': 

474 return hasattr(self, '_for_each_row') and self._for_each_row == 'FOR EACH ROW' 

475 else: 

476 # Raise AttributeError for non-existent attributes (standard Python behaviour) 

477 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{item}'") 

478 

479 def add_sql(self, sql: str | peewee.Query) -> Self: 

480 """ 

481 Add an SQL statement to be executed by the trigger. 

482 

483 The ``sql`` can be either a string containing the sql statement, or it can be any other peewee Query. 

484 

485 For example: 

486 

487 .. code-block:: python 

488 

489 # assuming you have created a trigger ... 

490 

491 sql = AnotherTable.insert( 

492 field1=some_value, field2=another_value 

493 ) 

494 trigger.add_sql(sql) 

495 

496 In this way the SQL code is generated with parametric placeholder if needed. 

497 

498 :param sql: The SQL statement. 

499 :type sql: str | peewee.Query 

500 :return: self for easy chaining 

501 :rtype: Trigger 

502 """ 

503 self._sql_list.append(sql) 

504 return self 

505 

506 def add_when(self, *conditions: str | peewee.Node) -> Self: 

507 """ 

508 Add conditions to the `when` statements. 

509 

510 Conditions are logically ANDed. 

511 To have mixed `OR` and `AND` logic, use the functions :func:`and_` and :func:`or_`. 

512 

513 The ``conditions`` can be either strings containing SQL conditions, or peewee Node objects 

514 (such as Expression or Query objects). 

515 

516 For example: 

517 

518 .. code-block:: python 

519 

520 # String condition 

521 trigger.add_when("NEW.status = 'active'") 

522 

523 # Peewee expression 

524 subq = TriggerStatus.select(TriggerStatus.status).where( 

525 TriggerStatus.trigger_type == 'DELETE_FILES' 

526 ) 

527 trigger.add_when(Value(1) == subq) 

528 

529 .. versionchanged:: v2.0.0 

530 The argument can also be a generic peewee Node. 

531 

532 :param conditions: Conditions to be added with logical AND. Can be strings or peewee Node objects. 

533 :type conditions: str | peewee.Node 

534 :return: self for easy chaining 

535 :rtype: Trigger 

536 """ 

537 for c in conditions: 

538 self._when_list.append(c) 

539 return self 

540 

541 def _node_to_sql(self, node: peewee.Node) -> str: 

542 """ 

543 Convert a peewee Node (Expression, Query, etc.) to a SQL string with interpolated parameters. 

544 

545 This is based on peewee's internal query_to_string function for debugging/logging purposes. 

546 

547 .. versionadded:: v2.0.0 

548 

549 :param node: A peewee Node object 

550 :return: SQL string with parameters interpolated 

551 """ 

552 from peewee import Context, Expression, Select 

553 

554 # Check if this is an Expression with lhs and rhs attributes (like comparisons) 

555 if isinstance(node, Expression) and hasattr(node, 'lhs') and hasattr(node, 'rhs'): 

556 # Recursively convert left and right sides 

557 lhs_sql = self._node_to_sql(node.lhs) if isinstance(node.lhs, peewee.Node) else self._value_to_sql(node.lhs) 

558 rhs_sql = self._node_to_sql(node.rhs) if isinstance(node.rhs, peewee.Node) else self._value_to_sql(node.rhs) 

559 

560 # Get the operator (e.g., '=', '>', '<', etc.) 

561 op = getattr(node, 'op', '=') 

562 

563 # For subqueries on either side, wrap them in parentheses 

564 if isinstance(node.lhs, Select): 

565 lhs_sql = f'({lhs_sql})' 

566 if isinstance(node.rhs, Select): 

567 rhs_sql = f'({rhs_sql})' 

568 

569 return f'{lhs_sql} {op} {rhs_sql}' 

570 

571 # For other node types, use the standard Context approach 

572 # Get database context if available, otherwise use a default Context 

573 db = self.database 

574 if db is not None: 

575 ctx = db.get_sql_context() 

576 else: 

577 ctx = Context() 

578 

579 # Generate SQL with parameters 

580 sql, params = ctx.sql(node).query() 

581 

582 # If no parameters, return as-is 

583 if not params: 

584 return cast(str, sql) 

585 

586 # Interpolate parameters into the SQL string 

587 # This is safe for trigger definitions (not for execution) 

588 param_placeholder = getattr(ctx.state, 'param', '?') or '?' 

589 if param_placeholder == '?': 

590 sql = sql.replace('?', '%s') 

591 

592 # Transform parameters to SQL-safe values 

593 transformed_params = [self._value_to_sql(v) for v in params] 

594 

595 interpolated_str = sql % tuple(transformed_params) 

596 return cast(str, interpolated_str) 

597 

598 def _value_to_sql(self, value: Any) -> str: 

599 """ 

600 Convert a Python value to its SQL representation. 

601 

602 .. versionadded:: v2.0.0 

603 

604 :param value: A Python value (string, int, float, None, etc.) 

605 :return: SQL string representation of the value 

606 """ 

607 if isinstance(value, str): 

608 # Escape single quotes by doubling them 

609 escaped = value.replace("'", "''") 

610 return f"'{escaped}'" 

611 elif isinstance(value, bool): # bools are numbers as well! 

612 return '1' if value else '0' 

613 elif isinstance(value, (int, float)): 

614 return str(value) 

615 elif value is None: 

616 return 'NULL' 

617 else: 

618 return str(value) 

619 

620 def set_database(self, database: peewee.Database | peewee.DatabaseProxy) -> Self: 

621 """ 

622 Set the database to use for this trigger. 

623 

624 :param database: The database instance 

625 :return: self for easy chaining 

626 """ 

627 self._database = database 

628 return self 

629 

630 @property 

631 def database(self) -> peewee.Database | None: 

632 """Return the actual database, unwrapping proxies.""" 

633 db = self._database 

634 if isinstance(db, peewee.DatabaseProxy): 

635 return cast(peewee.Database, db.obj) 

636 return db 

637 

638 def _compile_query(self, query: peewee.Query) -> str: 

639 """Compile a Peewee query using the current database dialect.""" 

640 sql, params = cast(tuple[str, tuple[Any, ...]], cast(Any, query).sql()) 

641 if params: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true

642 sql = sql % tuple(self._value_to_sql(p) for p in params) 

643 return sql 

644 

645 def _compile_node(self, node: peewee.Node) -> str: 

646 """Compile a Peewee node to SQL using the current database context.""" 

647 return self._node_to_sql(node) 

648 

649 def _compile_sql_statement(self, statement: str | peewee.Query) -> str: 

650 """Return a single SQL statement string with consistent formatting.""" 

651 if isinstance(statement, str): 

652 compiled = statement 

653 else: 

654 compiled = self._compile_query(statement) 

655 

656 compiled = compiled.strip() 

657 if not compiled.endswith(';'): 

658 compiled += ';' 

659 return compiled 

660 

661 def _compile_when_conditions(self) -> list[str]: 

662 """Compile each when condition, preserving nodes until creation time.""" 

663 compiled_conditions: list[str] = [] 

664 for cond in self._when_list: 

665 if isinstance(cond, str): 

666 compiled_conditions.append(cond.strip()) 

667 else: 

668 compiled_conditions.append(self._compile_node(cond)) 

669 return compiled_conditions 

670 

671 def _get_dialect(self) -> TriggerDialect: 

672 """ 

673 Get the appropriate dialect based on the database type. 

674 

675 :return: A dialect instance 

676 """ 

677 if self._dialect is not None: 

678 return self._dialect 

679 

680 if self._database is None: 

681 # Default to SQLite dialect 

682 return SQLiteDialect() 

683 

684 db = self._database 

685 if isinstance(db, peewee.DatabaseProxy): 

686 db = db.obj # Get the actual database from the proxy 

687 

688 if isinstance(db, peewee.SqliteDatabase): 

689 self._dialect = SQLiteDialect() 

690 elif isinstance(db, peewee.MySQLDatabase): 

691 self._dialect = MySQLDialect() 

692 elif isinstance(db, peewee.PostgresqlDatabase): 

693 self._dialect = PostgreSQLDialect() 

694 else: 

695 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}') 

696 

697 return self._dialect 

698 

699 def create(self) -> str: 

700 """ 

701 Generates the SQL create statement. 

702 

703 :return: The trigger creation statement. 

704 :raise MissingSQLStatement: if no SQL statements are provided. 

705 :raise UnsupportedDatabaseError: if the trigger type is not supported by the database. 

706 """ 

707 if len(self._sql_list) == 0: 

708 raise MissingSQLStatement('No SQL statements provided') 

709 

710 dialect = self._get_dialect() 

711 

712 # Check if the trigger type is supported 

713 if not dialect.supports_trigger_type(self.trigger_when, self.trigger_action, self.on_view): 

714 raise UnsupportedDatabaseError( 

715 f'Trigger type {self.trigger_when} {self.trigger_action} is not supported by the database' 

716 ) 

717 

718 # Check if safe create is supported 

719 if self.safe and not dialect.supports_safe_create(): 

720 # We can either ignore and continue without safe, or raise an error 

721 # For now, we'll just ignore and continue 

722 self.safe = False 

723 

724 # Check if update columns are supported 

725 if self.update_columns and not dialect.supports_update_of_columns(): 

726 # We can either ignore and continue without column-specific updates, or raise an error 

727 # For now, we'll ignore and continue 

728 self.update_columns = [] 

729 

730 # Generate the SQL 

731 return dialect.create_trigger_sql(self) 

732 

733 def drop(self, safe: bool = True) -> str: 

734 """ 

735 Generates the SQL drop statement. 

736 

737 :param safe: If True, add an IF EXIST. Defaults to True. 

738 :type safe: bool, Optional 

739 :return: The drop statement 

740 :rtype: str 

741 """ 

742 dialect = self._get_dialect() 

743 return dialect.drop_trigger_sql(self.trigger_name, safe)