fix(ingestion/airflow-plugin): pipeline tasks discoverable in search (#10819)

This commit is contained in:
dushayntAW 2024-07-02 11:59:16 +02:00 committed by GitHub
parent 8d5f0f3b04
commit 2e496d599a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 373 additions and 109 deletions

View File

@ -536,14 +536,27 @@ class DataHubListener:
)
dataflow.emit(self.emitter, callback=self._make_emit_callback())
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(dataflow.urn), aspect=StatusClass(removed=False)
)
self.emitter.emit(event)
for task in dag.tasks:
task_urn = builder.make_data_job_urn_with_flow(
str(dataflow.urn), task.task_id
)
event = MetadataChangeProposalWrapper(
entityUrn=task_urn, aspect=StatusClass(removed=False)
)
self.emitter.emit(event)
# emit tags
for tag in dataflow.tags:
tag_urn = builder.make_tag_urn(tag)
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
event = MetadataChangeProposalWrapper(
entityUrn=tag_urn, aspect=StatusClass(removed=False)
)
self.emitter.emit(event)
browse_path_v2_event: MetadataChangeProposalWrapper = (

View File

@ -57,6 +57,28 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",

View File

@ -57,6 +57,17 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)",
@ -72,6 +83,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",

View File

@ -58,6 +58,28 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
@ -102,6 +124,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",

View File

@ -58,6 +58,17 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)",
@ -73,6 +84,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",
@ -102,6 +124,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)",

View File

@ -57,6 +57,28 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,snowflake_operator,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,snowflake_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,snowflake_operator,prod)",

View File

@ -57,6 +57,28 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)",
@ -102,6 +124,39 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),populate_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)",
@ -155,6 +210,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)",

View File

@ -57,6 +57,28 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)",
@ -72,6 +94,28 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),populate_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)",
@ -154,19 +198,6 @@
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:sqlite",
"name": "public.costs",
"origin": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)",
@ -191,6 +222,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)",
@ -204,6 +246,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)",
@ -261,23 +314,6 @@
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fbeed1180fa0434e02ac6f75ace87869",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1717180072004,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"status": "STARTED",
"attempt": 1
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fbeed1180fa0434e02ac6f75ace87869",
@ -292,20 +328,19 @@
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)",
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fbeed1180fa0434e02ac6f75ace87869",
"changeType": "UPSERT",
"aspectName": "operation",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1718780495946,
"timestampMillis": 1717180072004,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1718780495946
"status": "STARTED",
"attempt": 1
}
}
},
@ -338,6 +373,24 @@
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)",
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"json": {
"timestampMillis": 1719864194882,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1719864194882
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)",
@ -680,19 +733,6 @@
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:04e1badac1eacd1c41123d07f579fa92",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
]
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)",
@ -892,6 +932,19 @@
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:04e1badac1eacd1c41123d07f579fa92",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
]
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b",
@ -1039,19 +1092,6 @@
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:sqlite",
"name": "public.processed_costs",
"origin": "PROD"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)",
@ -1192,32 +1232,6 @@
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
]
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)"
]
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a",
@ -1281,24 +1295,6 @@
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)",
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"json": {
"timestampMillis": 1718780501750,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1718780501750
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)",
@ -1383,6 +1379,19 @@
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:sqlite",
"name": "public.processed_costs",
"origin": "PROD"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)",
@ -1470,6 +1479,19 @@
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)"
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)",
@ -1488,6 +1510,19 @@
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)"
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)",
@ -1586,6 +1621,24 @@
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)",
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"json": {
"timestampMillis": 1719864203487,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1719864203487
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)",