feat(ingest): snowflake - add separate config for include_column_lineage in snowflake (#6712)

This commit is contained in:
Mayuri Nehate 2022-12-12 19:53:12 +05:30 committed by GitHub
parent d3fca44e16
commit 65ba13d9aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 221 additions and 16 deletions

View File

@ -1,7 +1,7 @@
import logging import logging
from typing import Dict, Optional, cast from typing import Dict, Optional, cast
from pydantic import Field, SecretStr, root_validator from pydantic import Field, SecretStr, root_validator, validator
from datahub.configuration.common import AllowDenyPattern from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.glossary.classifier import ClassificationConfig from datahub.ingestion.glossary.classifier import ClassificationConfig
@ -30,6 +30,11 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig):
description="If enabled, populates the snowflake technical schema and descriptions.", description="If enabled, populates the snowflake technical schema and descriptions.",
) )
include_column_lineage: bool = Field(
default=True,
description="If enabled, populates the column lineage. Supported only for snowflake table-to-table and view-to-table lineage edge (not supported in table-to-view or view-to-view lineage edge yet). Requires appropriate grants given to the role.",
)
check_role_grants: bool = Field( check_role_grants: bool = Field(
default=False, default=False,
description="Not supported", description="Not supported",
@ -54,6 +59,14 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig):
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.", description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
) )
@validator("include_column_lineage")
def validate_include_column_lineage(cls, v, values):
if not values.get("include_table_lineage") and v:
raise ValueError(
"include_table_lineage must be True for include_column_lineage to be set."
)
return v
@root_validator(pre=False) @root_validator(pre=False)
def validate_unsupported_configs(cls, values: Dict) -> Dict: def validate_unsupported_configs(cls, values: Dict) -> Dict:

View File

@ -138,11 +138,13 @@ class SnowflakeTableLineage:
default_factory=lambda: defaultdict(SnowflakeColumnUpstreams), init=False default_factory=lambda: defaultdict(SnowflakeColumnUpstreams), init=False
) )
def update_lineage(self, table: SnowflakeUpstreamTable) -> None: def update_lineage(
self, table: SnowflakeUpstreamTable, include_column_lineage: bool = True
) -> None:
if table.upstreamDataset not in self.upstreamTables.keys(): if table.upstreamDataset not in self.upstreamTables.keys():
self.upstreamTables[table.upstreamDataset] = table self.upstreamTables[table.upstreamDataset] = table
if table.downstreamColumns: if include_column_lineage and table.downstreamColumns:
for col in table.downstreamColumns: for col in table.downstreamColumns:
if col.directSourceColumns: if col.directSourceColumns:
@ -380,6 +382,7 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin):
if not self.config.ignore_start_time_lineage if not self.config.ignore_start_time_lineage
else 0, else 0,
end_time_millis=int(self.config.end_time.timestamp() * 1000), end_time_millis=int(self.config.end_time.timestamp() * 1000),
include_column_lineage=self.config.include_column_lineage,
) )
num_edges: int = 0 num_edges: int = 0
self._lineage_map = defaultdict(SnowflakeTableLineage) self._lineage_map = defaultdict(SnowflakeTableLineage)
@ -404,6 +407,7 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin):
db_row["UPSTREAM_TABLE_COLUMNS"], db_row["UPSTREAM_TABLE_COLUMNS"],
db_row["DOWNSTREAM_TABLE_COLUMNS"], db_row["DOWNSTREAM_TABLE_COLUMNS"],
), ),
self.config.include_column_lineage,
) )
num_edges += 1 num_edges += 1
logger.debug( logger.debug(
@ -452,7 +456,8 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin):
# key is the downstream view name # key is the downstream view name
self._lineage_map[view_name].update_lineage( self._lineage_map[view_name].update_lineage(
# (<upstream_table_name>, <empty_json_list_of_upstream_table_columns>, <empty_json_list_of_downstream_view_columns>) # (<upstream_table_name>, <empty_json_list_of_upstream_table_columns>, <empty_json_list_of_downstream_view_columns>)
SnowflakeUpstreamTable.from_dict(view_upstream, None, None) SnowflakeUpstreamTable.from_dict(view_upstream, None, None),
self.config.include_column_lineage,
) )
num_edges += 1 num_edges += 1
logger.debug( logger.debug(
@ -477,6 +482,7 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin):
if not self.config.ignore_start_time_lineage if not self.config.ignore_start_time_lineage
else 0, else 0,
end_time_millis=int(self.config.end_time.timestamp() * 1000), end_time_millis=int(self.config.end_time.timestamp() * 1000),
include_column_lineage=self.config.include_column_lineage,
) )
assert self._lineage_map is not None assert self._lineage_map is not None
@ -512,7 +518,8 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin):
view_name, view_name,
db_row["VIEW_COLUMNS"], db_row["VIEW_COLUMNS"],
db_row["DOWNSTREAM_TABLE_COLUMNS"], db_row["DOWNSTREAM_TABLE_COLUMNS"],
) ),
self.config.include_column_lineage,
) )
self.report.num_view_to_table_edges_scanned += 1 self.report.num_view_to_table_edges_scanned += 1

View File

@ -232,7 +232,9 @@ class SnowflakeQuery:
@staticmethod @staticmethod
def table_to_table_lineage_history( def table_to_table_lineage_history(
start_time_millis: int, end_time_millis: int start_time_millis: int,
end_time_millis: int,
include_column_lineage: bool = True,
) -> str: ) -> str:
return f""" return f"""
WITH table_lineage_history AS ( WITH table_lineage_history AS (
@ -263,8 +265,7 @@ class SnowflakeQuery:
WHERE upstream_table_domain in ('Table', 'External table') and downstream_table_domain = 'Table' WHERE upstream_table_domain in ('Table', 'External table') and downstream_table_domain = 'Table'
QUALIFY ROW_NUMBER() OVER ( QUALIFY ROW_NUMBER() OVER (
PARTITION BY downstream_table_name, PARTITION BY downstream_table_name,
upstream_table_name, upstream_table_name{", downstream_table_columns" if include_column_lineage else ""}
downstream_table_columns
ORDER BY query_start_time DESC ORDER BY query_start_time DESC
) = 1""" ) = 1"""
@ -289,7 +290,11 @@ class SnowflakeQuery:
""" """
@staticmethod @staticmethod
def view_lineage_history(start_time_millis: int, end_time_millis: int) -> str: def view_lineage_history(
start_time_millis: int,
end_time_millis: int,
include_column_lineage: bool = True,
) -> str:
return f""" return f"""
WITH view_lineage_history AS ( WITH view_lineage_history AS (
SELECT SELECT
@ -330,8 +335,7 @@ class SnowflakeQuery:
view_domain in ('View', 'Materialized view') view_domain in ('View', 'Materialized view')
QUALIFY ROW_NUMBER() OVER ( QUALIFY ROW_NUMBER() OVER (
PARTITION BY view_name, PARTITION BY view_name,
downstream_table_name, downstream_table_name {", downstream_table_columns" if include_column_lineage else ""}
downstream_table_columns
ORDER BY ORDER BY
query_start_time DESC query_start_time DESC
) = 1 ) = 1

View File

@ -11,6 +11,7 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlRepor
include_usage_stats: bool = False include_usage_stats: bool = False
include_operational_stats: bool = False include_operational_stats: bool = False
include_technical_schema: bool = False include_technical_schema: bool = False
include_column_lineage: bool = False
usage_aggregation_query_secs: float = -1 usage_aggregation_query_secs: float = -1
table_lineage_query_secs: float = -1 table_lineage_query_secs: float = -1

View File

@ -169,7 +169,7 @@ SNOWFLAKE_FIELD_TYPE_MAPPINGS = {
) )
@capability( @capability(
SourceCapability.LINEAGE_FINE, SourceCapability.LINEAGE_FINE,
"Enabled by default, can be disabled via configuration `include_table_lineage` and `include_view_lineage`", "Enabled by default, can be disabled via configuration `include_column_lineage`",
) )
@capability( @capability(
SourceCapability.USAGE_STATS, SourceCapability.USAGE_STATS,
@ -1101,6 +1101,7 @@ class SnowflakeV2Source(
self.report.check_role_grants = self.config.check_role_grants self.report.check_role_grants = self.config.check_role_grants
self.report.include_usage_stats = self.config.include_usage_stats self.report.include_usage_stats = self.config.include_usage_stats
self.report.include_operational_stats = self.config.include_operational_stats self.report.include_operational_stats = self.config.include_operational_stats
self.report.include_column_lineage = self.config.include_column_lineage
if self.report.include_usage_stats or self.config.include_operational_stats: if self.report.include_usage_stats or self.config.include_operational_stats:
self.report.window_start_time = self.config.start_time self.report.window_start_time = self.config.start_time
self.report.window_end_time = self.config.end_time self.report.window_end_time = self.config.end_time

View File

@ -195,6 +195,20 @@
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
},
{ {
"entityType": "dataset", "entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
@ -265,6 +279,20 @@
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
},
{ {
"entityType": "dataset", "entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)",
@ -335,6 +363,20 @@
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
},
{ {
"entityType": "dataset", "entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)",
@ -405,6 +447,20 @@
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
},
{ {
"entityType": "dataset", "entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_5,PROD)", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_5,PROD)",
@ -475,6 +531,20 @@
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_5,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
},
{ {
"entityType": "dataset", "entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_6,PROD)", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_6,PROD)",
@ -545,6 +615,20 @@
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_6,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
},
{ {
"entityType": "dataset", "entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_7,PROD)", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_7,PROD)",
@ -615,6 +699,20 @@
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_7,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
},
{ {
"entityType": "dataset", "entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_8,PROD)", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_8,PROD)",
@ -685,6 +783,20 @@
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_8,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
},
{ {
"entityType": "dataset", "entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_9,PROD)", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_9,PROD)",
@ -755,6 +867,20 @@
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
}, },
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_9,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
},
{ {
"entityType": "dataset", "entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_10,PROD)", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_10,PROD)",
@ -824,5 +950,19 @@
"lastObserved": 1654621200000, "lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00" "runId": "snowflake-beta-2022_06_07-17_00_00"
} }
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_10,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "snowflake-beta-2022_06_07-17_00_00"
}
} }
] ]

View File

@ -258,9 +258,14 @@ def default_query_results(query):
} }
for op_idx in range(1, NUM_OPS + 1) for op_idx in range(1, NUM_OPS + 1)
] ]
elif query == snowflake_query.SnowflakeQuery.table_to_table_lineage_history( elif query in (
1654499820000, snowflake_query.SnowflakeQuery.table_to_table_lineage_history(
1654586220000, 1654499820000,
1654586220000,
),
snowflake_query.SnowflakeQuery.table_to_table_lineage_history(
1654499820000, 1654586220000, False
),
): ):
return [ return [
{ {
@ -426,7 +431,8 @@ def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_
include_views=False, include_views=False,
schema_pattern=AllowDenyPattern(allow=["test_schema"]), schema_pattern=AllowDenyPattern(allow=["test_schema"]),
include_technical_schema=True, include_technical_schema=True,
include_table_lineage=False, include_table_lineage=True,
include_column_lineage=False,
include_view_lineage=False, include_view_lineage=False,
include_usage_stats=False, include_usage_stats=False,
include_operational_stats=False, include_operational_stats=False,

View File

@ -1,6 +1,7 @@
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import pytest import pytest
from pydantic import ValidationError
from datahub.configuration.common import ConfigurationError, OauthConfiguration from datahub.configuration.common import ConfigurationError, OauthConfiguration
from datahub.ingestion.api.source import SourceCapability from datahub.ingestion.api.source import SourceCapability
@ -182,6 +183,38 @@ def test_options_contain_connect_args():
assert connect_args is not None assert connect_args is not None
def test_snowflake_config_with_view_lineage_no_table_lineage_throws_error():
with pytest.raises(ValidationError):
SnowflakeV2Config.parse_obj(
{
"username": "user",
"password": "password",
"host_port": "acctname",
"database_pattern": {"allow": {"^demo$"}},
"warehouse": "COMPUTE_WH",
"role": "sysadmin",
"include_view_lineage": True,
"include_table_lineage": False,
}
)
def test_snowflake_config_with_column_lineage_no_table_lineage_throws_error():
with pytest.raises(ValidationError):
SnowflakeV2Config.parse_obj(
{
"username": "user",
"password": "password",
"host_port": "acctname",
"database_pattern": {"allow": {"^demo$"}},
"warehouse": "COMPUTE_WH",
"role": "sysadmin",
"include_column_lineage": True,
"include_table_lineage": False,
}
)
@patch("snowflake.connector.connect") @patch("snowflake.connector.connect")
def test_test_connection_failure(mock_connect): def test_test_connection_failure(mock_connect):
mock_connect.side_effect = Exception("Failed to connect to snowflake") mock_connect.side_effect = Exception("Failed to connect to snowflake")