diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java index 8d6a209dc65..ea5005f927a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java @@ -170,7 +170,7 @@ public class CreateIngestionPipelineImpl { JsonUtils.readOrConvertValue(ingestionPipelineStr, IngestionPipeline.class); if (ingestionPipeline.getPipelineType().equals(pipelineType) && ingestionPipeline.getDisplayName().equals(displayName)) { - return ingestionPipeline; + return ingestionPipeline.withService(service.getEntityReference()); } } return null; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/RunIngestionPipelineImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/RunIngestionPipelineImpl.java index e7f3d2b4b38..35f9f8639e8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/RunIngestionPipelineImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/RunIngestionPipelineImpl.java @@ -90,6 +90,7 @@ public class RunIngestionPipelineImpl { IngestionPipeline ingestionPipeline, long startTime, long timeoutMillis) { + long backoffMillis = 5 * 1000; while (true) { if (System.currentTimeMillis() - startTime > timeoutMillis) { return false; @@ -102,7 +103,14 @@ public class RunIngestionPipelineImpl { .getData(); if (statuses.isEmpty()) { - continue; + try { + Thread.sleep(backoffMillis); + backoffMillis *= 2; + continue; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Retry interrupted", ie); + } } PipelineStatus status = statuses.get(statuses.size() - 1);