feat(ingest): airflow - support enabled flag for airflow config (#6089)

This commit is contained in:
Harshal Sheth 2022-10-10 22:42:13 +00:00 committed by GitHub
parent 7b88de89d5
commit 879894d702
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 11 additions and 0 deletions

View File

@ -44,6 +44,7 @@ We recommend you use the lineage plugin if you are on Airflow version >= 2.0.2 o
|Name | Default value | Description |
|---|---|---|
| datahub.enabled | true | If the plugin should be enabled. |
| datahub.conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| datahub.cluster | prod | name of the airflow cluster |
| datahub.capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
@ -99,6 +100,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g
[lineage]
backend = datahub_provider.lineage.datahub.DatahubLineageBackend
datahub_kwargs = {
"enabled": true,
"datahub_conn_id": "datahub_rest_default",
"cluster": "prod",
"capture_ownership_info": true,

View File

@ -18,6 +18,7 @@ from datahub_provider.lineage.datahub import DatahubLineageConfig
def get_lineage_config() -> DatahubLineageConfig:
"""Load the lineage config from airflow.cfg."""
enabled = conf.get("datahub", "enabled", fallback=True)
datahub_conn_id = conf.get("datahub", "conn_id", fallback="datahub_rest_default")
cluster = conf.get("datahub", "cluster", fallback="prod")
graceful_exceptions = conf.get("datahub", "graceful_exceptions", fallback=True)
@ -27,6 +28,7 @@ def get_lineage_config() -> DatahubLineageConfig:
)
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
return DatahubLineageConfig(
enabled=enabled,
datahub_conn_id=datahub_conn_id,
cluster=cluster,
graceful_exceptions=graceful_exceptions,

View File

@ -22,6 +22,8 @@ def _entities_to_urn_list(iolets: List[_Entity]) -> List[DatasetUrn]:
class DatahubBasicLineageConfig(ConfigModel):
enabled: bool = True
# DataHub hook connection ID.
datahub_conn_id: str
@ -50,6 +52,9 @@ def send_lineage_to_datahub(
outlets: List[_Entity],
context: Dict,
) -> None:
if not config.enabled:
return
dag: "DAG" = context["dag"]
task: "BaseOperator" = context["task"]
ti: "TaskInstance" = context["task_instance"]

View File

@ -73,6 +73,8 @@ class DatahubLineageBackend(LineageBackend):
context: Dict = None,
) -> None:
config = get_lineage_config()
if not config.enabled:
return
try:
# This is necessary to avoid issues with circular imports.