feat(ingest): Starburst Trino usage (#3558)

This commit is contained in:
Tamas Nemeth 2021-11-18 18:56:24 +01:00 committed by GitHub
parent 658fa81406
commit a36fefaa33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 475 additions and 4 deletions

View File

@ -65,6 +65,7 @@ Sources:
| [sqlalchemy](./source_docs/sqlalchemy.md) | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source |
| [superset](./source_docs/superset.md) | `pip install 'acryl-datahub[superset]'` | Superset source |
| [trino](./source_docs/trino.md) | `pip install 'acryl-datahub[trino]` | Trino source |
| [starburst-trino-usage](./source_docs/trino.md) | `pip install 'acryl-datahub[starburst-trino-usage]'` | Starburst Trino usage statistics source |
Sinks

View File

@ -135,6 +135,14 @@ plugins: Dict[str, Set[str]] = {
# PR is from same author as that of sqlalchemy-trino library below.
"sqlalchemy-trino"
},
"starburst-trino-usage": sql_common
| {
# SQLAlchemy support is coming up in trino python client
# subject to PR merging - https://github.com/trinodb/trino-python-client/pull/81.
# PR is from same author as that of sqlalchemy-trino library below.
"sqlalchemy-trino"
},
}
all_exclude_plugins: Set[str] = {
@ -209,7 +217,7 @@ if is_py37_or_newer:
# The trino plugin only works on Python 3.7 or newer.
# The trino plugin can be supported on Python 3.6 with minimal changes to opensource sqlalchemy-trino sourcecode.
base_dev_requirements = base_dev_requirements.union(
{dependency for plugin in ["lookml", "trino"] for dependency in plugins[plugin]}
{dependency for plugin in ["lookml", "trino", "starburst-trino-usage"] for dependency in plugins[plugin]}
)
dev_requirements = {
@ -281,6 +289,8 @@ entry_points = {
"superset = datahub.ingestion.source.superset:SupersetSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"trino = datahub.ingestion.source.sql.trino:TrinoSource",
"starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",

View File

@ -69,6 +69,61 @@ As a SQL-based service, the Trino integration is also supported by our SQL profi
Coming soon!
## Trino Usage Stats
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
### Starburst Trino Usage Stats
If you are using Starburst Trino you can collect usage stats the following way.
#### Prerequsities
1. You need to setup Event Logger which saves audit logs into a Postgres db and setup this db as a catalog in Trino
Here you can find more info about how to setup:
https://docs.starburst.io/354-e/security/event-logger.html#security-event-logger--page-root
https://docs.starburst.io/354-e/security/event-logger.html#analyzing-the-event-log
2. Install starbust-trino-usage plugin
Run pip install 'acryl-datahub[starburst-trino-usage]'.
#### Usage stats ingestion job
Here is a sample recipe to ingest usage data:
```
source:
type: starburst-trino-usage
config:
# Coordinates
host_port: yourtrinohost:port
# The name of the catalog from getting the usage
database: hive
# Credentials
username: trino_username
password: trino_password
email_domain: test.com
audit_catalog: audit
audit_schema: audit_schema
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
```
### Config details
Note that a `.` is used to denote nested fields in the YAML recipe.
By default, we extract usage stats for the last day, with the recommendation that this source is executed every day.
| Field | Required | Default | Description |
| ---------------------- | -------- | -------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `database` | yes | | The name of the catalog from getting the usage |
| `audit_catalog` | yes | | The catalog name where the audit table can be found |
| `audit_schema` | yes | | The schema name where the audit table can be found |
| `email_domain` | yes | | The email domain which will be appended to the users |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
## Questions
If you've got any questions on configuring this source, feel free to ping us on [our Slack](https://slack.datahubproject.io/)!

View File

@ -418,9 +418,17 @@ class SQLAlchemySource(Source):
self.report.report_dropped(dataset_name)
continue
columns = inspector.get_columns(table, schema)
if len(columns) == 0:
self.report.report_warning(dataset_name, "missing column information")
try:
columns = inspector.get_columns(table, schema)
if len(columns) == 0:
self.report.report_warning(
dataset_name, "missing column information"
)
except Exception as e:
self.report.report_warning(
dataset_name,
f"unable to get column information due to an error -> {e}",
)
try:
# SQLALchemy stubs are incomplete and missing this method.

View File

@ -0,0 +1,253 @@
import collections
import dataclasses
import json
import logging
import sys
from datetime import datetime
from email.utils import parseaddr
from typing import Dict, Iterable, List
from dateutil import parser
from pydantic import Field
from pydantic.main import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
if sys.version_info >= (3, 7): # noqa: C901
import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.trino import TrinoConfig
from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
)
logger = logging.getLogger(__name__)
trino_datetime_format = "%Y-%m-%d %H:%M:%S.%f %Z"
# Qeurying Starburst completed queries table
# https://docs.starburst.io/latest/security/event-logger.html#completed-queries
trino_usage_sql_comment = """
SELECT DISTINCT usr,
query,
"catalog",
"schema",
query_type,
accessed_metadata,
create_time,
end_time
FROM {audit_catalog}.{audit_schema}.completed_queries
WHERE 1 = 1
AND query_type = 'SELECT'
AND create_time >= timestamp '{start_time}'
AND end_time < timestamp '{end_time}'
AND query_state = 'FINISHED'
ORDER BY end_time desc
""".strip()
TrinoTableRef = str
AggregatedDataset = GenericAggregatedDataset[TrinoTableRef]
class TrinoConnectorInfo(BaseModel):
partitionIds: List[str]
truncated: bool
class TrinoAccessedMetadata(BaseModel):
catalog_name: str = Field(None, alias="catalogName")
schema_name: str = Field(None, alias="schema") # type: ignore
table: str = None # type: ignore
columns: List[str]
connector_info: TrinoConnectorInfo = Field(None, alias="connectorInfo")
class TrinoJoinedAccessEvent(BaseModel):
usr: str = None # type:ignore
query: str = None # type: ignore
catalog: str = None # type: ignore
schema_name: str = Field(None, alias="schema")
query_type: str = None # type:ignore
table: str = None # type:ignore
accessed_metadata: List[TrinoAccessedMetadata]
starttime: datetime = Field(None, alias="create_time")
endtime: datetime = Field(None, alias="end_time")
class TrinoUsageConfig(TrinoConfig, BaseUsageConfig):
env: str = builder.DEFAULT_ENV
email_domain: str
audit_catalog: str
audit_schema: str
options: dict = {}
def get_sql_alchemy_url(self):
return super().get_sql_alchemy_url()
@dataclasses.dataclass
class TrinoUsageSource(Source):
config: TrinoUsageConfig
report: SourceReport = dataclasses.field(default_factory=SourceReport)
@classmethod
def create(cls, config_dict, ctx):
config = TrinoUsageConfig.parse_obj(config_dict)
return cls(ctx, config)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
access_events = self._get_trino_history()
# If the query results is empty, we don't want to proceed
if not access_events:
return []
joined_access_event = self._get_joined_access_event(access_events)
aggregated_info = self._aggregate_access_events(joined_access_event)
for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
wu = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
yield wu
def _make_usage_query(self) -> str:
return trino_usage_sql_comment.format(
audit_catalog=self.config.audit_catalog,
audit_schema=self.config.audit_schema,
start_time=self.config.start_time.strftime(trino_datetime_format),
end_time=self.config.end_time.strftime(trino_datetime_format),
)
def _make_sql_engine(self) -> Engine:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url = {url}")
engine = create_engine(url, **self.config.options)
return engine
def _get_trino_history(self):
query = self._make_usage_query()
engine = self._make_sql_engine()
results = engine.execute(query)
events = []
for row in results:
# minor type conversion
if hasattr(row, "_asdict"):
event_dict = row._asdict()
else:
event_dict = dict(row)
# stripping extra spaces caused by above _asdict() conversion
for k, v in event_dict.items():
if isinstance(v, str):
event_dict[k] = v.strip()
if event_dict.get("starttime", None):
event_dict["starttime"] = event_dict.get("starttime").__str__()
if event_dict.get("endtime", None):
event_dict["endtime"] = event_dict.get("endtime").__str__()
logger.debug(f"event_dict: {event_dict}")
events.append(event_dict)
if events:
return events
# SQL results can be empty. If results is empty, the SQL connection closes.
# Then, we don't want to proceed ingestion.
logging.info("SQL Result is empty")
return None
def _convert_str_to_datetime(self, v):
if isinstance(v, str):
isodate = parser.parse(v) # compatible with Python 3.6+
return isodate
def _get_joined_access_event(self, events):
joined_access_events = []
for event_dict in events:
event_dict["create_time"] = self._convert_str_to_datetime(
event_dict.get("create_time")
)
event_dict["end_time"] = self._convert_str_to_datetime(
event_dict.get("end_time")
)
if not event_dict["accessed_metadata"]:
logging.info("Field accessed_metadata is empty. Skipping ....")
continue
event_dict["accessed_metadata"] = json.loads(
event_dict["accessed_metadata"]
)
if not event_dict.get("usr"):
logging.info("The username parameter is missing. Skipping ....")
continue
joined_access_events.append(TrinoJoinedAccessEvent(**event_dict))
return joined_access_events
def _aggregate_access_events(
self, events: List[TrinoJoinedAccessEvent]
) -> Dict[datetime, Dict[TrinoTableRef, AggregatedDataset]]:
datasets: Dict[
datetime, Dict[TrinoTableRef, AggregatedDataset]
] = collections.defaultdict(dict)
for event in events:
floored_ts = get_time_bucket(
event.starttime, self.config.bucket_duration
)
for metadata in event.accessed_metadata:
# Skipping queries starting with $system@
if metadata.catalog_name.startswith("$system@"):
logging.debug(
f"Skipping system query for {metadata.catalog_name}..."
)
continue
# Filtering down queries to the selected catalog
if metadata.catalog_name != self.config.database:
continue
resource = f"{metadata.catalog_name}.{metadata.schema_name}.{metadata.table}"
agg_bucket = datasets[floored_ts].setdefault(
resource,
AggregatedDataset(
bucket_start_time=floored_ts, resource=resource
),
)
# add @unknown.com to username
# current limitation in user stats UI, we need to provide email to show users
if "@" in parseaddr(event.usr)[1]:
username = event.usr
else:
username = f"{event.usr if event.usr else 'unknown'}@{self.config.email_domain}"
agg_bucket.add_read_entry(
username,
event.query,
metadata.columns,
)
return datasets
def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit:
return agg.make_usage_workunit(
self.config.bucket_duration,
lambda resource: builder.make_dataset_urn(
"trino", resource.lower(), self.config.env
),
self.config.top_n_queries,
)
def get_report(self) -> SourceReport:
return self.report
def close(self) -> None:
pass
else:
raise ModuleNotFoundError("The trino usage plugin requires Python 3.7 or newer.")

View File

@ -0,0 +1,97 @@
import json
import pathlib
import sys
from unittest.mock import patch
import pytest
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
if sys.version_info >= (3, 7): # noqa: C901
from datahub.ingestion.source.usage.starburst_trino_usage import TrinoUsageConfig
from tests.test_helpers import mce_helpers
FROZEN_TIME = "2021-08-24 09:00:00"
@pytest.mark.skipif(sys.version_info < (3, 7), reason="trino requires Python 3.7+")
def test_trino_usage_config():
config = TrinoUsageConfig.parse_obj(
dict(
host_port="xxxxx",
database="testcatalog",
username="xxxxx",
password="xxxxx",
email_domain="xxxxx",
audit_catalog="xxxxx",
audit_schema="xxxxx",
include_views=True,
include_tables=True,
)
)
assert config.host_port == "xxxxx"
assert config.database == "testcatalog"
assert config.username == "xxxxx"
assert config.email_domain == "xxxxx"
assert config.audit_catalog == "xxxxx"
assert config.audit_schema == "xxxxx"
assert config.include_views
assert config.include_tables
def yield_function(li):
for i in li:
yield i
@freeze_time(FROZEN_TIME)
@pytest.mark.skipif(sys.version_info < (3, 7), reason="trino requires Python 3.7+")
def test_trino_usage_source(pytestconfig, tmp_path):
test_resources_dir = pathlib.Path(
pytestconfig.rootpath / "tests/integration/starburst-trino-usage"
)
with patch(
"datahub.ingestion.source.usage.starburst_trino_usage.TrinoUsageSource._get_trino_history"
) as mock_event_history:
access_events = load_access_events(test_resources_dir)
mock_event_history.return_value = access_events
# Run ingestion
pipeline = Pipeline.create(
{
"run_id": "test-trino-usage",
"source": {
"type": "starburst-trino-usage",
"config": {
"host_port": "xxxxx",
"database": "testcatalog",
"username": "xxxxx",
"password": "xxxxx",
"audit_catalog": "test",
"audit_schema": "test",
"email_domain": "acryl.io",
"include_views": True,
"include_tables": True,
},
},
"sink": {
"type": "file",
"config": {"filename": f"{tmp_path}/trino_usages.json"},
},
},
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=tmp_path / "trino_usages.json",
golden_path=test_resources_dir / "trino_usages_golden.json",
)
def load_access_events(test_resources_dir):
access_events_history_file = test_resources_dir / "usage_events_history.json"
with access_events_history_file.open() as access_events_json:
access_events = json.loads(access_events_json.read())
return access_events

View File

@ -0,0 +1,15 @@
[
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:trino,testcatalog.testschema.testtable,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetUsageStatistics",
"aspect": {
"value": "{\"timestampMillis\": 1634169600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 2, \"topSqlQueries\": [\"select * from testcatalog.testschema.testtable limit 100\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 2, \"userEmail\": \"test-name@acryl.io\"}], \"fieldCounts\": [{\"fieldPath\": \"column1\", \"count\": 2}, {\"fieldPath\": \"column2\", \"count\": 2}]}",
"contentType": "application/json"
},
"systemMetadata": null
}
]

View File

@ -0,0 +1,32 @@
[
{
"usr" : "test-name",
"query" : "select * from testcatalog.testschema.testtable limit 100",
"catalog" : null,
"schema" : null,
"query_type" : "SELECT",
"accessed_metadata" : "[{\"catalogName\":\"testcatalog\",\"schema\":\"testschema\",\"table\":\"testtable\",\"columns\":[\"column1\",\"column2\"],\"physicalInputBytes\":1673886,\"physicalInputRows\":4754}]",
"create_time" : "2021-10-14 09:40:53.108000 UTC",
"end_time" : "2021-10-14 09:40:55.214000 UTC"
},
{
"usr" : "test-name@acryl.io",
"query" : "select * from testcatalog.testschema.testtable limit 100",
"catalog" : null,
"schema" : null,
"query_type" : "SELECT",
"accessed_metadata" : "[{\"catalogName\":\"testcatalog\",\"schema\":\"testschema\",\"table\":\"testtable\",\"columns\":[\"column1\",\"column2\"],\"physicalInputBytes\":1673886,\"physicalInputRows\":4754}]",
"create_time" : "2021-10-14 09:40:53.108000 UTC",
"end_time" : "2021-10-14 09:40:55.214000 UTC"
},
{
"usr" : "test-name",
"query" : "select * from testcatalog.testschema.testtable limit 100",
"catalog" : null,
"schema" : null,
"query_type" : "SELECT",
"accessed_metadata": null,
"create_time" : "2021-10-14 09:40:53.108000 UTC",
"end_time" : "2021-10-14 09:40:55.214000 UTC"
}
]