diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 53f1dde48c..6ef4f83152 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -536,14 +536,27 @@ class DataHubListener: ) dataflow.emit(self.emitter, callback=self._make_emit_callback()) + event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityUrn=str(dataflow.urn), aspect=StatusClass(removed=False) + ) + self.emitter.emit(event) + + for task in dag.tasks: + task_urn = builder.make_data_job_urn_with_flow( + str(dataflow.urn), task.task_id + ) + event = MetadataChangeProposalWrapper( + entityUrn=task_urn, aspect=StatusClass(removed=False) + ) + self.emitter.emit(event) + # emit tags for tag in dataflow.tags: tag_urn = builder.make_tag_urn(tag) - event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + event = MetadataChangeProposalWrapper( entityUrn=tag_urn, aspect=StatusClass(removed=False) ) - self.emitter.emit(event) browse_path_v2_event: MetadataChangeProposalWrapper = ( diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json index 1ff53a45ab..e2f738d19d 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json @@ -57,6 +57,28 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json index ef55850894..7b1591bdf7 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json @@ -57,6 +57,17 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)", @@ -72,6 +83,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json index 48f8872d38..0828c5e5aa 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json @@ -58,6 +58,28 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", @@ -102,6 +124,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json index 434f4b6eb7..c7cd245cc0 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json @@ -58,6 +58,17 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", @@ -73,6 +84,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", @@ -102,6 +124,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json index 8946444803..b819379395 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json @@ -57,6 +57,28 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,snowflake_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,snowflake_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,snowflake_operator,prod)", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index b4b97bbb74..e7902d1650 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -57,6 +57,28 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", @@ -102,6 +124,39 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),populate_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -155,6 +210,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json index d09d4f76e3..a9af068e2e 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json @@ -57,6 +57,28 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", @@ -72,6 +94,28 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),populate_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -154,19 +198,6 @@ } } }, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", - "changeType": "UPSERT", - "aspectName": "datasetKey", - "aspect": { - "json": { - "platform": "urn:li:dataPlatform:sqlite", - "name": "public.costs", - "origin": "PROD" - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -191,6 +222,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", @@ -204,6 +246,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -261,23 +314,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:fbeed1180fa0434e02ac6f75ace87869", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1717180072004, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "status": "STARTED", - "attempt": 1 - } - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:fbeed1180fa0434e02ac6f75ace87869", @@ -292,20 +328,19 @@ } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:fbeed1180fa0434e02ac6f75ace87869", "changeType": "UPSERT", - "aspectName": "operation", + "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1718780495946, + "timestampMillis": 1717180072004, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" }, - "actor": "urn:li:corpuser:airflow", - "operationType": "CREATE", - "lastUpdatedTimestamp": 1718780495946 + "status": "STARTED", + "attempt": 1 } } }, @@ -338,6 +373,24 @@ } } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1719864194882, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:airflow", + "operationType": "CREATE", + "lastUpdatedTimestamp": 1719864194882 + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -680,19 +733,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:04e1badac1eacd1c41123d07f579fa92", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceOutput", - "aspect": { - "json": { - "outputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ] - } - } -}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", @@ -892,6 +932,19 @@ } } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:04e1badac1eacd1c41123d07f579fa92", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" + ] + } + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b", @@ -1039,19 +1092,6 @@ } } }, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", - "changeType": "UPSERT", - "aspectName": "datasetKey", - "aspect": { - "json": { - "platform": "urn:li:dataPlatform:sqlite", - "name": "public.processed_costs", - "origin": "PROD" - } - } -}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", @@ -1192,32 +1232,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", - "aspect": { - "json": { - "inputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceOutput", - "aspect": { - "json": { - "outputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" - ] - } - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", @@ -1281,24 +1295,6 @@ } } }, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", - "changeType": "UPSERT", - "aspectName": "operation", - "aspect": { - "json": { - "timestampMillis": 1718780501750, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "actor": "urn:li:corpuser:airflow", - "operationType": "CREATE", - "lastUpdatedTimestamp": 1718780501750 - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", @@ -1383,6 +1379,19 @@ } } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:sqlite", + "name": "public.processed_costs", + "origin": "PROD" + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", @@ -1470,6 +1479,19 @@ } } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" + ] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", @@ -1488,6 +1510,19 @@ } } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" + ] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", @@ -1586,6 +1621,24 @@ } } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1719864203487, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:airflow", + "operationType": "CREATE", + "lastUpdatedTimestamp": 1719864203487 + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)",