feat(ingest/fivetran): avoid duplicate table lineage entries (#11712)

This commit is contained in:
Harshal Sheth 2024-10-29 01:13:34 -07:00 committed by GitHub
parent 20eed21e27
commit a11ac8d104
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 263 additions and 208 deletions

View File

@ -144,8 +144,8 @@ class FivetranSourceReport(StaleEntityRemovalSourceReport):
def report_connectors_scanned(self, count: int = 1) -> None:
self.connectors_scanned += count
def report_connectors_dropped(self, model: str) -> None:
self.filtered_connectors.append(model)
def report_connectors_dropped(self, connector: str) -> None:
self.filtered_connectors.append(connector)
class PlatformDetail(ConfigModel):

View File

@ -76,7 +76,7 @@ class FivetranSource(StatefulIngestionSourceBase):
self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)
def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, str]:
input_dataset_urn_list: List[DatasetUrn] = []
output_dataset_urn_list: List[DatasetUrn] = []
fine_grained_lineage: List[FineGrainedLineage] = []
@ -93,8 +93,11 @@ class FivetranSource(StatefulIngestionSourceBase):
connector.connector_type
]
else:
logger.info(
f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity."
self.report.info(
title="Guessing source platform for lineage",
message="We encountered a connector type that we don't fully support yet. "
"We will attempt to guess the platform based on the connector type.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
)
source_details.platform = connector.connector_type
@ -170,7 +173,19 @@ class FivetranSource(StatefulIngestionSourceBase):
datajob.inlets.extend(input_dataset_urn_list)
datajob.outlets.extend(output_dataset_urn_list)
datajob.fine_grained_lineages.extend(fine_grained_lineage)
return None
return dict(
**{
f"source.{k}": str(v)
for k, v in source_details.dict().items()
if v is not None
},
**{
f"destination.{k}": str(v)
for k, v in destination_details.dict().items()
if v is not None
},
)
def _generate_dataflow_from_connector(self, connector: Connector) -> DataFlow:
return DataFlow(
@ -196,23 +211,23 @@ class FivetranSource(StatefulIngestionSourceBase):
owners={owner_email} if owner_email else set(),
)
job_property_bag: Dict[str, str] = {}
allowed_connection_keys = [
Constant.PAUSED,
Constant.SYNC_FREQUENCY,
Constant.DESTINATION_ID,
]
for key in allowed_connection_keys:
if hasattr(connector, key) and getattr(connector, key) is not None:
job_property_bag[key] = repr(getattr(connector, key))
datajob.properties = job_property_bag
# Map connector source and destination table with dataset entity
# Also extend the fine grained lineage of column if include_column_lineage is True
self._extend_lineage(connector=connector, datajob=datajob)
lineage_properties = self._extend_lineage(connector=connector, datajob=datajob)
# TODO: Add fine grained lineages of dataset after FineGrainedLineageDownstreamType.DATASET enabled
connector_properties: Dict[str, str] = {
"connector_id": connector.connector_id,
"connector_type": connector.connector_type,
"paused": str(connector.paused),
"sync_frequency": str(connector.sync_frequency),
"destination_id": connector.destination_id,
}
datajob.properties = {
**connector_properties,
**lineage_properties,
}
return datajob
def _generate_dpi_from_job(self, job: Job, datajob: DataJob) -> DataProcessInstance:

View File

@ -259,20 +259,23 @@ class FivetranLogAPI:
logger.info("Fetching connector list")
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
connector_id = connector[Constant.CONNECTOR_ID]
connector_name = connector[Constant.CONNECTOR_NAME]
if not connector_patterns.allowed(connector_name):
report.report_connectors_dropped(connector_name)
report.report_connectors_dropped(
f"{connector_name} (connector_id: {connector_id}, dropped due to filter pattern)"
)
continue
if not destination_patterns.allowed(
destination_id := connector[Constant.DESTINATION_ID]
):
report.report_connectors_dropped(
f"{connector_name} (destination_id: {destination_id})"
f"{connector_name} (connector_id: {connector_id}, destination_id: {destination_id})"
)
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_id=connector_id,
connector_name=connector_name,
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],

View File

@ -1,8 +1,8 @@
from typing import List
# Safeguards to prevent fetching massive amounts of data.
MAX_TABLE_LINEAGE_PER_CONNECTOR = 50
MAX_COLUMN_LINEAGE_PER_CONNECTOR = 500
MAX_TABLE_LINEAGE_PER_CONNECTOR = 120
MAX_COLUMN_LINEAGE_PER_CONNECTOR = 1000
MAX_JOBS_PER_CONNECTOR = 500
@ -33,6 +33,7 @@ SELECT
FROM {self.db_clause}connector
WHERE
_fivetran_deleted = FALSE
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY _fivetran_synced DESC) = 1
"""
def get_users_query(self) -> str:
@ -86,21 +87,29 @@ ORDER BY connector_id, end_time DESC
return f"""\
SELECT
stm.connector_id as connector_id,
stm.id as source_table_id,
stm.name as source_table_name,
ssm.name as source_schema_name,
dtm.id as destination_table_id,
dtm.name as destination_table_name,
dsm.name as destination_schema_name
FROM {self.db_clause}table_lineage as tl
JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY tl.created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR}
ORDER BY stm.connector_id, tl.created_at DESC
*
FROM (
SELECT
stm.connector_id as connector_id,
stm.id as source_table_id,
stm.name as source_table_name,
ssm.name as source_schema_name,
dtm.id as destination_table_id,
dtm.name as destination_table_name,
dsm.name as destination_schema_name,
tl.created_at as created_at,
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, stm.id, dtm.id ORDER BY tl.created_at DESC) as table_combo_rn
FROM {self.db_clause}table_lineage as tl
JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
)
-- Ensure that we only get back one entry per source and destination pair.
WHERE table_combo_rn = 1
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR}
ORDER BY connector_id, created_at DESC
"""
def get_column_lineage_query(self, connector_ids: List[str]) -> str:
@ -109,19 +118,31 @@ ORDER BY stm.connector_id, tl.created_at DESC
return f"""\
SELECT
scm.table_id as source_table_id,
dcm.table_id as destination_table_id,
scm.name as source_column_name,
dcm.name as destination_column_name
FROM {self.db_clause}column_lineage as cl
JOIN {self.db_clause}source_column_metadata as scm
ON cl.source_column_id = scm.id
JOIN {self.db_clause}destination_column_metadata as dcm
ON cl.destination_column_id = dcm.id
-- Only joining source_table_metadata to get the connector_id.
JOIN {self.db_clause}source_table_metadata as stm
ON scm.table_id = stm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY cl.created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR}
ORDER BY stm.connector_id, cl.created_at DESC
source_table_id,
destination_table_id,
source_column_name,
destination_column_name
FROM (
SELECT
stm.connector_id as connector_id,
scm.table_id as source_table_id,
dcm.table_id as destination_table_id,
scm.name as source_column_name,
dcm.name as destination_column_name,
cl.created_at as created_at,
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, cl.source_column_id, cl.destination_column_id ORDER BY cl.created_at DESC) as column_combo_rn
FROM {self.db_clause}column_lineage as cl
JOIN {self.db_clause}source_column_metadata as scm
ON cl.source_column_id = scm.id
JOIN {self.db_clause}destination_column_metadata as dcm
ON cl.destination_column_id = dcm.id
-- Only joining source_table_metadata to get the connector_id.
JOIN {self.db_clause}source_table_metadata as stm
ON scm.table_id = stm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
)
-- Ensure that we only get back one entry per (connector, source column, destination column) pair.
WHERE column_combo_rn = 1
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR}
ORDER BY connector_id, created_at DESC
"""

View File

@ -17,6 +17,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
@ -62,9 +78,17 @@
"aspect": {
"json": {
"customProperties": {
"connector_id": "calendar_elected",
"connector_type": "postgres",
"paused": "False",
"sync_frequency": "1440",
"destination_id": "'interval_unconstitutional'"
"destination_id": "interval_unconstitutional",
"source.platform": "postgres",
"source.env": "DEV",
"source.database": "postgres_db",
"destination.platform": "snowflake",
"destination.env": "PROD",
"destination.database": "test_database"
},
"name": "postgres",
"type": {
@ -79,6 +103,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
@ -149,38 +189,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
@ -218,6 +226,38 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
@ -304,8 +344,8 @@
"json": {
"timestampMillis": 1695191853000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
@ -325,8 +365,8 @@
"json": {
"timestampMillis": 1695191885000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -427,8 +467,8 @@
"json": {
"timestampMillis": 1696343730000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
@ -448,8 +488,8 @@
"json": {
"timestampMillis": 1696343732000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -550,8 +590,8 @@
"json": {
"timestampMillis": 1696343755000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
@ -571,8 +611,8 @@
"json": {
"timestampMillis": 1696343790000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -587,38 +627,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",

View File

@ -17,6 +17,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
@ -62,9 +78,17 @@
"aspect": {
"json": {
"customProperties": {
"connector_id": "calendar_elected",
"connector_type": "postgres",
"paused": "False",
"sync_frequency": "1440",
"destination_id": "'interval_unconstitutional'"
"destination_id": "interval_unconstitutional",
"source.platform": "postgres",
"source.env": "DEV",
"source.database": "postgres_db",
"destination.platform": "snowflake",
"destination.env": "PROD",
"destination.database": "test_database"
},
"name": "postgres",
"type": {
@ -79,6 +103,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
@ -149,38 +189,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
@ -226,6 +234,38 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
@ -312,8 +352,8 @@
"json": {
"timestampMillis": 1695191853000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
@ -333,8 +373,8 @@
"json": {
"timestampMillis": 1695191885000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -435,8 +475,8 @@
"json": {
"timestampMillis": 1696343730000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
@ -456,8 +496,8 @@
"json": {
"timestampMillis": 1696343732000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -558,8 +598,8 @@
"json": {
"timestampMillis": 1696343755000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
@ -579,8 +619,8 @@
"json": {
"timestampMillis": 1696343790000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
@ -595,38 +635,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",