Coverage for src / mafw / db / trigger.py: 98%
294 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 09:03 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-12 09:03 +0000
1# Copyright 2025–2026 European Union
2# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
3# SPDX-License-Identifier: EUPL-1.2
4"""
5Module provides a Trigger class and related tools to create triggers in the database via the ORM.
7It supports SQLite, MySQL and PostgreSQL with dialect-specific SQL generation.
8"""
10from abc import ABC, abstractmethod
11from enum import StrEnum
12from typing import Any, Self, cast
14import peewee
15from peewee import Model
17from mafw.db.db_types import PeeweeModelWithMeta
18from mafw.mafw_errors import MissingSQLStatement, UnsupportedDatabaseError
19from mafw.tools.regexp import normalize_sql_spaces
22def and_(*conditions: str) -> str:
23 """
24 Concatenates conditions with logical AND.
26 :param conditions: The condition to join.
27 :type conditions: str
28 :return: The and-concatenated string of conditions
29 :rtype: str
30 """
31 conditions_l = [f'({c})' for c in conditions]
32 return ' AND '.join(conditions_l)
35def or_(*conditions: str) -> str:
36 """
37 Concatenates conditions with logical OR.
39 :param conditions: The condition to join.
40 :type conditions: str
41 :return: The or-concatenated string of conditions.
42 :rtype: str
43 """
44 conditions_l = [f'({c})' for c in conditions]
45 return ' OR '.join(conditions_l)
48class TriggerWhen(StrEnum):
49 """String enumerator for the trigger execution time (Before, After or Instead Of)"""
51 Before = 'BEFORE'
52 After = 'AFTER'
53 Instead = 'INSTEAD OF'
56class TriggerAction(StrEnum):
57 """String enumerator for the trigger action (Delete, Insert, Update)"""
59 Delete = 'DELETE'
60 Insert = 'INSERT'
61 Update = 'UPDATE'
64class TriggerDialect(ABC):
65 """Abstract base class for database-specific trigger SQL generation."""
67 @abstractmethod
68 def create_trigger_sql(self, trigger: 'Trigger') -> str:
69 """
70 Generate the SQL to create a trigger for a specific database dialect.
72 :param trigger: The trigger object
73 :return: SQL string to create the trigger
74 """
75 pass # pragma: no cover
77 @abstractmethod
78 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
79 """
80 Generate the SQL to drop a trigger for a specific database dialect.
82 :param trigger_name: The name of the trigger to drop
83 :type trigger_name: str
84 :param safe: If True, add an IF EXISTS clause. Defaults to True.
85 :type safe: bool, Optional
86 :param table_name: The name of the target table for the trigger. Defaults to None.
87 :type table_name: str, Optional
88 :return: SQL string to drop the trigger
89 :rtype: str
90 """
91 pass # pragma: no cover
93 @abstractmethod
94 def select_all_trigger_sql(self) -> str:
95 pass # pragma: no cover
97 @abstractmethod
98 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
99 """
100 Check if the database supports the specified trigger type.
102 :param when: When the trigger should fire (BEFORE, AFTER, INSTEAD OF)
103 :param action: The action that triggers the trigger (INSERT, UPDATE, DELETE)
104 :param on_view: Whether the trigger is on a view
105 :return: True if supported, False otherwise
106 """
107 pass # pragma: no cover
109 @abstractmethod
110 def supports_safe_create(self) -> bool:
111 """
112 Check if the database supports IF NOT EXISTS for triggers.
114 :return: True if supported, False otherwise
115 """
116 pass # pragma: no cover
118 @abstractmethod
119 def supports_update_of_columns(self) -> bool:
120 """
121 Check if the database supports column-specific UPDATE triggers.
123 :return: True if supported, False otherwise
124 """
125 pass # pragma: no cover
127 @abstractmethod
128 def supports_when_clause(self) -> bool:
129 """
130 Check if the database supports WHEN conditions.
132 :return: True if supported, False otherwise
133 """
134 pass # pragma: no cover
137class SQLiteDialect(TriggerDialect):
138 """SQLite-specific trigger SQL generation."""
140 def create_trigger_sql(self, trigger: 'Trigger') -> str:
141 """Generate SQLite trigger SQL."""
142 if_not_exists = 'IF NOT EXISTS' if trigger.safe else ''
143 of_columns = (
144 f'OF {", ".join(trigger.update_columns)}'
145 if trigger.trigger_action == TriggerAction.Update and trigger.update_columns
146 else ''
147 )
148 for_each_row = 'FOR EACH ROW' if trigger.for_each_row else ''
149 compiled_conditions = trigger._compile_when_conditions()
150 when_clause = f'WHEN {" AND ".join(f"({c})" for c in compiled_conditions)}' if compiled_conditions else ''
151 sql_statements = '\n'.join(trigger._compile_sql_statement(stmt) for stmt in trigger._sql_list)
153 return normalize_sql_spaces(
154 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n'
155 f'{trigger.trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n'
156 f'{for_each_row} {when_clause}\n'
157 f'BEGIN\n'
158 f'{sql_statements}\n'
159 f'END;'
160 )
162 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
163 """Generate SQLite drop trigger SQL."""
164 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}')
166 def select_all_trigger_sql(self) -> str:
167 return "SELECT name AS trigger_name, tbl_name AS table_name FROM sqlite_master WHERE type = 'trigger';"
169 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
170 """SQLite supports all trigger types except INSTEAD OF on tables (only on views)."""
171 if when == TriggerWhen.Instead and not on_view:
172 return False
173 return True
175 def supports_safe_create(self) -> bool:
176 """SQLite supports IF NOT EXISTS for triggers."""
177 return True
179 def supports_update_of_columns(self) -> bool:
180 """SQLite supports column-specific UPDATE triggers."""
181 return True
183 def supports_when_clause(self) -> bool:
184 """SQLite supports WHEN conditions."""
185 return True
188class MySQLDialect(TriggerDialect):
189 """MySQL-specific trigger SQL generation."""
191 def create_trigger_sql(self, trigger: 'Trigger') -> str:
192 """Generate MySQL trigger SQL."""
193 # MySQL doesn't support INSTEAD OF triggers
194 # MySQL doesn't support column-specific UPDATE triggers
195 # MySQL requires FOR EACH ROW
197 if_not_exists = 'IF NOT EXISTS' if trigger.safe else ''
199 # In MySQL, we need to convert WHEN conditions to IF/THEN/END IF blocks
200 sql_statements = []
202 compiled_conditions = trigger._compile_when_conditions()
203 compiled_statements = [trigger._compile_sql_statement(stmt) for stmt in trigger._sql_list]
205 # If there are conditional statements, wrap them in IF blocks
206 if compiled_conditions:
207 condition = ' AND '.join(f'({c})' for c in compiled_conditions)
208 sql_statements.append(f'IF {condition} THEN')
210 # Add the SQL statements with indentation
211 for stmt in compiled_statements:
212 sql_statements.append(f' {stmt}')
214 # Close the IF block
215 sql_statements.append('END IF;')
216 else:
217 # No conditions, just add the SQL statements directly
218 sql_statements.extend(compiled_statements)
220 # Join all statements
221 trigger_body = '\n'.join(sql_statements)
223 # Construct the final SQL
224 sql = (
225 f'CREATE TRIGGER {if_not_exists} {trigger.trigger_name}\n'
226 f'{trigger.trigger_when} {trigger.trigger_action} ON {trigger.target_table}\n'
227 f'FOR EACH ROW\n'
228 f'BEGIN\n'
229 f'{trigger_body}\n'
230 f'END;'
231 )
232 return normalize_sql_spaces(sql)
234 def select_all_trigger_sql(self) -> str:
235 return 'SELECT trigger_name, event_object_table AS table_name FROM information_schema.TRIGGERS WHERE TRIGGER_SCHEMA = DATABASE();'
237 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
238 """Generate MySQL drop trigger SQL."""
239 return normalize_sql_spaces(f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name}')
241 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
242 """MySQL doesn't support INSTEAD OF triggers."""
243 return when != TriggerWhen.Instead
245 def supports_safe_create(self) -> bool:
246 """MySQL supports IF NOT EXISTS for triggers."""
247 return True
249 def supports_update_of_columns(self) -> bool:
250 """MySQL doesn't support column-specific UPDATE triggers."""
251 return False
253 def supports_when_clause(self) -> bool:
254 """MySQL supports conditions but through WHERE instead of WHEN."""
255 return True
258class PostgreSQLDialect(TriggerDialect):
259 """PostgreSQL-specific trigger SQL generation."""
261 def create_trigger_sql(self, trigger: 'Trigger') -> str:
262 """Generate PostgreSQL trigger SQL."""
263 # PostgreSQL handles INSTEAD OF differently
264 # PostgreSQL uses functions for trigger bodies
266 function_name = f'fn_{trigger.trigger_name}'
268 # First create the function
269 function_sql = f'CREATE OR REPLACE FUNCTION {function_name}() RETURNS TRIGGER AS $$\nBEGIN\n'
271 compiled_conditions = trigger._compile_when_conditions()
272 compiled_statements = [self._clean_sql(trigger._compile_sql_statement(stmt)) for stmt in trigger._sql_list]
274 # Add WHEN condition as IF statements if needed
275 if compiled_conditions:
276 when_condition = ' AND '.join(f'({c})' for c in compiled_conditions)
277 function_sql += f' IF {when_condition} THEN\n'
278 for sql in compiled_statements:
279 function_sql += f' {sql}\n'
280 function_sql += ' END IF;\n'
281 else:
282 for sql in compiled_statements:
283 function_sql += f' {sql}\n'
285 # For AFTER triggers, we need to return NULL
286 if trigger.trigger_when == TriggerWhen.After:
287 function_sql += ' RETURN NULL;\n'
288 # For BEFORE DELETE triggers, we must return OLD to allow the delete to proceed.
289 elif trigger.trigger_when == TriggerWhen.Before and trigger.trigger_action == TriggerAction.Delete: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 function_sql += ' RETURN OLD;\n'
291 # For BEFORE or INSTEAD OF triggers, we need to return NEW
292 else:
293 function_sql += ' RETURN NEW;\n'
295 function_sql += 'END;\n$$ LANGUAGE plpgsql;'
297 # Then create the trigger - PostgreSQL doesn't support IF NOT EXISTS for triggers before v14
298 # We'll handle this through a conditional drop
299 drop_if_exists = ''
300 if trigger.safe:
301 drop_if_exists = f'DROP TRIGGER IF EXISTS {trigger.trigger_name} ON {trigger.target_table} CASCADE;\n'
303 # PostgreSQL uses different syntax for INSTEAD OF (only allowed on views)
304 trigger_when = trigger.trigger_when
306 # Column-specific triggers in PostgreSQL
307 of_columns = (
308 f'OF {", ".join(trigger.update_columns)}'
309 if trigger.update_columns and trigger.trigger_action == TriggerAction.Update
310 else ''
311 )
313 for_each = 'FOR EACH ROW' if trigger.for_each_row else 'FOR EACH STATEMENT'
315 trigger_sql = (
316 f'{drop_if_exists}'
317 f'CREATE TRIGGER {trigger.trigger_name}\n'
318 f'{trigger_when} {trigger.trigger_action} {of_columns} ON {trigger.target_table}\n'
319 f'{for_each}\n'
320 f'EXECUTE FUNCTION {function_name}();'
321 )
323 sql = f'{function_sql}\n\n{trigger_sql}'
325 return normalize_sql_spaces(sql)
327 def _clean_sql(self, sql: str) -> str:
328 """
329 Remove RETURNING clauses from SQL statements for PostgreSQL trigger functions.
331 :param sql: The SQL statement
332 :return: SQL statement without RETURNING clause
333 """
334 # Find the RETURNING clause position - case insensitive search
335 sql_upper = sql.upper()
336 returning_pos = sql_upper.find('RETURNING')
338 # If RETURNING exists, remove it and everything after it up to the semicolon
339 if returning_pos != -1:
340 semicolon_pos = sql.find(';', returning_pos)
341 if semicolon_pos != -1:
342 return sql[:returning_pos] + ';'
343 return sql[:returning_pos]
344 return sql
346 def drop_trigger_sql(self, trigger_name: str, safe: bool = True, table_name: str | None = None) -> str:
347 """Generate PostgreSQL drop trigger SQL."""
348 if table_name is None:
349 raise RuntimeError('Cannot drop a trigger in PostgreSQL without a table_name')
351 function_name = f'fn_{trigger_name}'
352 return normalize_sql_spaces(
353 f'DROP TRIGGER {"IF EXISTS" if safe else ""} {trigger_name} ON {table_name};\n'
354 f'DROP FUNCTION {"IF EXISTS" if safe else ""} {function_name}();'
355 )
357 def select_all_trigger_sql(self) -> str:
358 return "SELECT trigger_name, event_object_table AS table_name FROM information_schema.triggers WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema');"
360 def supports_trigger_type(self, when: TriggerWhen, action: TriggerAction, on_view: bool = False) -> bool:
361 """PostgreSQL supports INSTEAD OF only on views."""
362 if when == TriggerWhen.Instead and not on_view:
363 return False
364 return True
366 def supports_safe_create(self) -> bool:
367 """PostgreSQL doesn't support IF NOT EXISTS for triggers before v14, but we implement safety differently."""
368 return True # We report True but handle it with DROP IF EXISTS
370 def supports_update_of_columns(self) -> bool:
371 """PostgreSQL supports column-specific UPDATE triggers."""
372 return True
374 def supports_when_clause(self) -> bool:
375 """PostgreSQL supports WHEN conditions."""
376 return True
379class Trigger:
380 """Trigger template wrapper for use with peewee ORM."""
382 # noinspection PyProtectedMember
383 def __init__(
384 self,
385 trigger_name: str,
386 trigger_type: tuple[TriggerWhen, TriggerAction],
387 source_table: type[Model] | Model | str,
388 safe: bool = False,
389 for_each_row: bool = False,
390 update_columns: list[str] | None = None,
391 on_view: bool = False, # Added parameter to indicate if the target is a view
392 ):
393 """
394 Constructor parameters:
396 :param trigger_name: The name of this trigger. It needs to be unique!
397 :type trigger_name: str
398 :param trigger_type: A tuple with :class:`TriggerWhen` and :class:`TriggerAction` to specify on which action
399 the trigger should be invoked and if before, after or instead of.
400 :type trigger_type: tuple[TriggerWhen, TriggerAction]
401 :param source_table: The table originating the trigger. It can be a model class, instance, or also the name of
402 the table.
403 :type source_table: type[Model] | Model | str
404 :param safe: A boolean flag to define if in the trigger creation statement a 'IF NOT EXISTS' clause should be
405 included. Defaults to False
406 :type safe: bool, Optional
407 :param for_each_row: A boolean flag to repeat the script content for each modified row in the table.
408 Defaults to False.
409 :type for_each_row: bool, Optional
410 :param update_columns: A list of column names. When defining a trigger on a table update, it is possible to
411 restrict the firing of the trigger to the cases when a subset of all columns have been updated. An column
412 is updated also when the new value is equal to the old one. If you want to discriminate this case, use the
413 :meth:`add_when` method. Defaults to None.
414 :type update_columns: list[str], Optional
415 :param on_view: A boolean flag to indicate if the target is a view. This affects the support for INSTEAD OF.
416 Defaults to False.
417 :type on_view: bool, Optional
418 """
419 self.trigger_name = trigger_name
420 self.trigger_type = trigger_type
421 self._trigger_when, self._trigger_op = self.trigger_type
422 self.update_columns = update_columns or []
423 self.on_view = on_view
425 if isinstance(source_table, type):
426 model_cls = cast(PeeweeModelWithMeta, source_table)
427 self.target_table = model_cls._meta.table_name
428 elif isinstance(source_table, Model):
429 model_instance = cast(PeeweeModelWithMeta, source_table)
430 self.target_table = model_instance._meta.table_name
431 else:
432 self.target_table = source_table
434 self.safe = safe
435 self.for_each_row = for_each_row
437 self._when_list: list[str | peewee.Node] = []
438 self._sql_list: list[str | peewee.Query] = []
439 self._database: peewee.Database | peewee.DatabaseProxy | None = None
440 self._dialect: TriggerDialect | None = None
442 @property
443 def trigger_action(self) -> TriggerAction:
444 return self._trigger_op
446 @trigger_action.setter
447 def trigger_action(self, action: TriggerAction) -> None:
448 self._trigger_op = action
450 @property
451 def trigger_when(self) -> TriggerWhen:
452 return self._trigger_when
454 @trigger_when.setter
455 def trigger_when(self, when: TriggerWhen) -> None:
456 self._trigger_when = when
458 def __setattr__(self, key: Any, value: Any) -> None:
459 if key == 'safe':
460 self.if_not_exists = 'IF NOT EXISTS' if value else ''
461 elif key == 'for_each_row':
462 self._for_each_row = 'FOR EACH ROW' if value else ''
463 else:
464 super().__setattr__(key, value)
466 def __getattr__(self, item: str) -> Any:
467 """
468 Custom attribute getter for computed properties.
470 :param item: The attribute name to get
471 :return: The attribute value
472 :raises AttributeError: If the attribute doesn't exist
473 """
474 if item == 'safe':
475 return hasattr(self, 'if_not_exists') and self.if_not_exists == 'IF NOT EXISTS'
476 elif item == 'for_each_row':
477 return hasattr(self, '_for_each_row') and self._for_each_row == 'FOR EACH ROW'
478 else:
479 # Raise AttributeError for non-existent attributes (standard Python behaviour)
480 raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{item}'")
482 def add_sql(self, sql: str | peewee.Query) -> Self:
483 """
484 Add an SQL statement to be executed by the trigger.
486 The ``sql`` can be either a string containing the sql statement, or it can be any other peewee Query.
488 For example:
490 .. code-block:: python
492 # assuming you have created a trigger ...
494 sql = AnotherTable.insert(
495 field1=some_value, field2=another_value
496 )
497 trigger.add_sql(sql)
499 In this way the SQL code is generated with parametric placeholder if needed.
501 :param sql: The SQL statement.
502 :type sql: str | peewee.Query
503 :return: self for easy chaining
504 :rtype: Trigger
505 """
506 self._sql_list.append(sql)
507 return self
509 def add_when(self, *conditions: str | peewee.Node) -> Self:
510 """
511 Add conditions to the `when` statements.
513 Conditions are logically ANDed.
514 To have mixed `OR` and `AND` logic, use the functions :func:`and_` and :func:`or_`.
516 The ``conditions`` can be either strings containing SQL conditions, or peewee Node objects
517 (such as Expression or Query objects).
519 For example:
521 .. code-block:: python
523 # String condition
524 trigger.add_when("NEW.status = 'active'")
526 # Peewee expression
527 subq = TriggerStatus.select(TriggerStatus.status).where(
528 TriggerStatus.trigger_type == 'DELETE_FILES'
529 )
530 trigger.add_when(Value(1) == subq)
532 .. versionchanged:: v2.0.0
533 The argument can also be a generic peewee Node.
535 :param conditions: Conditions to be added with logical AND. Can be strings or peewee Node objects.
536 :type conditions: str | peewee.Node
537 :return: self for easy chaining
538 :rtype: Trigger
539 """
540 for c in conditions:
541 self._when_list.append(c)
542 return self
544 def _node_to_sql(self, node: peewee.Node) -> str:
545 """
546 Convert a peewee Node (Expression, Query, etc.) to a SQL string with interpolated parameters.
548 This is based on peewee's internal query_to_string function for debugging/logging purposes.
550 .. versionadded:: v2.0.0
552 :param node: A peewee Node object
553 :return: SQL string with parameters interpolated
554 """
555 from peewee import Context, Expression, Select
557 # Check if this is an Expression with lhs and rhs attributes (like comparisons)
558 if isinstance(node, Expression) and hasattr(node, 'lhs') and hasattr(node, 'rhs'):
559 # Recursively convert left and right sides
560 lhs_sql = self._node_to_sql(node.lhs) if isinstance(node.lhs, peewee.Node) else self._value_to_sql(node.lhs)
561 rhs_sql = self._node_to_sql(node.rhs) if isinstance(node.rhs, peewee.Node) else self._value_to_sql(node.rhs)
563 # Get the operator (e.g., '=', '>', '<', etc.)
564 op = getattr(node, 'op', '=')
566 # For subqueries on either side, wrap them in parentheses
567 if isinstance(node.lhs, Select):
568 lhs_sql = f'({lhs_sql})'
569 if isinstance(node.rhs, Select):
570 rhs_sql = f'({rhs_sql})'
572 return f'{lhs_sql} {op} {rhs_sql}'
574 # For other node types, use the standard Context approach
575 # Get database context if available, otherwise use a default Context
576 db = self.database
577 if db is not None:
578 ctx = db.get_sql_context()
579 else:
580 ctx = Context()
582 # Generate SQL with parameters
583 sql, params = ctx.sql(node).query()
585 # If no parameters, return as-is
586 if not params:
587 return cast(str, sql)
589 # Interpolate parameters into the SQL string
590 # This is safe for trigger definitions (not for execution)
591 param_placeholder = getattr(ctx.state, 'param', '?') or '?'
592 if param_placeholder == '?':
593 sql = sql.replace('?', '%s')
595 # Transform parameters to SQL-safe values
596 transformed_params = [self._value_to_sql(v) for v in params]
598 interpolated_str = sql % tuple(transformed_params)
599 return cast(str, interpolated_str)
601 def _value_to_sql(self, value: Any) -> str:
602 """
603 Convert a Python value to its SQL representation.
605 .. versionadded:: v2.0.0
607 :param value: A Python value (string, int, float, None, etc.)
608 :return: SQL string representation of the value
609 """
610 if isinstance(value, str):
611 # Escape single quotes by doubling them
612 escaped = value.replace("'", "''")
613 return f"'{escaped}'"
614 elif isinstance(value, bool): # bools are numbers as well!
615 return '1' if value else '0'
616 elif isinstance(value, (int, float)):
617 return str(value)
618 elif value is None:
619 return 'NULL'
620 else:
621 return str(value)
623 def set_database(self, database: peewee.Database | peewee.DatabaseProxy) -> Self:
624 """
625 Set the database to use for this trigger.
627 :param database: The database instance
628 :return: self for easy chaining
629 """
630 self._database = database
631 return self
633 @property
634 def database(self) -> peewee.Database | None:
635 """Return the actual database, unwrapping proxies."""
636 db = self._database
637 if isinstance(db, peewee.DatabaseProxy):
638 return cast(peewee.Database, db.obj)
639 return db
641 def _compile_query(self, query: peewee.Query) -> str:
642 """Compile a Peewee query using the current database dialect."""
643 db = self.database
644 sql, params = cast(tuple[str, tuple[Any, ...]], cast(Any, query).sql())
645 if params: 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true
646 if db:
647 placeholder = db.param # normally ?, but it could be set to something else
648 else: # if the db is not yet bound
649 placeholder = '?'
651 sql = sql.replace(placeholder, '%s') % tuple(self._value_to_sql(p) for p in params)
652 return sql
654 def _compile_node(self, node: peewee.Node) -> str:
655 """Compile a Peewee node to SQL using the current database context."""
656 return self._node_to_sql(node)
658 def _compile_sql_statement(self, statement: str | peewee.Query) -> str:
659 """Return a single SQL statement string with consistent formatting."""
660 if isinstance(statement, str):
661 compiled = statement
662 else:
663 compiled = self._compile_query(statement)
665 compiled = compiled.strip()
666 if not compiled.endswith(';'):
667 compiled += ';'
668 return compiled
670 def _compile_when_conditions(self) -> list[str]:
671 """Compile each when condition, preserving nodes until creation time."""
672 compiled_conditions: list[str] = []
673 for cond in self._when_list:
674 if isinstance(cond, str):
675 compiled_conditions.append(cond.strip())
676 else:
677 compiled_conditions.append(self._compile_node(cond))
678 return compiled_conditions
680 def _get_dialect(self) -> TriggerDialect:
681 """
682 Get the appropriate dialect based on the database type.
684 :return: A dialect instance
685 """
686 if self._dialect is not None:
687 return self._dialect
689 if self._database is None:
690 # Default to SQLite dialect
691 return SQLiteDialect()
693 db = self._database
694 if isinstance(db, peewee.DatabaseProxy):
695 db = db.obj # Get the actual database from the proxy
697 if isinstance(db, peewee.SqliteDatabase):
698 self._dialect = SQLiteDialect()
699 elif isinstance(db, peewee.MySQLDatabase):
700 self._dialect = MySQLDialect()
701 elif isinstance(db, peewee.PostgresqlDatabase):
702 self._dialect = PostgreSQLDialect()
703 else:
704 raise UnsupportedDatabaseError(f'Unsupported database type: {type(db)}')
706 return self._dialect
708 def create(self) -> str:
709 """
710 Generates the SQL create statement.
712 :return: The trigger creation statement.
713 :raise MissingSQLStatement: if no SQL statements are provided.
714 :raise UnsupportedDatabaseError: if the trigger type is not supported by the database.
715 """
716 if len(self._sql_list) == 0:
717 raise MissingSQLStatement('No SQL statements provided')
719 dialect = self._get_dialect()
721 # Check if the trigger type is supported
722 if not dialect.supports_trigger_type(self.trigger_when, self.trigger_action, self.on_view):
723 raise UnsupportedDatabaseError(
724 f'Trigger type {self.trigger_when} {self.trigger_action} is not supported by the database'
725 )
727 # Check if safe create is supported
728 if self.safe and not dialect.supports_safe_create():
729 # We can either ignore and continue without safe, or raise an error
730 # For now, we'll just ignore and continue
731 self.safe = False
733 # Check if update columns are supported
734 if self.update_columns and not dialect.supports_update_of_columns():
735 # We can either ignore and continue without column-specific updates, or raise an error
736 # For now, we'll ignore and continue
737 self.update_columns = []
739 # Generate the SQL
740 return dialect.create_trigger_sql(self)
742 def drop(self, safe: bool = True) -> str:
743 """
744 Generates the SQL drop statement.
746 :param safe: If True, add an IF EXIST. Defaults to True.
747 :type safe: bool, Optional
748 :return: The drop statement
749 :rtype: str
750 """
751 dialect = self._get_dialect()
752 return dialect.drop_trigger_sql(self.trigger_name, safe)