mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 09:58:14 +00:00
feat(ingest): add redshift usage source (#3277)
This commit is contained in:
parent
185f7e2f5f
commit
5a239905df
@ -109,6 +109,8 @@ plugins: Dict[str, Set[str]] = {
|
||||
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
|
||||
"redash": {"redash-toolbelt"},
|
||||
"redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
|
||||
"redshift-usage": sql_common
|
||||
| {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
|
||||
"sagemaker": aws_common,
|
||||
"snowflake": sql_common | {"snowflake-sqlalchemy<=1.2.4"},
|
||||
"snowflake-usage": sql_common | {"snowflake-sqlalchemy<=1.2.4"},
|
||||
@ -175,6 +177,8 @@ base_dev_requirements = {
|
||||
"datahub-kafka",
|
||||
"datahub-rest",
|
||||
"redash",
|
||||
"redshift",
|
||||
"redshift-usage"
|
||||
# airflow is added below
|
||||
]
|
||||
for dependency in plugins[plugin]
|
||||
@ -247,6 +251,7 @@ entry_points = {
|
||||
"postgres = datahub.ingestion.source.sql.postgres:PostgresSource",
|
||||
"redash = datahub.ingestion.source.redash:RedashSource",
|
||||
"redshift = datahub.ingestion.source.sql.redshift:RedshiftSource",
|
||||
"redshift-usage = datahub.ingestion.source.usage.redshift_usage:RedshiftUsageSource",
|
||||
"snowflake = datahub.ingestion.source.sql.snowflake:SnowflakeSource",
|
||||
"snowflake-usage = datahub.ingestion.source.usage.snowflake_usage:SnowflakeUsageSource",
|
||||
"superset = datahub.ingestion.source.superset:SupersetSource",
|
||||
|
||||
@ -0,0 +1,225 @@
|
||||
import collections
|
||||
import dataclasses
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Dict, Iterable, List
|
||||
|
||||
from dateutil import parser
|
||||
from pydantic import Field
|
||||
from pydantic.main import BaseModel
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
import datahub.emitter.mce_builder as builder
|
||||
from datahub.ingestion.api.source import Source, SourceReport
|
||||
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||
from datahub.ingestion.source.sql.redshift import RedshiftConfig
|
||||
from datahub.ingestion.source.usage.usage_common import (
|
||||
BaseUsageConfig,
|
||||
GenericAggregatedDataset,
|
||||
get_time_bucket,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
redshift_datetime_format = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
# add this join to the sql comment for more metrics on completed queries
|
||||
# LEFT JOIN svl_query_metrics_summary sqms ON ss.query = sqms.query
|
||||
# Reference: https://docs.aws.amazon.com/redshift/latest/dg/r_SVL_QUERY_METRICS_SUMMARY.html
|
||||
|
||||
# this sql query joins stl_scan over table info,
|
||||
# querytext, and user info to get usage stats
|
||||
# using non-LEFT joins here to limit the results to
|
||||
# queries run by the user on user-defined tables.
|
||||
redshift_usage_sql_comment = """
|
||||
SELECT DISTINCT ss.userid,
|
||||
ss.query,
|
||||
sui.usename,
|
||||
ss.tbl,
|
||||
sq.text,
|
||||
sti.database,
|
||||
sti.schema,
|
||||
sti.table,
|
||||
ss.starttime,
|
||||
ss.endtime
|
||||
FROM stl_scan ss
|
||||
JOIN svv_table_info sti ON ss.tbl = sti.table_id
|
||||
LEFT JOIN stl_querytext sq ON ss.query = sq.query
|
||||
JOIN svl_user_info sui ON sq.userid = sui.usesysid
|
||||
WHERE ss.starttime >= '{start_time}'
|
||||
AND ss.endtime < '{end_time}'
|
||||
ORDER BY ss.endtime DESC;
|
||||
""".strip()
|
||||
|
||||
|
||||
RedshiftTableRef = str
|
||||
AggregatedDataset = GenericAggregatedDataset[RedshiftTableRef]
|
||||
|
||||
|
||||
class RedshiftJoinedAccessEvent(BaseModel):
|
||||
userid: int
|
||||
usename: str = None # type:ignore
|
||||
query: int
|
||||
tbl: int
|
||||
text: str = None # type:ignore
|
||||
database: str = None # type:ignore
|
||||
schema_: str = Field(None, alias="schema")
|
||||
table: str = None # type:ignore
|
||||
starttime: datetime
|
||||
endtime: datetime
|
||||
|
||||
|
||||
class RedshiftUsageConfig(RedshiftConfig, BaseUsageConfig):
|
||||
env: str = builder.DEFAULT_ENV
|
||||
email_domain: str
|
||||
options: dict = {}
|
||||
|
||||
def get_sql_alchemy_url(self):
|
||||
return super().get_sql_alchemy_url(uri_opts=self.options)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class RedshiftUsageSource(Source):
|
||||
config: RedshiftUsageConfig
|
||||
report: SourceReport = dataclasses.field(default_factory=SourceReport)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, ctx):
|
||||
config = RedshiftUsageConfig.parse_obj(config_dict)
|
||||
return cls(ctx, config)
|
||||
|
||||
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
|
||||
"""Gets Redshift usage stats as work units"""
|
||||
access_events = self._get_redshift_history()
|
||||
# If the query results is empty, we don't want to proceed
|
||||
if not access_events:
|
||||
return []
|
||||
|
||||
joined_access_event = self._get_joined_access_event(access_events)
|
||||
aggregated_info = self._aggregate_access_events(joined_access_event)
|
||||
|
||||
for time_bucket in aggregated_info.values():
|
||||
for aggregate in time_bucket.values():
|
||||
wu = self._make_usage_stat(aggregate)
|
||||
self.report.report_workunit(wu)
|
||||
yield wu
|
||||
|
||||
def _make_usage_query(self) -> str:
|
||||
return redshift_usage_sql_comment.format(
|
||||
start_time=self.config.start_time.strftime(redshift_datetime_format),
|
||||
end_time=self.config.end_time.strftime(redshift_datetime_format),
|
||||
)
|
||||
|
||||
def _make_sql_engine(self) -> Engine:
|
||||
url = self.config.get_sql_alchemy_url()
|
||||
logger.debug(f"sql_alchemy_url = {url}")
|
||||
engine = create_engine(url, **self.config.options)
|
||||
return engine
|
||||
|
||||
def _get_redshift_history(self):
|
||||
query = self._make_usage_query()
|
||||
engine = self._make_sql_engine()
|
||||
results = engine.execute(query)
|
||||
events = []
|
||||
for row in results:
|
||||
# minor type conversion
|
||||
if hasattr(row, "_asdict"):
|
||||
event_dict = row._asdict()
|
||||
else:
|
||||
event_dict = dict(row)
|
||||
|
||||
# stripping extra spaces caused by above _asdict() conversion
|
||||
for k, v in event_dict.items():
|
||||
if isinstance(v, str):
|
||||
event_dict[k] = v.strip()
|
||||
|
||||
if event_dict.get("starttime", None):
|
||||
event_dict["starttime"] = event_dict.get("starttime").__str__()
|
||||
if event_dict.get("endtime", None):
|
||||
event_dict["endtime"] = event_dict.get("endtime").__str__()
|
||||
|
||||
logger.debug(f"event_dict: {event_dict}")
|
||||
events.append(event_dict)
|
||||
|
||||
if events:
|
||||
return events
|
||||
|
||||
# SQL results can be empty. If results is empty, the SQL connection closes.
|
||||
# Then, we don't want to proceed ingestion.
|
||||
logging.info("SQL Result is empty")
|
||||
return None
|
||||
|
||||
def _convert_str_to_datetime(self, v):
|
||||
if isinstance(v, str):
|
||||
isodate = parser.parse(v) # compatible with Python 3.6+
|
||||
return isodate.strftime(redshift_datetime_format)
|
||||
|
||||
def _get_joined_access_event(self, events):
|
||||
joined_access_events = []
|
||||
for event_dict in events:
|
||||
|
||||
event_dict["starttime"] = self._convert_str_to_datetime(
|
||||
event_dict.get("starttime")
|
||||
)
|
||||
event_dict["endtime"] = self._convert_str_to_datetime(
|
||||
event_dict.get("endtime")
|
||||
)
|
||||
|
||||
if not (
|
||||
event_dict.get("database", None)
|
||||
and event_dict.get("schema", None)
|
||||
and event_dict.get("table", None)
|
||||
):
|
||||
logging.info("An access event parameter(s) is missing. Skipping ....")
|
||||
continue
|
||||
|
||||
if not event_dict.get("usename") or event_dict["usename"] == "":
|
||||
logging.info("The username parameter is missing. Skipping ....")
|
||||
continue
|
||||
|
||||
joined_access_events.append(RedshiftJoinedAccessEvent(**event_dict))
|
||||
return joined_access_events
|
||||
|
||||
def _aggregate_access_events(
|
||||
self, events: List[RedshiftJoinedAccessEvent]
|
||||
) -> Dict[datetime, Dict[RedshiftTableRef, AggregatedDataset]]:
|
||||
datasets: Dict[
|
||||
datetime, Dict[RedshiftTableRef, AggregatedDataset]
|
||||
] = collections.defaultdict(dict)
|
||||
|
||||
for event in events:
|
||||
floored_ts = get_time_bucket(event.starttime, self.config.bucket_duration)
|
||||
|
||||
resource = f"{event.database}.{event.schema_}.{event.table}"
|
||||
|
||||
agg_bucket = datasets[floored_ts].setdefault(
|
||||
resource,
|
||||
AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
|
||||
)
|
||||
|
||||
# add @unknown.com to username
|
||||
# current limitation in user stats UI, we need to provide email to show users
|
||||
username = f"{event.usename if event.usename else 'unknown'}@{self.config.email_domain}"
|
||||
logger.info(f"username: {username}")
|
||||
agg_bucket.add_read_entry(
|
||||
username,
|
||||
event.text,
|
||||
[], # TODO: not currently supported by redshift; find column level changes
|
||||
)
|
||||
return datasets
|
||||
|
||||
def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit:
|
||||
return agg.make_usage_workunit(
|
||||
self.config.bucket_duration,
|
||||
lambda resource: builder.make_dataset_urn(
|
||||
"redshift", resource.lower(), self.config.env
|
||||
),
|
||||
self.config.top_n_queries,
|
||||
)
|
||||
|
||||
def get_report(self) -> SourceReport:
|
||||
return self.report
|
||||
|
||||
def close(self) -> None:
|
||||
pass
|
||||
@ -0,0 +1,15 @@
|
||||
[
|
||||
{
|
||||
"auditHeader": null,
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
|
||||
"entityKeyAspect": null,
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "datasetUsageStatistics",
|
||||
"aspect": {
|
||||
"value": "{\"timestampMillis\": 1631577600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 1, \"topSqlQueries\": [\"select userid from users\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 1, \"userEmail\": \"test-name@acryl.io\"}], \"fieldCounts\": []}",
|
||||
"contentType": "application/json"
|
||||
},
|
||||
"systemMetadata": null
|
||||
}
|
||||
]
|
||||
@ -0,0 +1,89 @@
|
||||
import json
|
||||
import pathlib
|
||||
from unittest.mock import patch
|
||||
|
||||
from freezegun import freeze_time
|
||||
|
||||
from datahub.ingestion.run.pipeline import Pipeline
|
||||
from datahub.ingestion.source.usage.redshift_usage import RedshiftUsageConfig
|
||||
from tests.test_helpers import mce_helpers
|
||||
|
||||
FROZEN_TIME = "2021-08-24 09:00:00"
|
||||
|
||||
|
||||
def test_redshift_usage_config():
|
||||
config = RedshiftUsageConfig.parse_obj(
|
||||
dict(
|
||||
host_port="xxxxx",
|
||||
database="xxxxx",
|
||||
username="xxxxx",
|
||||
password="xxxxx",
|
||||
email_domain="xxxxx",
|
||||
include_views=True,
|
||||
include_tables=True,
|
||||
)
|
||||
)
|
||||
|
||||
assert config.host_port == "xxxxx"
|
||||
assert config.database == "xxxxx"
|
||||
assert config.username == "xxxxx"
|
||||
assert config.email_domain == "xxxxx"
|
||||
assert config.include_views
|
||||
assert config.include_tables
|
||||
|
||||
|
||||
def yield_function(li):
|
||||
for i in li:
|
||||
yield i
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
def test_redshift_usage_source(pytestconfig, tmp_path):
|
||||
|
||||
test_resources_dir = pathlib.Path(
|
||||
pytestconfig.rootpath / "tests/integration/redshift-usage"
|
||||
)
|
||||
|
||||
with patch(
|
||||
"datahub.ingestion.source.usage.redshift_usage.RedshiftUsageSource._get_redshift_history"
|
||||
) as mock_event_history:
|
||||
access_events = load_access_events(test_resources_dir)
|
||||
mock_event_history.return_value = access_events
|
||||
|
||||
# Run ingestion
|
||||
pipeline = Pipeline.create(
|
||||
{
|
||||
"run_id": "test-redshift-usage",
|
||||
"source": {
|
||||
"type": "redshift-usage",
|
||||
"config": {
|
||||
"host_port": "xxxxx",
|
||||
"database": "xxxxx",
|
||||
"username": "xxxxx",
|
||||
"password": "xxxxx",
|
||||
"email_domain": "acryl.io",
|
||||
"include_views": True,
|
||||
"include_tables": True,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
"type": "file",
|
||||
"config": {"filename": f"{tmp_path}/redshift_usages.json"},
|
||||
},
|
||||
},
|
||||
)
|
||||
pipeline.run()
|
||||
pipeline.raise_from_status()
|
||||
|
||||
mce_helpers.check_golden_file(
|
||||
pytestconfig=pytestconfig,
|
||||
output_path=tmp_path / "redshift_usages.json",
|
||||
golden_path=test_resources_dir / "redshift_usages_golden.json",
|
||||
)
|
||||
|
||||
|
||||
def load_access_events(test_resources_dir):
|
||||
access_events_history_file = test_resources_dir / "usage_events_history.json"
|
||||
with access_events_history_file.open() as access_events_json:
|
||||
access_events = json.loads(access_events_json.read())
|
||||
return access_events
|
||||
@ -0,0 +1,50 @@
|
||||
[
|
||||
{
|
||||
"userid": 1,
|
||||
"query": 293100,
|
||||
"usename": "test-name",
|
||||
"tbl": 101587,
|
||||
"text": "select userid from users",
|
||||
"database": "dev",
|
||||
"schema": "public",
|
||||
"table": "users",
|
||||
"starttime": "2021-09-14 00:00:00",
|
||||
"endtime": "2021-09-15 00:00:00"
|
||||
},
|
||||
{
|
||||
"userid": 2,
|
||||
"query": 293101,
|
||||
"usename": null,
|
||||
"tbl": 101588,
|
||||
"text": "select catid from category",
|
||||
"database": "dev",
|
||||
"schema": "public",
|
||||
"table": "category",
|
||||
"starttime": "2021-09-14 00:00:00",
|
||||
"endtime": "2021-09-15 00:00:00"
|
||||
},
|
||||
{
|
||||
"userid": 3,
|
||||
"query": 293102,
|
||||
"usename": "chinmay",
|
||||
"tbl": 101587,
|
||||
"text": "select catid from category",
|
||||
"database": null,
|
||||
"schema": null,
|
||||
"table": "category",
|
||||
"starttime": "2021-09-14 00:00:00",
|
||||
"endtime": "2021-09-15 00:00:00"
|
||||
},
|
||||
{
|
||||
"userid": 4,
|
||||
"query": 293102,
|
||||
"usename": "shirshanka",
|
||||
"tbl": 101588,
|
||||
"text": "select catid from category",
|
||||
"database": "db1",
|
||||
"schema": "schema1",
|
||||
"table": null,
|
||||
"starttime": "2021-09-14 00:00:00",
|
||||
"endtime": "2021-09-15 00:00:00"
|
||||
}
|
||||
]
|
||||
Loading…
x
Reference in New Issue
Block a user