Coverage for src / mafw / db / trigger.py: 100%

267 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-09 09:08 +0000

1# Copyright 2025 European Union 

2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu) 

3# SPDX-License-Identifier: EUPL-1.2 

4""" 

5Module provides a Trigger class and related tools to create triggers in the database via the ORM. 

6 

7It supports SQLite, MySQL and PostgreSQL with dialect-specific SQL generation. 

8""" 

9 

10from abc import ABC, abstractmethod 

11from enum import StrEnum 

12from typing import Any, Self, cast 

13 

14import peewee 

15from peewee import Model 

16 

17from mafw.db.db_types import PeeweeModelWithMeta 

18from mafw.mafw_errors import MissingSQLStatement, UnsupportedDatabaseError 

19from mafw.tools.regexp import normalize_sql_spaces 

20 

21 

22def and_(*conditions: str) -> str: 

23 """ 

24 Concatenates conditions with logical AND. 

25 

26 :param conditions: The condition to join. 

27 :type conditions: str 

28 :return: The and-concatenated string of conditions 

29 :rtype: str 

30 """ 

31 conditions_l = [f'({c})' for c in conditions] 

32 return ' AND '.join(conditions_l) 

33 

34 

35def or_(*conditions: str) -> str: 

36 """ 

37 Concatenates conditions with logical OR. 

38 

39 :param conditions: The condition to join. 

40 :type conditions: str 

41 :return: The or-concatenated string of conditions. 

42 :rtype: str 

43 """ 

44 conditions_l = [f'({c})' for c in conditions] 

45 return ' OR '.join(conditions_l) 

46 

47 

48class TriggerWhen(StrEnum): 

49 """String enumerator for the trigger execution time (Before, After or Instead Of)""" 

50 

51 Before = 'BEFORE' 

52 After = 'AFTER' 

53 Instead = 'INSTEAD OF' 

54 

55 

56class TriggerAction(StrEnum): 

57 """String enumerator for the trigger action (Delete, Insert, Update)""" 

58 

59 Delete = 'DELETE' 

60 Insert = 'INSERT' 

61 Update = 'UPDATE' 

62 

63 

64class TriggerDialect(ABC): 

65 """Abstract base class for database-specific trigger SQL generation.""" 

66 

67 @abstractmethod 

68 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

69 """ 

70 Generate the SQL to create a trigger for a specific database dialect. 

71 

72 :param trigger: The trigger object 

73 :return: SQL string to create the trigger 

74 """ 

75 pass # pragma: no cover 

76 

77 @abstractmethod 

78 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

79 """ 

80 Generate the SQL to drop a trigger for a specific database dialect. 

81 

82 :param trigger_name: The name of the trigger to drop 

83 :type trigger_name: str 

84 :param safe: If True, add an IF EXISTS clause. Defaults to True. 

85 :type safe: bool, Optional 

86 :param table_name: The name of the target table for the trigger. Defaults to None. 

87 :type table_name: str, Optional 

88 :return: SQL string to drop the trigger 

89 :rtype: str 

90 """ 

91 pass # pragma: no cover 

92 

93 @abstractmethod 

94 def select_all_trigger_sql(self) -> str: 

95 pass # pragma: no cover 

96 

97 @abstractmethod 

98 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

99 """ 

100 Check if the database supports the specified trigger type. 

101 

102 :param when: When the trigger should fire (BEFORE, AFTER, INSTEAD OF) 

103 :param action: The action that triggers the trigger (INSERT, UPDATE, DELETE) 

104 :param on_view: Whether the trigger is on a view 

105 :return: True if supported, False otherwise 

106 """ 

107 pass # pragma: no cover 

108 

109 @abstractmethod 

110 def supports_safe_create(self) -> bool: 

111 """ 

112 Check if the database supports IF NOT EXISTS for triggers. 

113 

114 :return: True if supported, False otherwise 

115 """ 

116 pass # pragma: no cover 

117 

118 @abstractmethod 

119 def supports_update_of_columns(self) -> bool: 

120 """ 

121 Check if the database supports column-specific UPDATE triggers. 

122 

123 :return: True if supported, False otherwise 

124 """ 

125 pass # pragma: no cover 

126 

127 @abstractmethod 

128 def supports_when_clause(self) -> bool: 

129 """ 

130 Check if the database supports WHEN conditions. 

131 

132 :return: True if supported, False otherwise 

133 """ 

134 pass # pragma: no cover 

135 

136 

137class SQLiteDialect(TriggerDialect): 

138 """SQLite-specific trigger SQL generation.""" 

139 

140 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

141 """Generate SQLite trigger SQL.""" 

142 if_not_exists = 'IF NOT EXISTS' if trigger.safe else '' 

143 of_columns = ( 

144 f'OF {", ".join(trigger.update_columns)}' 

145 if trigger.trigger_action == TriggerAction.Update and trigger.update_columns 

146 else '' 

147 ) 

148 for_each_row = 'FOR EACH ROW' if trigger.for_each_row else '' 

149 when_clause = f'WHEN {" AND ".join(trigger._when_list)}' if trigger._when_list else '' 

150 sql_statements = '\n'.join(trigger._sql_list) 

151 

152 return normalize_sql_spaces( 

153 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n' 

154 f'{trigger.trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n' 

155 f'{for_each_row} {when_clause}\n' 

156 f'BEGIN\n' 

157 f'{sql_statements}\n' 

158 f'END;' 

159 ) 

160 

161 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

162 """Generate SQLite drop trigger SQL.""" 

163 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}') 

164 

165 def select_all_trigger_sql(self) -> str: 

166 return "SELECT name AS trigger_name, tbl_name AS table_name FROM sqlite_master WHERE type = 'trigger';" 

167 

168 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

169 """SQLite supports all trigger types except INSTEAD OF on tables (only on views).""" 

170 if when == TriggerWhen.Instead and not on_view: 

171 return False 

172 return True 

173 

174 def supports_safe_create(self) -> bool: 

175 """SQLite supports IF NOT EXISTS for triggers.""" 

176 return True 

177 

178 def supports_update_of_columns(self) -> bool: 

179 """SQLite supports column-specific UPDATE triggers.""" 

180 return True 

181 

182 def supports_when_clause(self) -> bool: 

183 """SQLite supports WHEN conditions.""" 

184 return True 

185 

186 

187class MySQLDialect(TriggerDialect): 

188 """MySQL-specific trigger SQL generation.""" 

189 

190 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

191 """Generate MySQL trigger SQL.""" 

192 # MySQL doesn't support INSTEAD OF triggers 

193 # MySQL doesn't support column-specific UPDATE triggers 

194 # MySQL requires FOR EACH ROW 

195 

196 if_not_exists = 'IF NOT EXISTS' if trigger.safe else '' 

197 

198 # In MySQL, we need to convert WHEN conditions to IF/THEN/END IF blocks 

199 sql_statements = [] 

200 

201 # If there are conditional statements, wrap them in IF blocks 

202 if trigger._when_list: 

203 condition = ' AND '.join(trigger._when_list) 

204 # Start the IF block 

205 sql_statements.append(f'IF {condition} THEN') 

206 

207 # Add the SQL statements with indentation 

208 for stmt in trigger._sql_list: 

209 sql_statements.append(f' {stmt}') 

210 

211 # Close the IF block 

212 sql_statements.append('END IF;') 

213 else: 

214 # No conditions, just add the SQL statements directly 

215 sql_statements.extend(trigger._sql_list) 

216 

217 # Join all statements 

218 trigger_body = '\n'.join(sql_statements) 

219 

220 # Construct the final SQL 

221 sql = ( 

222 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n' 

223 f'{trigger.trigger_when} {trigger.trigger_action} ON {trigger.target_table}\n' 

224 f'FOR EACH ROW\n' 

225 f'BEGIN\n' 

226 f'{trigger_body}\n' 

227 f'END;' 

228 ) 

229 return normalize_sql_spaces(sql) 

230 

231 def select_all_trigger_sql(self) -> str: 

232 return 'SELECT trigger_name, event_object_table AS table_name FROM information_schema.TRIGGERS WHERE TRIGGER_SCHEMA = DATABASE();' 

233 

234 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

235 """Generate MySQL drop trigger SQL.""" 

236 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}') 

237 

238 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

239 """MySQL doesn't support INSTEAD OF triggers.""" 

240 return when != TriggerWhen.Instead 

241 

242 def supports_safe_create(self) -> bool: 

243 """MySQL supports IF NOT EXISTS for triggers.""" 

244 return True 

245 

246 def supports_update_of_columns(self) -> bool: 

247 """MySQL doesn't support column-specific UPDATE triggers.""" 

248 return False 

249 

250 def supports_when_clause(self) -> bool: 

251 """MySQL supports conditions but through WHERE instead of WHEN.""" 

252 return True 

253 

254 

255class PostgreSQLDialect(TriggerDialect): 

256 """PostgreSQL-specific trigger SQL generation.""" 

257 

258 def create_trigger_sql(self, trigger: 'Trigger') -> str: 

259 """Generate PostgreSQL trigger SQL.""" 

260 # PostgreSQL handles INSTEAD OF differently 

261 # PostgreSQL uses functions for trigger bodies 

262 

263 function_name = f'fn_{trigger.trigger_name}' 

264 

265 # First create the function 

266 function_sql = f'CREATE OR REPLACE FUNCTION {function_name}() RETURNS TRIGGER AS $$\nBEGIN\n' 

267 

268 # Add WHEN condition as IF statements if needed 

269 if trigger._when_list: 

270 when_condition = ' AND '.join(trigger._when_list) 

271 function_sql += f' IF {when_condition} THEN\n' 

272 # Indent SQL statements 

273 sql_statements = '\n'.join([' ' + self._clean_sql(sql) for sql in trigger._sql_list]) 

274 function_sql += f'{sql_statements}\n END IF;\n' 

275 else: 

276 # Indent SQL statements 

277 sql_statements = '\n'.join([' ' + self._clean_sql(sql) for sql in trigger._sql_list]) 

278 function_sql += f'{sql_statements}\n' 

279 

280 # For AFTER triggers, we need to return NULL or NEW 

281 if trigger.trigger_when == TriggerWhen.After: 

282 function_sql += ' RETURN NULL;\n' 

283 # For BEFORE or INSTEAD OF triggers, we need to return NEW 

284 else: 

285 function_sql += ' RETURN NEW;\n' 

286 

287 function_sql += 'END;\n$$ LANGUAGE plpgsql;' 

288 

289 # Then create the trigger - PostgreSQL doesn't support IF NOT EXISTS for triggers before v14 

290 # We'll handle this through a conditional drop 

291 drop_if_exists = '' 

292 if trigger.safe: 

293 drop_if_exists = f'DROP TRIGGER IF EXISTS {trigger.trigger_name} ON {trigger.target_table} CASCADE;\n' 

294 

295 # PostgreSQL uses different syntax for INSTEAD OF (only allowed on views) 

296 trigger_when = trigger.trigger_when 

297 

298 # Column-specific triggers in PostgreSQL 

299 of_columns = ( 

300 f'OF {", ".join(trigger.update_columns)}' 

301 if trigger.update_columns and trigger.trigger_action == TriggerAction.Update 

302 else '' 

303 ) 

304 

305 for_each = 'FOR EACH ROW' if trigger.for_each_row else 'FOR EACH STATEMENT' 

306 

307 trigger_sql = ( 

308 f'{drop_if_exists}' 

309 f'CREATE TRIGGER {trigger.trigger_name}\n' 

310 f'{trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n' 

311 f'{for_each}\n' 

312 f'EXECUTE FUNCTION {function_name}();' 

313 ) 

314 

315 sql = f'{function_sql}\n\n{trigger_sql}' 

316 

317 return normalize_sql_spaces(sql) 

318 

319 def _clean_sql(self, sql: str) -> str: 

320 """ 

321 Remove RETURNING clauses from SQL statements for PostgreSQL trigger functions. 

322 

323 :param sql: The SQL statement 

324 :return: SQL statement without RETURNING clause 

325 """ 

326 # Find the RETURNING clause position - case insensitive search 

327 sql_upper = sql.upper() 

328 returning_pos = sql_upper.find('RETURNING') 

329 

330 # If RETURNING exists, remove it and everything after it up to the semicolon 

331 if returning_pos != -1: 

332 semicolon_pos = sql.find(';', returning_pos) 

333 if semicolon_pos != -1: 

334 return sql[:returning_pos] + ';' 

335 return sql[:returning_pos] 

336 return sql 

337 

338 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str: 

339 """Generate PostgreSQL drop trigger SQL.""" 

340 if table_name is None: 

341 raise RuntimeError('Cannot drop a trigger in PostgreSQL without a table_name') 

342 

343 function_name = f'fn_{trigger_name}' 

344 return normalize_sql_spaces( 

345 f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name} ON {table_name};\n' 

346 f'DROP FUNCTION {"IF EXISTS" if safe else ""} {function_name}();' 

347 ) 

348 

349 def select_all_trigger_sql(self) -> str: 

350 return "SELECT trigger_name, event_object_table AS table_name FROM information_schema.triggers WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema');" 

351 

352 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool: 

353 """PostgreSQL supports INSTEAD OF only on views.""" 

354 if when == TriggerWhen.Instead and not on_view: 

355 return False 

356 return True 

357 

358 def supports_safe_create(self) -> bool: 

359 """PostgreSQL doesn't support IF NOT EXISTS for triggers before v14, but we implement safety differently.""" 

360 return True # We report True but handle it with DROP IF EXISTS 

361 

362 def supports_update_of_columns(self) -> bool: 

363 """PostgreSQL supports column-specific UPDATE triggers.""" 

364 return True 

365 

366 def supports_when_clause(self) -> bool: 

367 """PostgreSQL supports WHEN conditions.""" 

368 return True 

369 

370 

371class Trigger: 

372 """Trigger template wrapper for use with peewee ORM.""" 

373 

374 # noinspection PyProtectedMember 

375 def __init__( 

376 self, 

377 trigger_name: str, 

378 trigger_type: tuple[TriggerWhen, TriggerAction], 

379 source_table: type[Model] | Model | str, 

380 safe: bool = False, 

381 for_each_row: bool = False, 

382 update_columns: list[str] | None = None, 

383 on_view: bool = False, # Added parameter to indicate if the target is a view 

384 ): 

385 """ 

386 Constructor parameters: 

387 

388 :param trigger_name: The name of this trigger. It needs to be unique! 

389 :type trigger_name: str 

390 :param trigger_type: A tuple with :class:`TriggerWhen` and :class:`TriggerAction` to specify on which action 

391 the trigger should be invoked and if before, after or instead of. 

392 :type trigger_type: tuple[TriggerWhen, TriggerAction] 

393 :param source_table: The table originating the trigger. It can be a model class, instance, or also the name of 

394 the table. 

395 :type source_table: type[Model] | Model | str 

396 :param safe: A boolean flag to define if in the trigger creation statement a 'IF NOT EXISTS' clause should be 

397 included. Defaults to False 

398 :type safe: bool, Optional 

399 :param for_each_row: A boolean flag to repeat the script content for each modified row in the table. 

400 Defaults to False. 

401 :type for_each_row: bool, Optional 

402 :param update_columns: A list of column names. When defining a trigger on a table update, it is possible to 

403 restrict the firing of the trigger to the cases when a subset of all columns have been updated. An column 

404 is updated also when the new value is equal to the old one. If you want to discriminate this case, use the 

405 :meth:`add_when` method. Defaults to None. 

406 :type update_columns: list[str], Optional 

407 :param on_view: A boolean flag to indicate if the target is a view. This affects the support for INSTEAD OF. 

408 Defaults to False. 

409 :type on_view: bool, Optional 

410 """ 

411 self.trigger_name = trigger_name 

412 self.trigger_type = trigger_type 

413 self._trigger_when, self._trigger_op = self.trigger_type 

414 self.update_columns = update_columns or [] 

415 self.on_view = on_view 

416 

417 if isinstance(source_table, type): 

418 model_cls = cast(PeeweeModelWithMeta, source_table) 

419 self.target_table = model_cls._meta.table_name 

420 elif isinstance(source_table, Model): 

421 model_instance = cast(PeeweeModelWithMeta, source_table) 

422 self.target_table = model_instance._meta.table_name 

423 else: 

424 self.target_table = source_table 

425 

426 self.safe = safe 

427 self.for_each_row = for_each_row 

428 

429 self._when_list: list[str] = [] 

430 self._sql_list: list[str] = [] 

431 self._database: peewee.Database | peewee.DatabaseProxy | None = None 

432 self._dialect: TriggerDialect | None = None 

433 

434 @property 

435 def trigger_action(self) -> TriggerAction: 

436 return self._trigger_op 

437 

438 @trigger_action.setter 

439 def trigger_action(self, action: TriggerAction) -> None: 

440 self._trigger_op = action 

441 

442 @property 

443 def trigger_when(self) -> TriggerWhen: 

444 return self._trigger_when 

445 

446 @trigger_when.setter 

447 def trigger_when(self, when: TriggerWhen) -> None: 

448 self._trigger_when = when 

449 

450 def __setattr__(self, key: Any, value: Any) -> None: 

451 if key == 'safe': 

452 self.if_not_exists = 'IF NOT EXISTS' if value else '' 

453 elif key == 'for_each_row': 

454 self._for_each_row = 'FOR EACH ROW' if value else '' 

455 else: 

456 super().__setattr__(key, value) 

457 

458 def __getattr__(self, item: str) -> Any: 

459 """ 

460 Custom attribute getter for computed properties. 

461 

462 :param item: The attribute name to get 

463 :return: The attribute value 

464 :raises AttributeError: If the attribute doesn't exist 

465 """ 

466 if item == 'safe': 

467 return hasattr(self, 'if_not_exists') and self.if_not_exists == 'IF NOT EXISTS' 

468 elif item == 'for_each_row': 

469 return hasattr(self, '_for_each_row') and self._for_each_row == 'FOR EACH ROW' 

470 else: 

471 # Raise AttributeError for non-existent attributes (standard Python behaviour) 

472 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{item}'") 

473 

474 def add_sql(self, sql: str | peewee.Query) -> Self: 

475 """ 

476 Add an SQL statement to be executed by the trigger. 

477 

478 The ``sql`` can be either a string containing the sql statement, or it can be any other peewee Query. 

479 

480 For example: 

481 

482 .. code-block:: python 

483 

484 # assuming you have created a trigger ... 

485 

486 sql = AnotherTable.insert( 

487 field1=some_value, field2=another_value 

488 ) 

489 trigger.add_sql(sql) 

490 

491 In this way the SQL code is generated with parametric placeholder if needed. 

492 

493 :param sql: The SQL statement. 

494 :type sql: str | peewee.Query 

495 :return: self for easy chaining 

496 :rtype: Trigger 

497 """ 

498 if not isinstance(sql, str): 

499 sql = str(sql) 

500 sql = sql.strip() 

501 sql = chr(9) + sql 

502 if not sql.endswith(';'): 

503 sql += ';' 

504 self._sql_list.append(sql) 

505 return self 

506 

507 def add_when(self, *conditions: str | peewee.Node) -> Self: 

508 """ 

509 Add conditions to the `when` statements. 

510 

511 Conditions are logically ANDed. 

512 To have mixed `OR` and `AND` logic, use the functions :func:`and_` and :func:`or_`. 

513 

514 The ``conditions`` can be either strings containing SQL conditions, or peewee Node objects 

515 (such as Expression or Query objects). 

516 

517 For example: 

518 

519 .. code-block:: python 

520 

521 # String condition 

522 trigger.add_when("NEW.status = 'active'") 

523 

524 # Peewee expression 

525 subq = TriggerStatus.select(TriggerStatus.status).where( 

526 TriggerStatus.trigger_type == 'DELETE_FILES' 

527 ) 

528 trigger.add_when(Value(1) == subq) 

529 

530 .. versionchanged:: v2.0.0 

531 The argument can also be a generic peewee Node. 

532 

533 :param conditions: Conditions to be added with logical AND. Can be strings or peewee Node objects. 

534 :type conditions: str | peewee.Node 

535 :return: self for easy chaining 

536 :rtype: Trigger 

537 """ 

538 conditions_l = [] 

539 for c in conditions: 

540 if isinstance(c, str): 

541 # Handle string conditions 

542 condition_str = c.strip() 

543 else: 

544 # Handle peewee Node/Expression/Query objects 

545 # Convert to SQL with parameters interpolated 

546 condition_str = self._node_to_sql(c).strip() 

547 

548 conditions_l.append(f'({condition_str})') 

549 

550 self._when_list.append(f'({" AND ".join(conditions_l)})') 

551 return self 

552 

553 def _node_to_sql(self, node: peewee.Node) -> str: 

554 """ 

555 Convert a peewee Node (Expression, Query, etc.) to a SQL string with interpolated parameters. 

556 

557 This is based on peewee's internal query_to_string function for debugging/logging purposes. 

558 

559 .. versionadded:: v2.0.0 

560 

561 :param node: A peewee Node object 

562 :return: SQL string with parameters interpolated 

563 """ 

564 from peewee import Context, Expression, Select 

565 

566 # Check if this is an Expression with lhs and rhs attributes (like comparisons) 

567 if isinstance(node, Expression) and hasattr(node, 'lhs') and hasattr(node, 'rhs'): 

568 # Recursively convert left and right sides 

569 lhs_sql = self._node_to_sql(node.lhs) if isinstance(node.lhs, peewee.Node) else self._value_to_sql(node.lhs) 

570 rhs_sql = self._node_to_sql(node.rhs) if isinstance(node.rhs, peewee.Node) else self._value_to_sql(node.rhs) 

571 

572 # Get the operator (e.g., '=', '>', '<', etc.) 

573 op = getattr(node, 'op', '=') 

574 

575 # For subqueries on either side, wrap them in parentheses 

576 if isinstance(node.lhs, Select): 

577 lhs_sql = f'({lhs_sql})' 

578 if isinstance(node.rhs, Select): 

579 rhs_sql = f'({rhs_sql})' 

580 

581 return f'{lhs_sql} {op} {rhs_sql}' 

582 

583 # For other node types, use the standard Context approach 

584 # Get database context if available, otherwise use a default Context 

585 db = self._database 

586 if db is not None and isinstance(db, peewee.DatabaseProxy): 

587 db = db.obj # Get the actual database from the proxy 

588 

589 if db is not None: 

590 ctx = db.get_sql_context() 

591 else: 

592 ctx = Context() 

593 

594 # Generate SQL with parameters 

595 sql, params = ctx.sql(node).query() 

596 

597 # If no parameters, return as-is 

598 if not params: 

599 return cast(str, sql) 

600 

601 # Interpolate parameters into the SQL string 

602 # This is safe for trigger definitions (not for execution) 

603 param_placeholder = getattr(ctx.state, 'param', '?') or '?' 

604 if param_placeholder == '?': 

605 sql = sql.replace('?', '%s') 

606 

607 # Transform parameters to SQL-safe values 

608 transformed_params = [self._value_to_sql(v) for v in params] 

609 

610 interpolated_str = sql % tuple(transformed_params) 

611 return cast(str, interpolated_str) 

612 

613 def _value_to_sql(self, value: Any) -> str: 

614 """ 

615 Convert a Python value to its SQL representation. 

616 

617 .. versionadded:: v2.0.0 

618 

619 :param value: A Python value (string, int, float, None, etc.) 

620 :return: SQL string representation of the value 

621 """ 

622 if isinstance(value, str): 

623 # Escape single quotes by doubling them 

624 escaped = value.replace("'", "''") 

625 return f"'{escaped}'" 

626 elif isinstance(value, bool): # bools are numbers as well! 

627 return '1' if value else '0' 

628 elif isinstance(value, (int, float)): 

629 return str(value) 

630 elif value is None: 

631 return 'NULL' 

632 else: 

633 return str(value) 

634 

635 def set_database(self, database: peewee.Database | peewee.DatabaseProxy) -> Self: 

636 """ 

637 Set the database to use for this trigger. 

638 

639 :param database: The database instance 

640 :return: self for easy chaining 

641 """ 

642 self._database = database 

643 return self 

644 

645 def _get_dialect(self) -> TriggerDialect: 

646 """ 

647 Get the appropriate dialect based on the database type. 

648 

649 :return: A dialect instance 

650 """ 

651 if self._dialect is not None: 

652 return self._dialect 

653 

654 if self._database is None: 

655 # Default to SQLite dialect 

656 return SQLiteDialect() 

657 

658 db = self._database 

659 if isinstance(db, peewee.DatabaseProxy): 

660 db = db.obj # Get the actual database from the proxy 

661 

662 if isinstance(db, peewee.SqliteDatabase): 

663 self._dialect = SQLiteDialect() 

664 elif isinstance(db, peewee.MySQLDatabase): 

665 self._dialect = MySQLDialect() 

666 elif isinstance(db, peewee.PostgresqlDatabase): 

667 self._dialect = PostgreSQLDialect() 

668 else: 

669 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}') 

670 

671 return self._dialect 

672 

673 def create(self) -> str: 

674 """ 

675 Generates the SQL create statement. 

676 

677 :return: The trigger creation statement. 

678 :raise MissingSQLStatement: if no SQL statements are provided. 

679 :raise UnsupportedDatabaseError: if the trigger type is not supported by the database. 

680 """ 

681 if len(self._sql_list) == 0: 

682 raise MissingSQLStatement('No SQL statements provided') 

683 

684 dialect = self._get_dialect() 

685 

686 # Check if the trigger type is supported 

687 if not dialect.supports_trigger_type(self.trigger_when, self.trigger_action, self.on_view): 

688 raise UnsupportedDatabaseError( 

689 f'Trigger type {self.trigger_when} {self.trigger_action} is not supported by the database' 

690 ) 

691 

692 # Check if safe create is supported 

693 if self.safe and not dialect.supports_safe_create(): 

694 # We can either ignore and continue without safe, or raise an error 

695 # For now, we'll just ignore and continue 

696 self.safe = False 

697 

698 # Check if update columns are supported 

699 if self.update_columns and not dialect.supports_update_of_columns(): 

700 # We can either ignore and continue without column-specific updates, or raise an error 

701 # For now, we'll ignore and continue 

702 self.update_columns = [] 

703 

704 # Generate the SQL 

705 return dialect.create_trigger_sql(self) 

706 

707 def drop(self, safe: bool = True) -> str: 

708 """ 

709 Generates the SQL drop statement. 

710 

711 :param safe: If True, add an IF EXIST. Defaults to True. 

712 :type safe: bool, Optional 

713 :return: The drop statement 

714 :rtype: str 

715 """ 

716 dialect = self._get_dialect() 

717 return dialect.drop_trigger_sql(self.trigger_name, safe)