From a6e7ce45b36ff0e7adad3694c8f0a17c360780f0 Mon Sep 17 00:00:00 2001 From: Ajith Prasad <37380177+aji-aju@users.noreply.github.com> Date: Fri, 19 Sep 2025 07:52:50 +0530 Subject: [PATCH] Handle ingestion pipelines without service relationships during secrets migration by skipping orphaned pipelines (#23454) --- .../secrets/SecretsManagerUpdateService.java | 97 ++++++++++++++++--- 1 file changed, 82 insertions(+), 15 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/secrets/SecretsManagerUpdateService.java b/openmetadata-service/src/main/java/org/openmetadata/service/secrets/SecretsManagerUpdateService.java index 8d17eab9fe5..8fa13d8d24c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/secrets/SecretsManagerUpdateService.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/secrets/SecretsManagerUpdateService.java @@ -13,6 +13,7 @@ package org.openmetadata.service.secrets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -28,9 +29,11 @@ import org.openmetadata.schema.ServiceEntityInterface; import org.openmetadata.schema.entity.automations.Workflow; import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; import org.openmetadata.schema.entity.teams.User; +import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.Entity; import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.exception.SecretsManagerUpdateException; +import org.openmetadata.service.exception.UnhandledServerException; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.IngestionPipelineRepository; import org.openmetadata.service.jdbi3.ListFilter; @@ -270,23 +273,87 @@ public class SecretsManagerUpdateService { private List retrieveIngestionPipelines() { try { - // Need to fetch with service field to avoid NPE when accessing service.getId() Fields fields = new Fields(Set.of("service")); - return ingestionPipelineRepository - .listAfter( - null, - fields, - new ListFilter(), - ingestionPipelineRepository.getDao().listCount(new ListFilter()), - null) - .getData(); - } catch (EntityNotFoundException entityNotFoundException) { - LOG.error( - "Failed to retrieve ingestion pipelines. Entity not found: {}", - entityNotFoundException.getMessage()); - return Collections.emptyList(); - } catch (Exception e) { + List pipelines = + ingestionPipelineRepository + .listAfter( + null, + fields, + new ListFilter(), + ingestionPipelineRepository.getDao().listCount(new ListFilter()), + null) + .getData(); + + LOG.info( + "Successfully retrieved {} ingestion pipelines for secrets migration", pipelines.size()); + return pipelines; + } catch (UnhandledServerException e) { + if (e.getMessage().contains("does not have expected relationship contains")) { + LOG.warn("Found orphaned pipelines, filtering them out: {}", e.getMessage()); + return retrievePipelinesIndividually(); + } + // Re-throw if it's a different UnhandledServerException throw new SecretsManagerUpdateException(e.getMessage(), e.getCause()); + } catch (Exception e) { + LOG.error("Failed to retrieve ingestion pipelines: {}", e.getMessage()); + return Collections.emptyList(); + } + } + + private List retrievePipelinesIndividually() { + List validPipelines = new ArrayList<>(); + try { + int totalCount = ingestionPipelineRepository.getDao().listCount(new ListFilter()); + List pipelineJsons = + ingestionPipelineRepository.getDao().listAfter(new ListFilter(), totalCount, 0); + + LOG.info("Processing {} pipelines individually to filter out orphaned ones", totalCount); + + int skippedCount = 0; + for (String pipelineJson : pipelineJsons) { + try { + IngestionPipeline pipeline = JsonUtils.readValue(pipelineJson, IngestionPipeline.class); + IngestionPipeline fullPipeline = + ingestionPipelineRepository.get( + null, pipeline.getId(), ingestionPipelineRepository.getFields("service")); + + if (fullPipeline != null && fullPipeline.getService() != null) { + validPipelines.add(fullPipeline); + } + } catch (UnhandledServerException e) { + if (e.getMessage().contains("does not have expected relationship contains")) { + try { + IngestionPipeline basicInfo = + JsonUtils.readValue(pipelineJson, IngestionPipeline.class); + LOG.warn( + "Skipping orphaned ingestion pipeline {} (id: {}) - no service relationship", + basicInfo.getName(), + basicInfo.getId()); + skippedCount++; + } catch (Exception parseEx) { + LOG.warn("Skipping orphaned ingestion pipeline - unable to parse details"); + skippedCount++; + } + } else { + throw new SecretsManagerUpdateException( + String.format("Failed to load ingestion pipeline: %s", e.getMessage()), e); + } + } catch (Exception e) { + throw new SecretsManagerUpdateException( + String.format("Unexpected error loading ingestion pipeline: %s", e.getMessage()), e); + } + } + + LOG.info( + "Successfully filtered pipelines: {} valid, {} skipped out of {} total", + validPipelines.size(), + skippedCount, + totalCount); + + return validPipelines; + } catch (Exception e) { + LOG.error("Failed to retrieve pipelines individually: {}", e.getMessage()); + return Collections.emptyList(); } }