feat: allow task ownership as group (#10742)

This commit is contained in:
Francesco Macagno 2024-06-19 04:14:30 -05:00 committed by GitHub
parent c2b6272730
commit 750aab9a51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 33 additions and 17 deletions

View File

@ -66,11 +66,12 @@ enabled = True # default
```
| Name | Default value | Description |
| -------------------------- | -------------------- | ---------------------------------------------------------------------------------------- |
|----------------------------|----------------------|------------------------------------------------------------------------------------------|
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub rest connection. |
| cluster | prod | name of the airflow cluster, this is equivalent to the `env` of the instance |
| cluster | prod | name of the airflow cluster, this is equivalent to the `env` of the instance |
| capture_ownership_info | true | Extract DAG ownership. |
| capture_ownership_as_group | false | When extracting DAG ownership, treat DAG owner as a group rather than a user |
| capture_tags_info | true | Extract DAG tags. |
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
@ -130,18 +131,19 @@ conn_id = datahub_rest_default # or datahub_kafka_default
# etc.
```
| Name | Default value | Description |
| ---------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| cluster | prod | name of the airflow cluster |
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
| Name | Default value | Description |
|----------------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| cluster | prod | name of the airflow cluster |
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_ownership_as_group | false | When extracting DAG ownership, treat DAG owner as a group rather than a user. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
#### Validate that the plugin is working

View File

@ -28,9 +28,12 @@ class DatahubLineageConfig(ConfigModel):
# Cluster to associate with the pipelines and tasks. Defaults to "prod".
cluster: str = builder.DEFAULT_FLOW_CLUSTER
# If true, the owners field of the DAG will be capture as a DataHub corpuser.
# If true, the owners field of the DAG will be captured as a DataHub corpuser.
capture_ownership_info: bool = True
# If true, the owners field of the DAG will instead be captured as a DataHub corpgroup.
capture_ownership_as_group: bool = False
# If true, the tags field of the DAG will be captured as DataHub tags.
capture_tags_info: bool = True
@ -70,6 +73,9 @@ def get_lineage_config() -> DatahubLineageConfig:
capture_ownership_info = conf.get(
"datahub", "capture_ownership_info", fallback=True
)
capture_ownership_as_group = conf.get(
"datahub", "capture_ownership_as_group", fallback=False
)
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True)
enable_extractors = conf.get("datahub", "enable_extractors", fallback=True)
@ -87,6 +93,7 @@ def get_lineage_config() -> DatahubLineageConfig:
datahub_conn_id=datahub_conn_id,
cluster=cluster,
capture_ownership_info=capture_ownership_info,
capture_ownership_as_group=capture_ownership_as_group,
capture_tags_info=capture_tags_info,
capture_executions=capture_executions,
materialize_iolets=materialize_iolets,

View File

@ -175,7 +175,11 @@ class AirflowGenerator:
data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}"
if config.capture_ownership_info and dag.owner:
data_flow.owners.update(owner.strip() for owner in dag.owner.split(","))
owners = [owner.strip() for owner in dag.owner.split(",")]
if config.capture_ownership_as_group:
data_flow.group_owners.update(owners)
else:
data_flow.owners.update(owners)
if config.capture_tags_info and dag.tags:
data_flow.tags.update(dag.tags)
@ -278,7 +282,10 @@ class AirflowGenerator:
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.flow_id}&_flt_3_task_id={task.task_id}"
if capture_owner and dag.owner:
datajob.owners.add(dag.owner)
if config and config.capture_ownership_as_group:
datajob.group_owners.add(dag.owner)
else:
datajob.owners.add(dag.owner)
if capture_tags and dag.tags:
datajob.tags.update(dag.tags)