diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index c029e00553..ee092e55c5 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -35,6 +35,7 @@ from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_database import db from extensions.ext_redis import redis_client +from extensions.otel import WorkflowAppRunnerHandler, trace_span from models import Workflow from models.enums import UserFrom from models.model import App, Conversation, Message, MessageAnnotation @@ -80,6 +81,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): self._workflow_execution_repository = workflow_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository + @trace_span(WorkflowAppRunnerHandler) def run(self): app_config = self.application_generate_entity.app_config app_config = cast(AdvancedChatAppConfig, app_config) diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index d8460df390..894e6f397a 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -18,6 +18,7 @@ from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_redis import redis_client +from extensions.otel import WorkflowAppRunnerHandler, trace_span from libs.datetime_utils import naive_utc_now from models.enums import UserFrom from models.workflow import Workflow @@ -56,6 +57,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): self._workflow_execution_repository = workflow_execution_repository self._workflow_node_execution_repository = workflow_node_execution_repository + @trace_span(WorkflowAppRunnerHandler) def run(self): """ Run application diff --git a/api/extensions/ext_otel.py b/api/extensions/ext_otel.py index 20ac2503a2..40a915e68c 100644 --- a/api/extensions/ext_otel.py +++ b/api/extensions/ext_otel.py @@ -1,148 +1,22 @@ import atexit -import contextlib import logging import os import platform import socket -import sys from typing import Union -import flask -from celery.signals import worker_init -from flask_login import user_loaded_from_request, user_logged_in - from configs import dify_config from dify_app import DifyApp -from libs.helper import extract_tenant_id -from models import Account, EndUser logger = logging.getLogger(__name__) -@user_logged_in.connect -@user_loaded_from_request.connect -def on_user_loaded(_sender, user: Union["Account", "EndUser"]): - if dify_config.ENABLE_OTEL: - from opentelemetry.trace import get_current_span - - if user: - try: - current_span = get_current_span() - tenant_id = extract_tenant_id(user) - if not tenant_id: - return - if current_span: - current_span.set_attribute("service.tenant.id", tenant_id) - current_span.set_attribute("service.user.id", user.id) - except Exception: - logger.exception("Error setting tenant and user attributes") - pass - - def init_app(app: DifyApp): - from opentelemetry.semconv.trace import SpanAttributes - - def is_celery_worker(): - return "celery" in sys.argv[0].lower() - - def instrument_exception_logging(): - exception_handler = ExceptionLoggingHandler() - logging.getLogger().addHandler(exception_handler) - - def init_flask_instrumentor(app: DifyApp): - meter = get_meter("http_metrics", version=dify_config.project.version) - _http_response_counter = meter.create_counter( - "http.server.response.count", - description="Total number of HTTP responses by status code, method and target", - unit="{response}", - ) - - def response_hook(span: Span, status: str, response_headers: list): - if span and span.is_recording(): - try: - if status.startswith("2"): - span.set_status(StatusCode.OK) - else: - span.set_status(StatusCode.ERROR, status) - - status = status.split(" ")[0] - status_code = int(status) - status_class = f"{status_code // 100}xx" - attributes: dict[str, str | int] = {"status_code": status_code, "status_class": status_class} - request = flask.request - if request and request.url_rule: - attributes[SpanAttributes.HTTP_TARGET] = str(request.url_rule.rule) - if request and request.method: - attributes[SpanAttributes.HTTP_METHOD] = str(request.method) - _http_response_counter.add(1, attributes) - except Exception: - logger.exception("Error setting status and attributes") - pass - - instrumentor = FlaskInstrumentor() - if dify_config.DEBUG: - logger.info("Initializing Flask instrumentor") - instrumentor.instrument_app(app, response_hook=response_hook) - - def init_sqlalchemy_instrumentor(app: DifyApp): - with app.app_context(): - engines = list(app.extensions["sqlalchemy"].engines.values()) - SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) - - def setup_context_propagation(): - # Configure propagators - set_global_textmap( - CompositePropagator( - [ - TraceContextTextMapPropagator(), # W3C trace context - B3Format(), # B3 propagation (used by many systems) - ] - ) - ) - - def shutdown_tracer(): - provider = trace.get_tracer_provider() - if hasattr(provider, "force_flush"): - provider.force_flush() - - class ExceptionLoggingHandler(logging.Handler): - """Custom logging handler that creates spans for logging.exception() calls""" - - def emit(self, record: logging.LogRecord): - with contextlib.suppress(Exception): - if record.exc_info: - tracer = get_tracer_provider().get_tracer("dify.exception.logging") - with tracer.start_as_current_span( - "log.exception", - attributes={ - "log.level": record.levelname, - "log.message": record.getMessage(), - "log.logger": record.name, - "log.file.path": record.pathname, - "log.file.line": record.lineno, - }, - ) as span: - span.set_status(StatusCode.ERROR) - if record.exc_info[1]: - span.record_exception(record.exc_info[1]) - span.set_attribute("exception.message", str(record.exc_info[1])) - if record.exc_info[0]: - span.set_attribute("exception.type", record.exc_info[0].__name__) - - from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GRPCMetricExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HTTPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPSpanExporter - from opentelemetry.instrumentation.celery import CeleryInstrumentor - from opentelemetry.instrumentation.flask import FlaskInstrumentor - from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor - from opentelemetry.instrumentation.redis import RedisInstrumentor - from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor - from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider - from opentelemetry.propagate import set_global_textmap - from opentelemetry.propagators.b3 import B3Format - from opentelemetry.propagators.composite import CompositePropagator + from opentelemetry.metrics import set_meter_provider from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader from opentelemetry.sdk.resources import Resource @@ -153,9 +27,10 @@ def init_app(app: DifyApp): ) from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio from opentelemetry.semconv.resource import ResourceAttributes - from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider - from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator - from opentelemetry.trace.status import StatusCode + from opentelemetry.trace import set_tracer_provider + + from extensions.otel.instrumentation import init_instruments + from extensions.otel.runtime import setup_context_propagation, shutdown_tracer setup_context_propagation() # Initialize OpenTelemetry @@ -177,6 +52,7 @@ def init_app(app: DifyApp): ) sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE) provider = TracerProvider(resource=resource, sampler=sampler) + set_tracer_provider(provider) exporter: Union[GRPCSpanExporter, HTTPSpanExporter, ConsoleSpanExporter] metric_exporter: Union[GRPCMetricExporter, HTTPMetricExporter, ConsoleMetricExporter] @@ -231,29 +107,11 @@ def init_app(app: DifyApp): export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT, ) set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader])) - if not is_celery_worker(): - init_flask_instrumentor(app) - CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() - instrument_exception_logging() - init_sqlalchemy_instrumentor(app) - RedisInstrumentor().instrument() - HTTPXClientInstrumentor().instrument() + + init_instruments(app) + atexit.register(shutdown_tracer) def is_enabled(): return dify_config.ENABLE_OTEL - - -@worker_init.connect(weak=False) -def init_celery_worker(*args, **kwargs): - if dify_config.ENABLE_OTEL: - from opentelemetry.instrumentation.celery import CeleryInstrumentor - from opentelemetry.metrics import get_meter_provider - from opentelemetry.trace import get_tracer_provider - - tracer_provider = get_tracer_provider() - metric_provider = get_meter_provider() - if dify_config.DEBUG: - logger.info("Initializing OpenTelemetry for Celery worker") - CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() diff --git a/api/extensions/otel/__init__.py b/api/extensions/otel/__init__.py new file mode 100644 index 0000000000..a431698d3d --- /dev/null +++ b/api/extensions/otel/__init__.py @@ -0,0 +1,11 @@ +from extensions.otel.decorators.base import trace_span +from extensions.otel.decorators.handler import SpanHandler +from extensions.otel.decorators.handlers.generate_handler import AppGenerateHandler +from extensions.otel.decorators.handlers.workflow_app_runner_handler import WorkflowAppRunnerHandler + +__all__ = [ + "AppGenerateHandler", + "SpanHandler", + "WorkflowAppRunnerHandler", + "trace_span", +] diff --git a/api/extensions/otel/decorators/__init__.py b/api/extensions/otel/decorators/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/extensions/otel/decorators/base.py b/api/extensions/otel/decorators/base.py new file mode 100644 index 0000000000..9604a3b6d5 --- /dev/null +++ b/api/extensions/otel/decorators/base.py @@ -0,0 +1,61 @@ +import functools +import os +from collections.abc import Callable +from typing import Any, TypeVar, cast + +from opentelemetry.trace import get_tracer + +from configs import dify_config +from extensions.otel.decorators.handler import SpanHandler + +T = TypeVar("T", bound=Callable[..., Any]) + +_HANDLER_INSTANCES: dict[type[SpanHandler], SpanHandler] = {SpanHandler: SpanHandler()} + + +def _is_instrument_flag_enabled() -> bool: + """ + Check if external instrumentation is enabled via environment variable. + + Third-party non-invasive instrumentation agents set this flag to coordinate + with Dify's manual OpenTelemetry instrumentation. + """ + return os.getenv("ENABLE_OTEL_FOR_INSTRUMENT", "").strip().lower() == "true" + + +def _get_handler_instance(handler_class: type[SpanHandler]) -> SpanHandler: + """Get or create a singleton instance of the handler class.""" + if handler_class not in _HANDLER_INSTANCES: + _HANDLER_INSTANCES[handler_class] = handler_class() + return _HANDLER_INSTANCES[handler_class] + + +def trace_span(handler_class: type[SpanHandler] | None = None) -> Callable[[T], T]: + """ + Decorator that traces a function with an OpenTelemetry span. + + The decorator uses the provided handler class to create a singleton handler instance + and delegates the wrapper implementation to that handler. + + :param handler_class: Optional handler class to use for this span. If None, uses the default SpanHandler. + """ + + def decorator(func: T) -> T: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + if not (dify_config.ENABLE_OTEL or _is_instrument_flag_enabled()): + return func(*args, **kwargs) + + handler = _get_handler_instance(handler_class or SpanHandler) + tracer = get_tracer(__name__) + + return handler.wrapper( + tracer=tracer, + wrapped=func, + args=args, + kwargs=kwargs, + ) + + return cast(T, wrapper) + + return decorator diff --git a/api/extensions/otel/decorators/handler.py b/api/extensions/otel/decorators/handler.py new file mode 100644 index 0000000000..1a7def5b0b --- /dev/null +++ b/api/extensions/otel/decorators/handler.py @@ -0,0 +1,95 @@ +import inspect +from collections.abc import Callable, Mapping +from typing import Any + +from opentelemetry.trace import SpanKind, Status, StatusCode + + +class SpanHandler: + """ + Base class for all span handlers. + + Each instrumentation point provides a handler implementation that fully controls + how spans are created, annotated, and finalized through the wrapper method. + + This class provides a default implementation that creates a basic span and handles + exceptions. Handlers can override the wrapper method to customize behavior. + """ + + _signature_cache: dict[Callable[..., Any], inspect.Signature] = {} + + def _build_span_name(self, wrapped: Callable[..., Any]) -> str: + """ + Build the span name from the wrapped function. + + Handlers can override this method to customize span name generation. + + :param wrapped: The original function being traced + :return: The span name + """ + return f"{wrapped.__module__}.{wrapped.__qualname__}" + + def _extract_arguments( + self, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> dict[str, Any] | None: + """ + Extract function arguments using inspect.signature. + + Returns a dictionary of bound arguments, or None if extraction fails. + Handlers can use this to safely extract parameters from args/kwargs. + + The function signature is cached to improve performance on repeated calls. + + :param wrapped: The function being traced + :param args: Positional arguments + :param kwargs: Keyword arguments + :return: Dictionary of bound arguments, or None if extraction fails + """ + try: + if wrapped not in self._signature_cache: + self._signature_cache[wrapped] = inspect.signature(wrapped) + + sig = self._signature_cache[wrapped] + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + return bound.arguments + except Exception: + return None + + def wrapper( + self, + tracer: Any, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> Any: + """ + Fully control the wrapper behavior. + + Default implementation creates a basic span and handles exceptions. + Handlers can override this method to provide complete control over: + - Span creation and configuration + - Attribute extraction + - Function invocation + - Exception handling + - Status setting + + :param tracer: OpenTelemetry tracer instance + :param wrapped: The original function being traced + :param args: Positional arguments (including self/cls if applicable) + :param kwargs: Keyword arguments + :return: Result of calling wrapped function + """ + span_name = self._build_span_name(wrapped) + with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL) as span: + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + return result + except Exception as exc: + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, str(exc))) + raise diff --git a/api/extensions/otel/decorators/handlers/__init__.py b/api/extensions/otel/decorators/handlers/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/api/extensions/otel/decorators/handlers/__init__.py @@ -0,0 +1 @@ + diff --git a/api/extensions/otel/decorators/handlers/generate_handler.py b/api/extensions/otel/decorators/handlers/generate_handler.py new file mode 100644 index 0000000000..63748a9824 --- /dev/null +++ b/api/extensions/otel/decorators/handlers/generate_handler.py @@ -0,0 +1,64 @@ +import logging +from collections.abc import Callable, Mapping +from typing import Any + +from opentelemetry.trace import SpanKind, Status, StatusCode +from opentelemetry.util.types import AttributeValue + +from extensions.otel.decorators.handler import SpanHandler +from extensions.otel.semconv import DifySpanAttributes, GenAIAttributes +from models.model import Account + +logger = logging.getLogger(__name__) + + +class AppGenerateHandler(SpanHandler): + """Span handler for ``AppGenerateService.generate``.""" + + def wrapper( + self, + tracer: Any, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> Any: + try: + arguments = self._extract_arguments(wrapped, args, kwargs) + if not arguments: + return wrapped(*args, **kwargs) + + app_model = arguments.get("app_model") + user = arguments.get("user") + args_dict = arguments.get("args", {}) + streaming = arguments.get("streaming", True) + + if not app_model or not user or not isinstance(args_dict, dict): + return wrapped(*args, **kwargs) + app_id = getattr(app_model, "id", None) or "unknown" + tenant_id = getattr(app_model, "tenant_id", None) or "unknown" + user_id = getattr(user, "id", None) or "unknown" + workflow_id = args_dict.get("workflow_id") or "unknown" + + attributes: dict[str, AttributeValue] = { + DifySpanAttributes.APP_ID: app_id, + DifySpanAttributes.TENANT_ID: tenant_id, + GenAIAttributes.USER_ID: user_id, + DifySpanAttributes.USER_TYPE: "Account" if isinstance(user, Account) else "EndUser", + DifySpanAttributes.STREAMING: streaming, + DifySpanAttributes.WORKFLOW_ID: workflow_id, + } + + span_name = self._build_span_name(wrapped) + except Exception as exc: + logger.warning("Failed to prepare span attributes for AppGenerateService.generate: %s", exc, exc_info=True) + return wrapped(*args, **kwargs) + + with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL, attributes=attributes) as span: + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + return result + except Exception as exc: + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, str(exc))) + raise diff --git a/api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py b/api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py new file mode 100644 index 0000000000..8abd60197c --- /dev/null +++ b/api/extensions/otel/decorators/handlers/workflow_app_runner_handler.py @@ -0,0 +1,65 @@ +import logging +from collections.abc import Callable, Mapping +from typing import Any + +from opentelemetry.trace import SpanKind, Status, StatusCode +from opentelemetry.util.types import AttributeValue + +from extensions.otel.decorators.handler import SpanHandler +from extensions.otel.semconv import DifySpanAttributes, GenAIAttributes + +logger = logging.getLogger(__name__) + + +class WorkflowAppRunnerHandler(SpanHandler): + """Span handler for ``WorkflowAppRunner.run``.""" + + def wrapper( + self, + tracer: Any, + wrapped: Callable[..., Any], + args: tuple[Any, ...], + kwargs: Mapping[str, Any], + ) -> Any: + try: + arguments = self._extract_arguments(wrapped, args, kwargs) + if not arguments: + return wrapped(*args, **kwargs) + + runner = arguments.get("self") + if runner is None or not hasattr(runner, "application_generate_entity"): + return wrapped(*args, **kwargs) + + entity = runner.application_generate_entity + app_config = getattr(entity, "app_config", None) + if app_config is None: + return wrapped(*args, **kwargs) + + user_id: AttributeValue = getattr(entity, "user_id", None) or "unknown" + app_id: AttributeValue = getattr(app_config, "app_id", None) or "unknown" + tenant_id: AttributeValue = getattr(app_config, "tenant_id", None) or "unknown" + workflow_id: AttributeValue = getattr(app_config, "workflow_id", None) or "unknown" + streaming = getattr(entity, "stream", True) + + attributes: dict[str, AttributeValue] = { + DifySpanAttributes.APP_ID: app_id, + DifySpanAttributes.TENANT_ID: tenant_id, + GenAIAttributes.USER_ID: user_id, + DifySpanAttributes.STREAMING: streaming, + DifySpanAttributes.WORKFLOW_ID: workflow_id, + } + + span_name = self._build_span_name(wrapped) + except Exception as exc: + logger.warning("Failed to prepare span attributes for WorkflowAppRunner.run: %s", exc, exc_info=True) + return wrapped(*args, **kwargs) + + with tracer.start_as_current_span(span_name, kind=SpanKind.INTERNAL, attributes=attributes) as span: + try: + result = wrapped(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + return result + except Exception as exc: + span.record_exception(exc) + span.set_status(Status(StatusCode.ERROR, str(exc))) + raise diff --git a/api/extensions/otel/instrumentation.py b/api/extensions/otel/instrumentation.py new file mode 100644 index 0000000000..3597110cba --- /dev/null +++ b/api/extensions/otel/instrumentation.py @@ -0,0 +1,108 @@ +import contextlib +import logging + +import flask +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor +from opentelemetry.metrics import get_meter, get_meter_provider +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import Span, get_tracer_provider +from opentelemetry.trace.status import StatusCode + +from configs import dify_config +from dify_app import DifyApp +from extensions.otel.runtime import is_celery_worker + +logger = logging.getLogger(__name__) + + +class ExceptionLoggingHandler(logging.Handler): + def emit(self, record: logging.LogRecord): + with contextlib.suppress(Exception): + if record.exc_info: + tracer = get_tracer_provider().get_tracer("dify.exception.logging") + with tracer.start_as_current_span( + "log.exception", + attributes={ + "log.level": record.levelname, + "log.message": record.getMessage(), + "log.logger": record.name, + "log.file.path": record.pathname, + "log.file.line": record.lineno, + }, + ) as span: + span.set_status(StatusCode.ERROR) + if record.exc_info[1]: + span.record_exception(record.exc_info[1]) + span.set_attribute("exception.message", str(record.exc_info[1])) + if record.exc_info[0]: + span.set_attribute("exception.type", record.exc_info[0].__name__) + + +def instrument_exception_logging() -> None: + exception_handler = ExceptionLoggingHandler() + logging.getLogger().addHandler(exception_handler) + + +def init_flask_instrumentor(app: DifyApp) -> None: + meter = get_meter("http_metrics", version=dify_config.project.version) + _http_response_counter = meter.create_counter( + "http.server.response.count", + description="Total number of HTTP responses by status code, method and target", + unit="{response}", + ) + + def response_hook(span: Span, status: str, response_headers: list) -> None: + if span and span.is_recording(): + try: + if status.startswith("2"): + span.set_status(StatusCode.OK) + else: + span.set_status(StatusCode.ERROR, status) + + status = status.split(" ")[0] + status_code = int(status) + status_class = f"{status_code // 100}xx" + attributes: dict[str, str | int] = {"status_code": status_code, "status_class": status_class} + request = flask.request + if request and request.url_rule: + attributes[SpanAttributes.HTTP_TARGET] = str(request.url_rule.rule) + if request and request.method: + attributes[SpanAttributes.HTTP_METHOD] = str(request.method) + _http_response_counter.add(1, attributes) + except Exception: + logger.exception("Error setting status and attributes") + + from opentelemetry.instrumentation.flask import FlaskInstrumentor + + instrumentor = FlaskInstrumentor() + if dify_config.DEBUG: + logger.info("Initializing Flask instrumentor") + instrumentor.instrument_app(app, response_hook=response_hook) + + +def init_sqlalchemy_instrumentor(app: DifyApp) -> None: + with app.app_context(): + engines = list(app.extensions["sqlalchemy"].engines.values()) + SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) + + +def init_redis_instrumentor() -> None: + RedisInstrumentor().instrument() + + +def init_httpx_instrumentor() -> None: + HTTPXClientInstrumentor().instrument() + + +def init_instruments(app: DifyApp) -> None: + if not is_celery_worker(): + init_flask_instrumentor(app) + CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() + + instrument_exception_logging() + init_sqlalchemy_instrumentor(app) + init_redis_instrumentor() + init_httpx_instrumentor() diff --git a/api/extensions/otel/runtime.py b/api/extensions/otel/runtime.py new file mode 100644 index 0000000000..f8ed330cf6 --- /dev/null +++ b/api/extensions/otel/runtime.py @@ -0,0 +1,72 @@ +import logging +import sys +from typing import Union + +from celery.signals import worker_init +from flask_login import user_loaded_from_request, user_logged_in +from opentelemetry import trace +from opentelemetry.propagate import set_global_textmap +from opentelemetry.propagators.b3 import B3Format +from opentelemetry.propagators.composite import CompositePropagator +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + +from configs import dify_config +from libs.helper import extract_tenant_id +from models import Account, EndUser + +logger = logging.getLogger(__name__) + + +def setup_context_propagation() -> None: + set_global_textmap( + CompositePropagator( + [ + TraceContextTextMapPropagator(), + B3Format(), + ] + ) + ) + + +def shutdown_tracer() -> None: + provider = trace.get_tracer_provider() + if hasattr(provider, "force_flush"): + provider.force_flush() + + +def is_celery_worker(): + return "celery" in sys.argv[0].lower() + + +@user_logged_in.connect +@user_loaded_from_request.connect +def on_user_loaded(_sender, user: Union["Account", "EndUser"]): + if dify_config.ENABLE_OTEL: + from opentelemetry.trace import get_current_span + + if user: + try: + current_span = get_current_span() + tenant_id = extract_tenant_id(user) + if not tenant_id: + return + if current_span: + current_span.set_attribute("service.tenant.id", tenant_id) + current_span.set_attribute("service.user.id", user.id) + except Exception: + logger.exception("Error setting tenant and user attributes") + pass + + +@worker_init.connect(weak=False) +def init_celery_worker(*args, **kwargs): + if dify_config.ENABLE_OTEL: + from opentelemetry.instrumentation.celery import CeleryInstrumentor + from opentelemetry.metrics import get_meter_provider + from opentelemetry.trace import get_tracer_provider + + tracer_provider = get_tracer_provider() + metric_provider = get_meter_provider() + if dify_config.DEBUG: + logger.info("Initializing OpenTelemetry for Celery worker") + CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() diff --git a/api/extensions/otel/semconv/__init__.py b/api/extensions/otel/semconv/__init__.py new file mode 100644 index 0000000000..dc79dee222 --- /dev/null +++ b/api/extensions/otel/semconv/__init__.py @@ -0,0 +1,6 @@ +"""Semantic convention shortcuts for Dify-specific spans.""" + +from .dify import DifySpanAttributes +from .gen_ai import GenAIAttributes + +__all__ = ["DifySpanAttributes", "GenAIAttributes"] diff --git a/api/extensions/otel/semconv/dify.py b/api/extensions/otel/semconv/dify.py new file mode 100644 index 0000000000..a20b9b358d --- /dev/null +++ b/api/extensions/otel/semconv/dify.py @@ -0,0 +1,23 @@ +"""Dify-specific semantic convention definitions.""" + + +class DifySpanAttributes: + """Attribute names for Dify-specific spans.""" + + APP_ID = "dify.app_id" + """Application identifier.""" + + TENANT_ID = "dify.tenant_id" + """Tenant identifier.""" + + USER_TYPE = "dify.user_type" + """User type, e.g. Account, EndUser.""" + + STREAMING = "dify.streaming" + """Whether streaming response is enabled.""" + + WORKFLOW_ID = "dify.workflow_id" + """Workflow identifier.""" + + INVOKE_FROM = "dify.invoke_from" + """Invocation source, e.g. SERVICE_API, WEB_APP, DEBUGGER.""" diff --git a/api/extensions/otel/semconv/gen_ai.py b/api/extensions/otel/semconv/gen_ai.py new file mode 100644 index 0000000000..83c52ed34f --- /dev/null +++ b/api/extensions/otel/semconv/gen_ai.py @@ -0,0 +1,64 @@ +""" +GenAI semantic conventions. +""" + + +class GenAIAttributes: + """Common GenAI attribute keys.""" + + USER_ID = "gen_ai.user.id" + """Identifier of the end user in the application layer.""" + + FRAMEWORK = "gen_ai.framework" + """Framework type. Fixed to 'dify' in this project.""" + + SPAN_KIND = "gen_ai.span.kind" + """Operation type. Extended specification, not in OTel standard.""" + + +class ChainAttributes: + """Chain operation attribute keys.""" + + OPERATION_NAME = "gen_ai.operation.name" + """Secondary operation type, e.g. WORKFLOW, TASK.""" + + INPUT_VALUE = "input.value" + """Input content.""" + + OUTPUT_VALUE = "output.value" + """Output content.""" + + TIME_TO_FIRST_TOKEN = "gen_ai.user.time_to_first_token" + """Time to first token in nanoseconds from receiving the request to first token return.""" + + +class RetrieverAttributes: + """Retriever operation attribute keys.""" + + QUERY = "retrieval.query" + """Retrieval query string.""" + + DOCUMENT = "retrieval.document" + """Retrieved document list as JSON array.""" + + +class ToolAttributes: + """Tool operation attribute keys.""" + + TOOL_CALL_ID = "gen_ai.tool.call.id" + """Tool call identifier.""" + + TOOL_DESCRIPTION = "gen_ai.tool.description" + """Tool description.""" + + TOOL_NAME = "gen_ai.tool.name" + """Tool name.""" + + TOOL_TYPE = "gen_ai.tool.type" + """Tool type. Examples: function, extension, datastore.""" + + TOOL_CALL_ARGUMENTS = "gen_ai.tool.call.arguments" + """Tool invocation arguments.""" + + TOOL_CALL_RESULT = "gen_ai.tool.call.result" + """Tool invocation result.""" diff --git a/api/services/app_generate_service.py b/api/services/app_generate_service.py index dc85929b98..4514c86f7c 100644 --- a/api/services/app_generate_service.py +++ b/api/services/app_generate_service.py @@ -11,6 +11,7 @@ from core.app.apps.workflow.app_generator import WorkflowAppGenerator from core.app.entities.app_invoke_entities import InvokeFrom from core.app.features.rate_limiting import RateLimit from enums.quota_type import QuotaType, unlimited +from extensions.otel import AppGenerateHandler, trace_span from models.model import Account, App, AppMode, EndUser from models.workflow import Workflow from services.errors.app import InvokeRateLimitError, QuotaExceededError, WorkflowIdFormatError, WorkflowNotFoundError @@ -19,6 +20,7 @@ from services.workflow_service import WorkflowService class AppGenerateService: @classmethod + @trace_span(AppGenerateHandler) def generate( cls, app_model: App, diff --git a/api/tests/unit_tests/extensions/otel/__init__.py b/api/tests/unit_tests/extensions/otel/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/unit_tests/extensions/otel/conftest.py b/api/tests/unit_tests/extensions/otel/conftest.py new file mode 100644 index 0000000000..b7f27c4da8 --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/conftest.py @@ -0,0 +1,96 @@ +""" +Shared fixtures for OTel tests. + +Provides: +- Mock TracerProvider with MemorySpanExporter +- Mock configurations +- Test data factories +""" + +from unittest.mock import MagicMock, create_autospec + +import pytest +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import set_tracer_provider + + +@pytest.fixture +def memory_span_exporter(): + """Provide an in-memory span exporter for testing.""" + return InMemorySpanExporter() + + +@pytest.fixture +def tracer_provider_with_memory_exporter(memory_span_exporter): + """Provide a TracerProvider configured with memory exporter.""" + import opentelemetry.trace as trace_api + + trace_api._TRACER_PROVIDER = None + trace_api._TRACER_PROVIDER_SET_ONCE._done = False + + provider = TracerProvider() + processor = SimpleSpanProcessor(memory_span_exporter) + provider.add_span_processor(processor) + set_tracer_provider(provider) + + yield provider + + provider.force_flush() + + +@pytest.fixture +def mock_app_model(): + """Create a mock App model.""" + app = MagicMock() + app.id = "test-app-id" + app.tenant_id = "test-tenant-id" + return app + + +@pytest.fixture +def mock_account_user(): + """Create a mock Account user.""" + from models.model import Account + + user = create_autospec(Account, instance=True) + user.id = "test-user-id" + return user + + +@pytest.fixture +def mock_end_user(): + """Create a mock EndUser.""" + from models.model import EndUser + + user = create_autospec(EndUser, instance=True) + user.id = "test-end-user-id" + return user + + +@pytest.fixture +def mock_workflow_runner(): + """Create a mock WorkflowAppRunner.""" + runner = MagicMock() + runner.application_generate_entity = MagicMock() + runner.application_generate_entity.user_id = "test-user-id" + runner.application_generate_entity.stream = True + runner.application_generate_entity.app_config = MagicMock() + runner.application_generate_entity.app_config.app_id = "test-app-id" + runner.application_generate_entity.app_config.tenant_id = "test-tenant-id" + runner.application_generate_entity.app_config.workflow_id = "test-workflow-id" + return runner + + +@pytest.fixture(autouse=True) +def reset_handler_instances(): + """Reset handler singleton instances before each test.""" + from extensions.otel.decorators.base import _HANDLER_INSTANCES + + _HANDLER_INSTANCES.clear() + from extensions.otel.decorators.handler import SpanHandler + + _HANDLER_INSTANCES[SpanHandler] = SpanHandler() + yield + _HANDLER_INSTANCES.clear() diff --git a/api/tests/unit_tests/extensions/otel/decorators/__init__.py b/api/tests/unit_tests/extensions/otel/decorators/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/unit_tests/extensions/otel/decorators/handlers/__init__.py b/api/tests/unit_tests/extensions/otel/decorators/handlers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py new file mode 100644 index 0000000000..f7475f2239 --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_generate_handler.py @@ -0,0 +1,92 @@ +""" +Tests for AppGenerateHandler. + +Test objectives: +1. Verify handler compatibility with real function signature (fails when parameters change) +2. Verify span attribute mapping correctness +""" + +from unittest.mock import patch + +from core.app.entities.app_invoke_entities import InvokeFrom +from extensions.otel.decorators.handlers.generate_handler import AppGenerateHandler +from extensions.otel.semconv import DifySpanAttributes, GenAIAttributes + + +class TestAppGenerateHandler: + """Core tests for AppGenerateHandler""" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_compatible_with_real_function_signature( + self, tracer_provider_with_memory_exporter, mock_app_model, mock_account_user + ): + """ + Verify handler compatibility with real AppGenerateService.generate signature. + + If AppGenerateService.generate parameters change, this test will fail, + prompting developers to update the handler's parameter extraction logic. + """ + from services.app_generate_service import AppGenerateService + + handler = AppGenerateHandler() + + kwargs = { + "app_model": mock_app_model, + "user": mock_account_user, + "args": {"workflow_id": "test-wf-123"}, + "invoke_from": InvokeFrom.DEBUGGER, + "streaming": True, + "root_node_id": None, + } + + arguments = handler._extract_arguments(AppGenerateService.generate, (), kwargs) + + assert arguments is not None, "Failed to extract arguments from AppGenerateService.generate" + assert "app_model" in arguments, "Handler uses app_model but parameter is missing" + assert "user" in arguments, "Handler uses user but parameter is missing" + assert "args" in arguments, "Handler uses args but parameter is missing" + assert "streaming" in arguments, "Handler uses streaming but parameter is missing" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_all_span_attributes_set_correctly( + self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_app_model, mock_account_user + ): + """Verify all span attributes are mapped correctly""" + handler = AppGenerateHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + test_app_id = "app-456" + test_tenant_id = "tenant-789" + test_user_id = "user-111" + test_workflow_id = "wf-222" + + mock_app_model.id = test_app_id + mock_app_model.tenant_id = test_tenant_id + mock_account_user.id = test_user_id + + def dummy_func(app_model, user, args, invoke_from, streaming=True): + return "result" + + handler.wrapper( + tracer, + dummy_func, + (), + { + "app_model": mock_app_model, + "user": mock_account_user, + "args": {"workflow_id": test_workflow_id}, + "invoke_from": InvokeFrom.DEBUGGER, + "streaming": False, + }, + ) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = spans[0].attributes + + assert attrs[DifySpanAttributes.APP_ID] == test_app_id + assert attrs[DifySpanAttributes.TENANT_ID] == test_tenant_id + assert attrs[GenAIAttributes.USER_ID] == test_user_id + assert attrs[DifySpanAttributes.WORKFLOW_ID] == test_workflow_id + assert attrs[DifySpanAttributes.USER_TYPE] == "Account" + assert attrs[DifySpanAttributes.STREAMING] is False diff --git a/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py new file mode 100644 index 0000000000..500f80fc3c --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/decorators/handlers/test_workflow_app_runner_handler.py @@ -0,0 +1,76 @@ +""" +Tests for WorkflowAppRunnerHandler. + +Test objectives: +1. Verify handler compatibility with real WorkflowAppRunner structure (fails when structure changes) +2. Verify span attribute mapping correctness +""" + +from unittest.mock import patch + +from extensions.otel.decorators.handlers.workflow_app_runner_handler import WorkflowAppRunnerHandler +from extensions.otel.semconv import DifySpanAttributes, GenAIAttributes + + +class TestWorkflowAppRunnerHandler: + """Core tests for WorkflowAppRunnerHandler""" + + def test_handler_structure_dependencies(self): + """ + Verify handler dependencies on WorkflowAppRunner structure. + + Handler depends on: + - runner.application_generate_entity (WorkflowAppGenerateEntity) + - entity.app_config (WorkflowAppConfig) + - entity.user_id, entity.stream + - app_config.app_id, app_config.tenant_id, app_config.workflow_id + + If these attribute paths change in real types, this test will fail, + prompting developers to update the handler's attribute access logic. + """ + from core.app.app_config.entities import WorkflowUIBasedAppConfig + from core.app.entities.app_invoke_entities import WorkflowAppGenerateEntity + + required_entity_fields = ["user_id", "stream", "app_config"] + entity_fields = WorkflowAppGenerateEntity.model_fields + for field in required_entity_fields: + assert field in entity_fields, f"Handler expects WorkflowAppGenerateEntity.{field} but field is missing" + + required_config_fields = ["app_id", "tenant_id", "workflow_id"] + config_fields = WorkflowUIBasedAppConfig.model_fields + for field in required_config_fields: + assert field in config_fields, f"Handler expects app_config.{field} but field is missing" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_all_span_attributes_set_correctly( + self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_workflow_runner + ): + """Verify all span attributes are mapped correctly""" + handler = WorkflowAppRunnerHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + test_app_id = "app-999" + test_tenant_id = "tenant-888" + test_user_id = "user-777" + test_workflow_id = "wf-666" + + mock_workflow_runner.application_generate_entity.user_id = test_user_id + mock_workflow_runner.application_generate_entity.stream = False + mock_workflow_runner.application_generate_entity.app_config.app_id = test_app_id + mock_workflow_runner.application_generate_entity.app_config.tenant_id = test_tenant_id + mock_workflow_runner.application_generate_entity.app_config.workflow_id = test_workflow_id + + def runner_run(self): + return "result" + + handler.wrapper(tracer, runner_run, (mock_workflow_runner,), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = spans[0].attributes + + assert attrs[DifySpanAttributes.APP_ID] == test_app_id + assert attrs[DifySpanAttributes.TENANT_ID] == test_tenant_id + assert attrs[GenAIAttributes.USER_ID] == test_user_id + assert attrs[DifySpanAttributes.WORKFLOW_ID] == test_workflow_id + assert attrs[DifySpanAttributes.STREAMING] is False diff --git a/api/tests/unit_tests/extensions/otel/decorators/test_base.py b/api/tests/unit_tests/extensions/otel/decorators/test_base.py new file mode 100644 index 0000000000..a42f861bb7 --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/decorators/test_base.py @@ -0,0 +1,119 @@ +""" +Tests for trace_span decorator. + +Test coverage: +- Decorator basic functionality +- Enable/disable logic +- Handler singleton management +- Integration with OpenTelemetry SDK +""" + +from unittest.mock import patch + +import pytest +from opentelemetry.trace import StatusCode + +from extensions.otel.decorators.base import trace_span + + +class TestTraceSpanDecorator: + """Test trace_span decorator basic functionality.""" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_decorated_function_executes_normally(self, tracer_provider_with_memory_exporter): + """Test that decorated function executes and returns correct value.""" + + @trace_span() + def test_func(x, y): + return x + y + + result = test_func(2, 3) + assert result == 5 + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_decorator_with_args_and_kwargs(self, tracer_provider_with_memory_exporter): + """Test that decorator correctly handles args and kwargs.""" + + @trace_span() + def test_func(a, b, c=10): + return a + b + c + + result = test_func(1, 2, c=3) + assert result == 6 + + +class TestTraceSpanWithMemoryExporter: + """Test trace_span with MemorySpanExporter to verify span creation.""" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_span_is_created_and_exported(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that span is created and exported to memory exporter.""" + + @trace_span() + def test_func(): + return "result" + + test_func() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_span_name_matches_function(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that span name matches the decorated function.""" + + @trace_span() + def my_test_function(): + return "result" + + my_test_function() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert "my_test_function" in spans[0].name + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_span_status_is_ok_on_success(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that span status is OK when function succeeds.""" + + @trace_span() + def test_func(): + return "result" + + test_func() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.OK + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_span_status_is_error_on_exception(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that span status is ERROR when function raises exception.""" + + @trace_span() + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError, match="test error"): + test_func() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_exception_is_recorded_in_span(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that exception details are recorded in span events.""" + + @trace_span() + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError): + test_func() + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + events = spans[0].events + assert len(events) > 0 + assert any("exception" in event.name.lower() for event in events) diff --git a/api/tests/unit_tests/extensions/otel/decorators/test_handler.py b/api/tests/unit_tests/extensions/otel/decorators/test_handler.py new file mode 100644 index 0000000000..44788bab9a --- /dev/null +++ b/api/tests/unit_tests/extensions/otel/decorators/test_handler.py @@ -0,0 +1,258 @@ +""" +Tests for SpanHandler base class. + +Test coverage: +- _build_span_name method +- _extract_arguments method +- wrapper method default implementation +- Signature caching +""" + +from unittest.mock import patch + +import pytest +from opentelemetry.trace import StatusCode + +from extensions.otel.decorators.handler import SpanHandler + + +class TestSpanHandlerExtractArguments: + """Test SpanHandler._extract_arguments method.""" + + def test_extract_positional_arguments(self): + """Test extracting positional arguments.""" + handler = SpanHandler() + + def func(a, b, c): + pass + + args = (1, 2, 3) + kwargs = {} + result = handler._extract_arguments(func, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 2 + assert result["c"] == 3 + + def test_extract_keyword_arguments(self): + """Test extracting keyword arguments.""" + handler = SpanHandler() + + def func(a, b, c): + pass + + args = () + kwargs = {"a": 1, "b": 2, "c": 3} + result = handler._extract_arguments(func, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 2 + assert result["c"] == 3 + + def test_extract_mixed_arguments(self): + """Test extracting mixed positional and keyword arguments.""" + handler = SpanHandler() + + def func(a, b, c): + pass + + args = (1,) + kwargs = {"b": 2, "c": 3} + result = handler._extract_arguments(func, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 2 + assert result["c"] == 3 + + def test_extract_arguments_with_defaults(self): + """Test extracting arguments with default values.""" + handler = SpanHandler() + + def func(a, b=10, c=20): + pass + + args = (1,) + kwargs = {} + result = handler._extract_arguments(func, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 10 + assert result["c"] == 20 + + def test_extract_arguments_handles_self(self): + """Test extracting arguments from instance method (with self).""" + handler = SpanHandler() + + class MyClass: + def method(self, a, b): + pass + + instance = MyClass() + args = (1, 2) + kwargs = {} + result = handler._extract_arguments(instance.method, args, kwargs) + + assert result is not None + assert result["a"] == 1 + assert result["b"] == 2 + + def test_extract_arguments_returns_none_on_error(self): + """Test that _extract_arguments returns None when extraction fails.""" + handler = SpanHandler() + + def func(a, b): + pass + + args = (1,) + kwargs = {} + result = handler._extract_arguments(func, args, kwargs) + + assert result is None + + def test_signature_caching(self): + """Test that function signatures are cached.""" + handler = SpanHandler() + + def func(a, b): + pass + + assert func not in handler._signature_cache + + handler._extract_arguments(func, (1, 2), {}) + assert func in handler._signature_cache + + cached_sig = handler._signature_cache[func] + handler._extract_arguments(func, (3, 4), {}) + assert handler._signature_cache[func] is cached_sig + + +class TestSpanHandlerWrapper: + """Test SpanHandler.wrapper default implementation.""" + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_creates_span(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper creates a span.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + return "result" + + result = handler.wrapper(tracer, test_func, (), {}) + + assert result == "result" + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_sets_span_kind_internal(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper sets SpanKind to INTERNAL.""" + from opentelemetry.trace import SpanKind + + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + return "result" + + handler.wrapper(tracer, test_func, (), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].kind == SpanKind.INTERNAL + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_sets_status_ok_on_success(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper sets status to OK when function succeeds.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + return "result" + + handler.wrapper(tracer, test_func, (), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.OK + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_records_exception_on_error(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper records exception when function raises.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError, match="test error"): + handler.wrapper(tracer, test_func, (), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + events = spans[0].events + assert len(events) > 0 + assert any("exception" in event.name.lower() for event in events) + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_sets_status_error_on_exception(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper sets status to ERROR when function raises exception.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError): + handler.wrapper(tracer, test_func, (), {}) + + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].status.status_code == StatusCode.ERROR + assert "test error" in spans[0].status.description + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_re_raises_exception(self, tracer_provider_with_memory_exporter): + """Test that wrapper re-raises exception after recording it.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(): + raise ValueError("test error") + + with pytest.raises(ValueError, match="test error"): + handler.wrapper(tracer, test_func, (), {}) + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_passes_arguments_correctly(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test that wrapper correctly passes arguments to wrapped function.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def test_func(a, b, c=10): + return a + b + c + + result = handler.wrapper(tracer, test_func, (1, 2), {"c": 3}) + + assert result == 6 + + @patch("extensions.otel.decorators.base.dify_config.ENABLE_OTEL", True) + def test_wrapper_with_memory_exporter(self, tracer_provider_with_memory_exporter, memory_span_exporter): + """Test wrapper end-to-end with memory exporter.""" + handler = SpanHandler() + tracer = tracer_provider_with_memory_exporter.get_tracer(__name__) + + def my_function(x): + return x * 2 + + result = handler.wrapper(tracer, my_function, (5,), {}) + + assert result == 10 + spans = memory_span_exporter.get_finished_spans() + assert len(spans) == 1 + assert "my_function" in spans[0].name + assert spans[0].status.status_code == StatusCode.OK