Coverage for src / mafw / db / trigger.py: 100%
267 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-09 09:08 +0000
1# Copyright 2025 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Module provides a Trigger class and related tools to create triggers in the database via the ORM.
7It supports SQLite, MySQL and PostgreSQL with dialect-specific SQL generation.
8"""
10from abc import ABC, abstractmethod
11from enum import StrEnum
12from typing import Any, Self, cast
14import peewee
15from peewee import Model
17from mafw.db.db_types import PeeweeModelWithMeta
18from mafw.mafw_errors import MissingSQLStatement, UnsupportedDatabaseError
19from mafw.tools.regexp import normalize_sql_spaces
22def and_(*conditions: str) -> str:
23 """
24 Concatenates conditions with logical AND.
26 :param conditions: The condition to join.
27 :type conditions: str
28 :return: The and-concatenated string of conditions
29 :rtype: str
30 """
31 conditions_l = [f'({c})' for c in conditions]
32 return ' AND '.join(conditions_l)
35def or_(*conditions: str) -> str:
36 """
37 Concatenates conditions with logical OR.
39 :param conditions: The condition to join.
40 :type conditions: str
41 :return: The or-concatenated string of conditions.
42 :rtype: str
43 """
44 conditions_l = [f'({c})' for c in conditions]
45 return ' OR '.join(conditions_l)
48class TriggerWhen(StrEnum):
49 """String enumerator for the trigger execution time (Before, After or Instead Of)"""
51 Before = 'BEFORE'
52 After = 'AFTER'
53 Instead = 'INSTEAD OF'
56class TriggerAction(StrEnum):
57 """String enumerator for the trigger action (Delete, Insert, Update)"""
59 Delete = 'DELETE'
60 Insert = 'INSERT'
61 Update = 'UPDATE'
64class TriggerDialect(ABC):
65 """Abstract base class for database-specific trigger SQL generation."""
67 @abstractmethod
68 def create_trigger_sql(self, trigger: 'Trigger') -> str:
69 """
70 Generate the SQL to create a trigger for a specific database dialect.
72 :param trigger: The trigger object
73 :return: SQL string to create the trigger
74 """
75 pass # pragma: no cover
77 @abstractmethod
78 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
79 """
80 Generate the SQL to drop a trigger for a specific database dialect.
82 :param trigger_name: The name of the trigger to drop
83 :type trigger_name: str
84 :param safe: If True, add an IF EXISTS clause. Defaults to True.
85 :type safe: bool, Optional
86 :param table_name: The name of the target table for the trigger. Defaults to None.
87 :type table_name: str, Optional
88 :return: SQL string to drop the trigger
89 :rtype: str
90 """
91 pass # pragma: no cover
93 @abstractmethod
94 def select_all_trigger_sql(self) -> str:
95 pass # pragma: no cover
97 @abstractmethod
98 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
99 """
100 Check if the database supports the specified trigger type.
102 :param when: When the trigger should fire (BEFORE, AFTER, INSTEAD OF)
103 :param action: The action that triggers the trigger (INSERT, UPDATE, DELETE)
104 :param on_view: Whether the trigger is on a view
105 :return: True if supported, False otherwise
106 """
107 pass # pragma: no cover
109 @abstractmethod
110 def supports_safe_create(self) -> bool:
111 """
112 Check if the database supports IF NOT EXISTS for triggers.
114 :return: True if supported, False otherwise
115 """
116 pass # pragma: no cover
118 @abstractmethod
119 def supports_update_of_columns(self) -> bool:
120 """
121 Check if the database supports column-specific UPDATE triggers.
123 :return: True if supported, False otherwise
124 """
125 pass # pragma: no cover
127 @abstractmethod
128 def supports_when_clause(self) -> bool:
129 """
130 Check if the database supports WHEN conditions.
132 :return: True if supported, False otherwise
133 """
134 pass # pragma: no cover
137class SQLiteDialect(TriggerDialect):
138 """SQLite-specific trigger SQL generation."""
140 def create_trigger_sql(self, trigger: 'Trigger') -> str:
141 """Generate SQLite trigger SQL."""
142 if_not_exists = 'IF NOT EXISTS' if trigger.safe else ''
143 of_columns = (
144 f'OF {", ".join(trigger.update_columns)}'
145 if trigger.trigger_action == TriggerAction.Update and trigger.update_columns
146 else ''
147 )
148 for_each_row = 'FOR EACH ROW' if trigger.for_each_row else ''
149 when_clause = f'WHEN {" AND ".join(trigger._when_list)}' if trigger._when_list else ''
150 sql_statements = '\n'.join(trigger._sql_list)
152 return normalize_sql_spaces(
153 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n'
154 f'{trigger.trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n'
155 f'{for_each_row} {when_clause}\n'
156 f'BEGIN\n'
157 f'{sql_statements}\n'
158 f'END;'
159 )
161 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
162 """Generate SQLite drop trigger SQL."""
163 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}')
165 def select_all_trigger_sql(self) -> str:
166 return "SELECT name AS trigger_name, tbl_name AS table_name FROM sqlite_master WHERE type = 'trigger';"
168 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
169 """SQLite supports all trigger types except INSTEAD OF on tables (only on views)."""
170 if when == TriggerWhen.Instead and not on_view:
171 return False
172 return True
174 def supports_safe_create(self) -> bool:
175 """SQLite supports IF NOT EXISTS for triggers."""
176 return True
178 def supports_update_of_columns(self) -> bool:
179 """SQLite supports column-specific UPDATE triggers."""
180 return True
182 def supports_when_clause(self) -> bool:
183 """SQLite supports WHEN conditions."""
184 return True
187class MySQLDialect(TriggerDialect):
188 """MySQL-specific trigger SQL generation."""
190 def create_trigger_sql(self, trigger: 'Trigger') -> str:
191 """Generate MySQL trigger SQL."""
192 # MySQL doesn't support INSTEAD OF triggers
193 # MySQL doesn't support column-specific UPDATE triggers
194 # MySQL requires FOR EACH ROW
196 if_not_exists = 'IF NOT EXISTS' if trigger.safe else ''
198 # In MySQL, we need to convert WHEN conditions to IF/THEN/END IF blocks
199 sql_statements = []
201 # If there are conditional statements, wrap them in IF blocks
202 if trigger._when_list:
203 condition = ' AND '.join(trigger._when_list)
204 # Start the IF block
205 sql_statements.append(f'IF {condition} THEN')
207 # Add the SQL statements with indentation
208 for stmt in trigger._sql_list:
209 sql_statements.append(f' {stmt}')
211 # Close the IF block
212 sql_statements.append('END IF;')
213 else:
214 # No conditions, just add the SQL statements directly
215 sql_statements.extend(trigger._sql_list)
217 # Join all statements
218 trigger_body = '\n'.join(sql_statements)
220 # Construct the final SQL
221 sql = (
222 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n'
223 f'{trigger.trigger_when} {trigger.trigger_action} ON {trigger.target_table}\n'
224 f'FOR EACH ROW\n'
225 f'BEGIN\n'
226 f'{trigger_body}\n'
227 f'END;'
228 )
229 return normalize_sql_spaces(sql)
231 def select_all_trigger_sql(self) -> str:
232 return 'SELECT trigger_name, event_object_table AS table_name FROM information_schema.TRIGGERS WHERE TRIGGER_SCHEMA = DATABASE();'
234 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
235 """Generate MySQL drop trigger SQL."""
236 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}')
238 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
239 """MySQL doesn't support INSTEAD OF triggers."""
240 return when != TriggerWhen.Instead
242 def supports_safe_create(self) -> bool:
243 """MySQL supports IF NOT EXISTS for triggers."""
244 return True
246 def supports_update_of_columns(self) -> bool:
247 """MySQL doesn't support column-specific UPDATE triggers."""
248 return False
250 def supports_when_clause(self) -> bool:
251 """MySQL supports conditions but through WHERE instead of WHEN."""
252 return True
255class PostgreSQLDialect(TriggerDialect):
256 """PostgreSQL-specific trigger SQL generation."""
258 def create_trigger_sql(self, trigger: 'Trigger') -> str:
259 """Generate PostgreSQL trigger SQL."""
260 # PostgreSQL handles INSTEAD OF differently
261 # PostgreSQL uses functions for trigger bodies
263 function_name = f'fn_{trigger.trigger_name}'
265 # First create the function
266 function_sql = f'CREATE OR REPLACE FUNCTION {function_name}() RETURNS TRIGGER AS $$\nBEGIN\n'
268 # Add WHEN condition as IF statements if needed
269 if trigger._when_list:
270 when_condition = ' AND '.join(trigger._when_list)
271 function_sql += f' IF {when_condition} THEN\n'
272 # Indent SQL statements
273 sql_statements = '\n'.join([' ' + self._clean_sql(sql) for sql in trigger._sql_list])
274 function_sql += f'{sql_statements}\n END IF;\n'
275 else:
276 # Indent SQL statements
277 sql_statements = '\n'.join([' ' + self._clean_sql(sql) for sql in trigger._sql_list])
278 function_sql += f'{sql_statements}\n'
280 # For AFTER triggers, we need to return NULL or NEW
281 if trigger.trigger_when == TriggerWhen.After:
282 function_sql += ' RETURN NULL;\n'
283 # For BEFORE or INSTEAD OF triggers, we need to return NEW
284 else:
285 function_sql += ' RETURN NEW;\n'
287 function_sql += 'END;\n$$ LANGUAGE plpgsql;'
289 # Then create the trigger - PostgreSQL doesn't support IF NOT EXISTS for triggers before v14
290 # We'll handle this through a conditional drop
291 drop_if_exists = ''
292 if trigger.safe:
293 drop_if_exists = f'DROP TRIGGER IF EXISTS {trigger.trigger_name} ON {trigger.target_table} CASCADE;\n'
295 # PostgreSQL uses different syntax for INSTEAD OF (only allowed on views)
296 trigger_when = trigger.trigger_when
298 # Column-specific triggers in PostgreSQL
299 of_columns = (
300 f'OF {", ".join(trigger.update_columns)}'
301 if trigger.update_columns and trigger.trigger_action == TriggerAction.Update
302 else ''
303 )
305 for_each = 'FOR EACH ROW' if trigger.for_each_row else 'FOR EACH STATEMENT'
307 trigger_sql = (
308 f'{drop_if_exists}'
309 f'CREATE TRIGGER {trigger.trigger_name}\n'
310 f'{trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n'
311 f'{for_each}\n'
312 f'EXECUTE FUNCTION {function_name}();'
313 )
315 sql = f'{function_sql}\n\n{trigger_sql}'
317 return normalize_sql_spaces(sql)
319 def _clean_sql(self, sql: str) -> str:
320 """
321 Remove RETURNING clauses from SQL statements for PostgreSQL trigger functions.
323 :param sql: The SQL statement
324 :return: SQL statement without RETURNING clause
325 """
326 # Find the RETURNING clause position - case insensitive search
327 sql_upper = sql.upper()
328 returning_pos = sql_upper.find('RETURNING')
330 # If RETURNING exists, remove it and everything after it up to the semicolon
331 if returning_pos != -1:
332 semicolon_pos = sql.find(';', returning_pos)
333 if semicolon_pos != -1:
334 return sql[:returning_pos] + ';'
335 return sql[:returning_pos]
336 return sql
338 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
339 """Generate PostgreSQL drop trigger SQL."""
340 if table_name is None:
341 raise RuntimeError('Cannot drop a trigger in PostgreSQL without a table_name')
343 function_name = f'fn_{trigger_name}'
344 return normalize_sql_spaces(
345 f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name} ON {table_name};\n'
346 f'DROP FUNCTION {"IF EXISTS" if safe else ""} {function_name}();'
347 )
349 def select_all_trigger_sql(self) -> str:
350 return "SELECT trigger_name, event_object_table AS table_name FROM information_schema.triggers WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema');"
352 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
353 """PostgreSQL supports INSTEAD OF only on views."""
354 if when == TriggerWhen.Instead and not on_view:
355 return False
356 return True
358 def supports_safe_create(self) -> bool:
359 """PostgreSQL doesn't support IF NOT EXISTS for triggers before v14, but we implement safety differently."""
360 return True # We report True but handle it with DROP IF EXISTS
362 def supports_update_of_columns(self) -> bool:
363 """PostgreSQL supports column-specific UPDATE triggers."""
364 return True
366 def supports_when_clause(self) -> bool:
367 """PostgreSQL supports WHEN conditions."""
368 return True
371class Trigger:
372 """Trigger template wrapper for use with peewee ORM."""
374 # noinspection PyProtectedMember
375 def __init__(
376 self,
377 trigger_name: str,
378 trigger_type: tuple[TriggerWhen, TriggerAction],
379 source_table: type[Model] | Model | str,
380 safe: bool = False,
381 for_each_row: bool = False,
382 update_columns: list[str] | None = None,
383 on_view: bool = False, # Added parameter to indicate if the target is a view
384 ):
385 """
386 Constructor parameters:
388 :param trigger_name: The name of this trigger. It needs to be unique!
389 :type trigger_name: str
390 :param trigger_type: A tuple with :class:`TriggerWhen` and :class:`TriggerAction` to specify on which action
391 the trigger should be invoked and if before, after or instead of.
392 :type trigger_type: tuple[TriggerWhen, TriggerAction]
393 :param source_table: The table originating the trigger. It can be a model class, instance, or also the name of
394 the table.
395 :type source_table: type[Model] | Model | str
396 :param safe: A boolean flag to define if in the trigger creation statement a 'IF NOT EXISTS' clause should be
397 included. Defaults to False
398 :type safe: bool, Optional
399 :param for_each_row: A boolean flag to repeat the script content for each modified row in the table.
400 Defaults to False.
401 :type for_each_row: bool, Optional
402 :param update_columns: A list of column names. When defining a trigger on a table update, it is possible to
403 restrict the firing of the trigger to the cases when a subset of all columns have been updated. An column
404 is updated also when the new value is equal to the old one. If you want to discriminate this case, use the
405 :meth:`add_when` method. Defaults to None.
406 :type update_columns: list[str], Optional
407 :param on_view: A boolean flag to indicate if the target is a view. This affects the support for INSTEAD OF.
408 Defaults to False.
409 :type on_view: bool, Optional
410 """
411 self.trigger_name = trigger_name
412 self.trigger_type = trigger_type
413 self._trigger_when, self._trigger_op = self.trigger_type
414 self.update_columns = update_columns or []
415 self.on_view = on_view
417 if isinstance(source_table, type):
418 model_cls = cast(PeeweeModelWithMeta, source_table)
419 self.target_table = model_cls._meta.table_name
420 elif isinstance(source_table, Model):
421 model_instance = cast(PeeweeModelWithMeta, source_table)
422 self.target_table = model_instance._meta.table_name
423 else:
424 self.target_table = source_table
426 self.safe = safe
427 self.for_each_row = for_each_row
429 self._when_list: list[str] = []
430 self._sql_list: list[str] = []
431 self._database: peewee.Database | peewee.DatabaseProxy | None = None
432 self._dialect: TriggerDialect | None = None
434 @property
435 def trigger_action(self) -> TriggerAction:
436 return self._trigger_op
438 @trigger_action.setter
439 def trigger_action(self, action: TriggerAction) -> None:
440 self._trigger_op = action
442 @property
443 def trigger_when(self) -> TriggerWhen:
444 return self._trigger_when
446 @trigger_when.setter
447 def trigger_when(self, when: TriggerWhen) -> None:
448 self._trigger_when = when
450 def __setattr__(self, key: Any, value: Any) -> None:
451 if key == 'safe':
452 self.if_not_exists = 'IF NOT EXISTS' if value else ''
453 elif key == 'for_each_row':
454 self._for_each_row = 'FOR EACH ROW' if value else ''
455 else:
456 super().__setattr__(key, value)
458 def __getattr__(self, item: str) -> Any:
459 """
460 Custom attribute getter for computed properties.
462 :param item: The attribute name to get
463 :return: The attribute value
464 :raises AttributeError: If the attribute doesn't exist
465 """
466 if item == 'safe':
467 return hasattr(self, 'if_not_exists') and self.if_not_exists == 'IF NOT EXISTS'
468 elif item == 'for_each_row':
469 return hasattr(self, '_for_each_row') and self._for_each_row == 'FOR EACH ROW'
470 else:
471 # Raise AttributeError for non-existent attributes (standard Python behaviour)
472 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{item}'")
474 def add_sql(self, sql: str | peewee.Query) -> Self:
475 """
476 Add an SQL statement to be executed by the trigger.
478 The ``sql`` can be either a string containing the sql statement, or it can be any other peewee Query.
480 For example:
482 .. code-block:: python
484 # assuming you have created a trigger ...
486 sql = AnotherTable.insert(
487 field1=some_value, field2=another_value
488 )
489 trigger.add_sql(sql)
491 In this way the SQL code is generated with parametric placeholder if needed.
493 :param sql: The SQL statement.
494 :type sql: str | peewee.Query
495 :return: self for easy chaining
496 :rtype: Trigger
497 """
498 if not isinstance(sql, str):
499 sql = str(sql)
500 sql = sql.strip()
501 sql = chr(9) + sql
502 if not sql.endswith(';'):
503 sql += ';'
504 self._sql_list.append(sql)
505 return self
507 def add_when(self, *conditions: str | peewee.Node) -> Self:
508 """
509 Add conditions to the `when` statements.
511 Conditions are logically ANDed.
512 To have mixed `OR` and `AND` logic, use the functions :func:`and_` and :func:`or_`.
514 The ``conditions`` can be either strings containing SQL conditions, or peewee Node objects
515 (such as Expression or Query objects).
517 For example:
519 .. code-block:: python
521 # String condition
522 trigger.add_when("NEW.status = 'active'")
524 # Peewee expression
525 subq = TriggerStatus.select(TriggerStatus.status).where(
526 TriggerStatus.trigger_type == 'DELETE_FILES'
527 )
528 trigger.add_when(Value(1) == subq)
530 .. versionchanged:: v2.0.0
531 The argument can also be a generic peewee Node.
533 :param conditions: Conditions to be added with logical AND. Can be strings or peewee Node objects.
534 :type conditions: str | peewee.Node
535 :return: self for easy chaining
536 :rtype: Trigger
537 """
538 conditions_l = []
539 for c in conditions:
540 if isinstance(c, str):
541 # Handle string conditions
542 condition_str = c.strip()
543 else:
544 # Handle peewee Node/Expression/Query objects
545 # Convert to SQL with parameters interpolated
546 condition_str = self._node_to_sql(c).strip()
548 conditions_l.append(f'({condition_str})')
550 self._when_list.append(f'({" AND ".join(conditions_l)})')
551 return self
553 def _node_to_sql(self, node: peewee.Node) -> str:
554 """
555 Convert a peewee Node (Expression, Query, etc.) to a SQL string with interpolated parameters.
557 This is based on peewee's internal query_to_string function for debugging/logging purposes.
559 .. versionadded:: v2.0.0
561 :param node: A peewee Node object
562 :return: SQL string with parameters interpolated
563 """
564 from peewee import Context, Expression, Select
566 # Check if this is an Expression with lhs and rhs attributes (like comparisons)
567 if isinstance(node, Expression) and hasattr(node, 'lhs') and hasattr(node, 'rhs'):
568 # Recursively convert left and right sides
569 lhs_sql = self._node_to_sql(node.lhs) if isinstance(node.lhs, peewee.Node) else self._value_to_sql(node.lhs)
570 rhs_sql = self._node_to_sql(node.rhs) if isinstance(node.rhs, peewee.Node) else self._value_to_sql(node.rhs)
572 # Get the operator (e.g., '=', '>', '<', etc.)
573 op = getattr(node, 'op', '=')
575 # For subqueries on either side, wrap them in parentheses
576 if isinstance(node.lhs, Select):
577 lhs_sql = f'({lhs_sql})'
578 if isinstance(node.rhs, Select):
579 rhs_sql = f'({rhs_sql})'
581 return f'{lhs_sql} {op} {rhs_sql}'
583 # For other node types, use the standard Context approach
584 # Get database context if available, otherwise use a default Context
585 db = self._database
586 if db is not None and isinstance(db, peewee.DatabaseProxy):
587 db = db.obj # Get the actual database from the proxy
589 if db is not None:
590 ctx = db.get_sql_context()
591 else:
592 ctx = Context()
594 # Generate SQL with parameters
595 sql, params = ctx.sql(node).query()
597 # If no parameters, return as-is
598 if not params:
599 return cast(str, sql)
601 # Interpolate parameters into the SQL string
602 # This is safe for trigger definitions (not for execution)
603 param_placeholder = getattr(ctx.state, 'param', '?') or '?'
604 if param_placeholder == '?':
605 sql = sql.replace('?', '%s')
607 # Transform parameters to SQL-safe values
608 transformed_params = [self._value_to_sql(v) for v in params]
610 interpolated_str = sql % tuple(transformed_params)
611 return cast(str, interpolated_str)
613 def _value_to_sql(self, value: Any) -> str:
614 """
615 Convert a Python value to its SQL representation.
617 .. versionadded:: v2.0.0
619 :param value: A Python value (string, int, float, None, etc.)
620 :return: SQL string representation of the value
621 """
622 if isinstance(value, str):
623 # Escape single quotes by doubling them
624 escaped = value.replace("'", "''")
625 return f"'{escaped}'"
626 elif isinstance(value, bool): # bools are numbers as well!
627 return '1' if value else '0'
628 elif isinstance(value, (int, float)):
629 return str(value)
630 elif value is None:
631 return 'NULL'
632 else:
633 return str(value)
635 def set_database(self, database: peewee.Database | peewee.DatabaseProxy) -> Self:
636 """
637 Set the database to use for this trigger.
639 :param database: The database instance
640 :return: self for easy chaining
641 """
642 self._database = database
643 return self
645 def _get_dialect(self) -> TriggerDialect:
646 """
647 Get the appropriate dialect based on the database type.
649 :return: A dialect instance
650 """
651 if self._dialect is not None:
652 return self._dialect
654 if self._database is None:
655 # Default to SQLite dialect
656 return SQLiteDialect()
658 db = self._database
659 if isinstance(db, peewee.DatabaseProxy):
660 db = db.obj # Get the actual database from the proxy
662 if isinstance(db, peewee.SqliteDatabase):
663 self._dialect = SQLiteDialect()
664 elif isinstance(db, peewee.MySQLDatabase):
665 self._dialect = MySQLDialect()
666 elif isinstance(db, peewee.PostgresqlDatabase):
667 self._dialect = PostgreSQLDialect()
668 else:
669 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}')
671 return self._dialect
673 def create(self) -> str:
674 """
675 Generates the SQL create statement.
677 :return: The trigger creation statement.
678 :raise MissingSQLStatement: if no SQL statements are provided.
679 :raise UnsupportedDatabaseError: if the trigger type is not supported by the database.
680 """
681 if len(self._sql_list) == 0:
682 raise MissingSQLStatement('No SQL statements provided')
684 dialect = self._get_dialect()
686 # Check if the trigger type is supported
687 if not dialect.supports_trigger_type(self.trigger_when, self.trigger_action, self.on_view):
688 raise UnsupportedDatabaseError(
689 f'Trigger type {self.trigger_when} {self.trigger_action} is not supported by the database'
690 )
692 # Check if safe create is supported
693 if self.safe and not dialect.supports_safe_create():
694 # We can either ignore and continue without safe, or raise an error
695 # For now, we'll just ignore and continue
696 self.safe = False
698 # Check if update columns are supported
699 if self.update_columns and not dialect.supports_update_of_columns():
700 # We can either ignore and continue without column-specific updates, or raise an error
701 # For now, we'll ignore and continue
702 self.update_columns = []
704 # Generate the SQL
705 return dialect.create_trigger_sql(self)
707 def drop(self, safe: bool = True) -> str:
708 """
709 Generates the SQL drop statement.
711 :param safe: If True, add an IF EXIST. Defaults to True.
712 :type safe: bool, Optional
713 :return: The drop statement
714 :rtype: str
715 """
716 dialect = self._get_dialect()
717 return dialect.drop_trigger_sql(self.trigger_name, safe)