feat(ingest): switch telemetry endpoint to Mixpanel (#4238)

This commit is contained in:
Kevin Hu 2022-02-24 15:35:48 -05:00 committed by GitHub
parent 5d0915ec64
commit 46701319dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 132 additions and 145 deletions

View File

@ -178,7 +178,7 @@ The env variables take precedence over what is in the config.
### telemetry
To help us understand how people are using DataHub, we collect anonymous usage statistics on actions such as command invocations via Google Analytics.
To help us understand how people are using DataHub, we collect anonymous usage statistics on actions such as command invocations via Mixpanel.
We do not collect private information such as IP addresses, contents of ingestions, or credentials.
The code responsible for collecting and broadcasting these events is open-source and can be found [within our GitHub](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/telemetry/telemetry.py).

View File

@ -28,6 +28,7 @@ base_requirements = {
# Actual dependencies.
"typing-inspect",
"pydantic>=1.5.1",
"mixpanel>=4.9.0",
}
framework_common = {

View File

@ -266,15 +266,13 @@ class Pipeline:
def log_ingestion_stats(self) -> None:
telemetry.telemetry_instance.ping(
"ingest", "source_type", self.config.source.type
)
telemetry.telemetry_instance.ping("ingest", "sink_type", self.config.sink.type)
telemetry.telemetry_instance.ping(
"ingest",
"records_written",
# bucket by taking floor of log of the number of records written
# report the bucket as a label so the count is not collapsed
str(10 ** int(log10(self.sink.get_report().records_written + 1))),
"ingest_stats",
{
"source_type": self.config.source.type,
"sink_type": self.config.sink.type,
"records_written": 10
** int(log10(self.sink.get_report().records_written + 1)),
},
)
def pretty_print_summary(self, warnings_as_failure: bool = False) -> int:

View File

@ -2,7 +2,7 @@ import logging
import os
from datetime import datetime
from math import log10
from typing import Any, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional
import parse
import pydeequ
@ -115,7 +115,14 @@ def get_column_type(
return SchemaFieldDataType(type=TypeClass())
# flags to emit telemetry for
# config flags to emit telemetry for
config_options_to_report = [
"platform",
"use_relative_path",
"ignore_dotfiles",
]
# profiling flags to emit telemetry for
profiling_flags_to_report = [
"profile_table_level_only",
"include_field_null_count",
@ -145,27 +152,25 @@ class DataLakeSource(Source):
self.report = DataLakeSourceReport()
self.profiling_times_taken = []
config_report = {
config_option: config.dict().get(config_option)
for config_option in config_options_to_report
}
config_report = {**config_report, "profiling_enabled": config.profiling.enabled}
telemetry.telemetry_instance.ping(
"data_lake_profiling",
"config",
"enabled",
1 if config.profiling.enabled else 0,
"data_lake_config",
config_report,
)
if config.profiling.enabled:
for config_flag in profiling_flags_to_report:
config_value = getattr(config.profiling, config_flag)
config_int = (
1 if config_value else 0
) # convert to int so it can be emitted as a value
telemetry.telemetry_instance.ping(
"data_lake_profiling",
"config",
config_flag,
config_int,
)
telemetry.telemetry_instance.ping(
"data_lake_profiling_config",
{
config_flag: config.profiling.dict().get(config_flag)
for config_flag in profiling_flags_to_report
},
)
conf = SparkConf()
@ -242,11 +247,7 @@ class DataLakeSource(Source):
extension = os.path.splitext(file)[1]
telemetry.telemetry_instance.ping(
"data_lake_profiling",
"file_extension",
extension,
)
telemetry.telemetry_instance.ping("data_lake_file", {"extension": extension})
if file.endswith(".parquet"):
df = self.spark.read.parquet(file)
@ -549,30 +550,30 @@ class DataLakeSource(Source):
f"Profiling {len(self.profiling_times_taken)} table(s) finished in {total_time_taken:.3f} seconds"
)
telemetry.telemetry_instance.ping(
"data_lake_profiling",
"time_taken_total",
# bucket by taking floor of log of time taken
# report the bucket as a label so the count is not collapsed
str(10 ** int(log10(total_time_taken + 1))),
)
time_percentiles: Dict[str, float] = {}
if len(self.profiling_times_taken) > 0:
percentiles = [50, 75, 95, 99]
percentile_values = stats.calculate_percentiles(
self.profiling_times_taken, percentiles
)
for percentile in percentiles:
telemetry.telemetry_instance.ping(
"data_lake_profiling",
f"time_taken_p{percentile}",
# bucket by taking floor of log of time taken
# report the bucket as a label so the count is not collapsed
str(10 ** int(log10(percentile_values[percentile] + 1))),
)
time_percentiles = {
f"table_time_taken_p{percentile}": 10
** int(log10(percentile_values[percentile] + 1))
for percentile in percentiles
}
telemetry.telemetry_instance.ping(
"data_lake_profiling_summary",
# bucket by taking floor of log of time taken
{
"total_time_taken": 10 ** int(log10(total_time_taken + 1)),
"count": 10 ** int(log10(len(self.profiling_times_taken) + 1)),
"platform": self.source_config.platform,
**time_percentiles,
},
)
def get_report(self):
return self.report

View File

@ -315,11 +315,9 @@ class _SingleTableProfiler:
row_count = self.row_count
telemetry.telemetry_instance.ping(
"data_lake_profiling",
"rows_profiled",
"profile_data_lake_table",
# bucket by taking floor of log of the number of rows scanned
# report the bucket as a label so the count is not collapsed
str(10 ** int(log10(row_count + 1))),
{"rows_profiled": 10 ** int(log10(row_count + 1))},
)
# loop through the columns and add the analyzers

View File

@ -8,7 +8,7 @@ import traceback
import unittest.mock
import uuid
from math import log10
from typing import Any, Callable, Iterable, Iterator, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union
from datahub.telemetry import stats, telemetry
@ -479,11 +479,11 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
row_count: int = profile.rowCount
telemetry.telemetry_instance.ping(
"sql_profiling",
"rows_profiled",
"profile_sql_table",
# bucket by taking floor of log of the number of rows scanned
# report the bucket as a label so the count is not collapsed
str(10 ** int(log10(row_count + 1))),
{
"rows_profiled": 10 ** int(log10(row_count + 1)),
},
)
for column_spec in columns_profiling_queue:
@ -720,31 +720,30 @@ class DatahubGEProfiler:
f"Profiling {len(requests)} table(s) finished in {total_time_taken:.3f} seconds"
)
telemetry.telemetry_instance.ping(
"sql_profiling",
f"time_taken_total:{self.platform}",
# bucket by taking floor of log of time taken
# report the bucket as a label so the count is not collapsed
str(10 ** int(log10(total_time_taken + 1))),
)
time_percentiles: Dict[str, float] = {}
if len(self.times_taken) > 0:
percentiles = [50, 75, 95, 99]
percentile_values = stats.calculate_percentiles(
self.times_taken, percentiles
)
for percentile in percentiles:
telemetry.telemetry_instance.ping(
"sql_profiling",
f"time_taken_p{percentile}:{self.platform}",
# bucket by taking floor of log of time taken
# report the bucket as a label so the count is not collapsed
str(
10 ** int(log10(percentile_values[percentile] + 1))
),
)
time_percentiles = {
f"table_time_taken_p{percentile}": 10
** int(log10(percentile_values[percentile] + 1))
for percentile in percentiles
}
telemetry.telemetry_instance.ping(
"sql_profiling_summary",
# bucket by taking floor of log of time taken
{
"total_time_taken": 10 ** int(log10(total_time_taken + 1)),
"count": 10 ** int(log10(len(self.times_taken) + 1)),
"platform": self.platform,
**time_percentiles,
},
)
self.report.report_from_query_combiner(query_combiner.report)

View File

@ -382,6 +382,12 @@ def get_schema_metadata(
return schema_metadata
# config flags to emit telemetry for
config_options_to_report = [
"include_views",
"include_tables",
]
# flags to emit telemetry for
profiling_flags_to_report = [
"turn_off_expensive_profiling_metrics",
@ -409,27 +415,31 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
self.platform = platform
self.report = SQLSourceReport()
config_report = {
config_option: config.dict().get(config_option)
for config_option in config_options_to_report
}
config_report = {
**config_report,
"profiling_enabled": config.profiling.enabled,
"platform": platform,
}
telemetry.telemetry_instance.ping(
"sql_profiling",
"config",
"enabled",
1 if config.profiling.enabled else 0,
"sql_config",
config_report,
)
if config.profiling.enabled:
for config_flag in profiling_flags_to_report:
config_value = getattr(config.profiling, config_flag)
config_int = (
1 if config_value else 0
) # convert to int so it can be emitted as a value
telemetry.telemetry_instance.ping(
"sql_profiling",
"config",
config_flag,
config_int,
)
telemetry.telemetry_instance.ping(
"sql_profiling_config",
{
config_flag: config.profiling.dict().get(config_flag)
for config_flag in profiling_flags_to_report
},
)
def get_inspectors(self) -> Iterable[Inspector]:
# This method can be overridden in the case that you want to dynamically

View File

@ -7,17 +7,14 @@ import sys
import uuid
from functools import wraps
from pathlib import Path
from typing import Any, Callable, Dict, Optional, TypeVar, Union
from typing import Any, Callable, Dict, Optional, TypeVar
import requests
from mixpanel import Consumer, Mixpanel
import datahub as datahub_package
logger = logging.getLogger(__name__)
GA_VERSION = 1
GA_TID = "UA-212728656-1"
DATAHUB_FOLDER = Path(os.path.expanduser("~/.datahub"))
CONFIG_FILE = DATAHUB_FOLDER / "telemetry-config.json"
@ -25,6 +22,9 @@ CONFIG_FILE = DATAHUB_FOLDER / "telemetry-config.json"
# also fall back to environment variable if config file is not found
ENV_ENABLED = os.environ.get("DATAHUB_TELEMETRY_ENABLED", "true").lower() == "true"
TIMEOUT = int(os.environ.get("DATAHUB_TELEMETRY_TIMEOUT", "10"))
MIXPANEL_TOKEN = "5ee83d940754d63cacbf7d34daa6f44a"
mp = Mixpanel(MIXPANEL_TOKEN, consumer=Consumer(request_timeout=int(TIMEOUT)))
class Telemetry:
@ -42,6 +42,16 @@ class Telemetry:
else:
self.load_config()
# send updated user-level prop{erties
mp.people_set(
self.client_id,
{
"datahub_version": datahub_package.nice_version_name(),
"os": platform.system(),
"python_version": platform.python_version(),
},
)
def update_config(self) -> None:
"""
Update the config file with the current client ID and enabled status.
@ -110,13 +120,11 @@ class Telemetry:
def ping(
self,
category: str,
action: str,
label: Optional[str] = None,
value: Optional[int] = None,
properties: Optional[Dict[str, Any]] = None,
) -> None:
"""
Ping Google Analytics with a single event.
Send a single telemetry event.
Args:
category (str): category for the event
@ -128,41 +136,11 @@ class Telemetry:
if not self.enabled:
return
req_url = "https://www.google-analytics.com/collect"
params: Dict[str, Union[str, int]] = {
"an": "datahub-cli", # app name
"av": datahub_package.nice_version_name(), # app version
"t": "event", # event type
"v": GA_VERSION, # Google Analytics version
"tid": GA_TID, # tracking id
"cid": self.client_id, # client id
"ec": category, # event category
"ea": action, # event action
# use custom dimensions to capture OS and Python version
# see https://developers.google.com/analytics/devguides/collection/protocol/v1/parameters#cd_
"cd1": platform.system(), # OS
"cd2": platform.python_version(), # Python version
}
if label:
params["el"] = label
# this has to a non-negative int, otherwise the request will fail
if value:
params["ev"] = value
# send event
try:
requests.post(
req_url,
data=params,
headers={
"user-agent": f"datahub {datahub_package.nice_version_name()}"
},
timeout=TIMEOUT,
)
except Exception as e:
mp.track(self.client_id, action, properties)
except Exception as e:
logger.debug(f"Error reporting telemetry: {e}")
@ -182,34 +160,36 @@ def with_telemetry(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
category = func.__module__
action = func.__name__
action = f"function:{func.__module__}.{func.__name__}"
telemetry_instance.ping(category, action, "started")
telemetry_instance.ping(action)
try:
res = func(*args, **kwargs)
telemetry_instance.ping(category, action, "completed")
telemetry_instance.ping(f"{action}:result", {"status": "completed"})
return res
# Catch general exceptions
except Exception as e:
telemetry_instance.ping(category, action, f"error:{get_full_class_name(e)}")
telemetry_instance.ping(
f"{action}:result", {"status": "error", "error": get_full_class_name(e)}
)
raise e
# System exits (used in ingestion and Docker commands) are not caught by the exception handler,
# so we need to catch them here.
except SystemExit as e:
# Forward successful exits
if e.code == 0:
telemetry_instance.ping(category, action, "completed")
telemetry_instance.ping(f"{action}:result", {"status": "completed"})
sys.exit(0)
# Report failed exits
else:
telemetry_instance.ping(
category, action, f"error:{get_full_class_name(e)}"
f"{action}:result",
{"status": "error", "error": get_full_class_name(e)},
)
sys.exit(e.code)
# Catch SIGINTs
except KeyboardInterrupt:
telemetry_instance.ping(category, action, "cancelled")
telemetry_instance.ping(f"{action}:result", {"status": "cancelled"})
sys.exit(0)
return wrapper