From 65ba13d9aa91ce50eeefd542f8463d000c8bbc20 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Mon, 12 Dec 2022 19:53:12 +0530 Subject: [PATCH] feat(ingest): snowflake - add separate config for include_column_lineage in snowflake (#6712) --- .../source/snowflake/snowflake_config.py | 15 +- .../source/snowflake/snowflake_lineage.py | 15 +- .../source/snowflake/snowflake_query.py | 16 +- .../source/snowflake/snowflake_report.py | 1 + .../source/snowflake/snowflake_v2.py | 3 +- .../snowflake_privatelink_beta_golden.json | 140 ++++++++++++++++++ .../snowflake-beta/test_snowflake_beta.py | 14 +- .../tests/unit/test_snowflake_beta_source.py | 33 +++++ 8 files changed, 221 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 643ba4f1db..9e066b41fb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -1,7 +1,7 @@ import logging from typing import Dict, Optional, cast -from pydantic import Field, SecretStr, root_validator +from pydantic import Field, SecretStr, root_validator, validator from datahub.configuration.common import AllowDenyPattern from datahub.ingestion.glossary.classifier import ClassificationConfig @@ -30,6 +30,11 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig): description="If enabled, populates the snowflake technical schema and descriptions.", ) + include_column_lineage: bool = Field( + default=True, + description="If enabled, populates the column lineage. Supported only for snowflake table-to-table and view-to-table lineage edge (not supported in table-to-view or view-to-view lineage edge yet). Requires appropriate grants given to the role.", + ) + check_role_grants: bool = Field( default=False, description="Not supported", @@ -54,6 +59,14 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig): description="Whether `schema_pattern` is matched against fully qualified schema name `.`.", ) + @validator("include_column_lineage") + def validate_include_column_lineage(cls, v, values): + if not values.get("include_table_lineage") and v: + raise ValueError( + "include_table_lineage must be True for include_column_lineage to be set." + ) + return v + @root_validator(pre=False) def validate_unsupported_configs(cls, values: Dict) -> Dict: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage.py index 933431b014..6e0fa30575 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage.py @@ -138,11 +138,13 @@ class SnowflakeTableLineage: default_factory=lambda: defaultdict(SnowflakeColumnUpstreams), init=False ) - def update_lineage(self, table: SnowflakeUpstreamTable) -> None: + def update_lineage( + self, table: SnowflakeUpstreamTable, include_column_lineage: bool = True + ) -> None: if table.upstreamDataset not in self.upstreamTables.keys(): self.upstreamTables[table.upstreamDataset] = table - if table.downstreamColumns: + if include_column_lineage and table.downstreamColumns: for col in table.downstreamColumns: if col.directSourceColumns: @@ -380,6 +382,7 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): if not self.config.ignore_start_time_lineage else 0, end_time_millis=int(self.config.end_time.timestamp() * 1000), + include_column_lineage=self.config.include_column_lineage, ) num_edges: int = 0 self._lineage_map = defaultdict(SnowflakeTableLineage) @@ -404,6 +407,7 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): db_row["UPSTREAM_TABLE_COLUMNS"], db_row["DOWNSTREAM_TABLE_COLUMNS"], ), + self.config.include_column_lineage, ) num_edges += 1 logger.debug( @@ -452,7 +456,8 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): # key is the downstream view name self._lineage_map[view_name].update_lineage( # (, , ) - SnowflakeUpstreamTable.from_dict(view_upstream, None, None) + SnowflakeUpstreamTable.from_dict(view_upstream, None, None), + self.config.include_column_lineage, ) num_edges += 1 logger.debug( @@ -477,6 +482,7 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): if not self.config.ignore_start_time_lineage else 0, end_time_millis=int(self.config.end_time.timestamp() * 1000), + include_column_lineage=self.config.include_column_lineage, ) assert self._lineage_map is not None @@ -512,7 +518,8 @@ class SnowflakeLineageExtractor(SnowflakeQueryMixin, SnowflakeCommonMixin): view_name, db_row["VIEW_COLUMNS"], db_row["DOWNSTREAM_TABLE_COLUMNS"], - ) + ), + self.config.include_column_lineage, ) self.report.num_view_to_table_edges_scanned += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 1c2d780afa..c1957f46d2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -232,7 +232,9 @@ class SnowflakeQuery: @staticmethod def table_to_table_lineage_history( - start_time_millis: int, end_time_millis: int + start_time_millis: int, + end_time_millis: int, + include_column_lineage: bool = True, ) -> str: return f""" WITH table_lineage_history AS ( @@ -263,8 +265,7 @@ class SnowflakeQuery: WHERE upstream_table_domain in ('Table', 'External table') and downstream_table_domain = 'Table' QUALIFY ROW_NUMBER() OVER ( PARTITION BY downstream_table_name, - upstream_table_name, - downstream_table_columns + upstream_table_name{", downstream_table_columns" if include_column_lineage else ""} ORDER BY query_start_time DESC ) = 1""" @@ -289,7 +290,11 @@ class SnowflakeQuery: """ @staticmethod - def view_lineage_history(start_time_millis: int, end_time_millis: int) -> str: + def view_lineage_history( + start_time_millis: int, + end_time_millis: int, + include_column_lineage: bool = True, + ) -> str: return f""" WITH view_lineage_history AS ( SELECT @@ -330,8 +335,7 @@ class SnowflakeQuery: view_domain in ('View', 'Materialized view') QUALIFY ROW_NUMBER() OVER ( PARTITION BY view_name, - downstream_table_name, - downstream_table_columns + downstream_table_name {", downstream_table_columns" if include_column_lineage else ""} ORDER BY query_start_time DESC ) = 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py index e70c48771c..db605f1d62 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py @@ -11,6 +11,7 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlRepor include_usage_stats: bool = False include_operational_stats: bool = False include_technical_schema: bool = False + include_column_lineage: bool = False usage_aggregation_query_secs: float = -1 table_lineage_query_secs: float = -1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index be02c92a49..62a53b002f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -169,7 +169,7 @@ SNOWFLAKE_FIELD_TYPE_MAPPINGS = { ) @capability( SourceCapability.LINEAGE_FINE, - "Enabled by default, can be disabled via configuration `include_table_lineage` and `include_view_lineage`", + "Enabled by default, can be disabled via configuration `include_column_lineage`", ) @capability( SourceCapability.USAGE_STATS, @@ -1101,6 +1101,7 @@ class SnowflakeV2Source( self.report.check_role_grants = self.config.check_role_grants self.report.include_usage_stats = self.config.include_usage_stats self.report.include_operational_stats = self.config.include_operational_stats + self.report.include_column_lineage = self.config.include_column_lineage if self.report.include_usage_stats or self.config.include_operational_stats: self.report.window_start_time = self.config.start_time self.report.window_end_time = self.config.end_time diff --git a/metadata-ingestion/tests/integration/snowflake-beta/snowflake_privatelink_beta_golden.json b/metadata-ingestion/tests/integration/snowflake-beta/snowflake_privatelink_beta_golden.json index ae40629124..2f3ec2f32a 100644 --- a/metadata-ingestion/tests/integration/snowflake-beta/snowflake_privatelink_beta_golden.json +++ b/metadata-ingestion/tests/integration/snowflake-beta/snowflake_privatelink_beta_golden.json @@ -195,6 +195,20 @@ "runId": "snowflake-beta-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)", @@ -265,6 +279,20 @@ "runId": "snowflake-beta-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)", @@ -335,6 +363,20 @@ "runId": "snowflake-beta-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_3,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)", @@ -405,6 +447,20 @@ "runId": "snowflake-beta-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_4,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_5,PROD)", @@ -475,6 +531,20 @@ "runId": "snowflake-beta-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_5,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_6,PROD)", @@ -545,6 +615,20 @@ "runId": "snowflake-beta-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_6,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_7,PROD)", @@ -615,6 +699,20 @@ "runId": "snowflake-beta-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_7,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_8,PROD)", @@ -685,6 +783,20 @@ "runId": "snowflake-beta-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_8,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_9,PROD)", @@ -755,6 +867,20 @@ "runId": "snowflake-beta-2022_06_07-17_00_00" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_9,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_10,PROD)", @@ -824,5 +950,19 @@ "lastObserved": 1654621200000, "runId": "snowflake-beta-2022_06_07-17_00_00" } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_10,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "value": "{\"upstreams\": [{\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)\", \"type\": \"TRANSFORMED\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "snowflake-beta-2022_06_07-17_00_00" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/snowflake-beta/test_snowflake_beta.py b/metadata-ingestion/tests/integration/snowflake-beta/test_snowflake_beta.py index 95b5f63a0d..4b51ad7111 100644 --- a/metadata-ingestion/tests/integration/snowflake-beta/test_snowflake_beta.py +++ b/metadata-ingestion/tests/integration/snowflake-beta/test_snowflake_beta.py @@ -258,9 +258,14 @@ def default_query_results(query): } for op_idx in range(1, NUM_OPS + 1) ] - elif query == snowflake_query.SnowflakeQuery.table_to_table_lineage_history( - 1654499820000, - 1654586220000, + elif query in ( + snowflake_query.SnowflakeQuery.table_to_table_lineage_history( + 1654499820000, + 1654586220000, + ), + snowflake_query.SnowflakeQuery.table_to_table_lineage_history( + 1654499820000, 1654586220000, False + ), ): return [ { @@ -426,7 +431,8 @@ def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_ include_views=False, schema_pattern=AllowDenyPattern(allow=["test_schema"]), include_technical_schema=True, - include_table_lineage=False, + include_table_lineage=True, + include_column_lineage=False, include_view_lineage=False, include_usage_stats=False, include_operational_stats=False, diff --git a/metadata-ingestion/tests/unit/test_snowflake_beta_source.py b/metadata-ingestion/tests/unit/test_snowflake_beta_source.py index ed58e4aeb1..9e22a723bf 100644 --- a/metadata-ingestion/tests/unit/test_snowflake_beta_source.py +++ b/metadata-ingestion/tests/unit/test_snowflake_beta_source.py @@ -1,6 +1,7 @@ from unittest.mock import MagicMock, patch import pytest +from pydantic import ValidationError from datahub.configuration.common import ConfigurationError, OauthConfiguration from datahub.ingestion.api.source import SourceCapability @@ -182,6 +183,38 @@ def test_options_contain_connect_args(): assert connect_args is not None +def test_snowflake_config_with_view_lineage_no_table_lineage_throws_error(): + with pytest.raises(ValidationError): + SnowflakeV2Config.parse_obj( + { + "username": "user", + "password": "password", + "host_port": "acctname", + "database_pattern": {"allow": {"^demo$"}}, + "warehouse": "COMPUTE_WH", + "role": "sysadmin", + "include_view_lineage": True, + "include_table_lineage": False, + } + ) + + +def test_snowflake_config_with_column_lineage_no_table_lineage_throws_error(): + with pytest.raises(ValidationError): + SnowflakeV2Config.parse_obj( + { + "username": "user", + "password": "password", + "host_port": "acctname", + "database_pattern": {"allow": {"^demo$"}}, + "warehouse": "COMPUTE_WH", + "role": "sysadmin", + "include_column_lineage": True, + "include_table_lineage": False, + } + ) + + @patch("snowflake.connector.connect") def test_test_connection_failure(mock_connect): mock_connect.side_effect = Exception("Failed to connect to snowflake")