fix(airflow): Stable tag order in DataFlow/DataJobs (#5696)

This commit is contained in:
Tamas Nemeth 2022-08-23 08:40:53 +02:00 committed by GitHub
parent 1b626c7652
commit c66c263ceb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 17 additions and 17 deletions

View File

@ -46,6 +46,7 @@ We recommend you use the lineage plugin if you are on Airflow version >= 2.0.2 o
|---|---|---| |---|---|---|
| datahub.datahub_conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. | | datahub.datahub_conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| datahub.cluster | prod | name of the airflow cluster | | datahub.cluster | prod | name of the airflow cluster |
| capture_executions | false | If true, it captures task runs as DataHub DataProcessInstances. |
| datahub.capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. | | datahub.capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| datahub.capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. | | datahub.capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| datahub.graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.| | datahub.graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.|
@ -101,6 +102,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g
datahub_kwargs = { datahub_kwargs = {
"datahub_conn_id": "datahub_rest_default", "datahub_conn_id": "datahub_rest_default",
"cluster": "prod", "cluster": "prod",
"capture_executions": true,
"capture_ownership_info": true, "capture_ownership_info": true,
"capture_tags_info": true, "capture_tags_info": true,
"graceful_exceptions": true } "graceful_exceptions": true }

View File

@ -75,7 +75,7 @@ class DataFlow:
tags = GlobalTagsClass( tags = GlobalTagsClass(
tags=[ tags=[
TagAssociationClass(tag=builder.make_tag_urn(tag)) TagAssociationClass(tag=builder.make_tag_urn(tag))
for tag in (self.tags or []) for tag in (sorted(self.tags) or [])
] ]
) )
return [tags] return [tags]

View File

@ -103,7 +103,7 @@ class DataJob:
tags = GlobalTagsClass( tags = GlobalTagsClass(
tags=[ tags=[
TagAssociationClass(tag=builder.make_tag_urn(tag)) TagAssociationClass(tag=builder.make_tag_urn(tag))
for tag in (self.tags or []) for tag in (sorted(self.tags) or [])
] ]
) )
return [tags] return [tags]

View File

@ -37,11 +37,9 @@ with DAG(
task_id="run_data_task", task_id="run_data_task",
dag=dag, dag=dag,
bash_command="echo 'This is where you might run your data tooling.'", bash_command="echo 'This is where you might run your data tooling.'",
inlets={ inlets=[
"datasets": [
Dataset("snowflake", "mydb.schema.tableA"), Dataset("snowflake", "mydb.schema.tableA"),
Dataset("snowflake", "mydb.schema.tableB"), Dataset("snowflake", "mydb.schema.tableB"),
], ],
}, outlets=[Dataset("snowflake", "mydb.schema.tableC")],
outlets={"datasets": [Dataset("snowflake", "mydb.schema.tableC")]},
) )

View File

@ -30,13 +30,11 @@ default_args = {
) )
def datahub_lineage_backend_taskflow_demo(): def datahub_lineage_backend_taskflow_demo():
@task( @task(
inlets={ inlets=[
"datasets": [
Dataset("snowflake", "mydb.schema.tableA"), Dataset("snowflake", "mydb.schema.tableA"),
Dataset("snowflake", "mydb.schema.tableB"), Dataset("snowflake", "mydb.schema.tableB"),
], ],
}, outlets=[Dataset("snowflake", "mydb.schema.tableC")],
outlets={"datasets": [Dataset("snowflake", "mydb.schema.tableC")]},
) )
def run_data_task(): def run_data_task():
# This is where you might run your data tooling. # This is where you might run your data tooling.

View File

@ -120,6 +120,8 @@ class DatahubLineageBackend(LineageBackend):
except Exception as e: except Exception as e:
if config.graceful_exceptions: if config.graceful_exceptions:
operator.log.error(e) operator.log.error(e)
operator.log.info("Supressing error because graceful_exceptions is set") operator.log.info(
"Suppressing error because graceful_exceptions is set"
)
else: else:
raise raise