feat(ingest): add lineage support to snowflake connector (#3331)

This commit is contained in:
Ravindra Lanka 2021-10-05 22:58:07 -07:00 committed by GitHub
parent fe589a58b3
commit b4c0e20c68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 277 additions and 91 deletions

View File

@ -13,6 +13,7 @@ This plugin extracts the following:
- Metadata for databases, schemas, views and tables - Metadata for databases, schemas, views and tables
- Column types associated with each table - Column types associated with each table
- Table, row, and column statistics via optional [SQL profiling](./sql_profiles.md) - Table, row, and column statistics via optional [SQL profiling](./sql_profiles.md)
- Table lineage.
:::tip :::tip
@ -37,7 +38,7 @@ source:
# Credentials # Credentials
username: user username: user
password: pass password: pass
role: "sysadmin" role: "accountadmin"
sink: sink:
# sink configs # sink configs
@ -50,7 +51,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
As a SQL-based service, the Athena integration is also supported by our SQL profiler. See [here](./sql_profiles.md) for more details on configuration. As a SQL-based service, the Athena integration is also supported by our SQL profiler. See [here](./sql_profiles.md) for more details on configuration.
| Field | Required | Default | Description | | Field | Required | Default | Description |
| ----------------------------- | -------- | -------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | ----------------------------- | -------- | ----------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `username` | | | Snowflake username. | | `username` | | | Snowflake username. |
| `password` | | | Snowflake password. | | `password` | | | Snowflake password. |
| `host_port` | ✅ | | Snowflake host URL. | | `host_port` | ✅ | | Snowflake host URL. |
@ -72,6 +73,11 @@ As a SQL-based service, the Athena integration is also supported by our SQL prof
| `view_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | | `view_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `include_tables` | | `True` | Whether tables should be ingested. | | `include_tables` | | `True` | Whether tables should be ingested. |
| `include_views` | | `True` | Whether views should be ingested. | | `include_views` | | `True` | Whether views should be ingested. |
| `include_table_lineage` | | `True` | Whether table lineage should be ingested. |
| `bucket_duration` | | `"DAY"` | Duration to bucket lineage data extraction by. Can be `"DAY"` or `"HOUR"`. |
| `start_time` | | Start of last full day in UTC (or hour, depending on `bucket_duration`) | Earliest time of lineage data to consider. |
| `end_time` | | End of last full day in UTC (or hour, depending on `bucket_duration`) | Latest time of lineage data to consider. |
## Compatibility ## Compatibility

View File

@ -0,0 +1,64 @@
import enum
from datetime import datetime, timedelta, timezone
from typing import Any, Dict
import pydantic
from datahub.configuration.common import ConfigModel
from datahub.metadata.schema_classes import CalendarIntervalClass
@enum.unique
class BucketDuration(str, enum.Enum):
DAY = CalendarIntervalClass.DAY
HOUR = CalendarIntervalClass.HOUR
def get_time_bucket(original: datetime, bucketing: BucketDuration) -> datetime:
"""Floors the timestamp to the closest day or hour."""
if bucketing == BucketDuration.HOUR:
return original.replace(minute=0, second=0, microsecond=0)
else: # day
return original.replace(hour=0, minute=0, second=0, microsecond=0)
def get_bucket_duration_delta(bucketing: BucketDuration) -> timedelta:
if bucketing == BucketDuration.HOUR:
return timedelta(hours=1)
else: # day
return timedelta(days=1)
class BaseTimeWindowConfig(ConfigModel):
bucket_duration: BucketDuration = BucketDuration.DAY
# `start_time` and `end_time` will be populated by the pre-validators.
# However, we must specify a "default" value here or pydantic will complain
# if those fields are not set by the user.
end_time: datetime = None # type: ignore
start_time: datetime = None # type: ignore
@pydantic.validator("end_time", pre=True, always=True)
def default_end_time(
cls, v: Any, *, values: Dict[str, Any], **kwargs: Any
) -> datetime:
return v or get_time_bucket(
datetime.now(tz=timezone.utc), values["bucket_duration"]
)
@pydantic.validator("start_time", pre=True, always=True)
def default_start_time(
cls, v: Any, *, values: Dict[str, Any], **kwargs: Any
) -> datetime:
return v or (
values["end_time"] - get_bucket_duration_delta(values["bucket_duration"])
)
@pydantic.validator("start_time", "end_time")
def ensure_timestamps_in_utc(cls, v: datetime) -> datetime:
if v.tzinfo != timezone.utc:
raise ValueError(
'timezone is not UTC; try adding a "Z" to the value e.g. "2021-07-20T00:00:00Z"'
)
return v

View File

@ -1,5 +1,7 @@
import json
import logging import logging
from typing import Any, Iterable, Optional from collections import defaultdict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
import pydantic import pydantic
@ -11,15 +13,28 @@ from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import text from sqlalchemy.sql import text
from sqlalchemy.sql.elements import quoted_name from sqlalchemy.sql.elements import quoted_name
from datahub.configuration.common import AllowDenyPattern, ConfigModel import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_common import ( from datahub.ingestion.source.sql.sql_common import (
RecordTypeClass, RecordTypeClass,
SQLAlchemyConfig, SQLAlchemyConfig,
SQLAlchemySource, SQLAlchemySource,
SqlWorkUnit,
TimeTypeClass, TimeTypeClass,
make_sqlalchemy_uri, make_sqlalchemy_uri,
register_custom_type, register_custom_type,
) )
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
UpstreamClass,
UpstreamLineage,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass
register_custom_type(custom_types.TIMESTAMP_TZ, TimeTypeClass) register_custom_type(custom_types.TIMESTAMP_TZ, TimeTypeClass)
register_custom_type(custom_types.TIMESTAMP_LTZ, TimeTypeClass) register_custom_type(custom_types.TIMESTAMP_LTZ, TimeTypeClass)
@ -29,7 +44,7 @@ register_custom_type(custom_types.VARIANT, RecordTypeClass)
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
class BaseSnowflakeConfig(ConfigModel): class BaseSnowflakeConfig(BaseTimeWindowConfig):
# Note: this config model is also used by the snowflake-usage source. # Note: this config model is also used by the snowflake-usage source.
scheme = "snowflake" scheme = "snowflake"
@ -39,6 +54,7 @@ class BaseSnowflakeConfig(ConfigModel):
host_port: str host_port: str
warehouse: Optional[str] warehouse: Optional[str]
role: Optional[str] role: Optional[str]
include_table_lineage: Optional[bool] = True
def get_sql_alchemy_url(self, database=None): def get_sql_alchemy_url(self, database=None):
return make_sqlalchemy_uri( return make_sqlalchemy_uri(
@ -61,11 +77,7 @@ class BaseSnowflakeConfig(ConfigModel):
class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig): class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig):
database_pattern: AllowDenyPattern = AllowDenyPattern( database_pattern: AllowDenyPattern = AllowDenyPattern(
deny=[ deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"]
r"^UTIL_DB$",
r"^SNOWFLAKE$",
r"^SNOWFLAKE_SAMPLE_DATA$",
]
) )
database: Optional[str] # deprecated database: Optional[str] # deprecated
@ -88,6 +100,7 @@ class SnowflakeSource(SQLAlchemySource):
def __init__(self, config, ctx): def __init__(self, config, ctx):
super().__init__(config, ctx, "snowflake") super().__init__(config, ctx, "snowflake")
self._lineage_map: Optional[Dict[str, List[Tuple[str, str, str]]]] = None
@classmethod @classmethod
def create(cls, config_dict, ctx): def create(cls, config_dict, ctx):
@ -113,3 +126,156 @@ class SnowflakeSource(SQLAlchemySource):
def get_identifier(self, *, schema: str, entity: str, **kwargs: Any) -> str: def get_identifier(self, *, schema: str, entity: str, **kwargs: Any) -> str:
regular = super().get_identifier(schema=schema, entity=entity, **kwargs) regular = super().get_identifier(schema=schema, entity=entity, **kwargs)
return f"{self.current_database.lower()}.{regular}" return f"{self.current_database.lower()}.{regular}"
def _populate_lineage(self) -> None:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
query: str = """
WITH table_lineage_history AS (
SELECT
r.value:"objectName" AS upstream_table_name,
r.value:"objectDomain" AS upstream_table_domain,
r.value:"columns" AS upstream_table_columns,
w.value:"objectName" AS downstream_table_name,
w.value:"objectDomain" AS downstream_table_domain,
w.value:"columns" AS downstream_table_columns,
t.query_start_time AS query_start_time
FROM
(SELECT * from snowflake.account_usage.access_history) t,
lateral flatten(input => t.BASE_OBJECTS_ACCESSED) r,
lateral flatten(input => t.OBJECTS_MODIFIED) w
WHERE r.value:"objectId" IS NOT NULL
AND w.value:"objectId" IS NOT NULL
AND w.value:"objectName" NOT LIKE '%.GE_TMP_%'
AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3))
SELECT upstream_table_name, downstream_table_name, upstream_table_columns, downstream_table_columns
FROM table_lineage_history
WHERE upstream_table_domain = 'Table' and downstream_table_domain = 'Table'
QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_name ORDER BY query_start_time DESC) = 1 """.format(
start_time_millis=int(self.config.start_time.timestamp() * 1000),
end_time_millis=int(self.config.end_time.timestamp() * 1000),
)
self._lineage_map = defaultdict(list)
try:
for db_row in engine.execute(query):
# key is the down-stream table name
key: str = db_row[1].lower().replace('"', "")
self._lineage_map[key].append(
# (<upstream_table_name>, <json_list_of_upstream_columns>, <json_list_of_downstream_columns>)
(db_row[0].lower().replace('"', ""), db_row[2], db_row[3])
)
logger.debug(f"Lineage[{key}]:{self._lineage_map[key]}")
except Exception as e:
logger.warning(
f"Extracting lineage from Snowflake failed."
f"Please check your premissions. Continuing...\nError was {e}."
)
def _get_upstream_lineage_info(
self, dataset_urn: str
) -> Optional[Tuple[UpstreamLineage, Dict[str, str]]]:
dataset_key = builder.dataset_urn_to_key(dataset_urn)
if dataset_key is None:
logger.warning(f"Invalid dataset urn {dataset_urn}. Could not get key!")
return None
if self._lineage_map is None:
self._populate_lineage()
assert self._lineage_map is not None
dataset_name = dataset_key.name
lineage = self._lineage_map.get(f"{dataset_name}", None)
if lineage is None:
logger.debug(f"No lineage found for {dataset_name}")
return None
upstream_tables: List[UpstreamClass] = []
column_lineage: Dict[str, str] = {}
for lineage_entry in lineage:
# Update the table-lineage
upstream_table_name = lineage_entry[0]
upstream_table = UpstreamClass(
dataset=builder.make_dataset_urn(
self.platform, upstream_table_name, self.config.env
),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables.append(upstream_table)
# Update column-lineage for each down-stream column.
upstream_columns = [
d["columnName"].lower() for d in json.loads(lineage_entry[1])
]
downstream_columns = [
d["columnName"].lower() for d in json.loads(lineage_entry[2])
]
upstream_column_str = (
f"{upstream_table_name}({', '.join(sorted(upstream_columns))})"
)
downstream_column_str = (
f"{dataset_name}({', '.join(sorted(downstream_columns))})"
)
column_lineage_key = f"column_lineage[{upstream_table_name}]"
column_lineage_value = (
f"{{{upstream_column_str} -> {downstream_column_str}}}"
)
column_lineage[column_lineage_key] = column_lineage_value
logger.debug(f"{column_lineage_key}:{column_lineage_value}")
return UpstreamLineage(upstreams=upstream_tables), column_lineage
# Override the base class method.
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
for wu in super().get_workunits():
if (
self.config.include_table_lineage
and isinstance(wu, SqlWorkUnit)
and isinstance(wu.metadata, MetadataChangeEvent)
and isinstance(wu.metadata.proposedSnapshot, DatasetSnapshot)
):
dataset_snapshot: DatasetSnapshot = wu.metadata.proposedSnapshot
assert dataset_snapshot
# Join the workunit stream from super with the lineage info using the urn.
lineage_info = self._get_upstream_lineage_info(dataset_snapshot.urn)
if lineage_info is not None:
# Emit the lineage work unit
upstream_lineage, upstream_column_props = lineage_info
lineage_mcpw = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_snapshot.urn,
aspectName="upstreamLineage",
aspect=upstream_lineage,
)
lineage_wu = MetadataWorkUnit(
id=f"{self.platform}-{lineage_mcpw.entityUrn}-{lineage_mcpw.aspectName}",
mcp=lineage_mcpw,
)
self.report.report_workunit(lineage_wu)
yield lineage_wu
# Update the super's workunit to include the column-lineage in the custom properties. We need to follow
# the RCU semantics for both the aspects & customProperties in order to preserve the changes made by super.
aspects = dataset_snapshot.aspects
if aspects is None:
aspects = []
dataset_properties_aspect: Optional[DatasetPropertiesClass] = None
for aspect in aspects:
if isinstance(aspect, DatasetPropertiesClass):
dataset_properties_aspect = aspect
if dataset_properties_aspect is None:
dataset_properties_aspect = DatasetPropertiesClass()
aspects.append(dataset_properties_aspect)
custom_properties = (
{
**dataset_properties_aspect.customProperties,
**upstream_column_props,
}
if dataset_properties_aspect.customProperties
else upstream_column_props
)
dataset_properties_aspect.customProperties = custom_properties
dataset_snapshot.aspects = aspects
# Emit the work unit from super.
yield wu

View File

@ -12,13 +12,13 @@ import pydantic
from google.cloud.logging_v2.client import Client as GCPLoggingClient from google.cloud.logging_v2.client import Client as GCPLoggingClient
import datahub.emitter.mce_builder as builder import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.usage.usage_common import ( from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig, BaseUsageConfig,
GenericAggregatedDataset, GenericAggregatedDataset,
get_time_bucket,
) )
from datahub.utilities.delayed_iter import delayed_iter from datahub.utilities.delayed_iter import delayed_iter

View File

@ -11,13 +11,13 @@ from sqlalchemy import create_engine
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
import datahub.emitter.mce_builder as builder import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.redshift import RedshiftConfig from datahub.ingestion.source.sql.redshift import RedshiftConfig
from datahub.ingestion.source.usage.usage_common import ( from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig, BaseUsageConfig,
GenericAggregatedDataset, GenericAggregatedDataset,
get_time_bucket,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -12,13 +12,13 @@ from sqlalchemy import create_engine
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
import datahub.emitter.mce_builder as builder import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.snowflake import BaseSnowflakeConfig from datahub.ingestion.source.sql.snowflake import BaseSnowflakeConfig
from datahub.ingestion.source.usage.usage_common import ( from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig, BaseUsageConfig,
GenericAggregatedDataset, GenericAggregatedDataset,
get_time_bucket,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -1,17 +1,18 @@
import collections import collections
import dataclasses import dataclasses
import enum from datetime import datetime
from datetime import datetime, timedelta, timezone
from typing import Callable, Counter, Generic, List, Optional, TypeVar from typing import Callable, Counter, Generic, List, Optional, TypeVar
import pydantic import pydantic
import datahub.emitter.mce_builder as builder import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel from datahub.configuration.time_window_config import (
BaseTimeWindowConfig,
BucketDuration,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
CalendarIntervalClass,
ChangeTypeClass, ChangeTypeClass,
DatasetFieldUsageCountsClass, DatasetFieldUsageCountsClass,
DatasetUsageStatisticsClass, DatasetUsageStatisticsClass,
@ -19,29 +20,6 @@ from datahub.metadata.schema_classes import (
TimeWindowSizeClass, TimeWindowSizeClass,
) )
@enum.unique
class BucketDuration(str, enum.Enum):
DAY = CalendarIntervalClass.DAY
HOUR = CalendarIntervalClass.HOUR
def get_time_bucket(original: datetime, bucketing: BucketDuration) -> datetime:
"""Floors the timestamp to the closest day or hour."""
if bucketing == BucketDuration.HOUR:
return original.replace(minute=0, second=0, microsecond=0)
else: # day
return original.replace(hour=0, minute=0, second=0, microsecond=0)
def get_bucket_duration_delta(bucketing: BucketDuration) -> timedelta:
if bucketing == BucketDuration.HOUR:
return timedelta(hours=1)
else: # day
return timedelta(days=1)
ResourceType = TypeVar("ResourceType") ResourceType = TypeVar("ResourceType")
@ -111,33 +89,5 @@ class GenericAggregatedDataset(Generic[ResourceType]):
) )
class BaseUsageConfig(ConfigModel): class BaseUsageConfig(BaseTimeWindowConfig):
bucket_duration: BucketDuration = BucketDuration.DAY
# `start_time` and `end_time` will be populated by the pre-validators.
# However, we must specific a "default" value here or pydantic will complain
# if those fields are not set by the user.
end_time: datetime = None # type: ignore
start_time: datetime = None # type: ignore
top_n_queries: Optional[pydantic.PositiveInt] = 10 top_n_queries: Optional[pydantic.PositiveInt] = 10
@pydantic.validator("end_time", pre=True, always=True)
def default_end_time(cls, v, *, values, **kwargs):
return v or get_time_bucket(
datetime.now(tz=timezone.utc), values["bucket_duration"]
)
@pydantic.validator("start_time", pre=True, always=True)
def default_start_time(cls, v, *, values, **kwargs):
return v or (
values["end_time"] - get_bucket_duration_delta(values["bucket_duration"])
)
@pydantic.validator("start_time", "end_time")
def ensure_timestamps_in_utc(cls, v: datetime) -> datetime:
if v.tzinfo != timezone.utc:
raise ValueError(
'timezone is not UTC; try adding a "Z" to the value e.g. "2021-07-20T00:00:00Z"'
)
return v