fix(ingestion/tableau): Fix tableau custom sql lineage gap (#10359)

This commit is contained in:
Shubham Jagtap 2024-05-07 13:47:56 +05:30 committed by GitHub
parent 360445e879
commit ddb38e7448
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 80 additions and 37 deletions

View File

@ -1693,7 +1693,7 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
) -> Optional["SqlParsingResult"]: ) -> Optional["SqlParsingResult"]:
database_info = datasource.get(c.DATABASE) or { database_info = datasource.get(c.DATABASE) or {
c.NAME: c.UNKNOWN.lower(), c.NAME: c.UNKNOWN.lower(),
c.CONNECTION_TYPE: "databricks", c.CONNECTION_TYPE: datasource.get(c.CONNECTION_TYPE),
} }
if ( if (
@ -1703,7 +1703,10 @@ class TableauSource(StatefulIngestionSourceBase, TestableSource):
logger.debug(f"datasource {datasource_urn} is not created from custom sql") logger.debug(f"datasource {datasource_urn} is not created from custom sql")
return None return None
if c.NAME not in database_info or c.CONNECTION_TYPE not in database_info: if (
database_info.get(c.NAME) is None
or database_info.get(c.CONNECTION_TYPE) is None
):
logger.debug( logger.debug(
f"database information is missing from datasource {datasource_urn}" f"database information is missing from datasource {datasource_urn}"
) )

View File

@ -324,6 +324,7 @@ custom_sql_graphql_query = """
totalCount totalCount
} }
} }
connectionType
database{ database{
name name
connectionType connectionType
@ -827,6 +828,7 @@ def get_unique_custom_sql(custom_sql_list: List[dict]) -> List[dict]:
# are missing from api result. # are missing from api result.
"isUnsupportedCustomSql": True if not custom_sql.get("tables") else False, "isUnsupportedCustomSql": True if not custom_sql.get("tables") else False,
"query": custom_sql.get("query"), "query": custom_sql.get("query"),
"connectionType": custom_sql.get("connectionType"),
"columns": custom_sql.get("columns"), "columns": custom_sql.get("columns"),
"tables": custom_sql.get("tables"), "tables": custom_sql.get("tables"),
"database": custom_sql.get("database"), "database": custom_sql.get("database"),

View File

@ -16,6 +16,7 @@ from tableauserverclient.models import (
) )
from datahub.configuration.source_common import DEFAULT_ENV from datahub.configuration.source_common import DEFAULT_ENV
from datahub.emitter.mce_builder import make_schema_field_urn
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.tableau import TableauConfig, TableauSource from datahub.ingestion.source.tableau import TableauConfig, TableauSource
from datahub.ingestion.source.tableau_common import ( from datahub.ingestion.source.tableau_common import (
@ -24,10 +25,12 @@ from datahub.ingestion.source.tableau_common import (
) )
from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType, DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
UpstreamLineage, UpstreamLineage,
) )
from datahub.metadata.schema_classes import MetadataChangeProposalClass, UpstreamClass from datahub.metadata.schema_classes import MetadataChangeProposalClass, UpstreamClass
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult
from tests.test_helpers import mce_helpers, test_connection_helpers from tests.test_helpers import mce_helpers, test_connection_helpers
from tests.test_helpers.state_helpers import ( from tests.test_helpers.state_helpers import (
get_current_checkpoint_from_pipeline, get_current_checkpoint_from_pipeline,
@ -805,55 +808,90 @@ def test_tableau_signout_timeout(pytestconfig, tmp_path, mock_datahub_graph):
) )
def test_tableau_unsupported_csql(mock_datahub_graph): def test_tableau_unsupported_csql():
context = PipelineContext(run_id="0", pipeline_name="test_tableau") context = PipelineContext(run_id="0", pipeline_name="test_tableau")
context.graph = mock_datahub_graph config_dict = config_source_default.copy()
config = TableauConfig.parse_obj(config_source_default.copy()) del config_dict["stateful_ingestion"]
config = TableauConfig.parse_obj(config_dict)
config.extract_lineage_from_unsupported_custom_sql_queries = True config.extract_lineage_from_unsupported_custom_sql_queries = True
config.lineage_overrides = TableauLineageOverrides( config.lineage_overrides = TableauLineageOverrides(
database_override_map={"production database": "prod"} database_override_map={"production database": "prod"}
) )
with mock.patch( def test_lineage_metadata(
"datahub.ingestion.source.tableau.create_lineage_sql_parsed_result", lineage, expected_entity_urn, expected_upstream_table, expected_cll
return_value=SqlParsingResult(
in_tables=[
"urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)"
],
out_tables=[],
column_lineage=None,
),
): ):
source = TableauSource(config=config, ctx=context)
lineage = source._create_lineage_from_unsupported_csql(
csql_urn="urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)",
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"database": {
"name": "my-bigquery-project",
"connectionType": "bigquery",
},
},
out_columns=[],
)
mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata) mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata)
assert mcp.aspect == UpstreamLineage( assert mcp.aspect == UpstreamLineage(
upstreams=[ upstreams=[
UpstreamClass( UpstreamClass(
dataset="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)", dataset=expected_upstream_table,
type=DatasetLineageType.TRANSFORMED, type=DatasetLineageType.TRANSFORMED,
) )
], ],
fineGrainedLineages=[], fineGrainedLineages=[
) FineGrainedLineage(
assert ( upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
mcp.entityUrn upstreams=[
== "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)" make_schema_field_urn(expected_upstream_table, upstream_column)
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
make_schema_field_urn(expected_entity_urn, downstream_column)
],
)
for upstream_column, downstream_column in expected_cll.items()
],
) )
assert mcp.entityUrn == expected_entity_urn
csql_urn = "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
expected_upstream_table = "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.UserDetail,PROD)"
expected_cll = {
"user_id": "user_id",
"source": "source",
"user_source": "user_source",
}
source = TableauSource(config=config, ctx=context)
lineage = source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"connectionType": "bigquery",
"database": {
"name": "my_bigquery_project",
"connectionType": "bigquery",
},
},
out_columns=[],
)
test_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
expected_cll=expected_cll,
)
# With database as None
lineage = source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM my_bigquery_project.invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"connectionType": "bigquery",
"database": None,
},
out_columns=[],
)
test_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
expected_cll=expected_cll,
)
@freeze_time(FROZEN_TIME) @freeze_time(FROZEN_TIME)