diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 77ffb361d7..4880d274e1 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -46,6 +46,7 @@ We recommend you use the lineage plugin if you are on Airflow version >= 2.0.2 o |---|---|---| | datahub.datahub_conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. | | datahub.cluster | prod | name of the airflow cluster | + | capture_executions | false | If true, it captures task runs as DataHub DataProcessInstances. | | datahub.capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. | | datahub.capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. | | datahub.graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.| @@ -101,6 +102,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g datahub_kwargs = { "datahub_conn_id": "datahub_rest_default", "cluster": "prod", + "capture_executions": true, "capture_ownership_info": true, "capture_tags_info": true, "graceful_exceptions": true } diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py index 588e66f19a..5634a39ab4 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py @@ -75,7 +75,7 @@ class DataFlow: tags = GlobalTagsClass( tags=[ TagAssociationClass(tag=builder.make_tag_urn(tag)) - for tag in (self.tags or []) + for tag in (sorted(self.tags) or []) ] ) return [tags] diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index ec86ad8022..8f33c7c3a4 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -103,7 +103,7 @@ class DataJob: tags = GlobalTagsClass( tags=[ TagAssociationClass(tag=builder.make_tag_urn(tag)) - for tag in (self.tags or []) + for tag in (sorted(self.tags) or []) ] ) return [tags] diff --git a/metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py b/metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py index b1bd560a63..d269a1fdb5 100644 --- a/metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py +++ b/metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py @@ -37,11 +37,9 @@ with DAG( task_id="run_data_task", dag=dag, bash_command="echo 'This is where you might run your data tooling.'", - inlets={ - "datasets": [ - Dataset("snowflake", "mydb.schema.tableA"), - Dataset("snowflake", "mydb.schema.tableB"), - ], - }, - outlets={"datasets": [Dataset("snowflake", "mydb.schema.tableC")]}, + inlets=[ + Dataset("snowflake", "mydb.schema.tableA"), + Dataset("snowflake", "mydb.schema.tableB"), + ], + outlets=[Dataset("snowflake", "mydb.schema.tableC")], ) diff --git a/metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py b/metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py index 36b011dda5..0cc4809598 100644 --- a/metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py +++ b/metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py @@ -30,13 +30,11 @@ default_args = { ) def datahub_lineage_backend_taskflow_demo(): @task( - inlets={ - "datasets": [ - Dataset("snowflake", "mydb.schema.tableA"), - Dataset("snowflake", "mydb.schema.tableB"), - ], - }, - outlets={"datasets": [Dataset("snowflake", "mydb.schema.tableC")]}, + inlets=[ + Dataset("snowflake", "mydb.schema.tableA"), + Dataset("snowflake", "mydb.schema.tableB"), + ], + outlets=[Dataset("snowflake", "mydb.schema.tableC")], ) def run_data_task(): # This is where you might run your data tooling. diff --git a/metadata-ingestion/src/datahub_provider/lineage/datahub.py b/metadata-ingestion/src/datahub_provider/lineage/datahub.py index 17c0655308..d341165aee 100644 --- a/metadata-ingestion/src/datahub_provider/lineage/datahub.py +++ b/metadata-ingestion/src/datahub_provider/lineage/datahub.py @@ -120,6 +120,8 @@ class DatahubLineageBackend(LineageBackend): except Exception as e: if config.graceful_exceptions: operator.log.error(e) - operator.log.info("Supressing error because graceful_exceptions is set") + operator.log.info( + "Suppressing error because graceful_exceptions is set" + ) else: raise