mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-25 08:58:26 +00:00
feat(ingest/snowflake): optionally emit all upstreams irrespective of recipe pattern (#7842)
This commit is contained in:
parent
a5fa933fb0
commit
3212e74969
@ -84,6 +84,11 @@ class SnowflakeV2Config(
|
||||
description="Whether to use the legacy lineage computation method. If set to False, ingestion uses new optimised lineage extraction method that requires less ingestion process memory.",
|
||||
)
|
||||
|
||||
validate_upstreams_against_patterns: bool = Field(
|
||||
default=True,
|
||||
description="Whether to validate upstream snowflake tables against allow-deny patterns",
|
||||
)
|
||||
|
||||
@validator("include_column_lineage")
|
||||
def validate_include_column_lineage(cls, v, values):
|
||||
if not values.get("include_table_lineage") and v:
|
||||
|
||||
@ -228,36 +228,32 @@ class SnowflakeLineageExtractor(
|
||||
def get_table_upstream_workunits(self, discovered_tables):
|
||||
if self.config.include_table_lineage:
|
||||
for dataset_name in discovered_tables:
|
||||
if self._is_dataset_pattern_allowed(
|
||||
dataset_name, SnowflakeObjectDomain.TABLE
|
||||
):
|
||||
dataset_urn = builder.make_dataset_urn_with_platform_instance(
|
||||
self.platform,
|
||||
dataset_name,
|
||||
self.config.platform_instance,
|
||||
self.config.env,
|
||||
)
|
||||
upstream_lineage = self._get_upstream_lineage_info(dataset_name)
|
||||
if upstream_lineage is not None:
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityUrn=dataset_urn, aspect=upstream_lineage
|
||||
).as_workunit()
|
||||
dataset_urn = builder.make_dataset_urn_with_platform_instance(
|
||||
self.platform,
|
||||
dataset_name,
|
||||
self.config.platform_instance,
|
||||
self.config.env,
|
||||
)
|
||||
upstream_lineage = self._get_upstream_lineage_info(dataset_name)
|
||||
if upstream_lineage is not None:
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityUrn=dataset_urn, aspect=upstream_lineage
|
||||
).as_workunit()
|
||||
|
||||
def get_view_upstream_workunits(self, discovered_views):
|
||||
if self.config.include_view_lineage:
|
||||
for view_name in discovered_views:
|
||||
if self._is_dataset_pattern_allowed(view_name, "view"):
|
||||
dataset_urn = builder.make_dataset_urn_with_platform_instance(
|
||||
self.platform,
|
||||
view_name,
|
||||
self.config.platform_instance,
|
||||
self.config.env,
|
||||
)
|
||||
upstream_lineage = self._get_upstream_lineage_info(view_name)
|
||||
if upstream_lineage is not None:
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityUrn=dataset_urn, aspect=upstream_lineage
|
||||
).as_workunit()
|
||||
dataset_urn = builder.make_dataset_urn_with_platform_instance(
|
||||
self.platform,
|
||||
view_name,
|
||||
self.config.platform_instance,
|
||||
self.config.env,
|
||||
)
|
||||
upstream_lineage = self._get_upstream_lineage_info(view_name)
|
||||
if upstream_lineage is not None:
|
||||
yield MetadataChangeProposalWrapper(
|
||||
entityUrn=dataset_urn, aspect=upstream_lineage
|
||||
).as_workunit()
|
||||
|
||||
def _get_upstream_lineage_info(
|
||||
self, dataset_name: str
|
||||
@ -446,7 +442,7 @@ class SnowflakeLineageExtractor(
|
||||
key, SnowflakeObjectDomain.TABLE
|
||||
) or not (
|
||||
self._is_dataset_pattern_allowed(
|
||||
upstream_table_name, SnowflakeObjectDomain.TABLE
|
||||
upstream_table_name, SnowflakeObjectDomain.TABLE, is_upstream=True
|
||||
)
|
||||
):
|
||||
return
|
||||
@ -500,7 +496,7 @@ class SnowflakeLineageExtractor(
|
||||
dataset_name=view_name,
|
||||
dataset_type=db_row["REFERENCING_OBJECT_DOMAIN"],
|
||||
) or not self._is_dataset_pattern_allowed(
|
||||
view_upstream, db_row["REFERENCED_OBJECT_DOMAIN"]
|
||||
view_upstream, db_row["REFERENCED_OBJECT_DOMAIN"], is_upstream=True
|
||||
):
|
||||
return
|
||||
# key is the downstream view name
|
||||
@ -554,7 +550,7 @@ class SnowflakeLineageExtractor(
|
||||
db_row["DOWNSTREAM_TABLE_NAME"]
|
||||
)
|
||||
if not self._is_dataset_pattern_allowed(
|
||||
view_name, db_row["VIEW_DOMAIN"]
|
||||
view_name, db_row["VIEW_DOMAIN"], is_upstream=True
|
||||
) or not self._is_dataset_pattern_allowed(
|
||||
downstream_table, db_row["DOWNSTREAM_TABLE_DOMAIN"]
|
||||
):
|
||||
@ -655,7 +651,7 @@ class SnowflakeLineageExtractor(
|
||||
upstream_col.objectName
|
||||
and upstream_col.columnName
|
||||
and self._is_dataset_pattern_allowed(
|
||||
upstream_col.objectName, upstream_col.objectDomain
|
||||
upstream_col.objectName, upstream_col.objectDomain, is_upstream=True
|
||||
)
|
||||
):
|
||||
upstream_dataset_name = self.get_dataset_identifier_from_qualified_name(
|
||||
|
||||
@ -374,7 +374,7 @@ class SnowflakeLineageExtractor(
|
||||
upstream_table["upstream_object_name"]
|
||||
)
|
||||
if upstream_name and self._is_dataset_pattern_allowed(
|
||||
upstream_name, upstream_table["upstream_object_domain"]
|
||||
upstream_name, upstream_table["upstream_object_domain"], is_upstream=True
|
||||
):
|
||||
upstreams.append(
|
||||
UpstreamClass(
|
||||
@ -486,7 +486,9 @@ class SnowflakeLineageExtractor(
|
||||
upstream_col.objectName
|
||||
and upstream_col.columnName
|
||||
and self._is_dataset_pattern_allowed(
|
||||
upstream_col.objectName, upstream_col.objectDomain
|
||||
upstream_col.objectName,
|
||||
upstream_col.objectDomain,
|
||||
is_upstream=True,
|
||||
)
|
||||
):
|
||||
upstream_dataset_name = self.get_dataset_identifier_from_qualified_name(
|
||||
|
||||
@ -103,7 +103,10 @@ class SnowflakeCommonMixin:
|
||||
self: SnowflakeCommonProtocol,
|
||||
dataset_name: Optional[str],
|
||||
dataset_type: Optional[str],
|
||||
is_upstream: bool = False,
|
||||
) -> bool:
|
||||
if is_upstream and not self.config.validate_upstreams_against_patterns:
|
||||
return True
|
||||
if not dataset_type or not dataset_name:
|
||||
return True
|
||||
dataset_params = dataset_name.split(".")
|
||||
|
||||
@ -285,6 +285,30 @@ def default_query_results(query): # noqa: C901
|
||||
),
|
||||
}
|
||||
for op_idx in range(1, NUM_OPS + 1)
|
||||
] + [
|
||||
{
|
||||
"DOWNSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_1",
|
||||
"UPSTREAM_TABLE_NAME": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
|
||||
"UPSTREAM_TABLE_COLUMNS": json.dumps(
|
||||
[{"columnId": 0, "columnName": "COL_1"}]
|
||||
),
|
||||
"DOWNSTREAM_TABLE_COLUMNS": json.dumps(
|
||||
[
|
||||
{
|
||||
"columnId": 0,
|
||||
"columnName": "COL_1",
|
||||
"directSources": [
|
||||
{
|
||||
"columnName": "COL_1",
|
||||
"objectDomain": "Table",
|
||||
"objectId": 0,
|
||||
"objectName": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
),
|
||||
}
|
||||
]
|
||||
elif query in (
|
||||
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
|
||||
@ -307,7 +331,11 @@ def default_query_results(query): # noqa: C901
|
||||
{
|
||||
"upstream_object_name": "TEST_DB.TEST_SCHEMA.VIEW_1",
|
||||
"upstream_object_domain": "VIEW",
|
||||
}
|
||||
},
|
||||
{
|
||||
"upstream_object_name": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
|
||||
"upstream_object_domain": "TABLE",
|
||||
},
|
||||
]
|
||||
if op_idx == 1
|
||||
else []
|
||||
@ -329,6 +357,24 @@ def default_query_results(query): # noqa: C901
|
||||
}
|
||||
for col_idx in range(1, NUM_COLS + 1)
|
||||
]
|
||||
+ ( # This additional upstream is only for TABLE_1
|
||||
[
|
||||
{
|
||||
"column_name": "COL_1",
|
||||
"upstreams": [
|
||||
[
|
||||
{
|
||||
"object_name": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
|
||||
"object_domain": "Table",
|
||||
"column_name": "COL_1",
|
||||
}
|
||||
]
|
||||
],
|
||||
}
|
||||
]
|
||||
if op_idx == 1
|
||||
else []
|
||||
)
|
||||
),
|
||||
}
|
||||
for op_idx in range(1, NUM_OPS + 1)
|
||||
@ -347,8 +393,18 @@ def default_query_results(query): # noqa: C901
|
||||
{
|
||||
"upstream_object_name": "TEST_DB.TEST_SCHEMA.TABLE_2",
|
||||
"upstream_object_domain": "TABLE",
|
||||
}
|
||||
},
|
||||
]
|
||||
+ ( # This additional upstream is only for TABLE_1
|
||||
[
|
||||
{
|
||||
"upstream_object_name": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
|
||||
"upstream_object_domain": "TABLE",
|
||||
},
|
||||
]
|
||||
if op_idx == 1
|
||||
else []
|
||||
)
|
||||
),
|
||||
}
|
||||
for op_idx in range(1, NUM_OPS + 1)
|
||||
|
||||
@ -3589,7 +3589,7 @@
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_2,PROD)",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "upstreamLineage",
|
||||
"aspect": {
|
||||
@ -3600,25 +3600,9 @@
|
||||
"time": 0,
|
||||
"actor": "urn:li:corpuser:unknown"
|
||||
},
|
||||
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
|
||||
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,other_db.other_schema.table_1,PROD)",
|
||||
"type": "TRANSFORMED"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1654621200000,
|
||||
"runId": "snowflake-2022_06_07-17_00_00"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "upstreamLineage",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"upstreams": [
|
||||
},
|
||||
{
|
||||
"auditStamp": {
|
||||
"time": 0,
|
||||
@ -3637,6 +3621,17 @@
|
||||
}
|
||||
],
|
||||
"fineGrainedLineages": [
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,other_db.other_schema.table_1,PROD),col_1)"
|
||||
],
|
||||
"downstreamType": "FIELD",
|
||||
"downstreams": [
|
||||
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD),col_1)"
|
||||
],
|
||||
"confidenceScore": 1.0
|
||||
},
|
||||
{
|
||||
"upstreamType": "FIELD_SET",
|
||||
"upstreams": [
|
||||
@ -4979,6 +4974,30 @@
|
||||
"runId": "snowflake-2022_06_07-17_00_00"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.view_2,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "upstreamLineage",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"upstreams": [
|
||||
{
|
||||
"auditStamp": {
|
||||
"time": 0,
|
||||
"actor": "urn:li:corpuser:unknown"
|
||||
},
|
||||
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_2,PROD)",
|
||||
"type": "TRANSFORMED"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1654621200000,
|
||||
"runId": "snowflake-2022_06_07-17_00_00"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataset",
|
||||
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.table_1,PROD)",
|
||||
|
||||
@ -92,6 +92,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
||||
include_view_lineage=True,
|
||||
include_usage_stats=False,
|
||||
use_legacy_lineage_method=False,
|
||||
validate_upstreams_against_patterns=False,
|
||||
include_operational_stats=True,
|
||||
start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace(
|
||||
tzinfo=timezone.utc
|
||||
|
||||
@ -0,0 +1,198 @@
|
||||
import random
|
||||
import string
|
||||
from datetime import datetime, timezone
|
||||
from unittest import mock
|
||||
|
||||
import pandas as pd
|
||||
import pytest
|
||||
from freezegun import freeze_time
|
||||
|
||||
from datahub.configuration.common import AllowDenyPattern, DynamicTypedConfig
|
||||
from datahub.ingestion.glossary.classifier import (
|
||||
ClassificationConfig,
|
||||
DynamicTypedClassifierConfig,
|
||||
)
|
||||
from datahub.ingestion.glossary.datahub_classifier import (
|
||||
DataHubClassifierConfig,
|
||||
InfoTypeConfig,
|
||||
PredictionFactorsAndWeights,
|
||||
)
|
||||
from datahub.ingestion.run.pipeline import Pipeline
|
||||
from datahub.ingestion.run.pipeline_config import PipelineConfig, SourceConfig
|
||||
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
|
||||
from datahub.ingestion.source.snowflake.snowflake_config import (
|
||||
SnowflakeV2Config,
|
||||
TagOption,
|
||||
)
|
||||
from tests.integration.snowflake.common import FROZEN_TIME, default_query_results
|
||||
from tests.test_helpers import mce_helpers
|
||||
|
||||
|
||||
def random_email():
|
||||
return (
|
||||
"".join(
|
||||
[
|
||||
random.choice(string.ascii_lowercase)
|
||||
for i in range(random.randint(10, 15))
|
||||
]
|
||||
)
|
||||
+ "@xyz.com"
|
||||
)
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
@pytest.mark.integration
|
||||
def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/snowflake"
|
||||
|
||||
# Run the metadata ingestion pipeline.
|
||||
output_file = tmp_path / "snowflake_test_events.json"
|
||||
golden_file = test_resources_dir / "snowflake_golden.json"
|
||||
|
||||
with mock.patch("snowflake.connector.connect") as mock_connect, mock.patch(
|
||||
"datahub.ingestion.source.snowflake.snowflake_v2.SnowflakeV2Source.get_sample_values_for_table"
|
||||
) as mock_sample_values:
|
||||
sf_connection = mock.MagicMock()
|
||||
sf_cursor = mock.MagicMock()
|
||||
mock_connect.return_value = sf_connection
|
||||
sf_connection.cursor.return_value = sf_cursor
|
||||
|
||||
sf_cursor.execute.side_effect = default_query_results
|
||||
|
||||
mock_sample_values.return_value = pd.DataFrame(
|
||||
data={
|
||||
"col_1": [random.randint(0, 100) for i in range(1, 200)],
|
||||
"col_2": [random_email() for i in range(1, 200)],
|
||||
}
|
||||
)
|
||||
|
||||
datahub_classifier_config = DataHubClassifierConfig()
|
||||
datahub_classifier_config.confidence_level_threshold = 0.58
|
||||
datahub_classifier_config.info_types_config = {
|
||||
"Age": InfoTypeConfig(
|
||||
Prediction_Factors_and_Weights=PredictionFactorsAndWeights(
|
||||
Name=0, Values=1, Description=0, Datatype=0
|
||||
)
|
||||
),
|
||||
}
|
||||
pipeline = Pipeline(
|
||||
config=PipelineConfig(
|
||||
source=SourceConfig(
|
||||
type="snowflake",
|
||||
config=SnowflakeV2Config(
|
||||
account_id="ABC12345.ap-south-1.aws",
|
||||
username="TST_USR",
|
||||
password="TST_PWD",
|
||||
match_fully_qualified_names=True,
|
||||
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
|
||||
include_technical_schema=True,
|
||||
include_table_lineage=True,
|
||||
include_view_lineage=True,
|
||||
include_usage_stats=False,
|
||||
use_legacy_lineage_method=True,
|
||||
validate_upstreams_against_patterns=False,
|
||||
include_operational_stats=True,
|
||||
start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace(
|
||||
tzinfo=timezone.utc
|
||||
),
|
||||
end_time=datetime(2022, 6, 7, 7, 17, 0, 0).replace(
|
||||
tzinfo=timezone.utc
|
||||
),
|
||||
classification=ClassificationConfig(
|
||||
enabled=True,
|
||||
column_pattern=AllowDenyPattern(
|
||||
allow=[".*col_1$", ".*col_2$"]
|
||||
),
|
||||
classifiers=[
|
||||
DynamicTypedClassifierConfig(
|
||||
type="datahub", config=datahub_classifier_config
|
||||
)
|
||||
],
|
||||
),
|
||||
profiling=GEProfilingConfig(
|
||||
enabled=True,
|
||||
profile_if_updated_since_days=None,
|
||||
profile_table_row_limit=None,
|
||||
profile_table_size_limit=None,
|
||||
profile_table_level_only=True,
|
||||
),
|
||||
extract_tags=TagOption.without_lineage,
|
||||
),
|
||||
),
|
||||
sink=DynamicTypedConfig(
|
||||
type="file", config={"filename": str(output_file)}
|
||||
),
|
||||
)
|
||||
)
|
||||
pipeline.run()
|
||||
pipeline.pretty_print_summary()
|
||||
pipeline.raise_from_status()
|
||||
|
||||
# Verify the output.
|
||||
|
||||
mce_helpers.check_golden_file(
|
||||
pytestconfig,
|
||||
output_path=output_file,
|
||||
golden_path=golden_file,
|
||||
ignore_paths=[],
|
||||
)
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
@pytest.mark.integration
|
||||
def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/snowflake"
|
||||
|
||||
# Run the metadata ingestion pipeline.
|
||||
output_file = tmp_path / "snowflake_privatelink_test_events.json"
|
||||
golden_file = test_resources_dir / "snowflake_privatelink_golden.json"
|
||||
|
||||
with mock.patch("snowflake.connector.connect") as mock_connect:
|
||||
sf_connection = mock.MagicMock()
|
||||
sf_cursor = mock.MagicMock()
|
||||
mock_connect.return_value = sf_connection
|
||||
sf_connection.cursor.return_value = sf_cursor
|
||||
sf_cursor.execute.side_effect = default_query_results
|
||||
|
||||
pipeline = Pipeline(
|
||||
config=PipelineConfig(
|
||||
source=SourceConfig(
|
||||
type="snowflake",
|
||||
config=SnowflakeV2Config(
|
||||
account_id="ABC12345.ap-south-1.privatelink",
|
||||
username="TST_USR",
|
||||
password="TST_PWD",
|
||||
schema_pattern=AllowDenyPattern(allow=["test_schema"]),
|
||||
include_technical_schema=True,
|
||||
include_table_lineage=True,
|
||||
include_column_lineage=False,
|
||||
include_views=False,
|
||||
include_view_lineage=False,
|
||||
use_legacy_lineage_method=True,
|
||||
include_usage_stats=False,
|
||||
include_operational_stats=False,
|
||||
start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace(
|
||||
tzinfo=timezone.utc
|
||||
),
|
||||
end_time=datetime(2022, 6, 7, 7, 17, 0, 0).replace(
|
||||
tzinfo=timezone.utc
|
||||
),
|
||||
),
|
||||
),
|
||||
sink=DynamicTypedConfig(
|
||||
type="file", config={"filename": str(output_file)}
|
||||
),
|
||||
)
|
||||
)
|
||||
pipeline.run()
|
||||
pipeline.pretty_print_summary()
|
||||
pipeline.raise_from_status()
|
||||
|
||||
# Verify the output.
|
||||
|
||||
mce_helpers.check_golden_file(
|
||||
pytestconfig,
|
||||
output_path=output_file,
|
||||
golden_path=golden_file,
|
||||
ignore_paths=[],
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user