diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index f5c757a332..67c4880096 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -65,6 +65,7 @@ Sources: | [sqlalchemy](./source_docs/sqlalchemy.md) | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source | | [superset](./source_docs/superset.md) | `pip install 'acryl-datahub[superset]'` | Superset source | | [trino](./source_docs/trino.md) | `pip install 'acryl-datahub[trino]` | Trino source | +| [starburst-trino-usage](./source_docs/trino.md) | `pip install 'acryl-datahub[starburst-trino-usage]'` | Starburst Trino usage statistics source | Sinks diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index a12379a7d1..724348aebf 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -135,6 +135,14 @@ plugins: Dict[str, Set[str]] = { # PR is from same author as that of sqlalchemy-trino library below. "sqlalchemy-trino" }, + "starburst-trino-usage": sql_common + | { + # SQLAlchemy support is coming up in trino python client + # subject to PR merging - https://github.com/trinodb/trino-python-client/pull/81. + # PR is from same author as that of sqlalchemy-trino library below. + "sqlalchemy-trino" + }, + } all_exclude_plugins: Set[str] = { @@ -209,7 +217,7 @@ if is_py37_or_newer: # The trino plugin only works on Python 3.7 or newer. # The trino plugin can be supported on Python 3.6 with minimal changes to opensource sqlalchemy-trino sourcecode. base_dev_requirements = base_dev_requirements.union( - {dependency for plugin in ["lookml", "trino"] for dependency in plugins[plugin]} + {dependency for plugin in ["lookml", "trino", "starburst-trino-usage"] for dependency in plugins[plugin]} ) dev_requirements = { @@ -281,6 +289,8 @@ entry_points = { "superset = datahub.ingestion.source.superset:SupersetSource", "openapi = datahub.ingestion.source.openapi:OpenApiSource", "trino = datahub.ingestion.source.sql.trino:TrinoSource", + "starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource", + ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/source_docs/trino.md b/metadata-ingestion/source_docs/trino.md index 72739fdec9..e160c11479 100644 --- a/metadata-ingestion/source_docs/trino.md +++ b/metadata-ingestion/source_docs/trino.md @@ -69,6 +69,61 @@ As a SQL-based service, the Trino integration is also supported by our SQL profi Coming soon! +## Trino Usage Stats + +For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). + +### Starburst Trino Usage Stats +If you are using Starburst Trino you can collect usage stats the following way. +#### Prerequsities +1. You need to setup Event Logger which saves audit logs into a Postgres db and setup this db as a catalog in Trino +Here you can find more info about how to setup: +https://docs.starburst.io/354-e/security/event-logger.html#security-event-logger--page-root +https://docs.starburst.io/354-e/security/event-logger.html#analyzing-the-event-log + +2. Install starbust-trino-usage plugin +Run pip install 'acryl-datahub[starburst-trino-usage]'. + +#### Usage stats ingestion job +Here is a sample recipe to ingest usage data: +``` +source: + type: starburst-trino-usage + config: + # Coordinates + host_port: yourtrinohost:port + # The name of the catalog from getting the usage + database: hive + # Credentials + username: trino_username + password: trino_password + email_domain: test.com + audit_catalog: audit + audit_schema: audit_schema + +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080" +``` +### Config details + +Note that a `.` is used to denote nested fields in the YAML recipe. + +By default, we extract usage stats for the last day, with the recommendation that this source is executed every day. + +| Field | Required | Default | Description | +| ---------------------- | -------- | -------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `database` | yes | | The name of the catalog from getting the usage | +| `audit_catalog` | yes | | The catalog name where the audit table can be found | +| `audit_schema` | yes | | The schema name where the audit table can be found | +| `email_domain` | yes | | The email domain which will be appended to the users | +| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. | +| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. | +| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. | +| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. | +| `top_n_queries` | | `10` | Number of top queries to save to each table. | + ## Questions If you've got any questions on configuring this source, feel free to ping us on [our Slack](https://slack.datahubproject.io/)! diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index ce80a76f1e..6ecad4392e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -418,9 +418,17 @@ class SQLAlchemySource(Source): self.report.report_dropped(dataset_name) continue - columns = inspector.get_columns(table, schema) - if len(columns) == 0: - self.report.report_warning(dataset_name, "missing column information") + try: + columns = inspector.get_columns(table, schema) + if len(columns) == 0: + self.report.report_warning( + dataset_name, "missing column information" + ) + except Exception as e: + self.report.report_warning( + dataset_name, + f"unable to get column information due to an error -> {e}", + ) try: # SQLALchemy stubs are incomplete and missing this method. diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py new file mode 100644 index 0000000000..099ee3e188 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/starburst_trino_usage.py @@ -0,0 +1,253 @@ +import collections +import dataclasses +import json +import logging +import sys +from datetime import datetime +from email.utils import parseaddr +from typing import Dict, Iterable, List + +from dateutil import parser +from pydantic import Field +from pydantic.main import BaseModel +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine + +if sys.version_info >= (3, 7): # noqa: C901 + import datahub.emitter.mce_builder as builder + from datahub.configuration.time_window_config import get_time_bucket + from datahub.ingestion.api.source import Source, SourceReport + from datahub.ingestion.api.workunit import MetadataWorkUnit + from datahub.ingestion.source.sql.trino import TrinoConfig + from datahub.ingestion.source.usage.usage_common import ( + BaseUsageConfig, + GenericAggregatedDataset, + ) + + logger = logging.getLogger(__name__) + + trino_datetime_format = "%Y-%m-%d %H:%M:%S.%f %Z" + + # Qeurying Starburst completed queries table + # https://docs.starburst.io/latest/security/event-logger.html#completed-queries + trino_usage_sql_comment = """ + SELECT DISTINCT usr, + query, + "catalog", + "schema", + query_type, + accessed_metadata, + create_time, + end_time + FROM {audit_catalog}.{audit_schema}.completed_queries + WHERE 1 = 1 + AND query_type = 'SELECT' + AND create_time >= timestamp '{start_time}' + AND end_time < timestamp '{end_time}' + AND query_state = 'FINISHED' + ORDER BY end_time desc + """.strip() + + TrinoTableRef = str + AggregatedDataset = GenericAggregatedDataset[TrinoTableRef] + + class TrinoConnectorInfo(BaseModel): + partitionIds: List[str] + truncated: bool + + class TrinoAccessedMetadata(BaseModel): + catalog_name: str = Field(None, alias="catalogName") + schema_name: str = Field(None, alias="schema") # type: ignore + table: str = None # type: ignore + columns: List[str] + connector_info: TrinoConnectorInfo = Field(None, alias="connectorInfo") + + class TrinoJoinedAccessEvent(BaseModel): + usr: str = None # type:ignore + query: str = None # type: ignore + catalog: str = None # type: ignore + schema_name: str = Field(None, alias="schema") + query_type: str = None # type:ignore + table: str = None # type:ignore + accessed_metadata: List[TrinoAccessedMetadata] + starttime: datetime = Field(None, alias="create_time") + endtime: datetime = Field(None, alias="end_time") + + class TrinoUsageConfig(TrinoConfig, BaseUsageConfig): + env: str = builder.DEFAULT_ENV + email_domain: str + audit_catalog: str + audit_schema: str + options: dict = {} + + def get_sql_alchemy_url(self): + return super().get_sql_alchemy_url() + + @dataclasses.dataclass + class TrinoUsageSource(Source): + config: TrinoUsageConfig + report: SourceReport = dataclasses.field(default_factory=SourceReport) + + @classmethod + def create(cls, config_dict, ctx): + config = TrinoUsageConfig.parse_obj(config_dict) + return cls(ctx, config) + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + access_events = self._get_trino_history() + # If the query results is empty, we don't want to proceed + if not access_events: + return [] + + joined_access_event = self._get_joined_access_event(access_events) + aggregated_info = self._aggregate_access_events(joined_access_event) + + for time_bucket in aggregated_info.values(): + for aggregate in time_bucket.values(): + wu = self._make_usage_stat(aggregate) + self.report.report_workunit(wu) + yield wu + + def _make_usage_query(self) -> str: + return trino_usage_sql_comment.format( + audit_catalog=self.config.audit_catalog, + audit_schema=self.config.audit_schema, + start_time=self.config.start_time.strftime(trino_datetime_format), + end_time=self.config.end_time.strftime(trino_datetime_format), + ) + + def _make_sql_engine(self) -> Engine: + url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url = {url}") + engine = create_engine(url, **self.config.options) + return engine + + def _get_trino_history(self): + query = self._make_usage_query() + engine = self._make_sql_engine() + results = engine.execute(query) + events = [] + for row in results: + # minor type conversion + if hasattr(row, "_asdict"): + event_dict = row._asdict() + else: + event_dict = dict(row) + + # stripping extra spaces caused by above _asdict() conversion + for k, v in event_dict.items(): + if isinstance(v, str): + event_dict[k] = v.strip() + + if event_dict.get("starttime", None): + event_dict["starttime"] = event_dict.get("starttime").__str__() + if event_dict.get("endtime", None): + event_dict["endtime"] = event_dict.get("endtime").__str__() + + logger.debug(f"event_dict: {event_dict}") + events.append(event_dict) + + if events: + return events + + # SQL results can be empty. If results is empty, the SQL connection closes. + # Then, we don't want to proceed ingestion. + logging.info("SQL Result is empty") + return None + + def _convert_str_to_datetime(self, v): + if isinstance(v, str): + isodate = parser.parse(v) # compatible with Python 3.6+ + return isodate + + def _get_joined_access_event(self, events): + joined_access_events = [] + for event_dict in events: + event_dict["create_time"] = self._convert_str_to_datetime( + event_dict.get("create_time") + ) + + event_dict["end_time"] = self._convert_str_to_datetime( + event_dict.get("end_time") + ) + + if not event_dict["accessed_metadata"]: + logging.info("Field accessed_metadata is empty. Skipping ....") + continue + + event_dict["accessed_metadata"] = json.loads( + event_dict["accessed_metadata"] + ) + + if not event_dict.get("usr"): + logging.info("The username parameter is missing. Skipping ....") + continue + + joined_access_events.append(TrinoJoinedAccessEvent(**event_dict)) + return joined_access_events + + def _aggregate_access_events( + self, events: List[TrinoJoinedAccessEvent] + ) -> Dict[datetime, Dict[TrinoTableRef, AggregatedDataset]]: + datasets: Dict[ + datetime, Dict[TrinoTableRef, AggregatedDataset] + ] = collections.defaultdict(dict) + + for event in events: + floored_ts = get_time_bucket( + event.starttime, self.config.bucket_duration + ) + for metadata in event.accessed_metadata: + + # Skipping queries starting with $system@ + if metadata.catalog_name.startswith("$system@"): + logging.debug( + f"Skipping system query for {metadata.catalog_name}..." + ) + continue + + # Filtering down queries to the selected catalog + if metadata.catalog_name != self.config.database: + continue + + resource = f"{metadata.catalog_name}.{metadata.schema_name}.{metadata.table}" + + agg_bucket = datasets[floored_ts].setdefault( + resource, + AggregatedDataset( + bucket_start_time=floored_ts, resource=resource + ), + ) + + # add @unknown.com to username + # current limitation in user stats UI, we need to provide email to show users + if "@" in parseaddr(event.usr)[1]: + username = event.usr + else: + username = f"{event.usr if event.usr else 'unknown'}@{self.config.email_domain}" + + agg_bucket.add_read_entry( + username, + event.query, + metadata.columns, + ) + return datasets + + def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit: + return agg.make_usage_workunit( + self.config.bucket_duration, + lambda resource: builder.make_dataset_urn( + "trino", resource.lower(), self.config.env + ), + self.config.top_n_queries, + ) + + def get_report(self) -> SourceReport: + return self.report + + def close(self) -> None: + pass + + +else: + raise ModuleNotFoundError("The trino usage plugin requires Python 3.7 or newer.") diff --git a/metadata-ingestion/tests/integration/starburst-trino-usage/test_starburst_trino_usage.py b/metadata-ingestion/tests/integration/starburst-trino-usage/test_starburst_trino_usage.py new file mode 100644 index 0000000000..5fab543d37 --- /dev/null +++ b/metadata-ingestion/tests/integration/starburst-trino-usage/test_starburst_trino_usage.py @@ -0,0 +1,97 @@ +import json +import pathlib +import sys +from unittest.mock import patch + +import pytest +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline + +if sys.version_info >= (3, 7): # noqa: C901 + from datahub.ingestion.source.usage.starburst_trino_usage import TrinoUsageConfig + from tests.test_helpers import mce_helpers + + FROZEN_TIME = "2021-08-24 09:00:00" + + @pytest.mark.skipif(sys.version_info < (3, 7), reason="trino requires Python 3.7+") + def test_trino_usage_config(): + config = TrinoUsageConfig.parse_obj( + dict( + host_port="xxxxx", + database="testcatalog", + username="xxxxx", + password="xxxxx", + email_domain="xxxxx", + audit_catalog="xxxxx", + audit_schema="xxxxx", + include_views=True, + include_tables=True, + ) + ) + + assert config.host_port == "xxxxx" + assert config.database == "testcatalog" + assert config.username == "xxxxx" + assert config.email_domain == "xxxxx" + assert config.audit_catalog == "xxxxx" + assert config.audit_schema == "xxxxx" + assert config.include_views + assert config.include_tables + + def yield_function(li): + for i in li: + yield i + + @freeze_time(FROZEN_TIME) + @pytest.mark.skipif(sys.version_info < (3, 7), reason="trino requires Python 3.7+") + def test_trino_usage_source(pytestconfig, tmp_path): + + test_resources_dir = pathlib.Path( + pytestconfig.rootpath / "tests/integration/starburst-trino-usage" + ) + + with patch( + "datahub.ingestion.source.usage.starburst_trino_usage.TrinoUsageSource._get_trino_history" + ) as mock_event_history: + access_events = load_access_events(test_resources_dir) + mock_event_history.return_value = access_events + + # Run ingestion + pipeline = Pipeline.create( + { + "run_id": "test-trino-usage", + "source": { + "type": "starburst-trino-usage", + "config": { + "host_port": "xxxxx", + "database": "testcatalog", + "username": "xxxxx", + "password": "xxxxx", + "audit_catalog": "test", + "audit_schema": "test", + "email_domain": "acryl.io", + "include_views": True, + "include_tables": True, + }, + }, + "sink": { + "type": "file", + "config": {"filename": f"{tmp_path}/trino_usages.json"}, + }, + }, + ) + pipeline.run() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig=pytestconfig, + output_path=tmp_path / "trino_usages.json", + golden_path=test_resources_dir / "trino_usages_golden.json", + ) + + def load_access_events(test_resources_dir): + access_events_history_file = test_resources_dir / "usage_events_history.json" + with access_events_history_file.open() as access_events_json: + access_events = json.loads(access_events_json.read()) + return access_events diff --git a/metadata-ingestion/tests/integration/starburst-trino-usage/trino_usages_golden.json b/metadata-ingestion/tests/integration/starburst-trino-usage/trino_usages_golden.json new file mode 100644 index 0000000000..95761200f2 --- /dev/null +++ b/metadata-ingestion/tests/integration/starburst-trino-usage/trino_usages_golden.json @@ -0,0 +1,15 @@ +[ + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:trino,testcatalog.testschema.testtable,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetUsageStatistics", + "aspect": { + "value": "{\"timestampMillis\": 1634169600000, \"eventGranularity\": {\"unit\": \"DAY\", \"multiple\": 1}, \"uniqueUserCount\": 1, \"totalSqlQueries\": 2, \"topSqlQueries\": [\"select * from testcatalog.testschema.testtable limit 100\"], \"userCounts\": [{\"user\": \"urn:li:corpuser:test-name\", \"count\": 2, \"userEmail\": \"test-name@acryl.io\"}], \"fieldCounts\": [{\"fieldPath\": \"column1\", \"count\": 2}, {\"fieldPath\": \"column2\", \"count\": 2}]}", + "contentType": "application/json" + }, + "systemMetadata": null + } +] diff --git a/metadata-ingestion/tests/integration/starburst-trino-usage/usage_events_history.json b/metadata-ingestion/tests/integration/starburst-trino-usage/usage_events_history.json new file mode 100644 index 0000000000..330cff0ebe --- /dev/null +++ b/metadata-ingestion/tests/integration/starburst-trino-usage/usage_events_history.json @@ -0,0 +1,32 @@ +[ + { + "usr" : "test-name", + "query" : "select * from testcatalog.testschema.testtable limit 100", + "catalog" : null, + "schema" : null, + "query_type" : "SELECT", + "accessed_metadata" : "[{\"catalogName\":\"testcatalog\",\"schema\":\"testschema\",\"table\":\"testtable\",\"columns\":[\"column1\",\"column2\"],\"physicalInputBytes\":1673886,\"physicalInputRows\":4754}]", + "create_time" : "2021-10-14 09:40:53.108000 UTC", + "end_time" : "2021-10-14 09:40:55.214000 UTC" + }, + { + "usr" : "test-name@acryl.io", + "query" : "select * from testcatalog.testschema.testtable limit 100", + "catalog" : null, + "schema" : null, + "query_type" : "SELECT", + "accessed_metadata" : "[{\"catalogName\":\"testcatalog\",\"schema\":\"testschema\",\"table\":\"testtable\",\"columns\":[\"column1\",\"column2\"],\"physicalInputBytes\":1673886,\"physicalInputRows\":4754}]", + "create_time" : "2021-10-14 09:40:53.108000 UTC", + "end_time" : "2021-10-14 09:40:55.214000 UTC" + }, + { + "usr" : "test-name", + "query" : "select * from testcatalog.testschema.testtable limit 100", + "catalog" : null, + "schema" : null, + "query_type" : "SELECT", + "accessed_metadata": null, + "create_time" : "2021-10-14 09:40:53.108000 UTC", + "end_time" : "2021-10-14 09:40:55.214000 UTC" + } +]