feat(ingestion/airflow): support datajobs as task inlets (#9211)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Shubham Jagtap 2023-11-11 05:29:18 +05:30 committed by GitHub
parent a187127ac5
commit bb7300251f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 261 additions and 124 deletions

View File

@ -17,7 +17,6 @@ from datahub.metadata.schema_classes import (
)
from datahub.telemetry import telemetry
from datahub.utilities.sqlglot_lineage import SqlParsingResult
from datahub.utilities.urns.dataset_urn import DatasetUrn
from openlineage.airflow.listener import TaskHolder
from openlineage.airflow.utils import redact_with_exclusions
from openlineage.client.serde import Serde
@ -32,7 +31,11 @@ from datahub_airflow_plugin._config import DatahubLineageConfig, get_lineage_con
from datahub_airflow_plugin._datahub_ol_adapter import translate_ol_to_datahub_urn
from datahub_airflow_plugin._extractors import SQL_PARSING_RESULT_KEY, ExtractorManager
from datahub_airflow_plugin.client.airflow_generator import AirflowGenerator
from datahub_airflow_plugin.entities import _Entity
from datahub_airflow_plugin.entities import (
_Entity,
entities_to_datajob_urn_list,
entities_to_dataset_urn_list,
)
_F = TypeVar("_F", bound=Callable[..., None])
if TYPE_CHECKING:
@ -272,10 +275,9 @@ class DataHubListener:
)
# Write the lineage to the datajob object.
datajob.inlets.extend(DatasetUrn.create_from_string(urn) for urn in input_urns)
datajob.outlets.extend(
DatasetUrn.create_from_string(urn) for urn in output_urns
)
datajob.inlets.extend(entities_to_dataset_urn_list(input_urns))
datajob.outlets.extend(entities_to_dataset_urn_list(output_urns))
datajob.upstream_urns.extend(entities_to_datajob_urn_list(input_urns))
datajob.fine_grained_lineages.extend(fine_grained_lineages)
# Merge in extra stuff that was present in the DataJob we constructed

View File

@ -1,8 +1,10 @@
from abc import abstractmethod
from typing import Optional
from typing import List, Optional
import attr
import datahub.emitter.mce_builder as builder
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import guess_entity_type
@ -38,10 +40,28 @@ class Urn(_Entity):
def _validate_urn(self, attribute, value):
if not value.startswith("urn:"):
raise ValueError("invalid urn provided: urns must start with 'urn:'")
if guess_entity_type(value) != "dataset":
# This is because DataJobs only support Dataset lineage.
raise ValueError("Airflow lineage currently only supports datasets")
if guess_entity_type(value) not in ["dataset", "dataJob"]:
# This is because DataJobs only support Dataset and upstream Datajob lineage.
raise ValueError(
"Airflow lineage currently only supports datasets and upstream datajobs"
)
@property
def urn(self):
return self._urn
def entities_to_dataset_urn_list(iolets: List[str]) -> List[DatasetUrn]:
dataset_urn_list: List[DatasetUrn] = []
for let in iolets:
if guess_entity_type(let) == "dataset":
dataset_urn_list.append(DatasetUrn.create_from_string(let))
return dataset_urn_list
def entities_to_datajob_urn_list(inlets: List[str]) -> List[DataJobUrn]:
datajob_urn_list: List[DataJobUrn] = []
for let in inlets:
if guess_entity_type(let) == "dataJob":
datajob_urn_list.append(DataJobUrn.create_from_string(let))
return datajob_urn_list

View File

@ -46,6 +46,7 @@ with DAG(
Urn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
),
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,dag1,prod),task1)"),
],
outlets=[Dataset("snowflake", "mydb.schema.tableD")],
)

View File

@ -37,6 +37,7 @@ def datahub_lineage_backend_taskflow_demo():
Urn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
),
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,dag1,prod),task1)"),
],
outlets=[Dataset("snowflake", "mydb.schema.tableD")],
)

View File

@ -2,11 +2,14 @@ from datetime import datetime
from typing import TYPE_CHECKING, Dict, List
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub_airflow_plugin._config import DatahubLineageConfig
from datahub_airflow_plugin.client.airflow_generator import AirflowGenerator
from datahub_airflow_plugin.entities import _Entity
from datahub_airflow_plugin.entities import (
_Entity,
entities_to_datajob_urn_list,
entities_to_dataset_urn_list,
)
if TYPE_CHECKING:
from airflow import DAG
@ -16,10 +19,6 @@ if TYPE_CHECKING:
from datahub_airflow_plugin._airflow_shims import Operator
def _entities_to_urn_list(iolets: List[_Entity]) -> List[DatasetUrn]:
return [DatasetUrn.create_from_string(let.urn) for let in iolets]
def send_lineage_to_datahub(
config: DatahubLineageConfig,
operator: "Operator",
@ -53,8 +52,11 @@ def send_lineage_to_datahub(
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
)
datajob.inlets.extend(_entities_to_urn_list(inlets))
datajob.outlets.extend(_entities_to_urn_list(outlets))
datajob.inlets.extend(entities_to_dataset_urn_list([let.urn for let in inlets]))
datajob.outlets.extend(entities_to_dataset_urn_list([let.urn for let in outlets]))
datajob.upstream_urns.extend(
entities_to_datajob_urn_list([let.urn for let in inlets])
)
datajob.emit(emitter)
operator.log.info(f"Emitted from Lineage: {datajob}")

View File

@ -26,6 +26,7 @@ with DAG(
Urn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
),
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"),
],
outlets=[
Dataset("snowflake", "mydb.schema.tableD"),

View File

@ -21,6 +21,7 @@ with DAG(
Urn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
),
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"),
],
outlets=[Dataset("snowflake", "mydb.schema.tableD")],
)

View File

@ -11,7 +11,7 @@
"catchup": "False",
"description": "None",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'",
"fileloc": "'/home/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "None",
@ -95,7 +95,8 @@
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableB,DEV)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,cloud.mydb.schema.tableC,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)",
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
@ -150,6 +151,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
@ -245,7 +257,8 @@
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableB,DEV)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,cloud.mydb.schema.tableC,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)",
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
@ -300,6 +313,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
@ -365,9 +389,9 @@
"json": {
"customProperties": {
"run_id": "manual_run_test",
"duration": "0.176536",
"start_date": "2023-09-30 00:49:56.670239+00:00",
"end_date": "2023-09-30 00:49:56.846775+00:00",
"duration": "0.143271",
"start_date": "2023-11-08 09:55:05.801617+00:00",
"end_date": "2023-11-08 09:55:05.944888+00:00",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "1",
"max_tries": "0",
@ -384,7 +408,7 @@
"name": "basic_iolets_run_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1696034996670,
"time": 1699437305801,
"actor": "urn:li:corpuser:datahub"
}
}
@ -413,7 +437,8 @@
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableB,DEV)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,cloud.mydb.schema.tableC,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)",
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
]
}
}
@ -476,6 +501,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
@ -505,7 +541,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696034996670,
"timestampMillis": 1699437305801,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -522,7 +558,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696034996846,
"timestampMillis": 1699437305944,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"

View File

@ -11,7 +11,7 @@
"catchup": "False",
"description": "'A simple DAG that runs a few fake data tasks.'",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'",
"fileloc": "'/home/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "None",
@ -94,7 +94,8 @@
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)",
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)"
@ -126,6 +127,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
@ -208,7 +220,8 @@
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)",
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)"
@ -240,6 +253,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
@ -294,9 +318,9 @@
"json": {
"customProperties": {
"run_id": "manual_run_test",
"duration": "0.175983",
"start_date": "2023-09-30 00:48:58.943850+00:00",
"end_date": "2023-09-30 00:48:59.119833+00:00",
"duration": "0.120524",
"start_date": "2023-11-08 09:54:06.065112+00:00",
"end_date": "2023-11-08 09:54:06.185636+00:00",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "1",
"max_tries": "0",
@ -313,7 +337,7 @@
"name": "simple_dag_task_1_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1696034938943,
"time": 1699437246065,
"actor": "urn:li:corpuser:datahub"
}
}
@ -340,7 +364,8 @@
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)",
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
]
}
}
@ -380,6 +405,17 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
@ -398,7 +434,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696034938943,
"timestampMillis": 1699437246065,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -415,7 +451,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696034939119,
"timestampMillis": 1699437246185,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -440,7 +476,7 @@
"catchup": "False",
"description": "'A simple DAG that runs a few fake data tasks.'",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'",
"fileloc": "'/home/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "None",
@ -651,9 +687,9 @@
"json": {
"customProperties": {
"run_id": "manual_run_test",
"duration": "0.129888",
"start_date": "2023-09-30 00:49:02.158752+00:00",
"end_date": "2023-09-30 00:49:02.288640+00:00",
"duration": "0.099975",
"start_date": "2023-11-08 09:54:09.744583+00:00",
"end_date": "2023-11-08 09:54:09.844558+00:00",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "1",
"max_tries": "0",
@ -670,7 +706,7 @@
"name": "simple_dag_run_another_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1696034942158,
"time": 1699437249744,
"actor": "urn:li:corpuser:datahub"
}
}
@ -695,7 +731,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696034942158,
"timestampMillis": 1699437249744,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -712,7 +748,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696034942288,
"timestampMillis": 1699437249844,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"

View File

@ -11,7 +11,7 @@
"catchup": "False",
"description": "None",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'",
"fileloc": "'/Users/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/basic_iolets.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "[]",
@ -73,9 +73,9 @@
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
@ -102,7 +102,9 @@
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableE,PROD)"
],
"inputDatajobs": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"fineGrainedLineages": []
}
}
@ -217,7 +219,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-09-30 01:13:14.266272+00:00",
"start_date": "2023-10-30 13:07:55.311482+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -235,7 +237,7 @@
"name": "basic_iolets_run_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1696036394266,
"time": 1698671275311,
"actor": "urn:li:corpuser:datahub"
}
}
@ -356,7 +358,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696036394266,
"timestampMillis": 1698671275311,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -383,9 +385,9 @@
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
@ -412,7 +414,9 @@
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableE,PROD)"
],
"inputDatajobs": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"fineGrainedLineages": []
}
}
@ -524,7 +528,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696036394833,
"timestampMillis": 1698671276777,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"

View File

@ -73,9 +73,9 @@
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
@ -102,7 +102,9 @@
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableE,PROD)"
],
"inputDatajobs": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"fineGrainedLineages": []
}
}
@ -217,7 +219,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-09-30 06:59:52.401211+00:00",
"start_date": "2023-11-10 19:11:17.444435+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -235,7 +237,7 @@
"name": "basic_iolets_run_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1696057192401,
"time": 1699643477444,
"actor": "urn:li:corpuser:datahub"
}
}
@ -356,7 +358,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696057192401,
"timestampMillis": 1699643477444,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -383,9 +385,9 @@
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
@ -412,7 +414,9 @@
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableE,PROD)"
],
"inputDatajobs": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"fineGrainedLineages": []
}
}
@ -524,7 +528,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696057192982,
"timestampMillis": 1699643478123,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"

View File

@ -11,7 +11,7 @@
"catchup": "False",
"description": "'A simple DAG that runs a few fake data tasks.'",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'",
"fileloc": "'/Users/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/simple_dag.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "[]",
@ -74,9 +74,9 @@
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
"name": "task_1",
@ -100,7 +100,9 @@
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)"
],
"inputDatajobs": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"fineGrainedLineages": []
}
}
@ -182,7 +184,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-09-30 06:53:58.219003+00:00",
"start_date": "2023-10-30 13:06:07.193282+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -200,7 +202,7 @@
"name": "simple_dag_task_1_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1696056838219,
"time": 1698671167193,
"actor": "urn:li:corpuser:datahub"
}
}
@ -285,7 +287,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696056838219,
"timestampMillis": 1698671167193,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -312,9 +314,9 @@
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
"name": "task_1",
@ -338,7 +340,9 @@
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)"
],
"inputDatajobs": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"fineGrainedLineages": []
}
}
@ -417,7 +421,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696056838648,
"timestampMillis": 1698671168726,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -449,7 +453,7 @@
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 2'\", \"dag\": \"<<non-serializable: DAG>>\", \"task_id\": \"run_another_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 2'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [], \"outlets\": [], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_another_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [\"task_1\"], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 2'\", \"dag\": \"<<non-serializable: DAG>>\", \"task_id\": \"run_another_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 2'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [], \"outlets\": [], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_another_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [\"task_1\"], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task",
"name": "run_another_data_task",
@ -519,7 +523,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-09-30 06:54:02.407515+00:00",
"start_date": "2023-10-30 13:06:19.970466+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -537,7 +541,7 @@
"name": "simple_dag_run_another_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1696056842407,
"time": 1698671179970,
"actor": "urn:li:corpuser:datahub"
}
}
@ -562,7 +566,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696056842407,
"timestampMillis": 1698671179970,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -591,7 +595,7 @@
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 2'\", \"dag\": \"<<non-serializable: DAG>>\", \"task_id\": \"run_another_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 2'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [], \"outlets\": [], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_another_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [\"task_1\"], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 2'\", \"dag\": \"<<non-serializable: DAG>>\", \"task_id\": \"run_another_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_is_setup\": false, \"_is_teardown\": false, \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 2'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [], \"outlets\": [], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": {}, \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_another_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [\"task_1\"], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task",
"name": "run_another_data_task",
@ -658,7 +662,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696056842831,
"timestampMillis": 1698671180730,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"

View File

@ -74,9 +74,9 @@
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
"name": "task_1",
@ -100,7 +100,9 @@
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)"
],
"inputDatajobs": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"fineGrainedLineages": []
}
}
@ -182,7 +184,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-09-30 06:58:56.105026+00:00",
"start_date": "2023-11-10 19:10:10.856995+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -200,7 +202,7 @@
"name": "simple_dag_task_1_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1696057136105,
"time": 1699643410856,
"actor": "urn:li:corpuser:datahub"
}
}
@ -285,7 +287,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696057136105,
"timestampMillis": 1699643410856,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -312,9 +314,9 @@
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}, {\"_urn\": \"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
"name": "task_1",
@ -338,7 +340,9 @@
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)"
],
"inputDatajobs": [],
"inputDatajobs": [
"urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"
],
"fineGrainedLineages": []
}
}
@ -417,7 +421,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696057136612,
"timestampMillis": 1699643411390,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -577,7 +581,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-09-30 06:58:59.567004+00:00",
"start_date": "2023-11-10 19:10:15.128009+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -595,7 +599,7 @@
"name": "simple_dag_run_another_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1696057139567,
"time": 1699643415128,
"actor": "urn:li:corpuser:datahub"
}
}
@ -620,7 +624,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696057139567,
"timestampMillis": 1699643415128,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -716,7 +720,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1696057140164,
"timestampMillis": 1699643415856,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"

View File

@ -11,7 +11,7 @@
"catchup": "False",
"description": "None",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/snowflake_operator.py'",
"fileloc": "'/Users/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/snowflake_operator.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "[]",

View File

@ -11,7 +11,7 @@
"catchup": "False",
"description": "None",
"doc_md": "None",
"fileloc": "'/Users/hsheth/projects/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'",
"fileloc": "'/Users/shubham/airflow1/datahub/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/sqlite_operator.py'",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "[]",

View File

@ -194,7 +194,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-10-15 20:27:26.883178+00:00",
"start_date": "2023-11-10 19:12:17.805860+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -212,7 +212,7 @@
"name": "sqlite_operator_create_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1697401646883,
"time": 1699643537805,
"actor": "urn:li:corpuser:datahub"
}
}
@ -261,7 +261,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401646883,
"timestampMillis": 1699643537805,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -442,7 +442,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401647826,
"timestampMillis": 1699643538759,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -615,7 +615,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-10-15 20:27:31.398799+00:00",
"start_date": "2023-11-10 19:12:22.560376+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -633,7 +633,7 @@
"name": "sqlite_operator_populate_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1697401651398,
"time": 1699643542560,
"actor": "urn:li:corpuser:datahub"
}
}
@ -682,7 +682,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401651398,
"timestampMillis": 1699643542560,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -792,7 +792,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401652651,
"timestampMillis": 1699643543925,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -1035,7 +1035,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-10-15 20:27:37.697995+00:00",
"start_date": "2023-11-10 19:12:29.429032+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -1053,7 +1053,7 @@
"name": "sqlite_operator_transform_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1697401657697,
"time": 1699643549429,
"actor": "urn:li:corpuser:datahub"
}
}
@ -1126,7 +1126,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401657697,
"timestampMillis": 1699643549429,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -1362,7 +1362,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401659496,
"timestampMillis": 1699643551423,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -1537,7 +1537,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-10-15 20:27:45.670215+00:00",
"start_date": "2023-11-10 19:12:37.423556+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -1555,7 +1555,7 @@
"name": "sqlite_operator_cleanup_costs_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1697401665670,
"time": 1699643557423,
"actor": "urn:li:corpuser:datahub"
}
}
@ -1604,7 +1604,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401665670,
"timestampMillis": 1699643557423,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -1716,7 +1716,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401667670,
"timestampMillis": 1699643559607,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -1891,7 +1891,7 @@
"customProperties": {
"run_id": "manual_run_test",
"duration": "None",
"start_date": "2023-10-15 20:27:51.559194+00:00",
"start_date": "2023-11-10 19:12:43.792375+00:00",
"end_date": "None",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
@ -1909,7 +1909,7 @@
"name": "sqlite_operator_cleanup_processed_costs_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1697401671559,
"time": 1699643563792,
"actor": "urn:li:corpuser:datahub"
}
}
@ -1958,7 +1958,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401671559,
"timestampMillis": 1699643563792,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
@ -2070,7 +2070,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1697401673788,
"timestampMillis": 1699643566350,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"

View File

@ -188,10 +188,17 @@ def test_entities():
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"
)
assert (
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)").urn
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"
)
with pytest.raises(ValueError, match="invalid"):
Urn("not a URN")
with pytest.raises(ValueError, match="only supports datasets"):
with pytest.raises(
ValueError, match="only supports datasets and upstream datajobs"
):
Urn("urn:li:mlModel:(urn:li:dataPlatform:science,scienceModel,PROD)")
@ -199,13 +206,19 @@ def test_entities():
["inlets", "outlets", "capture_executions"],
[
pytest.param(
[Dataset("snowflake", "mydb.schema.tableConsumed")],
[
Dataset("snowflake", "mydb.schema.tableConsumed"),
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"),
],
[Dataset("snowflake", "mydb.schema.tableProduced")],
False,
id="airflow-lineage-no-executions",
),
pytest.param(
[Dataset("snowflake", "mydb.schema.tableConsumed")],
[
Dataset("snowflake", "mydb.schema.tableConsumed"),
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"),
],
[Dataset("snowflake", "mydb.schema.tableProduced")],
True,
id="airflow-lineage-capture-executions",
@ -293,9 +306,13 @@ def test_lineage_backend(mock_emit, inlets, outlets, capture_executions):
# Verify that the inlets and outlets are registered and recognized by Airflow correctly,
# or that our lineage backend forces it to.
assert len(op2.inlets) == 1
assert len(op2.inlets) == 2
assert len(op2.outlets) == 1
assert all(map(lambda let: isinstance(let, Dataset), op2.inlets))
assert all(
map(
lambda let: isinstance(let, Dataset) or isinstance(let, Urn), op2.inlets
)
)
assert all(map(lambda let: isinstance(let, Dataset), op2.outlets))
# Check that the right things were emitted.
@ -338,6 +355,10 @@ def test_lineage_backend(mock_emit, inlets, outlets, capture_executions):
mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[0]
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,test_lineage_is_sent_to_backend,prod),task1_upstream)"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatajobs[1]
== "urn:li:dataJob:(urn:li:dataFlow:(airflow,testDag,PROD),testTask)"
)
assert (
mock_emitter.method_calls[4].args[0].aspect.inputDatasets[0]
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableConsumed,PROD)"