diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 94eb69a2ed..2e584b8ce0 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -71,6 +71,7 @@ enabled = True # default | capture_ownership_info | true | Extract DAG ownership. | | capture_tags_info | true | Extract DAG tags. | | capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. | +| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. | | enable_extractors | true | Enable automatic lineage extraction. | | disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. | | log_level | _no change_ | [debug] Set the log level for the plugin. | @@ -135,8 +136,9 @@ conn_id = datahub_rest_default # or datahub_kafka_default | capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. | | capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. | | capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. | -| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. - | +| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. | +| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. | +| | | graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | #### Validate that the plugin is working diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 48d462b857..f2cd647837 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -34,6 +34,10 @@ class DatahubLineageConfig(ConfigModel): # If true, the tags field of the DAG will be captured as DataHub tags. capture_tags_info: bool = True + # If true (default), we'll materialize and un-soft-delete any urns + # referenced by inlets or outlets. + materialize_iolets: bool = True + capture_executions: bool = False enable_extractors: bool = True @@ -67,6 +71,7 @@ def get_lineage_config() -> DatahubLineageConfig: "datahub", "capture_ownership_info", fallback=True ) capture_executions = conf.get("datahub", "capture_executions", fallback=True) + materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True) enable_extractors = conf.get("datahub", "enable_extractors", fallback=True) log_level = conf.get("datahub", "log_level", fallback=None) debug_emitter = conf.get("datahub", "debug_emitter", fallback=False) @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig: capture_ownership_info=capture_ownership_info, capture_tags_info=capture_tags_info, capture_executions=capture_executions, + materialize_iolets=materialize_iolets, enable_extractors=enable_extractors, log_level=log_level, debug_emitter=debug_emitter, diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index ace0d035c3..ac5dc00e0e 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -389,7 +389,10 @@ class DataHubListener: # TODO: Add handling for Airflow mapped tasks using task_instance.map_index - datajob.emit(self.emitter, callback=self._make_emit_callback()) + for mcp in datajob.generate_mcp( + materialize_iolets=self.config.materialize_iolets + ): + self.emitter.emit(mcp, self._make_emit_callback()) logger.debug(f"Emitted DataHub Datajob start: {datajob}") if self.config.capture_executions: @@ -430,7 +433,10 @@ class DataHubListener: # Add lineage info. self._extract_lineage(datajob, dagrun, task, task_instance, complete=True) - datajob.emit(self.emitter, callback=self._make_emit_callback()) + for mcp in datajob.generate_mcp( + materialize_iolets=self.config.materialize_iolets + ): + self.emitter.emit(mcp, self._make_emit_callback()) logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}") if self.config.capture_executions: diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py index 7b8d719712..628300d45d 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py @@ -133,7 +133,8 @@ def datahub_task_status_callback(context, status): ) task.log.info(f"Emitting Datahub Datajob: {datajob}") - datajob.emit(emitter, callback=_make_emit_callback(task.log)) + for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets): + emitter.emit(mcp, _make_emit_callback(task.log)) if config.capture_executions: dpi = AirflowGenerator.run_datajob( @@ -200,7 +201,8 @@ def datahub_pre_execution(context): ) task.log.info(f"Emitting Datahub dataJob {datajob}") - datajob.emit(emitter, callback=_make_emit_callback(task.log)) + for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets): + emitter.emit(mcp, _make_emit_callback(task.log)) if config.capture_executions: dpi = AirflowGenerator.run_datajob( diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py index daf45e1cd8..43e62c9f65 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py @@ -59,7 +59,8 @@ def send_lineage_to_datahub( entities_to_datajob_urn_list([let.urn for let in inlets]) ) - datajob.emit(emitter) + for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets): + emitter.emit(mcp) operator.log.info(f"Emitted from Lineage: {datajob}") if config.capture_executions: