mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-18 20:57:07 +00:00
fix(ingest/fivetran): Add way to not add schema to the destination/source urn. (#12314)
This commit is contained in:
parent
ef36837ba0
commit
7eab2eb8d9
@ -167,6 +167,10 @@ class PlatformDetail(ConfigModel):
|
|||||||
description="The database that all assets produced by this connector belong to. "
|
description="The database that all assets produced by this connector belong to. "
|
||||||
"For destinations, this defaults to the fivetran log config's database.",
|
"For destinations, this defaults to the fivetran log config's database.",
|
||||||
)
|
)
|
||||||
|
include_schema_in_urn: bool = pydantic.Field(
|
||||||
|
default=True,
|
||||||
|
description="Include schema in the dataset URN. In some cases, the schema is not relevant to the dataset URN and Fivetran sets it to the source and destination table names in the connector.",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
|
class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
|
||||||
|
|||||||
@ -119,21 +119,31 @@ class FivetranSource(StatefulIngestionSourceBase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
for lineage in connector.lineage:
|
for lineage in connector.lineage:
|
||||||
|
source_table = (
|
||||||
|
lineage.source_table
|
||||||
|
if source_details.include_schema_in_urn
|
||||||
|
else lineage.source_table.split(".", 1)[1]
|
||||||
|
)
|
||||||
input_dataset_urn = DatasetUrn.create_from_ids(
|
input_dataset_urn = DatasetUrn.create_from_ids(
|
||||||
platform_id=source_details.platform,
|
platform_id=source_details.platform,
|
||||||
table_name=(
|
table_name=(
|
||||||
f"{source_details.database.lower()}.{lineage.source_table}"
|
f"{source_details.database.lower()}.{source_table}"
|
||||||
if source_details.database
|
if source_details.database
|
||||||
else lineage.source_table
|
else source_table
|
||||||
),
|
),
|
||||||
env=source_details.env,
|
env=source_details.env,
|
||||||
platform_instance=source_details.platform_instance,
|
platform_instance=source_details.platform_instance,
|
||||||
)
|
)
|
||||||
input_dataset_urn_list.append(input_dataset_urn)
|
input_dataset_urn_list.append(input_dataset_urn)
|
||||||
|
|
||||||
|
destination_table = (
|
||||||
|
lineage.destination_table
|
||||||
|
if destination_details.include_schema_in_urn
|
||||||
|
else lineage.destination_table.split(".", 1)[1]
|
||||||
|
)
|
||||||
output_dataset_urn = DatasetUrn.create_from_ids(
|
output_dataset_urn = DatasetUrn.create_from_ids(
|
||||||
platform_id=destination_details.platform,
|
platform_id=destination_details.platform,
|
||||||
table_name=f"{destination_details.database.lower()}.{lineage.destination_table}",
|
table_name=f"{destination_details.database.lower()}.{destination_table}",
|
||||||
env=destination_details.env,
|
env=destination_details.env,
|
||||||
platform_instance=destination_details.platform_instance,
|
platform_instance=destination_details.platform_instance,
|
||||||
)
|
)
|
||||||
@ -176,12 +186,12 @@ class FivetranSource(StatefulIngestionSourceBase):
|
|||||||
**{
|
**{
|
||||||
f"source.{k}": str(v)
|
f"source.{k}": str(v)
|
||||||
for k, v in source_details.dict().items()
|
for k, v in source_details.dict().items()
|
||||||
if v is not None
|
if v is not None and not isinstance(v, bool)
|
||||||
},
|
},
|
||||||
**{
|
**{
|
||||||
f"destination.{k}": str(v)
|
f"destination.{k}": str(v)
|
||||||
for k, v in destination_details.dict().items()
|
for k, v in destination_details.dict().items()
|
||||||
if v is not None
|
if v is not None and not isinstance(v, bool)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -595,6 +595,323 @@
|
|||||||
"lastRunId": "no-run-id-provided"
|
"lastRunId": "no-run-id-provided"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataFlow",
|
||||||
|
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataFlowInfo",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"customProperties": {},
|
||||||
|
"name": "confluent_cloud",
|
||||||
|
"env": "PROD"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataFlow",
|
||||||
|
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "status",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"removed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataFlow",
|
||||||
|
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "ownership",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"owners": [],
|
||||||
|
"ownerTypes": {},
|
||||||
|
"lastModified": {
|
||||||
|
"time": 0,
|
||||||
|
"actor": "urn:li:corpuser:fivetran"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataFlow",
|
||||||
|
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "globalTags",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"tags": []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataJobInfo",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"customProperties": {
|
||||||
|
"connector_id": "my_confluent_cloud_connector_id",
|
||||||
|
"connector_type": "confluent_cloud",
|
||||||
|
"paused": "False",
|
||||||
|
"sync_frequency": "1440",
|
||||||
|
"destination_id": "interval_unconstitutional",
|
||||||
|
"source.platform": "kafka",
|
||||||
|
"source.env": "PROD",
|
||||||
|
"source.database": "kafka_prod",
|
||||||
|
"destination.platform": "snowflake",
|
||||||
|
"destination.env": "PROD",
|
||||||
|
"destination.database": "test_database"
|
||||||
|
},
|
||||||
|
"name": "confluent_cloud",
|
||||||
|
"type": {
|
||||||
|
"string": "COMMAND"
|
||||||
|
},
|
||||||
|
"env": "PROD"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "status",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"removed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataJobInputOutput",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"inputDatasets": [
|
||||||
|
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)"
|
||||||
|
],
|
||||||
|
"outputDatasets": [
|
||||||
|
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)"
|
||||||
|
],
|
||||||
|
"inputDatajobs": [],
|
||||||
|
"fineGrainedLineages": []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "ownership",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"owners": [],
|
||||||
|
"ownerTypes": {},
|
||||||
|
"lastModified": {
|
||||||
|
"time": 0,
|
||||||
|
"actor": "urn:li:corpuser:fivetran"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "globalTags",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"tags": []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceProperties",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"customProperties": {},
|
||||||
|
"name": "d9a03d6-eded-4422-a46a-163266e58244",
|
||||||
|
"type": "BATCH_SCHEDULED",
|
||||||
|
"created": {
|
||||||
|
"time": 1695191853000,
|
||||||
|
"actor": "urn:li:corpuser:datahub"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceRelationships",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"upstreamInstances": []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceInput",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"inputs": [
|
||||||
|
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceOutput",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"outputs": [
|
||||||
|
"urn:li:dataset:(urn:li:dataPlatform:snowflake,test_database.confluent_cloud.my-destination-topic,PROD)"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceRunEvent",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"timestampMillis": 1695191853000,
|
||||||
|
"partitionSpec": {
|
||||||
|
"partition": "FULL_TABLE_SNAPSHOT",
|
||||||
|
"type": "FULL_TABLE"
|
||||||
|
},
|
||||||
|
"status": "STARTED"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceRunEvent",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"timestampMillis": 1695191885000,
|
||||||
|
"partitionSpec": {
|
||||||
|
"partition": "FULL_TABLE_SNAPSHOT",
|
||||||
|
"type": "FULL_TABLE"
|
||||||
|
},
|
||||||
|
"status": "COMPLETE",
|
||||||
|
"result": {
|
||||||
|
"type": "SUCCESS",
|
||||||
|
"nativeResultType": "fivetran"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "status",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"removed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"entityType": "dataProcessInstance",
|
"entityType": "dataProcessInstance",
|
||||||
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
|
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
|
||||||
|
|||||||
@ -603,6 +603,331 @@
|
|||||||
"lastRunId": "no-run-id-provided"
|
"lastRunId": "no-run-id-provided"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataFlow",
|
||||||
|
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataFlowInfo",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"customProperties": {},
|
||||||
|
"name": "confluent_cloud",
|
||||||
|
"env": "PROD"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataFlow",
|
||||||
|
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "status",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"removed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataFlow",
|
||||||
|
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "ownership",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"owners": [],
|
||||||
|
"ownerTypes": {},
|
||||||
|
"lastModified": {
|
||||||
|
"time": 0,
|
||||||
|
"actor": "urn:li:corpuser:fivetran"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataFlow",
|
||||||
|
"entityUrn": "urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "globalTags",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"tags": []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataJobInfo",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"customProperties": {
|
||||||
|
"connector_id": "my_confluent_cloud_connector_id",
|
||||||
|
"connector_type": "confluent_cloud",
|
||||||
|
"paused": "False",
|
||||||
|
"sync_frequency": "1440",
|
||||||
|
"destination_id": "my_confluent_cloud_connector_id",
|
||||||
|
"source.platform": "kafka",
|
||||||
|
"source.env": "PROD",
|
||||||
|
"source.database": "kafka_prod",
|
||||||
|
"destination.platform": "kafka",
|
||||||
|
"destination.env": "PROD",
|
||||||
|
"destination.database": "kafka_prod"
|
||||||
|
},
|
||||||
|
"name": "confluent_cloud",
|
||||||
|
"type": {
|
||||||
|
"string": "COMMAND"
|
||||||
|
},
|
||||||
|
"env": "PROD"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "status",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"removed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataJobInputOutput",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"inputDatasets": [
|
||||||
|
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)"
|
||||||
|
],
|
||||||
|
"outputDatasets": [
|
||||||
|
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-destination-topic,PROD)"
|
||||||
|
],
|
||||||
|
"inputDatajobs": [],
|
||||||
|
"fineGrainedLineages": []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "ownership",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"owners": [
|
||||||
|
{
|
||||||
|
"owner": "urn:li:corpuser:abc.xyz@email.com",
|
||||||
|
"type": "DEVELOPER",
|
||||||
|
"source": {
|
||||||
|
"type": "SERVICE"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"ownerTypes": {},
|
||||||
|
"lastModified": {
|
||||||
|
"time": 0,
|
||||||
|
"actor": "urn:li:corpuser:fivetran"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataJob",
|
||||||
|
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "globalTags",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"tags": []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceProperties",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"customProperties": {},
|
||||||
|
"name": "d9a03d6-eded-4422-a46a-163266e58244",
|
||||||
|
"type": "BATCH_SCHEDULED",
|
||||||
|
"created": {
|
||||||
|
"time": 1695191853000,
|
||||||
|
"actor": "urn:li:corpuser:datahub"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceRelationships",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
|
||||||
|
"upstreamInstances": []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceInput",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"inputs": [
|
||||||
|
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-source-topic,PROD)"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceOutput",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"outputs": [
|
||||||
|
"urn:li:dataset:(urn:li:dataPlatform:kafka,kafka_prod.my-destination-topic,PROD)"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceRunEvent",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"timestampMillis": 1695191853000,
|
||||||
|
"partitionSpec": {
|
||||||
|
"partition": "FULL_TABLE_SNAPSHOT",
|
||||||
|
"type": "FULL_TABLE"
|
||||||
|
},
|
||||||
|
"status": "STARTED"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "dataProcessInstanceRunEvent",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"timestampMillis": 1695191885000,
|
||||||
|
"partitionSpec": {
|
||||||
|
"partition": "FULL_TABLE_SNAPSHOT",
|
||||||
|
"type": "FULL_TABLE"
|
||||||
|
},
|
||||||
|
"status": "COMPLETE",
|
||||||
|
"result": {
|
||||||
|
"type": "SUCCESS",
|
||||||
|
"nativeResultType": "fivetran"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"entityType": "dataProcessInstance",
|
||||||
|
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
|
||||||
|
"changeType": "UPSERT",
|
||||||
|
"aspectName": "status",
|
||||||
|
"aspect": {
|
||||||
|
"json": {
|
||||||
|
"removed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systemMetadata": {
|
||||||
|
"lastObserved": 1654621200000,
|
||||||
|
"runId": "powerbi-test",
|
||||||
|
"lastRunId": "no-run-id-provided"
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"entityType": "dataProcessInstance",
|
"entityType": "dataProcessInstance",
|
||||||
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
|
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
|
||||||
|
|||||||
@ -32,6 +32,15 @@ default_connector_query_results = [
|
|||||||
"sync_frequency": 1440,
|
"sync_frequency": 1440,
|
||||||
"destination_id": "interval_unconstitutional",
|
"destination_id": "interval_unconstitutional",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"connector_id": "my_confluent_cloud_connector_id",
|
||||||
|
"connecting_user_id": "reapply_phone",
|
||||||
|
"connector_type_id": "confluent_cloud",
|
||||||
|
"connector_name": "confluent_cloud",
|
||||||
|
"paused": False,
|
||||||
|
"sync_frequency": 1440,
|
||||||
|
"destination_id": "my_confluent_cloud_connector_id",
|
||||||
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@ -45,7 +54,7 @@ def default_query_results(
|
|||||||
elif query == fivetran_log_query.get_connectors_query():
|
elif query == fivetran_log_query.get_connectors_query():
|
||||||
return connector_query_results
|
return connector_query_results
|
||||||
elif query == fivetran_log_query.get_table_lineage_query(
|
elif query == fivetran_log_query.get_table_lineage_query(
|
||||||
connector_ids=["calendar_elected"]
|
connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"]
|
||||||
):
|
):
|
||||||
return [
|
return [
|
||||||
{
|
{
|
||||||
@ -66,9 +75,18 @@ def default_query_results(
|
|||||||
"destination_table_name": "company",
|
"destination_table_name": "company",
|
||||||
"destination_schema_name": "postgres_public",
|
"destination_schema_name": "postgres_public",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"connector_id": "my_confluent_cloud_connector_id",
|
||||||
|
"source_table_id": "10042",
|
||||||
|
"source_table_name": "my-source-topic",
|
||||||
|
"source_schema_name": "confluent_cloud",
|
||||||
|
"destination_table_id": "7781",
|
||||||
|
"destination_table_name": "my-destination-topic",
|
||||||
|
"destination_schema_name": "confluent_cloud",
|
||||||
|
},
|
||||||
]
|
]
|
||||||
elif query == fivetran_log_query.get_column_lineage_query(
|
elif query == fivetran_log_query.get_column_lineage_query(
|
||||||
connector_ids=["calendar_elected"]
|
connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"]
|
||||||
):
|
):
|
||||||
return [
|
return [
|
||||||
{
|
{
|
||||||
@ -107,7 +125,7 @@ def default_query_results(
|
|||||||
]
|
]
|
||||||
elif query == fivetran_log_query.get_sync_logs_query(
|
elif query == fivetran_log_query.get_sync_logs_query(
|
||||||
syncs_interval=7,
|
syncs_interval=7,
|
||||||
connector_ids=["calendar_elected"],
|
connector_ids=["calendar_elected", "my_confluent_cloud_connector_id"],
|
||||||
):
|
):
|
||||||
return [
|
return [
|
||||||
{
|
{
|
||||||
@ -131,6 +149,13 @@ def default_query_results(
|
|||||||
"end_time": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000),
|
"end_time": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000),
|
||||||
"end_message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"',
|
"end_message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"',
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"connector_id": "my_confluent_cloud_connector_id",
|
||||||
|
"sync_id": "d9a03d6-eded-4422-a46a-163266e58244",
|
||||||
|
"start_time": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000),
|
||||||
|
"end_time": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000),
|
||||||
|
"end_message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"',
|
||||||
|
},
|
||||||
]
|
]
|
||||||
# Unreachable code
|
# Unreachable code
|
||||||
raise Exception(f"Unknown query {query}")
|
raise Exception(f"Unknown query {query}")
|
||||||
@ -172,19 +197,30 @@ def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path):
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
"connector_patterns": {
|
"connector_patterns": {
|
||||||
"allow": [
|
"allow": ["postgres", "confluent_cloud"]
|
||||||
"postgres",
|
|
||||||
]
|
|
||||||
},
|
},
|
||||||
"destination_patterns": {
|
"destination_patterns": {
|
||||||
"allow": [
|
"allow": [
|
||||||
"interval_unconstitutional",
|
"interval_unconstitutional",
|
||||||
|
"my_confluent_cloud_connector_id",
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"sources_to_platform_instance": {
|
"sources_to_platform_instance": {
|
||||||
"calendar_elected": {
|
"calendar_elected": {
|
||||||
"database": "postgres_db",
|
"database": "postgres_db",
|
||||||
"env": "DEV",
|
"env": "DEV",
|
||||||
|
},
|
||||||
|
"my_confluent_cloud_connector_id": {
|
||||||
|
"platform": "kafka",
|
||||||
|
"include_schema_in_urn": False,
|
||||||
|
"database": "kafka_prod",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"destination_to_platform_instance": {
|
||||||
|
"my_confluent_cloud_connector_id": {
|
||||||
|
"platform": "kafka",
|
||||||
|
"include_schema_in_urn": False,
|
||||||
|
"database": "kafka_prod",
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -234,6 +270,15 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_
|
|||||||
"sync_frequency": 1440,
|
"sync_frequency": 1440,
|
||||||
"destination_id": "interval_unconstitutional",
|
"destination_id": "interval_unconstitutional",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"connector_id": "my_confluent_cloud_connector_id",
|
||||||
|
"connecting_user_id": None,
|
||||||
|
"connector_type_id": "confluent_cloud",
|
||||||
|
"connector_name": "confluent_cloud",
|
||||||
|
"paused": False,
|
||||||
|
"sync_frequency": 1440,
|
||||||
|
"destination_id": "interval_unconstitutional",
|
||||||
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
connection_magic_mock.execute.side_effect = partial(
|
connection_magic_mock.execute.side_effect = partial(
|
||||||
@ -261,9 +306,7 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
"connector_patterns": {
|
"connector_patterns": {
|
||||||
"allow": [
|
"allow": ["postgres", "confluent_cloud"]
|
||||||
"postgres",
|
|
||||||
]
|
|
||||||
},
|
},
|
||||||
"destination_patterns": {
|
"destination_patterns": {
|
||||||
"allow": [
|
"allow": [
|
||||||
@ -275,6 +318,18 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_
|
|||||||
"platform": "postgres",
|
"platform": "postgres",
|
||||||
"env": "DEV",
|
"env": "DEV",
|
||||||
"database": "postgres_db",
|
"database": "postgres_db",
|
||||||
|
},
|
||||||
|
"my_confluent_cloud_connector_id": {
|
||||||
|
"platform": "kafka",
|
||||||
|
"database": "kafka_prod",
|
||||||
|
"include_schema_in_urn": False,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"destination_to_platform_instance": {
|
||||||
|
"my_confluent_cloud_connector_id": {
|
||||||
|
"platform": "kafka",
|
||||||
|
"database": "kafka_prod",
|
||||||
|
"include_schema_in_urn": False,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user