Coverage for src / mafw / db / trigger.py: 99%
288 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 16:10 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Module provides a Trigger class and related tools to create triggers in the database via the ORM.
7It supports SQLite, MySQL and PostgreSQL with dialect-specific SQL generation.
8"""
10from abc import ABC, abstractmethod
11from enum import StrEnum
12from typing import Any, Self, cast
14import peewee
15from peewee import Model
17from mafw.db.db_types import PeeweeModelWithMeta
18from mafw.mafw_errors import MissingSQLStatement, UnsupportedDatabaseError
19from mafw.tools.regexp import normalize_sql_spaces
22def and_(*conditions: str) -> str:
23 """
24 Concatenates conditions with logical AND.
26 :param conditions: The condition to join.
27 :type conditions: str
28 :return: The and-concatenated string of conditions
29 :rtype: str
30 """
31 conditions_l = [f'({c})' for c in conditions]
32 return ' AND '.join(conditions_l)
35def or_(*conditions: str) -> str:
36 """
37 Concatenates conditions with logical OR.
39 :param conditions: The condition to join.
40 :type conditions: str
41 :return: The or-concatenated string of conditions.
42 :rtype: str
43 """
44 conditions_l = [f'({c})' for c in conditions]
45 return ' OR '.join(conditions_l)
48class TriggerWhen(StrEnum):
49 """String enumerator for the trigger execution time (Before, After or Instead Of)"""
51 Before = 'BEFORE'
52 After = 'AFTER'
53 Instead = 'INSTEAD OF'
56class TriggerAction(StrEnum):
57 """String enumerator for the trigger action (Delete, Insert, Update)"""
59 Delete = 'DELETE'
60 Insert = 'INSERT'
61 Update = 'UPDATE'
64class TriggerDialect(ABC):
65 """Abstract base class for database-specific trigger SQL generation."""
67 @abstractmethod
68 def create_trigger_sql(self, trigger: 'Trigger') -> str:
69 """
70 Generate the SQL to create a trigger for a specific database dialect.
72 :param trigger: The trigger object
73 :return: SQL string to create the trigger
74 """
75 pass # pragma: no cover
77 @abstractmethod
78 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
79 """
80 Generate the SQL to drop a trigger for a specific database dialect.
82 :param trigger_name: The name of the trigger to drop
83 :type trigger_name: str
84 :param safe: If True, add an IF EXISTS clause. Defaults to True.
85 :type safe: bool, Optional
86 :param table_name: The name of the target table for the trigger. Defaults to None.
87 :type table_name: str, Optional
88 :return: SQL string to drop the trigger
89 :rtype: str
90 """
91 pass # pragma: no cover
93 @abstractmethod
94 def select_all_trigger_sql(self) -> str:
95 pass # pragma: no cover
97 @abstractmethod
98 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
99 """
100 Check if the database supports the specified trigger type.
102 :param when: When the trigger should fire (BEFORE, AFTER, INSTEAD OF)
103 :param action: The action that triggers the trigger (INSERT, UPDATE, DELETE)
104 :param on_view: Whether the trigger is on a view
105 :return: True if supported, False otherwise
106 """
107 pass # pragma: no cover
109 @abstractmethod
110 def supports_safe_create(self) -> bool:
111 """
112 Check if the database supports IF NOT EXISTS for triggers.
114 :return: True if supported, False otherwise
115 """
116 pass # pragma: no cover
118 @abstractmethod
119 def supports_update_of_columns(self) -> bool:
120 """
121 Check if the database supports column-specific UPDATE triggers.
123 :return: True if supported, False otherwise
124 """
125 pass # pragma: no cover
127 @abstractmethod
128 def supports_when_clause(self) -> bool:
129 """
130 Check if the database supports WHEN conditions.
132 :return: True if supported, False otherwise
133 """
134 pass # pragma: no cover
137class SQLiteDialect(TriggerDialect):
138 """SQLite-specific trigger SQL generation."""
140 def create_trigger_sql(self, trigger: 'Trigger') -> str:
141 """Generate SQLite trigger SQL."""
142 if_not_exists = 'IF NOT EXISTS' if trigger.safe else ''
143 of_columns = (
144 f'OF {", ".join(trigger.update_columns)}'
145 if trigger.trigger_action == TriggerAction.Update and trigger.update_columns
146 else ''
147 )
148 for_each_row = 'FOR EACH ROW' if trigger.for_each_row else ''
149 compiled_conditions = trigger._compile_when_conditions()
150 when_clause = f'WHEN {" AND ".join(f"({c})" for c in compiled_conditions)}' if compiled_conditions else ''
151 sql_statements = '\n'.join(trigger._compile_sql_statement(stmt) for stmt in trigger._sql_list)
153 return normalize_sql_spaces(
154 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n'
155 f'{trigger.trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n'
156 f'{for_each_row} {when_clause}\n'
157 f'BEGIN\n'
158 f'{sql_statements}\n'
159 f'END;'
160 )
162 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
163 """Generate SQLite drop trigger SQL."""
164 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}')
166 def select_all_trigger_sql(self) -> str:
167 return "SELECT name AS trigger_name, tbl_name AS table_name FROM sqlite_master WHERE type = 'trigger';"
169 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
170 """SQLite supports all trigger types except INSTEAD OF on tables (only on views)."""
171 if when == TriggerWhen.Instead and not on_view:
172 return False
173 return True
175 def supports_safe_create(self) -> bool:
176 """SQLite supports IF NOT EXISTS for triggers."""
177 return True
179 def supports_update_of_columns(self) -> bool:
180 """SQLite supports column-specific UPDATE triggers."""
181 return True
183 def supports_when_clause(self) -> bool:
184 """SQLite supports WHEN conditions."""
185 return True
188class MySQLDialect(TriggerDialect):
189 """MySQL-specific trigger SQL generation."""
191 def create_trigger_sql(self, trigger: 'Trigger') -> str:
192 """Generate MySQL trigger SQL."""
193 # MySQL doesn't support INSTEAD OF triggers
194 # MySQL doesn't support column-specific UPDATE triggers
195 # MySQL requires FOR EACH ROW
197 if_not_exists = 'IF NOT EXISTS' if trigger.safe else ''
199 # In MySQL, we need to convert WHEN conditions to IF/THEN/END IF blocks
200 sql_statements = []
202 compiled_conditions = trigger._compile_when_conditions()
203 compiled_statements = [trigger._compile_sql_statement(stmt) for stmt in trigger._sql_list]
205 # If there are conditional statements, wrap them in IF blocks
206 if compiled_conditions:
207 condition = ' AND '.join(f'({c})' for c in compiled_conditions)
208 sql_statements.append(f'IF {condition} THEN')
210 # Add the SQL statements with indentation
211 for stmt in compiled_statements:
212 sql_statements.append(f' {stmt}')
214 # Close the IF block
215 sql_statements.append('END IF;')
216 else:
217 # No conditions, just add the SQL statements directly
218 sql_statements.extend(compiled_statements)
220 # Join all statements
221 trigger_body = '\n'.join(sql_statements)
223 # Construct the final SQL
224 sql = (
225 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n'
226 f'{trigger.trigger_when} {trigger.trigger_action} ON {trigger.target_table}\n'
227 f'FOR EACH ROW\n'
228 f'BEGIN\n'
229 f'{trigger_body}\n'
230 f'END;'
231 )
232 return normalize_sql_spaces(sql)
234 def select_all_trigger_sql(self) -> str:
235 return 'SELECT trigger_name, event_object_table AS table_name FROM information_schema.TRIGGERS WHERE TRIGGER_SCHEMA = DATABASE();'
237 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
238 """Generate MySQL drop trigger SQL."""
239 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}')
241 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
242 """MySQL doesn't support INSTEAD OF triggers."""
243 return when != TriggerWhen.Instead
245 def supports_safe_create(self) -> bool:
246 """MySQL supports IF NOT EXISTS for triggers."""
247 return True
249 def supports_update_of_columns(self) -> bool:
250 """MySQL doesn't support column-specific UPDATE triggers."""
251 return False
253 def supports_when_clause(self) -> bool:
254 """MySQL supports conditions but through WHERE instead of WHEN."""
255 return True
258class PostgreSQLDialect(TriggerDialect):
259 """PostgreSQL-specific trigger SQL generation."""
261 def create_trigger_sql(self, trigger: 'Trigger') -> str:
262 """Generate PostgreSQL trigger SQL."""
263 # PostgreSQL handles INSTEAD OF differently
264 # PostgreSQL uses functions for trigger bodies
266 function_name = f'fn_{trigger.trigger_name}'
268 # First create the function
269 function_sql = f'CREATE OR REPLACE FUNCTION {function_name}() RETURNS TRIGGER AS $$\nBEGIN\n'
271 compiled_conditions = trigger._compile_when_conditions()
272 compiled_statements = [self._clean_sql(trigger._compile_sql_statement(stmt)) for stmt in trigger._sql_list]
274 # Add WHEN condition as IF statements if needed
275 if compiled_conditions:
276 when_condition = ' AND '.join(f'({c})' for c in compiled_conditions)
277 function_sql += f' IF {when_condition} THEN\n'
278 for sql in compiled_statements:
279 function_sql += f' {sql}\n'
280 function_sql += ' END IF;\n'
281 else:
282 for sql in compiled_statements:
283 function_sql += f' {sql}\n'
285 # For AFTER triggers, we need to return NULL or NEW
286 if trigger.trigger_when == TriggerWhen.After:
287 function_sql += ' RETURN NULL;\n'
288 # For BEFORE or INSTEAD OF triggers, we need to return NEW
289 else:
290 function_sql += ' RETURN NEW;\n'
292 function_sql += 'END;\n$$ LANGUAGE plpgsql;'
294 # Then create the trigger - PostgreSQL doesn't support IF NOT EXISTS for triggers before v14
295 # We'll handle this through a conditional drop
296 drop_if_exists = ''
297 if trigger.safe:
298 drop_if_exists = f'DROP TRIGGER IF EXISTS {trigger.trigger_name} ON {trigger.target_table} CASCADE;\n'
300 # PostgreSQL uses different syntax for INSTEAD OF (only allowed on views)
301 trigger_when = trigger.trigger_when
303 # Column-specific triggers in PostgreSQL
304 of_columns = (
305 f'OF {", ".join(trigger.update_columns)}'
306 if trigger.update_columns and trigger.trigger_action == TriggerAction.Update
307 else ''
308 )
310 for_each = 'FOR EACH ROW' if trigger.for_each_row else 'FOR EACH STATEMENT'
312 trigger_sql = (
313 f'{drop_if_exists}'
314 f'CREATE TRIGGER {trigger.trigger_name}\n'
315 f'{trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n'
316 f'{for_each}\n'
317 f'EXECUTE FUNCTION {function_name}();'
318 )
320 sql = f'{function_sql}\n\n{trigger_sql}'
322 return normalize_sql_spaces(sql)
324 def _clean_sql(self, sql: str) -> str:
325 """
326 Remove RETURNING clauses from SQL statements for PostgreSQL trigger functions.
328 :param sql: The SQL statement
329 :return: SQL statement without RETURNING clause
330 """
331 # Find the RETURNING clause position - case insensitive search
332 sql_upper = sql.upper()
333 returning_pos = sql_upper.find('RETURNING')
335 # If RETURNING exists, remove it and everything after it up to the semicolon
336 if returning_pos != -1:
337 semicolon_pos = sql.find(';', returning_pos)
338 if semicolon_pos != -1:
339 return sql[:returning_pos] + ';'
340 return sql[:returning_pos]
341 return sql
343 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
344 """Generate PostgreSQL drop trigger SQL."""
345 if table_name is None:
346 raise RuntimeError('Cannot drop a trigger in PostgreSQL without a table_name')
348 function_name = f'fn_{trigger_name}'
349 return normalize_sql_spaces(
350 f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name} ON {table_name};\n'
351 f'DROP FUNCTION {"IF EXISTS" if safe else ""} {function_name}();'
352 )
354 def select_all_trigger_sql(self) -> str:
355 return "SELECT trigger_name, event_object_table AS table_name FROM information_schema.triggers WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema');"
357 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
358 """PostgreSQL supports INSTEAD OF only on views."""
359 if when == TriggerWhen.Instead and not on_view:
360 return False
361 return True
363 def supports_safe_create(self) -> bool:
364 """PostgreSQL doesn't support IF NOT EXISTS for triggers before v14, but we implement safety differently."""
365 return True # We report True but handle it with DROP IF EXISTS
367 def supports_update_of_columns(self) -> bool:
368 """PostgreSQL supports column-specific UPDATE triggers."""
369 return True
371 def supports_when_clause(self) -> bool:
372 """PostgreSQL supports WHEN conditions."""
373 return True
376class Trigger:
377 """Trigger template wrapper for use with peewee ORM."""
379 # noinspection PyProtectedMember
380 def __init__(
381 self,
382 trigger_name: str,
383 trigger_type: tuple[TriggerWhen, TriggerAction],
384 source_table: type[Model] | Model | str,
385 safe: bool = False,
386 for_each_row: bool = False,
387 update_columns: list[str] | None = None,
388 on_view: bool = False, # Added parameter to indicate if the target is a view
389 ):
390 """
391 Constructor parameters:
393 :param trigger_name: The name of this trigger. It needs to be unique!
394 :type trigger_name: str
395 :param trigger_type: A tuple with :class:`TriggerWhen` and :class:`TriggerAction` to specify on which action
396 the trigger should be invoked and if before, after or instead of.
397 :type trigger_type: tuple[TriggerWhen, TriggerAction]
398 :param source_table: The table originating the trigger. It can be a model class, instance, or also the name of
399 the table.
400 :type source_table: type[Model] | Model | str
401 :param safe: A boolean flag to define if in the trigger creation statement a 'IF NOT EXISTS' clause should be
402 included. Defaults to False
403 :type safe: bool, Optional
404 :param for_each_row: A boolean flag to repeat the script content for each modified row in the table.
405 Defaults to False.
406 :type for_each_row: bool, Optional
407 :param update_columns: A list of column names. When defining a trigger on a table update, it is possible to
408 restrict the firing of the trigger to the cases when a subset of all columns have been updated. An column
409 is updated also when the new value is equal to the old one. If you want to discriminate this case, use the
410 :meth:`add_when` method. Defaults to None.
411 :type update_columns: list[str], Optional
412 :param on_view: A boolean flag to indicate if the target is a view. This affects the support for INSTEAD OF.
413 Defaults to False.
414 :type on_view: bool, Optional
415 """
416 self.trigger_name = trigger_name
417 self.trigger_type = trigger_type
418 self._trigger_when, self._trigger_op = self.trigger_type
419 self.update_columns = update_columns or []
420 self.on_view = on_view
422 if isinstance(source_table, type):
423 model_cls = cast(PeeweeModelWithMeta, source_table)
424 self.target_table = model_cls._meta.table_name
425 elif isinstance(source_table, Model):
426 model_instance = cast(PeeweeModelWithMeta, source_table)
427 self.target_table = model_instance._meta.table_name
428 else:
429 self.target_table = source_table
431 self.safe = safe
432 self.for_each_row = for_each_row
434 self._when_list: list[str | peewee.Node] = []
435 self._sql_list: list[str | peewee.Query] = []
436 self._database: peewee.Database | peewee.DatabaseProxy | None = None
437 self._dialect: TriggerDialect | None = None
439 @property
440 def trigger_action(self) -> TriggerAction:
441 return self._trigger_op
443 @trigger_action.setter
444 def trigger_action(self, action: TriggerAction) -> None:
445 self._trigger_op = action
447 @property
448 def trigger_when(self) -> TriggerWhen:
449 return self._trigger_when
451 @trigger_when.setter
452 def trigger_when(self, when: TriggerWhen) -> None:
453 self._trigger_when = when
455 def __setattr__(self, key: Any, value: Any) -> None:
456 if key == 'safe':
457 self.if_not_exists = 'IF NOT EXISTS' if value else ''
458 elif key == 'for_each_row':
459 self._for_each_row = 'FOR EACH ROW' if value else ''
460 else:
461 super().__setattr__(key, value)
463 def __getattr__(self, item: str) -> Any:
464 """
465 Custom attribute getter for computed properties.
467 :param item: The attribute name to get
468 :return: The attribute value
469 :raises AttributeError: If the attribute doesn't exist
470 """
471 if item == 'safe':
472 return hasattr(self, 'if_not_exists') and self.if_not_exists == 'IF NOT EXISTS'
473 elif item == 'for_each_row':
474 return hasattr(self, '_for_each_row') and self._for_each_row == 'FOR EACH ROW'
475 else:
476 # Raise AttributeError for non-existent attributes (standard Python behaviour)
477 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{item}'")
479 def add_sql(self, sql: str | peewee.Query) -> Self:
480 """
481 Add an SQL statement to be executed by the trigger.
483 The ``sql`` can be either a string containing the sql statement, or it can be any other peewee Query.
485 For example:
487 .. code-block:: python
489 # assuming you have created a trigger ...
491 sql = AnotherTable.insert(
492 field1=some_value, field2=another_value
493 )
494 trigger.add_sql(sql)
496 In this way the SQL code is generated with parametric placeholder if needed.
498 :param sql: The SQL statement.
499 :type sql: str | peewee.Query
500 :return: self for easy chaining
501 :rtype: Trigger
502 """
503 self._sql_list.append(sql)
504 return self
506 def add_when(self, *conditions: str | peewee.Node) -> Self:
507 """
508 Add conditions to the `when` statements.
510 Conditions are logically ANDed.
511 To have mixed `OR` and `AND` logic, use the functions :func:`and_` and :func:`or_`.
513 The ``conditions`` can be either strings containing SQL conditions, or peewee Node objects
514 (such as Expression or Query objects).
516 For example:
518 .. code-block:: python
520 # String condition
521 trigger.add_when("NEW.status = 'active'")
523 # Peewee expression
524 subq = TriggerStatus.select(TriggerStatus.status).where(
525 TriggerStatus.trigger_type == 'DELETE_FILES'
526 )
527 trigger.add_when(Value(1) == subq)
529 .. versionchanged:: v2.0.0
530 The argument can also be a generic peewee Node.
532 :param conditions: Conditions to be added with logical AND. Can be strings or peewee Node objects.
533 :type conditions: str | peewee.Node
534 :return: self for easy chaining
535 :rtype: Trigger
536 """
537 for c in conditions:
538 self._when_list.append(c)
539 return self
541 def _node_to_sql(self, node: peewee.Node) -> str:
542 """
543 Convert a peewee Node (Expression, Query, etc.) to a SQL string with interpolated parameters.
545 This is based on peewee's internal query_to_string function for debugging/logging purposes.
547 .. versionadded:: v2.0.0
549 :param node: A peewee Node object
550 :return: SQL string with parameters interpolated
551 """
552 from peewee import Context, Expression, Select
554 # Check if this is an Expression with lhs and rhs attributes (like comparisons)
555 if isinstance(node, Expression) and hasattr(node, 'lhs') and hasattr(node, 'rhs'):
556 # Recursively convert left and right sides
557 lhs_sql = self._node_to_sql(node.lhs) if isinstance(node.lhs, peewee.Node) else self._value_to_sql(node.lhs)
558 rhs_sql = self._node_to_sql(node.rhs) if isinstance(node.rhs, peewee.Node) else self._value_to_sql(node.rhs)
560 # Get the operator (e.g., '=', '>', '<', etc.)
561 op = getattr(node, 'op', '=')
563 # For subqueries on either side, wrap them in parentheses
564 if isinstance(node.lhs, Select):
565 lhs_sql = f'({lhs_sql})'
566 if isinstance(node.rhs, Select):
567 rhs_sql = f'({rhs_sql})'
569 return f'{lhs_sql} {op} {rhs_sql}'
571 # For other node types, use the standard Context approach
572 # Get database context if available, otherwise use a default Context
573 db = self.database
574 if db is not None:
575 ctx = db.get_sql_context()
576 else:
577 ctx = Context()
579 # Generate SQL with parameters
580 sql, params = ctx.sql(node).query()
582 # If no parameters, return as-is
583 if not params:
584 return cast(str, sql)
586 # Interpolate parameters into the SQL string
587 # This is safe for trigger definitions (not for execution)
588 param_placeholder = getattr(ctx.state, 'param', '?') or '?'
589 if param_placeholder == '?':
590 sql = sql.replace('?', '%s')
592 # Transform parameters to SQL-safe values
593 transformed_params = [self._value_to_sql(v) for v in params]
595 interpolated_str = sql % tuple(transformed_params)
596 return cast(str, interpolated_str)
598 def _value_to_sql(self, value: Any) -> str:
599 """
600 Convert a Python value to its SQL representation.
602 .. versionadded:: v2.0.0
604 :param value: A Python value (string, int, float, None, etc.)
605 :return: SQL string representation of the value
606 """
607 if isinstance(value, str):
608 # Escape single quotes by doubling them
609 escaped = value.replace("'", "''")
610 return f"'{escaped}'"
611 elif isinstance(value, bool): # bools are numbers as well!
612 return '1' if value else '0'
613 elif isinstance(value, (int, float)):
614 return str(value)
615 elif value is None:
616 return 'NULL'
617 else:
618 return str(value)
620 def set_database(self, database: peewee.Database | peewee.DatabaseProxy) -> Self:
621 """
622 Set the database to use for this trigger.
624 :param database: The database instance
625 :return: self for easy chaining
626 """
627 self._database = database
628 return self
630 @property
631 def database(self) -> peewee.Database | None:
632 """Return the actual database, unwrapping proxies."""
633 db = self._database
634 if isinstance(db, peewee.DatabaseProxy):
635 return cast(peewee.Database, db.obj)
636 return db
638 def _compile_query(self, query: peewee.Query) -> str:
639 """Compile a Peewee query using the current database dialect."""
640 sql, params = cast(tuple[str, tuple[Any, ...]], cast(Any, query).sql())
641 if params: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true
642 sql = sql % tuple(self._value_to_sql(p) for p in params)
643 return sql
645 def _compile_node(self, node: peewee.Node) -> str:
646 """Compile a Peewee node to SQL using the current database context."""
647 return self._node_to_sql(node)
649 def _compile_sql_statement(self, statement: str | peewee.Query) -> str:
650 """Return a single SQL statement string with consistent formatting."""
651 if isinstance(statement, str):
652 compiled = statement
653 else:
654 compiled = self._compile_query(statement)
656 compiled = compiled.strip()
657 if not compiled.endswith(';'):
658 compiled += ';'
659 return compiled
661 def _compile_when_conditions(self) -> list[str]:
662 """Compile each when condition, preserving nodes until creation time."""
663 compiled_conditions: list[str] = []
664 for cond in self._when_list:
665 if isinstance(cond, str):
666 compiled_conditions.append(cond.strip())
667 else:
668 compiled_conditions.append(self._compile_node(cond))
669 return compiled_conditions
671 def _get_dialect(self) -> TriggerDialect:
672 """
673 Get the appropriate dialect based on the database type.
675 :return: A dialect instance
676 """
677 if self._dialect is not None:
678 return self._dialect
680 if self._database is None:
681 # Default to SQLite dialect
682 return SQLiteDialect()
684 db = self._database
685 if isinstance(db, peewee.DatabaseProxy):
686 db = db.obj # Get the actual database from the proxy
688 if isinstance(db, peewee.SqliteDatabase):
689 self._dialect = SQLiteDialect()
690 elif isinstance(db, peewee.MySQLDatabase):
691 self._dialect = MySQLDialect()
692 elif isinstance(db, peewee.PostgresqlDatabase):
693 self._dialect = PostgreSQLDialect()
694 else:
695 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}')
697 return self._dialect
699 def create(self) -> str:
700 """
701 Generates the SQL create statement.
703 :return: The trigger creation statement.
704 :raise MissingSQLStatement: if no SQL statements are provided.
705 :raise UnsupportedDatabaseError: if the trigger type is not supported by the database.
706 """
707 if len(self._sql_list) == 0:
708 raise MissingSQLStatement('No SQL statements provided')
710 dialect = self._get_dialect()
712 # Check if the trigger type is supported
713 if not dialect.supports_trigger_type(self.trigger_when, self.trigger_action, self.on_view):
714 raise UnsupportedDatabaseError(
715 f'Trigger type {self.trigger_when} {self.trigger_action} is not supported by the database'
716 )
718 # Check if safe create is supported
719 if self.safe and not dialect.supports_safe_create():
720 # We can either ignore and continue without safe, or raise an error
721 # For now, we'll just ignore and continue
722 self.safe = False
724 # Check if update columns are supported
725 if self.update_columns and not dialect.supports_update_of_columns():
726 # We can either ignore and continue without column-specific updates, or raise an error
727 # For now, we'll ignore and continue
728 self.update_columns = []
730 # Generate the SQL
731 return dialect.create_trigger_sql(self)
733 def drop(self, safe: bool = True) -> str:
734 """
735 Generates the SQL drop statement.
737 :param safe: If True, add an IF EXIST. Defaults to True.
738 :type safe: bool, Optional
739 :return: The drop statement
740 :rtype: str
741 """
742 dialect = self._get_dialect()
743 return dialect.drop_trigger_sql(self.trigger_name, safe)