feat(ariflow-plugin): ability to disable datajob lineage (#13187)

This commit is contained in:
Sergio Gómez Villamor 2025-04-14 16:15:19 +02:00 committed by GitHub
parent 5ee0b66920
commit 60b769fbf6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 747 additions and 15 deletions

View File

@ -79,6 +79,7 @@ enabled = True # default
| disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. |
| log_level | _no change_ | [debug] Set the log level for the plugin. |
| debug_emitter | false | [debug] If true, the plugin will log the emitted events. |
| enable_datajob_lineage | true | If true, the plugin will emit input/output lineage for DataJobs. |
## DataHub Plugin v1
@ -148,6 +149,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
| dag_filter_str | { "allow": [".*"] } | AllowDenyPattern value in form of JSON string to filter the DAGs from running. |
| enable_datajob_lineage | true | If true, the plugin will emit input/output lineage for DataJobs. |
#### Validate that the plugin is working

View File

@ -67,6 +67,9 @@ class DatahubLineageConfig(ConfigModel):
# Makes extraction of jinja-templated fields more accurate.
render_templates: bool = True
# Only if true, lineage will be emitted for the DataJobs.
enable_datajob_lineage: bool = True
dag_filter_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for DAGs to ingest",
@ -127,6 +130,7 @@ def get_lineage_config() -> DatahubLineageConfig:
dag_filter_pattern = AllowDenyPattern.parse_raw(
conf.get("datahub", "dag_filter_str", fallback='{"allow": [".*"]}')
)
enable_lineage = conf.get("datahub", "enable_datajob_lineage", fallback=True)
return DatahubLineageConfig(
enabled=enabled,
@ -145,4 +149,5 @@ def get_lineage_config() -> DatahubLineageConfig:
datajob_url_link=datajob_url_link,
render_templates=render_templates,
dag_filter_pattern=dag_filter_pattern,
enable_datajob_lineage=enable_lineage,
)

View File

@ -260,6 +260,9 @@ class DataHubListener:
routine is also responsible for converting the lineage to DataHub URNs.
"""
if not self.config.enable_datajob_lineage:
return
input_urns: List[str] = []
output_urns: List[str] = []
fine_grained_lineages: List[FineGrainedLineageClass] = []
@ -450,7 +453,8 @@ class DataHubListener:
# TODO: Add handling for Airflow mapped tasks using task_instance.map_index
for mcp in datajob.generate_mcp(
materialize_iolets=self.config.materialize_iolets
generate_lineage=self.config.enable_datajob_lineage,
materialize_iolets=self.config.materialize_iolets,
):
self.emitter.emit(mcp, self._make_emit_callback())
logger.debug(f"Emitted DataHub Datajob start: {datajob}")
@ -536,7 +540,8 @@ class DataHubListener:
self._extract_lineage(datajob, dagrun, task, task_instance, complete=True)
for mcp in datajob.generate_mcp(
materialize_iolets=self.config.materialize_iolets
generate_lineage=self.config.enable_datajob_lineage,
materialize_iolets=self.config.materialize_iolets,
):
self.emitter.emit(mcp, self._make_emit_callback())
logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}")

View File

@ -132,7 +132,10 @@ def datahub_task_status_callback(context, status):
)
task.log.info(f"Emitting Datahub Datajob: {datajob}")
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
for mcp in datajob.generate_mcp(
generate_lineage=config.enable_datajob_lineage,
materialize_iolets=config.materialize_iolets,
):
emitter.emit(mcp, _make_emit_callback(task.log))
if config.capture_executions:
@ -199,7 +202,10 @@ def datahub_pre_execution(context):
)
task.log.info(f"Emitting Datahub dataJob {datajob}")
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
for mcp in datajob.generate_mcp(
generate_lineage=config.enable_datajob_lineage,
materialize_iolets=config.materialize_iolets,
):
emitter.emit(mcp, _make_emit_callback(task.log))
if config.capture_executions:

View File

@ -56,7 +56,10 @@ def send_lineage_to_datahub(
entities_to_datajob_urn_list([let.urn for let in inlets])
)
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
for mcp in datajob.generate_mcp(
generate_lineage=config.enable_datajob_lineage,
materialize_iolets=config.materialize_iolets,
):
emitter.emit(mcp)
operator.log.info(f"Emitted from Lineage: {datajob}")

View File

@ -0,0 +1,687 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,myairflow.simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"_access_control": "None",
"catchup": "False",
"description": "'A simple DAG that runs a few fake data tasks.'",
"doc_md": "None",
"fileloc": "<fileloc>",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "[]",
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag",
"name": "simple_dag",
"description": "A simple DAG that runs a few fake data tasks.",
"env": "PROD"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,myairflow.simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,myairflow.simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,myairflow)"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,myairflow.simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:airflow",
"type": "DEVELOPER",
"source": {
"type": "SERVICE"
}
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:airflow"
}
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,myairflow.simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,myairflow.simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,myairflow.simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,myairflow)"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,myairflow.simple_dag,prod)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "myairflow",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,myairflow)"
},
{
"id": "simple_dag"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'task_1'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'task_1'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
"name": "task_1",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,myairflow)"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:airflow",
"type": "DEVELOPER",
"source": {
"type": "SERVICE"
}
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:airflow"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {
"run_id": "manual_run_test",
"duration": "<duration>",
"start_date": "<start_date>",
"end_date": "<end_date>",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
"max_tries": "0",
"external_executor_id": "None",
"state": "running",
"operator": "BashOperator",
"priority_weight": "2",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1",
"orchestrator": "airflow",
"dag_id": "simple_dag",
"task_id": "task_1"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1",
"name": "simple_dag_task_1_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1744380570011,
"actor": "urn:li:corpuser:datahub"
}
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"upstreamInstances": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,myairflow)"
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1744380570011,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'task_1'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'task_1'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "['run_another_data_task']",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)'), Urn(_urn='urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.test_dag,PROD),test_task)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1",
"name": "task_1",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,myairflow)"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:airflow",
"type": "DEVELOPER",
"source": {
"type": "SERVICE"
}
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:airflow"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),task_1)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1744380570147,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
"type": "SUCCESS",
"nativeResultType": "airflow"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_another_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_another_data_task'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task",
"name": "run_another_data_task",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,myairflow)"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:airflow",
"type": "DEVELOPER",
"source": {
"type": "SERVICE"
}
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:airflow"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {
"run_id": "manual_run_test",
"duration": "<duration>",
"start_date": "<start_date>",
"end_date": "<end_date>",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
"max_tries": "0",
"external_executor_id": "None",
"state": "running",
"operator": "BashOperator",
"priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1",
"orchestrator": "airflow",
"dag_id": "simple_dag",
"task_id": "run_another_data_task"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1",
"name": "simple_dag_run_another_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1744380573217,
"actor": "urn:li:corpuser:datahub"
}
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"upstreamInstances": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,myairflow)"
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1744380573217,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_another_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_another_data_task'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task",
"name": "run_another_data_task",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:airflow",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,myairflow)"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:airflow",
"type": "DEVELOPER",
"source": {
"type": "SERVICE"
}
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:airflow"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,myairflow.simple_dag,prod),run_another_data_task)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1744380573438,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
"type": "SUCCESS",
"nativeResultType": "airflow"
}
}
}
}
]

View File

@ -182,6 +182,7 @@ def _run_airflow(
is_v1: bool,
multiple_connections: bool,
platform_instance: Optional[str],
enable_datajob_lineage: bool,
) -> Iterator[AirflowInstance]:
airflow_home = tmp_path / "airflow_home"
print(f"Using airflow home: {airflow_home}")
@ -257,6 +258,9 @@ def _run_airflow(
"AIRFLOW__DATAHUB__LOG_LEVEL": "DEBUG",
"AIRFLOW__DATAHUB__DEBUG_EMITTER": "True",
"SQLALCHEMY_SILENCE_UBER_WARNING": "1",
"AIRFLOW__DATAHUB__ENABLE_DATAJOB_LINEAGE": "true"
if enable_datajob_lineage
else "false",
}
if platform_instance:
@ -371,12 +375,27 @@ class DagTestCase:
v2_only: bool = False
multiple_connections: bool = False
platform_instance: Optional[str] = None
enable_datajob_lineage: bool = True
# used to identify the test case in the golden file when same DAG is used in multiple tests
test_variant: Optional[str] = None
@property
def dag_test_id(self) -> str:
return f"{self.dag_id}{self.test_variant or ''}"
test_cases = [
DagTestCase(
"simple_dag", multiple_connections=True, platform_instance=PLATFORM_INSTANCE
),
DagTestCase(
"simple_dag",
multiple_connections=True,
platform_instance=PLATFORM_INSTANCE,
enable_datajob_lineage=False,
test_variant="_no_datajob_lineage",
),
DagTestCase("basic_iolets", platform_instance=PLATFORM_INSTANCE),
DagTestCase("dag_to_skip", v2_only=True, platform_instance=PLATFORM_INSTANCE),
DagTestCase("snowflake_operator", success=False, v2_only=True),
@ -395,10 +414,10 @@ test_cases = [
[
*[
pytest.param(
f"v1_{test_case.dag_id}",
f"v1_{test_case.dag_test_id}",
test_case,
True,
id=f"v1_{test_case.dag_id}",
id=f"v1_{test_case.dag_test_id}",
marks=pytest.mark.skipif(
AIRFLOW_VERSION >= packaging.version.parse("2.4.0"),
reason="We only test the v1 plugin on Airflow 2.3",
@ -411,16 +430,16 @@ test_cases = [
pytest.param(
# On Airflow 2.3-2.4, test plugin v2 without dataFlows.
(
f"v2_{test_case.dag_id}"
f"v2_{test_case.dag_test_id}"
if HAS_AIRFLOW_DAG_LISTENER_API
else f"v2_{test_case.dag_id}_no_dag_listener"
else f"v2_{test_case.dag_test_id}_no_dag_listener"
),
test_case,
False,
id=(
f"v2_{test_case.dag_id}"
f"v2_{test_case.dag_test_id}"
if HAS_AIRFLOW_DAG_LISTENER_API
else f"v2_{test_case.dag_id}_no_dag_listener"
else f"v2_{test_case.dag_test_id}_no_dag_listener"
),
marks=[
pytest.mark.skipif(
@ -466,6 +485,7 @@ def test_airflow_plugin(
is_v1=is_v1,
multiple_connections=test_case.multiple_connections,
platform_instance=test_case.platform_instance,
enable_datajob_lineage=test_case.enable_datajob_lineage,
) as airflow_instance:
print(f"Running DAG {dag_id}...")
_wait_for_dag_to_load(airflow_instance, dag_id)
@ -573,6 +593,7 @@ if __name__ == "__main__":
is_v1=not HAS_AIRFLOW_LISTENER_API,
multiple_connections=False,
platform_instance=None,
enable_datajob_lineage=True,
) as airflow_instance:
# input("Press enter to exit...")
print("quitting airflow")

View File

@ -108,7 +108,9 @@ class DataJob:
return [tags]
def generate_mcp(
self, materialize_iolets: bool = True
self,
generate_lineage: bool = True,
materialize_iolets: bool = True,
) -> Iterable[MetadataChangeProposalWrapper]:
env: Optional[str] = None
if self.flow_urn.cluster.upper() in builder.ALL_ENV_TYPES:
@ -152,9 +154,10 @@ class DataJob:
)
yield mcp
yield from self.generate_data_input_output_mcp(
materialize_iolets=materialize_iolets
)
if generate_lineage:
yield from self.generate_data_input_output_mcp(
materialize_iolets=materialize_iolets
)
for owner in self.generate_ownership_aspect():
mcp = MetadataChangeProposalWrapper(