fix(ingest): snowflake - fix missing snowflake lineage when table_pattern is set (#6410)

This commit is contained in:
Mayuri Nehate 2022-11-11 21:17:09 +05:30 committed by GitHub
parent 1b739319f0
commit 6c42064332
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 4 deletions

View File

@ -95,7 +95,12 @@ class SnowflakeUpstreamTable:
downstreamColumns: List[SnowflakeColumnWithLineage]
@classmethod
def from_dict(cls, dataset, upstreams_columns_json, downstream_columns_json):
def from_dict(
cls,
dataset: str,
upstreams_columns_json: Optional[str],
downstream_columns_json: Optional[str],
) -> "SnowflakeUpstreamTable":
try:
upstreams_columns_list = []
downstream_columns_list = []

View File

@ -13,6 +13,8 @@ from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeTyp
from datahub.metadata.schema_classes import _Aspect
# Required only for mypy, since we are using mixin classes, and not inheritance.
# Reference - https://mypy.readthedocs.io/en/latest/more_types.html#mixin-classes
class SnowflakeLoggingProtocol(Protocol):
@property
def logger(self) -> logging.Logger:
@ -37,6 +39,9 @@ class SnowflakeCommonProtocol(Protocol):
) -> str:
...
def get_dataset_identifier_from_qualified_name(self, qualified_name: str) -> str:
...
def snowflake_identifier(self, identifier: str) -> str:
...
@ -76,14 +81,16 @@ class SnowflakeCommonMixin:
return False
if dataset_type.lower() in {"table"} and not self.config.table_pattern.allowed(
dataset_params[2].strip('"')
self.get_dataset_identifier_from_qualified_name(dataset_name)
):
return False
if dataset_type.lower() in {
"view",
"materialized_view",
} and not self.config.view_pattern.allowed(dataset_params[2].strip('"')):
} and not self.config.view_pattern.allowed(
self.get_dataset_identifier_from_qualified_name(dataset_name)
):
return False
return True

View File

@ -4,7 +4,7 @@ from unittest import mock
from freezegun import freeze_time
from datahub.configuration.common import DynamicTypedConfig
from datahub.configuration.common import AllowDenyPattern, DynamicTypedConfig
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.run.pipeline_config import PipelineConfig, SourceConfig
from datahub.ingestion.source.snowflake import snowflake_query
@ -297,6 +297,7 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
username="TST_USR",
password="TST_PWD",
include_views=False,
table_pattern=AllowDenyPattern(allow=["test_db.test_schema.*"]),
include_technical_schema=True,
include_table_lineage=True,
include_view_lineage=False,