487 lines
15 KiB
Python

import errno
import json
import logging
import os
import platform
import sys
import uuid
from functools import wraps
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, TypeVar
from mixpanel import Consumer, Mixpanel
from typing_extensions import ParamSpec
from datahub._version import __version__, nice_version_name
from datahub.cli.config_utils import DATAHUB_ROOT_FOLDER
from datahub.cli.env_utils import get_boolean_env_variable
from datahub.configuration.common import ExceptionWithProps
from datahub.metadata.schema_classes import _custom_package_path
from datahub.utilities.perf_timer import PerfTimer
if TYPE_CHECKING:
from datahub.ingestion.graph.client import DataHubGraph
logger = logging.getLogger(__name__)
DATAHUB_FOLDER = Path(DATAHUB_ROOT_FOLDER)
CONFIG_FILE = DATAHUB_FOLDER / "telemetry-config.json"
# also fall back to environment variable if config file is not found
ENV_ENABLED = get_boolean_env_variable("DATAHUB_TELEMETRY_ENABLED", True)
# see
# https://adamj.eu/tech/2020/03/09/detect-if-your-tests-are-running-on-ci/
# https://github.com/watson/ci-info
CI_ENV_VARS = {
"APPCENTER",
"APPCIRCLE",
"APPCIRCLEAZURE_PIPELINES",
"APPVEYOR",
"AZURE_PIPELINES",
"BAMBOO",
"BITBUCKET",
"BITRISE",
"BUDDY",
"BUILDKITE",
"BUILD_ID",
"CI",
"CIRCLE",
"CIRCLECI",
"CIRRUS",
"CIRRUS_CI",
"CI_NAME",
"CODEBUILD",
"CODEBUILD_BUILD_ID",
"CODEFRESH",
"CODESHIP",
"CYPRESS_HOST",
"DRONE",
"DSARI",
"EAS_BUILD",
"GITHUB_ACTIONS",
"GITLAB",
"GITLAB_CI",
"GOCD",
"HEROKU_TEST_RUN_ID",
"HUDSON",
"JENKINS",
"JENKINS_URL",
"LAYERCI",
"MAGNUM",
"NETLIFY",
"NEVERCODE",
"RENDER",
"SAIL",
"SCREWDRIVER",
"SEMAPHORE",
"SHIPPABLE",
"SOLANO",
"STRIDER",
"TASKCLUSTER",
"TEAMCITY",
"TEAMCITY_VERSION",
"TF_BUILD",
"TRAVIS",
"VERCEL",
"WERCKER_ROOT",
"bamboo.buildKey",
}
# disable when running in any CI
if any(var in os.environ for var in CI_ENV_VARS):
ENV_ENABLED = False
# Also disable if a custom metadata model package is in use.
if _custom_package_path:
ENV_ENABLED = False
TIMEOUT = int(os.environ.get("DATAHUB_TELEMETRY_TIMEOUT", "10"))
MIXPANEL_ENDPOINT = "track.datahubproject.io/mp"
MIXPANEL_TOKEN = "5ee83d940754d63cacbf7d34daa6f44a"
SENTRY_DSN: Optional[str] = os.environ.get("SENTRY_DSN", None)
SENTRY_ENVIRONMENT: str = os.environ.get("SENTRY_ENVIRONMENT", "dev")
def _default_global_properties() -> Dict[str, Any]:
return {
"datahub_version": nice_version_name(),
"python_version": platform.python_version(),
"os": platform.system(),
"arch": platform.machine(),
}
class Telemetry:
client_id: str
enabled: bool = True
tracking_init: bool = False
sentry_enabled: bool = False
context_properties: Dict[str, Any] = {}
def __init__(self):
self.global_properties = _default_global_properties()
self.context_properties = {}
if SENTRY_DSN:
self.sentry_enabled = True
try:
import sentry_sdk
sentry_sdk.init(
dsn=SENTRY_DSN,
environment=SENTRY_ENVIRONMENT,
release=__version__,
)
except Exception as e:
# We need to print initialization errors to stderr, since logger is not initialized yet
print(f"Error initializing Sentry: {e}", file=sys.stderr)
logger.info(f"Error initializing Sentry: {e}")
# try loading the config if it exists, update it if that fails
if not CONFIG_FILE.exists() or not self.load_config():
# set up defaults
self.client_id = str(uuid.uuid4())
self.enabled = self.enabled and ENV_ENABLED
if not self.update_config():
# If we're not able to persist the client ID, we should default
# to a standardized value. This prevents us from minting a new
# client ID every time we start the CLI.
self.client_id = "00000000-0000-0000-0000-000000000001"
# send updated user-level properties
self.mp = None
if self.enabled:
try:
self.mp = Mixpanel(
MIXPANEL_TOKEN,
consumer=Consumer(
request_timeout=int(TIMEOUT), api_host=MIXPANEL_ENDPOINT
),
)
except Exception as e:
logger.debug(f"Error connecting to mixpanel: {e}")
# Initialize the default properties for all events.
self.set_context()
def update_config(self) -> bool:
"""
Update the config file with the current client ID and enabled status.
Return True if the update succeeded, False otherwise
"""
logger.debug("Updating telemetry config")
try:
os.makedirs(DATAHUB_FOLDER, exist_ok=True)
try:
with open(CONFIG_FILE, "w") as f:
json.dump(
{"client_id": self.client_id, "enabled": self.enabled},
f,
indent=2,
)
return True
except OSError as x:
if x.errno == errno.ENOENT:
logger.debug(
f"{CONFIG_FILE} does not exist and could not be created. Please check permissions on the parent folder."
)
elif x.errno == errno.EACCES:
logger.debug(
f"{CONFIG_FILE} cannot be read. Please check the permissions on this file."
)
else:
logger.debug(
f"{CONFIG_FILE} had an IOError, please inspect this file for issues."
)
except Exception as e:
logger.debug(f"Failed to update config file at {CONFIG_FILE} due to {e}")
return False
def enable(self) -> None:
"""
Enable telemetry.
"""
self.enabled = True
self.update_config()
def disable(self) -> None:
"""
Disable telemetry.
"""
self.enabled = False
self.update_config()
def load_config(self) -> bool:
"""
Load the saved config for the telemetry client ID and enabled status.
Returns True if config was correctly loaded, False otherwise.
"""
try:
with open(CONFIG_FILE) as f:
config = json.load(f)
self.client_id = config["client_id"]
self.enabled = config["enabled"] & ENV_ENABLED
return True
except OSError as x:
if x.errno == errno.ENOENT:
logger.debug(
f"{CONFIG_FILE} does not exist and could not be created. Please check permissions on the parent folder."
)
elif x.errno == errno.EACCES:
logger.debug(
f"{CONFIG_FILE} cannot be read. Please check the permissions on this file."
)
else:
logger.debug(
f"{CONFIG_FILE} had an IOError, please inspect this file for issues."
)
except Exception as e:
logger.debug(f"Failed to load {CONFIG_FILE} due to {e}")
return False
def add_global_property(self, key: str, value: Any) -> None:
self.global_properties[key] = value
self._update_sentry_properties()
def set_context(
self,
server: Optional["DataHubGraph"] = None,
properties: Optional[Dict[str, Any]] = None,
) -> None:
self.context_properties = {
**self._server_props(server),
**(properties or {}),
}
self._update_sentry_properties()
def _update_sentry_properties(self) -> None:
properties = {
**self.global_properties,
**self.context_properties,
}
if self.sentry_enabled:
import sentry_sdk
sentry_sdk.set_tags(properties)
def init_capture_exception(self) -> None:
if self.sentry_enabled:
import sentry_sdk
sentry_sdk.set_user({"client_id": self.client_id})
sentry_sdk.set_context(
"environment",
{
"environment": SENTRY_ENVIRONMENT,
"datahub_version": nice_version_name(),
"os": platform.system(),
"python_version": platform.python_version(),
},
)
def capture_exception(self, e: BaseException) -> None:
try:
if self.sentry_enabled:
import sentry_sdk
sentry_sdk.capture_exception(e)
except Exception as e:
logger.warning("Failed to capture exception in Sentry.", exc_info=e)
def init_tracking(self) -> None:
if not self.enabled or self.mp is None or self.tracking_init is True:
return
logger.debug("Sending init telemetry")
try:
self.mp.people_set(
self.client_id,
self.global_properties,
)
except Exception as e:
logger.debug(f"Error initializing telemetry: {e}")
self.init_track = True
def ping(
self,
event_name: str,
properties: Optional[Dict[str, Any]] = None,
) -> None:
"""
Send a single telemetry event.
Args:
event_name: name of the event to send.
properties: metadata for the event
"""
if not self.enabled or self.mp is None:
return
properties = properties or {}
# send event
try:
if event_name == "function-call":
logger.debug(
f"Sending telemetry for {event_name} {properties.get('function')}, status {properties.get('status')}"
)
else:
logger.debug(f"Sending telemetry for {event_name}")
properties = {
**self.global_properties,
**self.context_properties,
**properties,
}
self.mp.track(self.client_id, event_name, properties)
except Exception as e:
logger.debug(f"Error reporting telemetry: {e}")
@classmethod
def _server_props(cls, server: Optional["DataHubGraph"]) -> Dict[str, str]:
if not server:
return {
"server_type": "n/a",
"server_version": "n/a",
"server_id": "n/a",
}
else:
return {
"server_type": server.server_config.raw_config.get("datahub", {}).get(
"serverType", "missing"
),
"server_version": server.server_config.raw_config.get("versions", {})
.get("acryldata/datahub", {})
.get("version", "missing"),
"server_id": server.server_id or "missing",
}
telemetry_instance = Telemetry()
def suppress_telemetry() -> None:
"""disables telemetry for this invocation, doesn't affect persistent client settings"""
if telemetry_instance.enabled:
logger.debug("Disabling telemetry locally due to server config")
telemetry_instance.enabled = False
def get_full_class_name(obj):
module = obj.__class__.__module__
if module is None or module == str.__class__.__module__:
return obj.__class__.__name__
return f"{module}.{obj.__class__.__name__}"
def _error_props(error: BaseException) -> Dict[str, Any]:
props = {
"error": get_full_class_name(error),
}
if isinstance(error, ExceptionWithProps):
try:
props.update(error.get_telemetry_props())
except Exception as e:
logger.debug(f"Error getting telemetry props for {error}: {e}")
return props
_T = TypeVar("_T")
_P = ParamSpec("_P")
def with_telemetry(
*, capture_kwargs: Optional[List[str]] = None
) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
kwargs_to_track = capture_kwargs or []
def with_telemetry_decorator(func: Callable[_P, _T]) -> Callable[_P, _T]:
function = f"{func.__module__}.{func.__name__}"
@wraps(func)
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
telemetry_instance.init_tracking()
telemetry_instance.init_capture_exception()
call_props: Dict[str, Any] = {"function": function}
for kwarg in kwargs_to_track:
call_props[f"arg_{kwarg}"] = kwargs.get(kwarg)
telemetry_instance.ping(
"function-call",
{**call_props, "status": "start"},
)
try:
try:
with PerfTimer() as timer:
res = func(*args, **kwargs)
finally:
call_props["duration"] = timer.elapsed_seconds()
telemetry_instance.ping(
"function-call",
{**call_props, "status": "completed"},
)
return res
# System exits (used in ingestion and Docker commands) are not caught by the exception handler,
# so we need to catch them here.
except SystemExit as e:
# Forward successful exits
# 0 or None imply success
if not e.code:
telemetry_instance.ping(
"function-call",
{**call_props, "status": "completed"},
)
# Report failed exits
else:
telemetry_instance.ping(
"function-call",
{
**call_props,
"status": "error",
**_error_props(e),
"code": e.code,
},
)
telemetry_instance.capture_exception(e)
raise e
# Catch SIGINTs
except KeyboardInterrupt as e:
telemetry_instance.ping(
"function-call",
{**call_props, "status": "cancelled"},
)
telemetry_instance.capture_exception(e)
raise e
# Catch general exceptions
except BaseException as e:
telemetry_instance.ping(
"function-call",
{
**call_props,
"status": "error",
**_error_props(e),
},
)
telemetry_instance.capture_exception(e)
raise e
return wrapper
return with_telemetry_decorator