Handle ingestion pipelines without service relationships during secrets migration by skipping orphaned pipelines (#23454)

This commit is contained in:
Ajith Prasad 2025-09-19 07:52:50 +05:30 committed by GitHub
parent 4be7d65fbf
commit a6e7ce45b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -13,6 +13,7 @@
package org.openmetadata.service.secrets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -28,9 +29,11 @@ import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.entity.automations.Workflow;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.exception.SecretsManagerUpdateException;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.jdbi3.ListFilter;
@ -270,23 +273,87 @@ public class SecretsManagerUpdateService {
private List<IngestionPipeline> retrieveIngestionPipelines() {
try {
// Need to fetch with service field to avoid NPE when accessing service.getId()
Fields fields = new Fields(Set.of("service"));
return ingestionPipelineRepository
.listAfter(
null,
fields,
new ListFilter(),
ingestionPipelineRepository.getDao().listCount(new ListFilter()),
null)
.getData();
} catch (EntityNotFoundException entityNotFoundException) {
LOG.error(
"Failed to retrieve ingestion pipelines. Entity not found: {}",
entityNotFoundException.getMessage());
return Collections.emptyList();
} catch (Exception e) {
List<IngestionPipeline> pipelines =
ingestionPipelineRepository
.listAfter(
null,
fields,
new ListFilter(),
ingestionPipelineRepository.getDao().listCount(new ListFilter()),
null)
.getData();
LOG.info(
"Successfully retrieved {} ingestion pipelines for secrets migration", pipelines.size());
return pipelines;
} catch (UnhandledServerException e) {
if (e.getMessage().contains("does not have expected relationship contains")) {
LOG.warn("Found orphaned pipelines, filtering them out: {}", e.getMessage());
return retrievePipelinesIndividually();
}
// Re-throw if it's a different UnhandledServerException
throw new SecretsManagerUpdateException(e.getMessage(), e.getCause());
} catch (Exception e) {
LOG.error("Failed to retrieve ingestion pipelines: {}", e.getMessage());
return Collections.emptyList();
}
}
private List<IngestionPipeline> retrievePipelinesIndividually() {
List<IngestionPipeline> validPipelines = new ArrayList<>();
try {
int totalCount = ingestionPipelineRepository.getDao().listCount(new ListFilter());
List<String> pipelineJsons =
ingestionPipelineRepository.getDao().listAfter(new ListFilter(), totalCount, 0);
LOG.info("Processing {} pipelines individually to filter out orphaned ones", totalCount);
int skippedCount = 0;
for (String pipelineJson : pipelineJsons) {
try {
IngestionPipeline pipeline = JsonUtils.readValue(pipelineJson, IngestionPipeline.class);
IngestionPipeline fullPipeline =
ingestionPipelineRepository.get(
null, pipeline.getId(), ingestionPipelineRepository.getFields("service"));
if (fullPipeline != null && fullPipeline.getService() != null) {
validPipelines.add(fullPipeline);
}
} catch (UnhandledServerException e) {
if (e.getMessage().contains("does not have expected relationship contains")) {
try {
IngestionPipeline basicInfo =
JsonUtils.readValue(pipelineJson, IngestionPipeline.class);
LOG.warn(
"Skipping orphaned ingestion pipeline {} (id: {}) - no service relationship",
basicInfo.getName(),
basicInfo.getId());
skippedCount++;
} catch (Exception parseEx) {
LOG.warn("Skipping orphaned ingestion pipeline - unable to parse details");
skippedCount++;
}
} else {
throw new SecretsManagerUpdateException(
String.format("Failed to load ingestion pipeline: %s", e.getMessage()), e);
}
} catch (Exception e) {
throw new SecretsManagerUpdateException(
String.format("Unexpected error loading ingestion pipeline: %s", e.getMessage()), e);
}
}
LOG.info(
"Successfully filtered pipelines: {} valid, {} skipped out of {} total",
validPipelines.size(),
skippedCount,
totalCount);
return validPipelines;
} catch (Exception e) {
LOG.error("Failed to retrieve pipelines individually: {}", e.getMessage());
return Collections.emptyList();
}
}