mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-28 10:28:22 +00:00
fix(ingest/airflow): respect enabled flag in airflow plugin (#6528)
This commit is contained in:
parent
4db51b4c4f
commit
ebd685d40d
@ -237,8 +237,9 @@ def datahub_pre_execution(context):
|
||||
def _wrap_pre_execution(pre_execution):
|
||||
def custom_pre_execution(context):
|
||||
config = get_lineage_config()
|
||||
context["_datahub_config"] = config
|
||||
datahub_pre_execution(context)
|
||||
if config.enabled:
|
||||
context["_datahub_config"] = config
|
||||
datahub_pre_execution(context)
|
||||
|
||||
# Call original policy
|
||||
if pre_execution:
|
||||
@ -250,14 +251,15 @@ def _wrap_pre_execution(pre_execution):
|
||||
def _wrap_on_failure_callback(on_failure_callback):
|
||||
def custom_on_failure_callback(context):
|
||||
config = get_lineage_config()
|
||||
context["_datahub_config"] = config
|
||||
try:
|
||||
datahub_task_status_callback(context, status=InstanceRunResult.FAILURE)
|
||||
except Exception as e:
|
||||
if not config.graceful_exceptions:
|
||||
raise e
|
||||
else:
|
||||
print(f"Exception: {traceback.format_exc()}")
|
||||
if config.enabled:
|
||||
context["_datahub_config"] = config
|
||||
try:
|
||||
datahub_task_status_callback(context, status=InstanceRunResult.FAILURE)
|
||||
except Exception as e:
|
||||
if not config.graceful_exceptions:
|
||||
raise e
|
||||
else:
|
||||
print(f"Exception: {traceback.format_exc()}")
|
||||
|
||||
# Call original policy
|
||||
if on_failure_callback:
|
||||
@ -269,15 +271,17 @@ def _wrap_on_failure_callback(on_failure_callback):
|
||||
def _wrap_on_success_callback(on_success_callback):
|
||||
def custom_on_success_callback(context):
|
||||
config = get_lineage_config()
|
||||
context["_datahub_config"] = config
|
||||
try:
|
||||
datahub_task_status_callback(context, status=InstanceRunResult.SUCCESS)
|
||||
except Exception as e:
|
||||
if not config.graceful_exceptions:
|
||||
raise e
|
||||
else:
|
||||
print(f"Exception: {traceback.format_exc()}")
|
||||
if config.enabled:
|
||||
context["_datahub_config"] = config
|
||||
try:
|
||||
datahub_task_status_callback(context, status=InstanceRunResult.SUCCESS)
|
||||
except Exception as e:
|
||||
if not config.graceful_exceptions:
|
||||
raise e
|
||||
else:
|
||||
print(f"Exception: {traceback.format_exc()}")
|
||||
|
||||
# Call original policy
|
||||
if on_success_callback:
|
||||
on_success_callback(context)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user