import json import time from collections.abc import Mapping from datetime import datetime from functools import cached_property from typing import Any, cast import sqlalchemy as sa from sqlalchemy import DateTime, Index, Integer, String, UniqueConstraint, func from sqlalchemy.orm import Mapped, mapped_column from core.plugin.entities.plugin_daemon import CredentialType from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity from core.trigger.entities.entities import Subscription from core.trigger.utils.endpoint import generate_plugin_trigger_endpoint_url, generate_webhook_trigger_endpoint from libs.datetime_utils import naive_utc_now from models.base import Base from models.engine import db from models.enums import AppTriggerStatus, AppTriggerType, CreatorUserRole, WorkflowTriggerStatus from models.model import Account from models.types import EnumText, StringUUID class TriggerSubscription(Base): """ Trigger provider model for managing credentials Supports multiple credential instances per provider """ __tablename__ = "trigger_subscriptions" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="trigger_provider_pkey"), Index("idx_trigger_providers_tenant_provider", "tenant_id", "provider_id"), # Primary index for O(1) lookup by endpoint Index("idx_trigger_providers_endpoint", "endpoint_id", unique=True), # Composite index for tenant-specific queries (optional, kept for compatibility) Index("idx_trigger_providers_tenant_endpoint", "tenant_id", "endpoint_id"), UniqueConstraint("tenant_id", "provider_id", "name", name="unique_trigger_provider"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) name: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription instance name") tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) user_id: Mapped[str] = mapped_column(StringUUID, nullable=False) provider_id: Mapped[str] = mapped_column( String(255), nullable=False, comment="Provider identifier (e.g., plugin_id/provider_name)" ) endpoint_id: Mapped[str] = mapped_column(String(255), nullable=False, comment="Subscription endpoint") parameters: Mapped[dict[str, Any]] = mapped_column(sa.JSON, nullable=False, comment="Subscription parameters JSON") properties: Mapped[dict[str, Any]] = mapped_column(sa.JSON, nullable=False, comment="Subscription properties JSON") credentials: Mapped[dict[str, Any]] = mapped_column( sa.JSON, nullable=False, comment="Subscription credentials JSON" ) credential_type: Mapped[str] = mapped_column(String(50), nullable=False, comment="oauth or api_key") credential_expires_at: Mapped[int] = mapped_column( Integer, default=-1, comment="OAuth token expiration timestamp, -1 for never" ) expires_at: Mapped[int] = mapped_column( Integer, default=-1, comment="Subscription instance expiration timestamp, -1 for never" ) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( DateTime, nullable=False, server_default=func.current_timestamp(), server_onupdate=func.current_timestamp(), ) def is_credential_expired(self) -> bool: """Check if credential is expired""" if self.credential_expires_at == -1: return False # Check if token expires in next 3 minutes return (self.credential_expires_at - 180) < int(time.time()) def to_entity(self) -> Subscription: return Subscription( expires_at=self.expires_at, endpoint=generate_plugin_trigger_endpoint_url(self.endpoint_id), parameters=self.parameters, properties=self.properties, ) def to_api_entity(self) -> TriggerProviderSubscriptionApiEntity: return TriggerProviderSubscriptionApiEntity( id=self.id, name=self.name, provider=self.provider_id, endpoint=generate_plugin_trigger_endpoint_url(self.endpoint_id), parameters=self.parameters, properties=self.properties, credential_type=CredentialType(self.credential_type), credentials=self.credentials, workflows_in_use=-1, ) # system level trigger oauth client params class TriggerOAuthSystemClient(Base): __tablename__ = "trigger_oauth_system_clients" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="trigger_oauth_system_client_pkey"), sa.UniqueConstraint("plugin_id", "provider", name="trigger_oauth_system_client_plugin_id_provider_idx"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) plugin_id: Mapped[str] = mapped_column(String(512), nullable=False) provider: Mapped[str] = mapped_column(String(255), nullable=False) # oauth params of the trigger provider encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( DateTime, nullable=False, server_default=func.current_timestamp(), server_onupdate=func.current_timestamp(), ) # tenant level trigger oauth client params (client_id, client_secret, etc.) class TriggerOAuthTenantClient(Base): __tablename__ = "trigger_oauth_tenant_clients" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="trigger_oauth_tenant_client_pkey"), sa.UniqueConstraint("tenant_id", "plugin_id", "provider", name="unique_trigger_oauth_tenant_client"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) # tenant id tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) plugin_id: Mapped[str] = mapped_column(String(512), nullable=False) provider: Mapped[str] = mapped_column(String(255), nullable=False) enabled: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true")) # oauth params of the trigger provider encrypted_oauth_params: Mapped[str] = mapped_column(sa.Text, nullable=False) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( DateTime, nullable=False, server_default=func.current_timestamp(), server_onupdate=func.current_timestamp(), ) @property def oauth_params(self) -> Mapping[str, Any]: return cast(Mapping[str, Any], json.loads(self.encrypted_oauth_params or "{}")) class WorkflowTriggerLog(Base): """ Workflow Trigger Log Track async trigger workflow runs with re-invocation capability Attributes: - id (uuid) Trigger Log ID (used as workflow_trigger_log_id) - tenant_id (uuid) Workspace ID - app_id (uuid) App ID - workflow_id (uuid) Workflow ID - workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts - root_node_id (string) Optional - Custom starting node ID for workflow execution - trigger_metadata (text) Optional - Trigger metadata (JSON) - trigger_type (string) Type of trigger: webhook, schedule, plugin - trigger_data (text) Full trigger data including inputs (JSON) - inputs (text) Input parameters (JSON) - outputs (text) Optional - Output content (JSON) - status (string) Execution status - error (text) Optional - Error message if failed - queue_name (string) Celery queue used - celery_task_id (string) Optional - Celery task ID for tracking - retry_count (int) Number of retry attempts - elapsed_time (float) Optional - Time consumption in seconds - total_tokens (int) Optional - Total tokens used - created_by_role (string) Creator role: account, end_user - created_by (string) Creator ID - created_at (timestamp) Creation time - triggered_at (timestamp) Optional - When actually triggered - finished_at (timestamp) Optional - Completion time """ __tablename__ = "workflow_trigger_logs" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_trigger_log_pkey"), sa.Index("workflow_trigger_log_tenant_app_idx", "tenant_id", "app_id"), sa.Index("workflow_trigger_log_status_idx", "status"), sa.Index("workflow_trigger_log_created_at_idx", "created_at"), sa.Index("workflow_trigger_log_workflow_run_idx", "workflow_run_id"), sa.Index("workflow_trigger_log_workflow_id_idx", "workflow_id"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False) workflow_run_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) root_node_id: Mapped[str | None] = mapped_column(String(255), nullable=True) trigger_metadata: Mapped[str] = mapped_column(sa.Text, nullable=False) trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False) trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON inputs: Mapped[str] = mapped_column(sa.Text, nullable=False) # Just inputs for easy viewing outputs: Mapped[str | None] = mapped_column(sa.Text, nullable=True) status: Mapped[str] = mapped_column( EnumText(WorkflowTriggerStatus, length=50), nullable=False, default=WorkflowTriggerStatus.PENDING ) error: Mapped[str | None] = mapped_column(sa.Text, nullable=True) queue_name: Mapped[str] = mapped_column(String(100), nullable=False) celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True) retry_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, default=0) elapsed_time: Mapped[float | None] = mapped_column(sa.Float, nullable=True) total_tokens: Mapped[int | None] = mapped_column(sa.Integer, nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) created_by_role: Mapped[str] = mapped_column(String(255), nullable=False) created_by: Mapped[str] = mapped_column(String(255), nullable=False) triggered_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) finished_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) @property def created_by_account(self): created_by_role = CreatorUserRole(self.created_by_role) return db.session.get(Account, self.created_by) if created_by_role == CreatorUserRole.ACCOUNT else None @property def created_by_end_user(self): from models.model import EndUser created_by_role = CreatorUserRole(self.created_by_role) return db.session.get(EndUser, self.created_by) if created_by_role == CreatorUserRole.END_USER else None def to_dict(self) -> dict[str, Any]: """Convert to dictionary for API responses""" return { "id": self.id, "tenant_id": self.tenant_id, "app_id": self.app_id, "workflow_id": self.workflow_id, "workflow_run_id": self.workflow_run_id, "root_node_id": self.root_node_id, "trigger_metadata": json.loads(self.trigger_metadata) if self.trigger_metadata else None, "trigger_type": self.trigger_type, "trigger_data": json.loads(self.trigger_data), "inputs": json.loads(self.inputs), "outputs": json.loads(self.outputs) if self.outputs else None, "status": self.status, "error": self.error, "queue_name": self.queue_name, "celery_task_id": self.celery_task_id, "retry_count": self.retry_count, "elapsed_time": self.elapsed_time, "total_tokens": self.total_tokens, "created_by_role": self.created_by_role, "created_by": self.created_by, "created_at": self.created_at.isoformat() if self.created_at else None, "triggered_at": self.triggered_at.isoformat() if self.triggered_at else None, "finished_at": self.finished_at.isoformat() if self.finished_at else None, } class WorkflowWebhookTrigger(Base): """ Workflow Webhook Trigger Attributes: - id (uuid) Primary key - app_id (uuid) App ID to bind to a specific app - node_id (varchar) Node ID which node in the workflow - tenant_id (uuid) Workspace ID - webhook_id (varchar) Webhook ID for URL: https://api.dify.ai/triggers/webhook/:webhook_id - created_by (varchar) User ID of the creator - created_at (timestamp) Creation time - updated_at (timestamp) Last update time """ __tablename__ = "workflow_webhook_triggers" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_webhook_trigger_pkey"), sa.Index("workflow_webhook_trigger_tenant_idx", "tenant_id"), sa.UniqueConstraint("app_id", "node_id", name="uniq_node"), sa.UniqueConstraint("webhook_id", name="uniq_webhook_id"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) node_id: Mapped[str] = mapped_column(String(64), nullable=False) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) webhook_id: Mapped[str] = mapped_column(String(24), nullable=False) created_by: Mapped[str] = mapped_column(StringUUID, nullable=False) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( DateTime, nullable=False, server_default=func.current_timestamp(), server_onupdate=func.current_timestamp(), ) @cached_property def webhook_url(self): """ Generated webhook url """ return generate_webhook_trigger_endpoint(self.webhook_id) @cached_property def webhook_debug_url(self): """ Generated debug webhook url """ return generate_webhook_trigger_endpoint(self.webhook_id, True) class WorkflowPluginTrigger(Base): """ Workflow Plugin Trigger Maps plugin triggers to workflow nodes, similar to WorkflowWebhookTrigger Attributes: - id (uuid) Primary key - app_id (uuid) App ID to bind to a specific app - node_id (varchar) Node ID which node in the workflow - tenant_id (uuid) Workspace ID - provider_id (varchar) Plugin provider ID - event_name (varchar) trigger name - subscription_id (varchar) Subscription ID - created_at (timestamp) Creation time - updated_at (timestamp) Last update time """ __tablename__ = "workflow_plugin_triggers" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_plugin_trigger_pkey"), sa.Index("workflow_plugin_trigger_tenant_subscription_idx", "tenant_id", "subscription_id", "event_name"), sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node_subscription"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) node_id: Mapped[str] = mapped_column(String(64), nullable=False) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) provider_id: Mapped[str] = mapped_column(String(512), nullable=False) event_name: Mapped[str] = mapped_column(String(255), nullable=False) subscription_id: Mapped[str] = mapped_column(String(255), nullable=False) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( DateTime, nullable=False, server_default=func.current_timestamp(), server_onupdate=func.current_timestamp(), ) class AppTrigger(Base): """ App Trigger Manages multiple triggers for an app with enable/disable and authorization states. Attributes: - id (uuid) Primary key - tenant_id (uuid) Workspace ID - app_id (uuid) App ID - trigger_type (string) Type: webhook, schedule, plugin - title (string) Trigger title - status (string) Status: enabled, disabled, unauthorized, error - node_id (string) Optional workflow node ID - created_at (timestamp) Creation time - updated_at (timestamp) Last update time """ __tablename__ = "app_triggers" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="app_trigger_pkey"), sa.Index("app_trigger_tenant_app_idx", "tenant_id", "app_id"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) node_id: Mapped[str | None] = mapped_column(String(64), nullable=False) trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False) title: Mapped[str] = mapped_column(String(255), nullable=False) provider_name: Mapped[str] = mapped_column(String(255), server_default="", nullable=True) status: Mapped[str] = mapped_column( EnumText(AppTriggerStatus, length=50), nullable=False, default=AppTriggerStatus.ENABLED ) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( DateTime, nullable=False, default=naive_utc_now(), server_onupdate=func.current_timestamp(), ) class WorkflowSchedulePlan(Base): """ Workflow Schedule Configuration Store schedule configurations for time-based workflow triggers. Uses cron expressions with timezone support for flexible scheduling. Attributes: - id (uuid) Primary key - app_id (uuid) App ID to bind to a specific app - node_id (varchar) Starting node ID for workflow execution - tenant_id (uuid) Workspace ID for multi-tenancy - cron_expression (varchar) Cron expression defining schedule pattern - timezone (varchar) Timezone for cron evaluation (e.g., 'Asia/Shanghai') - next_run_at (timestamp) Next scheduled execution time - created_at (timestamp) Creation timestamp - updated_at (timestamp) Last update timestamp """ __tablename__ = "workflow_schedule_plans" __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_schedule_plan_pkey"), sa.UniqueConstraint("app_id", "node_id", name="uniq_app_node"), sa.Index("workflow_schedule_plan_next_idx", "next_run_at"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuidv7()")) app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) node_id: Mapped[str] = mapped_column(String(64), nullable=False) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) # Schedule configuration cron_expression: Mapped[str] = mapped_column(String(255), nullable=False) timezone: Mapped[str] = mapped_column(String(64), nullable=False) # Schedule control next_run_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) updated_at: Mapped[datetime] = mapped_column( DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp() ) def to_dict(self) -> dict[str, Any]: """Convert to dictionary representation""" return { "id": self.id, "app_id": self.app_id, "node_id": self.node_id, "tenant_id": self.tenant_id, "cron_expression": self.cron_expression, "timezone": self.timezone, "next_run_at": self.next_run_at.isoformat() if self.next_run_at else None, "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat(), }