From bb7300251f6d65024b3440379d8eef3e2413a2b0 Mon Sep 17 00:00:00 2001 From: Shubham Jagtap <132359390+shubhamjagtap639@users.noreply.github.com> Date: Sat, 11 Nov 2023 05:29:18 +0530 Subject: [PATCH] feat(ingestion/airflow): support datajobs as task inlets (#9211) Co-authored-by: Harshal Sheth --- .../datahub_listener.py | 14 ++-- .../src/datahub_airflow_plugin/entities.py | 28 ++++++-- .../example_dags/lineage_backend_demo.py | 1 + .../lineage_backend_taskflow_demo.py | 1 + .../lineage/_lineage_core.py | 18 ++--- .../tests/integration/dags/basic_iolets.py | 1 + .../tests/integration/dags/simple_dag.py | 1 + .../integration/goldens/v1_basic_iolets.json | 56 ++++++++++++--- .../integration/goldens/v1_simple_dag.json | 70 ++++++++++++++----- .../integration/goldens/v2_basic_iolets.json | 26 ++++--- .../v2_basic_iolets_no_dag_listener.json | 24 ++++--- .../integration/goldens/v2_simple_dag.json | 38 +++++----- .../v2_simple_dag_no_dag_listener.json | 32 +++++---- .../goldens/v2_snowflake_operator.json | 2 +- .../goldens/v2_sqlite_operator.json | 2 +- .../v2_sqlite_operator_no_dag_listener.json | 40 +++++------ .../airflow-plugin/tests/unit/test_airflow.py | 31 ++++++-- 17 files changed, 261 insertions(+), 124 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index a3f5cb489e..d00b10bbe1 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -17,7 +17,6 @@ from datahub.metadata.schema_classes import ( ) from datahub.telemetry import telemetry from datahub.utilities.sqlglot_lineage import SqlParsingResult -from datahub.utilities.urns.dataset_urn import DatasetUrn from openlineage.airflow.listener import TaskHolder from openlineage.airflow.utils import redact_with_exclusions from openlineage.client.serde import Serde @@ -32,7 +31,11 @@ from datahub_airflow_plugin._config import DatahubLineageConfig, get_lineage_con from datahub_airflow_plugin._datahub_ol_adapter import translate_ol_to_datahub_urn from datahub_airflow_plugin._extractors import SQL_PARSING_RESULT_KEY, ExtractorManager from datahub_airflow_plugin.client.airflow_generator import AirflowGenerator -from datahub_airflow_plugin.entities import _Entity +from datahub_airflow_plugin.entities import ( + _Entity, + entities_to_datajob_urn_list, + entities_to_dataset_urn_list, +) _F = TypeVar("_F", bound=Callable[..., None]) if TYPE_CHECKING: @@ -272,10 +275,9 @@ class DataHubListener: ) # Write the lineage to the datajob object. - datajob.inlets.extend(DatasetUrn.create_from_string(urn) for urn in input_urns) - datajob.outlets.extend( - DatasetUrn.create_from_string(urn) for urn in output_urns - ) + datajob.inlets.extend(entities_to_dataset_urn_list(input_urns)) + datajob.outlets.extend(entities_to_dataset_urn_list(output_urns)) + datajob.upstream_urns.extend(entities_to_datajob_urn_list(input_urns)) datajob.fine_grained_lineages.extend(fine_grained_lineages) # Merge in extra stuff that was present in the DataJob we constructed diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/entities.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/entities.py index 69f667cad3..5a4bcb0097 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/entities.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/entities.py @@ -1,8 +1,10 @@ from abc import abstractmethod -from typing import Optional +from typing import List, Optional import attr import datahub.emitter.mce_builder as builder +from datahub.utilities.urns.data_job_urn import DataJobUrn +from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.urn import guess_entity_type @@ -38,10 +40,28 @@ class Urn(_Entity): def _validate_urn(self, attribute, value): if not value.startswith("urn:"): raise ValueError("invalid urn provided: urns must start with 'urn:'") - if guess_entity_type(value) != "dataset": - # This is because DataJobs only support Dataset lineage. - raise ValueError("Airflow lineage currently only supports datasets") + if guess_entity_type(value) not in ["dataset", "dataJob"]: + # This is because DataJobs only support Dataset and upstream Datajob lineage. + raise ValueError( + "Airflow lineage currently only supports datasets and upstream datajobs" + ) @property def urn(self): return self._urn + + +def entities_to_dataset_urn_list(iolets: List[str]) -> List[DatasetUrn]: + dataset_urn_list: List[DatasetUrn] = [] + for let in iolets: + if guess_entity_type(let) == "dataset": + dataset_urn_list.append(DatasetUrn.create_from_string(let)) + return dataset_urn_list + + +def entities_to_datajob_urn_list(inlets: List[str]) -> List[DataJobUrn]: + datajob_urn_list: List[DataJobUrn] = [] + for let in inlets: + if guess_entity_type(let) == "dataJob": + datajob_urn_list.append(DataJobUrn.create_from_string(let)) + return datajob_urn_list diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_demo.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_demo.py index 3caea093b9..ce161d6a41 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_demo.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_demo.py @@ -46,6 +46,7 @@ with DAG( Urn( "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" ), + Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,dag1,prod),task1)"), ], outlets=[Dataset("snowflake", "mydb.schema.tableD")], ) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_taskflow_demo.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_taskflow_demo.py index ceb0f452b5..80df7053a4 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_taskflow_demo.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/example_dags/lineage_backend_taskflow_demo.py @@ -37,6 +37,7 @@ def datahub_lineage_backend_taskflow_demo(): Urn( "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" ), + Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,dag1,prod),task1)"), ], outlets=[Dataset("snowflake", "mydb.schema.tableD")], ) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py index f5f519fa23..75fc79443e 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py @@ -2,11 +2,14 @@ from datetime import datetime from typing import TYPE_CHECKING, Dict, List from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult -from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub_airflow_plugin._config import DatahubLineageConfig from datahub_airflow_plugin.client.airflow_generator import AirflowGenerator -from datahub_airflow_plugin.entities import _Entity +from datahub_airflow_plugin.entities import ( + _Entity, + entities_to_datajob_urn_list, + entities_to_dataset_urn_list, +) if TYPE_CHECKING: from airflow import DAG @@ -16,10 +19,6 @@ if TYPE_CHECKING: from datahub_airflow_plugin._airflow_shims import Operator -def _entities_to_urn_list(iolets: List[_Entity]) -> List[DatasetUrn]: - return [DatasetUrn.create_from_string(let.urn) for let in iolets] - - def send_lineage_to_datahub( config: DatahubLineageConfig, operator: "Operator", @@ -53,8 +52,11 @@ def send_lineage_to_datahub( capture_tags=config.capture_tags_info, capture_owner=config.capture_ownership_info, ) - datajob.inlets.extend(_entities_to_urn_list(inlets)) - datajob.outlets.extend(_entities_to_urn_list(outlets)) + datajob.inlets.extend(entities_to_dataset_urn_list([let.urn for let in inlets])) + datajob.outlets.extend(entities_to_dataset_urn_list([let.urn for let in outlets])) + datajob.upstream_urns.extend( + entities_to_datajob_urn_list([let.urn for let in inlets]) + ) datajob.emit(emitter) operator.log.info(f"Emitted from Lineage: {datajob}") diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py index 8b0803ab98..11b3731c52 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py @@ -26,6 +26,7 @@ with DAG( Urn( "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" ), + Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"), ], outlets=[ Dataset("snowflake", "mydb.schema.tableD"), diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py index 1dd047f0a6..71b462159a 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py @@ -21,6 +21,7 @@ with DAG( Urn( "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" ), + Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"), ], outlets=[Dataset("snowflake", "mydb.schema.tableD")], ) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json index a4c17c73e9..6b460e99b1 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json @@ -11,7 +11,7 @@ "catchup": "False", "description": "None", "doc_md": "None", - "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'", + "fileloc": "'/home/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", "tags": "None", @@ -95,7 +95,8 @@ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableB,DEV)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,cloud.mydb.schema.tableC,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)", + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" ], "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", @@ -150,6 +151,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", @@ -245,7 +257,8 @@ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableB,DEV)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,cloud.mydb.schema.tableC,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)", + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" ], "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", @@ -300,6 +313,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", @@ -365,9 +389,9 @@ "json": { "customProperties": { "run_id": "manual_run_test", - "duration": "0.176536", - "start_date": "2023-09-30 00:49:56.670239+00:00", - "end_date": "2023-09-30 00:49:56.846775+00:00", + "duration": "0.143271", + "start_date": "2023-11-08 09:55:05.801617+00:00", + "end_date": "2023-11-08 09:55:05.944888+00:00", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "1", "max_tries": "0", @@ -384,7 +408,7 @@ "name": "basic_iolets_run_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696034996670, + "time": 1699437305801, "actor": "urn:li:corpuser:datahub" } } @@ -413,7 +437,8 @@ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableB,DEV)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,cloud.mydb.schema.tableC,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)", + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" ] } } @@ -476,6 +501,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", @@ -505,7 +541,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696034996670, + "timestampMillis": 1699437305801, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -522,7 +558,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696034996846, + "timestampMillis": 1699437305944, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json index a0a95716a0..7ec172e367 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json @@ -11,7 +11,7 @@ "catchup": "False", "description": "'A simple DAG that runs a few fake data tasks.'", "doc_md": "None", - "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", + "fileloc": "'/home/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", "tags": "None", @@ -94,7 +94,8 @@ "json": { "inputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)", + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" ], "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)" @@ -126,6 +127,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", @@ -208,7 +220,8 @@ "json": { "inputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)", + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" ], "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)" @@ -240,6 +253,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", @@ -294,9 +318,9 @@ "json": { "customProperties": { "run_id": "manual_run_test", - "duration": "0.175983", - "start_date": "2023-09-30 00:48:58.943850+00:00", - "end_date": "2023-09-30 00:48:59.119833+00:00", + "duration": "0.120524", + "start_date": "2023-11-08 09:54:06.065112+00:00", + "end_date": "2023-11-08 09:54:06.185636+00:00", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "1", "max_tries": "0", @@ -313,7 +337,7 @@ "name": "simple_dag_task_1_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696034938943, + "time": 1699437246065, "actor": "urn:li:corpuser:datahub" } } @@ -340,7 +364,8 @@ "json": { "inputs": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)", + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" ] } } @@ -380,6 +405,17 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", @@ -398,7 +434,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696034938943, + "timestampMillis": 1699437246065, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -415,7 +451,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696034939119, + "timestampMillis": 1699437246185, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -440,7 +476,7 @@ "catchup": "False", "description": "'A simple DAG that runs a few fake data tasks.'", "doc_md": "None", - "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", + "fileloc": "'/home/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", "tags": "None", @@ -651,9 +687,9 @@ "json": { "customProperties": { "run_id": "manual_run_test", - "duration": "0.129888", - "start_date": "2023-09-30 00:49:02.158752+00:00", - "end_date": "2023-09-30 00:49:02.288640+00:00", + "duration": "0.099975", + "start_date": "2023-11-08 09:54:09.744583+00:00", + "end_date": "2023-11-08 09:54:09.844558+00:00", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "1", "max_tries": "0", @@ -670,7 +706,7 @@ "name": "simple_dag_run_another_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696034942158, + "time": 1699437249744, "actor": "urn:li:corpuser:datahub" } } @@ -695,7 +731,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696034942158, + "timestampMillis": 1699437249744, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -712,7 +748,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696034942288, + "timestampMillis": 1699437249844, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json index 1974f1f085..6767a368f3 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json @@ -11,7 +11,7 @@ "catchup": "False", "description": "None", "doc_md": "None", - "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'", + "fileloc": "'/Users/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", "tags": "[]", @@ -73,9 +73,9 @@ "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "[]", - "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", + "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]", "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task", "name": "run_data_task", @@ -102,7 +102,9 @@ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableE,PROD)" ], - "inputDatajobs": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" + ], "fineGrainedLineages": [] } } @@ -217,7 +219,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 01:13:14.266272+00:00", + "start_date": "2023-10-30 13:07:55.311482+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -235,7 +237,7 @@ "name": "basic_iolets_run_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696036394266, + "time": 1698671275311, "actor": "urn:li:corpuser:datahub" } } @@ -356,7 +358,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696036394266, + "timestampMillis": 1698671275311, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -383,9 +385,9 @@ "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "[]", - "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", + "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]", "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task", "name": "run_data_task", @@ -412,7 +414,9 @@ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableE,PROD)" ], - "inputDatajobs": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" + ], "fineGrainedLineages": [] } } @@ -524,7 +528,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696036394833, + "timestampMillis": 1698671276777, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json index d02951bc9e..63b0a05935 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json @@ -73,9 +73,9 @@ "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "[]", - "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", + "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]", "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task", "name": "run_data_task", @@ -102,7 +102,9 @@ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableE,PROD)" ], - "inputDatajobs": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" + ], "fineGrainedLineages": [] } } @@ -217,7 +219,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:59:52.401211+00:00", + "start_date": "2023-11-10 19:11:17.444435+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -235,7 +237,7 @@ "name": "basic_iolets_run_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057192401, + "time": 1699643477444, "actor": "urn:li:corpuser:datahub" } } @@ -356,7 +358,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057192401, + "timestampMillis": 1699643477444, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -383,9 +385,9 @@ "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "[]", - "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", + "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]", "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task", "name": "run_data_task", @@ -412,7 +414,9 @@ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableE,PROD)" ], - "inputDatajobs": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" + ], "fineGrainedLineages": [] } } @@ -524,7 +528,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057192982, + "timestampMillis": 1699643478123, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json index 9acc47ec13..c558f79c32 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json @@ -11,7 +11,7 @@ "catchup": "False", "description": "'A simple DAG that runs a few fake data tasks.'", "doc_md": "None", - "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", + "fileloc": "'/Users/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", "tags": "[]", @@ -74,9 +74,9 @@ "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "['run_another_data_task']", - "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", + "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]", "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1", "name": "task_1", @@ -100,7 +100,9 @@ "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)" ], - "inputDatajobs": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" + ], "fineGrainedLineages": [] } } @@ -182,7 +184,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:53:58.219003+00:00", + "start_date": "2023-10-30 13:06:07.193282+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -200,7 +202,7 @@ "name": "simple_dag_task_1_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056838219, + "time": 1698671167193, "actor": "urn:li:corpuser:datahub" } } @@ -285,7 +287,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056838219, + "timestampMillis": 1698671167193, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -312,9 +314,9 @@ "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "['run_another_data_task']", - "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", + "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]", "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1", "name": "task_1", @@ -338,7 +340,9 @@ "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)" ], - "inputDatajobs": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" + ], "fineGrainedLineages": [] } } @@ -417,7 +421,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056838648, + "timestampMillis": 1698671168726, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -449,7 +453,7 @@ "downstream_task_ids": "[]", "inlets": "[]", "outlets": "[]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 2'\", \"dag\": \"<>\", \"task_id\": \"run_another_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 2'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [], \"outlets\": [], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_another_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [\"task_1\"], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 2'\", \"dag\": \"<>\", \"task_id\": \"run_another_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 2'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [], \"outlets\": [], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_another_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [\"task_1\"], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task", "name": "run_another_data_task", @@ -519,7 +523,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:54:02.407515+00:00", + "start_date": "2023-10-30 13:06:19.970466+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -537,7 +541,7 @@ "name": "simple_dag_run_another_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696056842407, + "time": 1698671179970, "actor": "urn:li:corpuser:datahub" } } @@ -562,7 +566,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056842407, + "timestampMillis": 1698671179970, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -591,7 +595,7 @@ "downstream_task_ids": "[]", "inlets": "[]", "outlets": "[]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 2'\", \"dag\": \"<>\", \"task_id\": \"run_another_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 2'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [], \"outlets\": [], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_another_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [\"task_1\"], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 2'\", \"dag\": \"<>\", \"task_id\": \"run_another_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 2'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [], \"outlets\": [], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_another_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [\"task_1\"], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task", "name": "run_another_data_task", @@ -658,7 +662,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696056842831, + "timestampMillis": 1698671180730, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json index 03299c483f..ec0f3cab1e 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json @@ -74,9 +74,9 @@ "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "['run_another_data_task']", - "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", + "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]", "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1", "name": "task_1", @@ -100,7 +100,9 @@ "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)" ], - "inputDatajobs": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" + ], "fineGrainedLineages": [] } } @@ -182,7 +184,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:58:56.105026+00:00", + "start_date": "2023-11-10 19:10:10.856995+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -200,7 +202,7 @@ "name": "simple_dag_task_1_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057136105, + "time": 1699643410856, "actor": "urn:li:corpuser:datahub" } } @@ -285,7 +287,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057136105, + "timestampMillis": 1699643410856, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -312,9 +314,9 @@ "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "['run_another_data_task']", - "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", + "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]", "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1", "name": "task_1", @@ -338,7 +340,9 @@ "outputDatasets": [ "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)" ], - "inputDatajobs": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)" + ], "fineGrainedLineages": [] } } @@ -417,7 +421,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057136612, + "timestampMillis": 1699643411390, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -577,7 +581,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-09-30 06:58:59.567004+00:00", + "start_date": "2023-11-10 19:10:15.128009+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -595,7 +599,7 @@ "name": "simple_dag_run_another_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1696057139567, + "time": 1699643415128, "actor": "urn:li:corpuser:datahub" } } @@ -620,7 +624,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057139567, + "timestampMillis": 1699643415128, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -716,7 +720,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1696057140164, + "timestampMillis": 1699643415856, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json index 11a0b17b45..0a704ed10c 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json @@ -11,7 +11,7 @@ "catchup": "False", "description": "None", "doc_md": "None", - "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/snowflake_operator.py'", + "fileloc": "'/Users/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/snowflake_operator.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", "tags": "[]", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index 19e4aac9fb..3b4b60174f 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -11,7 +11,7 @@ "catchup": "False", "description": "None", "doc_md": "None", - "fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'", + "fileloc": "'/Users/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", "tags": "[]", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json index b67464b385..99a8aadb7f 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json @@ -194,7 +194,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-10-15 20:27:26.883178+00:00", + "start_date": "2023-11-10 19:12:17.805860+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -212,7 +212,7 @@ "name": "sqlite_operator_create_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1697401646883, + "time": 1699643537805, "actor": "urn:li:corpuser:datahub" } } @@ -261,7 +261,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401646883, + "timestampMillis": 1699643537805, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -442,7 +442,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401647826, + "timestampMillis": 1699643538759, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -615,7 +615,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-10-15 20:27:31.398799+00:00", + "start_date": "2023-11-10 19:12:22.560376+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -633,7 +633,7 @@ "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1697401651398, + "time": 1699643542560, "actor": "urn:li:corpuser:datahub" } } @@ -682,7 +682,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401651398, + "timestampMillis": 1699643542560, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -792,7 +792,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401652651, + "timestampMillis": 1699643543925, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1035,7 +1035,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-10-15 20:27:37.697995+00:00", + "start_date": "2023-11-10 19:12:29.429032+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1053,7 +1053,7 @@ "name": "sqlite_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1697401657697, + "time": 1699643549429, "actor": "urn:li:corpuser:datahub" } } @@ -1126,7 +1126,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401657697, + "timestampMillis": 1699643549429, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1362,7 +1362,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401659496, + "timestampMillis": 1699643551423, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1537,7 +1537,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-10-15 20:27:45.670215+00:00", + "start_date": "2023-11-10 19:12:37.423556+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1555,7 +1555,7 @@ "name": "sqlite_operator_cleanup_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1697401665670, + "time": 1699643557423, "actor": "urn:li:corpuser:datahub" } } @@ -1604,7 +1604,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401665670, + "timestampMillis": 1699643557423, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1716,7 +1716,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401667670, + "timestampMillis": 1699643559607, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -1891,7 +1891,7 @@ "customProperties": { "run_id": "manual_run_test", "duration": "None", - "start_date": "2023-10-15 20:27:51.559194+00:00", + "start_date": "2023-11-10 19:12:43.792375+00:00", "end_date": "None", "execution_date": "2023-09-27 21:34:38+00:00", "try_number": "0", @@ -1909,7 +1909,7 @@ "name": "sqlite_operator_cleanup_processed_costs_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1697401671559, + "time": 1699643563792, "actor": "urn:li:corpuser:datahub" } } @@ -1958,7 +1958,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401671559, + "timestampMillis": 1699643563792, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" @@ -2070,7 +2070,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1697401673788, + "timestampMillis": 1699643566350, "partitionSpec": { "type": "FULL_TABLE", "partition": "FULL_TABLE_SNAPSHOT" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py index d8620e74d7..7fbf707995 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_airflow.py @@ -188,10 +188,17 @@ def test_entities(): == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)" ) + assert ( + Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)").urn + == "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)" + ) + with pytest.raises(ValueError, match="invalid"): Urn("not a URN") - with pytest.raises(ValueError, match="only supports datasets"): + with pytest.raises( + ValueError, match="only supports datasets and upstream datajobs" + ): Urn("urn:li:mlModel:(urn:li:dataPlatform:science,scienceModel,PROD)") @@ -199,13 +206,19 @@ def test_entities(): ["inlets", "outlets", "capture_executions"], [ pytest.param( - [Dataset("snowflake", "mydb.schema.tableConsumed")], + [ + Dataset("snowflake", "mydb.schema.tableConsumed"), + Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"), + ], [Dataset("snowflake", "mydb.schema.tableProduced")], False, id="airflow-lineage-no-executions", ), pytest.param( - [Dataset("snowflake", "mydb.schema.tableConsumed")], + [ + Dataset("snowflake", "mydb.schema.tableConsumed"), + Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"), + ], [Dataset("snowflake", "mydb.schema.tableProduced")], True, id="airflow-lineage-capture-executions", @@ -293,9 +306,13 @@ def test_lineage_backend(mock_emit, inlets, outlets, capture_executions): # Verify that the inlets and outlets are registered and recognized by Airflow correctly, # or that our lineage backend forces it to. - assert len(op2.inlets) == 1 + assert len(op2.inlets) == 2 assert len(op2.outlets) == 1 - assert all(map(lambda let: isinstance(let, Dataset), op2.inlets)) + assert all( + map( + lambda let: isinstance(let, Dataset) or isinstance(let, Urn), op2.inlets + ) + ) assert all(map(lambda let: isinstance(let, Dataset), op2.outlets)) # Check that the right things were emitted. @@ -338,6 +355,10 @@ def test_lineage_backend(mock_emit, inlets, outlets, capture_executions): mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[0] == "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task1_upstream)" ) + assert ( + mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[1] + == "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)" + ) assert ( mock_emitter.method_calls[4].args[0].aspect.inputDatasets[0] == "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"