mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-18 06:06:55 +00:00
feat(ingest/airflow): support disabling iolet materialization (#10305)
This commit is contained in:
parent
529710ab9d
commit
76b5783e70
@ -71,6 +71,7 @@ enabled = True # default
|
|||||||
| capture_ownership_info | true | Extract DAG ownership. |
|
| capture_ownership_info | true | Extract DAG ownership. |
|
||||||
| capture_tags_info | true | Extract DAG tags. |
|
| capture_tags_info | true | Extract DAG tags. |
|
||||||
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
|
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
|
||||||
|
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
|
||||||
| enable_extractors | true | Enable automatic lineage extraction. |
|
| enable_extractors | true | Enable automatic lineage extraction. |
|
||||||
| disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. |
|
| disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. |
|
||||||
| log_level | _no change_ | [debug] Set the log level for the plugin. |
|
| log_level | _no change_ | [debug] Set the log level for the plugin. |
|
||||||
@ -135,8 +136,9 @@ conn_id = datahub_rest_default # or datahub_kafka_default
|
|||||||
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
|
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
|
||||||
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
|
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
|
||||||
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
|
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
|
||||||
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid.
|
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
|
||||||
|
|
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
|
||||||
|
| |
|
||||||
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
|
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
|
||||||
|
|
||||||
#### Validate that the plugin is working
|
#### Validate that the plugin is working
|
||||||
|
@ -34,6 +34,10 @@ class DatahubLineageConfig(ConfigModel):
|
|||||||
# If true, the tags field of the DAG will be captured as DataHub tags.
|
# If true, the tags field of the DAG will be captured as DataHub tags.
|
||||||
capture_tags_info: bool = True
|
capture_tags_info: bool = True
|
||||||
|
|
||||||
|
# If true (default), we'll materialize and un-soft-delete any urns
|
||||||
|
# referenced by inlets or outlets.
|
||||||
|
materialize_iolets: bool = True
|
||||||
|
|
||||||
capture_executions: bool = False
|
capture_executions: bool = False
|
||||||
|
|
||||||
enable_extractors: bool = True
|
enable_extractors: bool = True
|
||||||
@ -67,6 +71,7 @@ def get_lineage_config() -> DatahubLineageConfig:
|
|||||||
"datahub", "capture_ownership_info", fallback=True
|
"datahub", "capture_ownership_info", fallback=True
|
||||||
)
|
)
|
||||||
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
|
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
|
||||||
|
materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True)
|
||||||
enable_extractors = conf.get("datahub", "enable_extractors", fallback=True)
|
enable_extractors = conf.get("datahub", "enable_extractors", fallback=True)
|
||||||
log_level = conf.get("datahub", "log_level", fallback=None)
|
log_level = conf.get("datahub", "log_level", fallback=None)
|
||||||
debug_emitter = conf.get("datahub", "debug_emitter", fallback=False)
|
debug_emitter = conf.get("datahub", "debug_emitter", fallback=False)
|
||||||
@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig:
|
|||||||
capture_ownership_info=capture_ownership_info,
|
capture_ownership_info=capture_ownership_info,
|
||||||
capture_tags_info=capture_tags_info,
|
capture_tags_info=capture_tags_info,
|
||||||
capture_executions=capture_executions,
|
capture_executions=capture_executions,
|
||||||
|
materialize_iolets=materialize_iolets,
|
||||||
enable_extractors=enable_extractors,
|
enable_extractors=enable_extractors,
|
||||||
log_level=log_level,
|
log_level=log_level,
|
||||||
debug_emitter=debug_emitter,
|
debug_emitter=debug_emitter,
|
||||||
|
@ -389,7 +389,10 @@ class DataHubListener:
|
|||||||
|
|
||||||
# TODO: Add handling for Airflow mapped tasks using task_instance.map_index
|
# TODO: Add handling for Airflow mapped tasks using task_instance.map_index
|
||||||
|
|
||||||
datajob.emit(self.emitter, callback=self._make_emit_callback())
|
for mcp in datajob.generate_mcp(
|
||||||
|
materialize_iolets=self.config.materialize_iolets
|
||||||
|
):
|
||||||
|
self.emitter.emit(mcp, self._make_emit_callback())
|
||||||
logger.debug(f"Emitted DataHub Datajob start: {datajob}")
|
logger.debug(f"Emitted DataHub Datajob start: {datajob}")
|
||||||
|
|
||||||
if self.config.capture_executions:
|
if self.config.capture_executions:
|
||||||
@ -430,7 +433,10 @@ class DataHubListener:
|
|||||||
# Add lineage info.
|
# Add lineage info.
|
||||||
self._extract_lineage(datajob, dagrun, task, task_instance, complete=True)
|
self._extract_lineage(datajob, dagrun, task, task_instance, complete=True)
|
||||||
|
|
||||||
datajob.emit(self.emitter, callback=self._make_emit_callback())
|
for mcp in datajob.generate_mcp(
|
||||||
|
materialize_iolets=self.config.materialize_iolets
|
||||||
|
):
|
||||||
|
self.emitter.emit(mcp, self._make_emit_callback())
|
||||||
logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}")
|
logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}")
|
||||||
|
|
||||||
if self.config.capture_executions:
|
if self.config.capture_executions:
|
||||||
|
@ -133,7 +133,8 @@ def datahub_task_status_callback(context, status):
|
|||||||
)
|
)
|
||||||
|
|
||||||
task.log.info(f"Emitting Datahub Datajob: {datajob}")
|
task.log.info(f"Emitting Datahub Datajob: {datajob}")
|
||||||
datajob.emit(emitter, callback=_make_emit_callback(task.log))
|
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
|
||||||
|
emitter.emit(mcp, _make_emit_callback(task.log))
|
||||||
|
|
||||||
if config.capture_executions:
|
if config.capture_executions:
|
||||||
dpi = AirflowGenerator.run_datajob(
|
dpi = AirflowGenerator.run_datajob(
|
||||||
@ -200,7 +201,8 @@ def datahub_pre_execution(context):
|
|||||||
)
|
)
|
||||||
|
|
||||||
task.log.info(f"Emitting Datahub dataJob {datajob}")
|
task.log.info(f"Emitting Datahub dataJob {datajob}")
|
||||||
datajob.emit(emitter, callback=_make_emit_callback(task.log))
|
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
|
||||||
|
emitter.emit(mcp, _make_emit_callback(task.log))
|
||||||
|
|
||||||
if config.capture_executions:
|
if config.capture_executions:
|
||||||
dpi = AirflowGenerator.run_datajob(
|
dpi = AirflowGenerator.run_datajob(
|
||||||
|
@ -59,7 +59,8 @@ def send_lineage_to_datahub(
|
|||||||
entities_to_datajob_urn_list([let.urn for let in inlets])
|
entities_to_datajob_urn_list([let.urn for let in inlets])
|
||||||
)
|
)
|
||||||
|
|
||||||
datajob.emit(emitter)
|
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
|
||||||
|
emitter.emit(mcp)
|
||||||
operator.log.info(f"Emitted from Lineage: {datajob}")
|
operator.log.info(f"Emitted from Lineage: {datajob}")
|
||||||
|
|
||||||
if config.capture_executions:
|
if config.capture_executions:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user