feat(ingestion): support Airflow cluster config (#3336)

This commit is contained in:
Harshal Sheth 2021-10-06 18:01:59 -04:00 committed by GitHub
parent d22d1c2795
commit bd5e3b174a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 1 deletions

View File

@ -212,6 +212,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g
backend = datahub_provider.lineage.datahub.DatahubLineageBackend
datahub_kwargs = {
"datahub_conn_id": "datahub_rest_default",
"cluster": "prod",
"capture_ownership_info": true,
"capture_tags_info": true,
"graceful_exceptions": true }
@ -219,6 +220,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g
```
**Configuration options:**
- `datahub_conn_id` (required): Usually `datahub_rest_default` or `datahub_kafka_default`, depending on what you named the connection in step 1.
- `cluster` (defaults to "prod"): The "cluster" to associate Airflow DAGs and tasks with.
- `capture_ownership_info` (defaults to true): If true, the owners field of the DAG will be capture as a DataHub corpuser.
- `capture_tags_info` (defaults to true): If true, the tags field of the DAG will be captured as DataHub tags.
- `graceful_exceptions` (defaults to true): If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.

View File

@ -24,6 +24,9 @@ class DatahubBasicLineageConfig(ConfigModel):
# DataHub hook connection ID.
datahub_conn_id: str
# Cluster to associate with the pipelines and tasks. Defaults to "prod".
cluster: str = builder.DEFAULT_FLOW_CLUSTER
# If true, the owners field of the DAG will be capture as a DataHub corpuser.
capture_ownership_info: bool = True
@ -58,7 +61,7 @@ def send_lineage_to_datahub(
# task_instance: "TaskInstance" = context["task_instance"]
# TODO: capture raw sql from db operators
flow_urn = builder.make_data_flow_urn("airflow", dag.dag_id)
flow_urn = builder.make_data_flow_urn("airflow", dag.dag_id, config.cluster)
job_urn = builder.make_data_job_urn_with_flow(flow_urn, task.task_id)
base_url = conf.get("webserver", "base_url")