Add deploy pipeline method to ingestion pipeline repository (#22999)

* Adding deployIngestionPipeline method to the IngestionPipelineRepository

* Add Missing entityspecificUpdate

* Fix updating relationships

* Made updateProcessingEngine protected

* Update updateFrom and  updateTo relationship to be able to just delete a relationship if we update it to null
This commit is contained in:
IceS2 2025-08-20 10:32:51 +02:00 committed by Pablo Takara
parent e0f4a59770
commit 0782dc28b6
No known key found for this signature in database
GPG Key ID: 63381DDFBB2BF725
2 changed files with 63 additions and 11 deletions

View File

@ -4571,13 +4571,15 @@ public abstract class EntityRepository<T extends EntityInterface> {
deleteTo(fromId, fromEntityType, relationshipType, toEntityType);
}
// Add relationships from updated
addRelationship(
fromId,
updatedToRef.getId(),
fromEntityType,
toEntityType,
relationshipType,
bidirectional);
if (updatedToRef != null) {
addRelationship(
fromId,
updatedToRef.getId(),
fromEntityType,
toEntityType,
relationshipType,
bidirectional);
}
}
/**
@ -4652,7 +4654,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
deleteTo(toId, toEntityType, relationshipType, fromEntityType);
// Add relationships from updated
addRelationship(updatedFromRef.getId(), toId, fromEntityType, toEntityType, relationshipType);
if (updatedFromRef != null) {
addRelationship(
updatedFromRef.getId(), toId, fromEntityType, toEntityType, relationshipType);
}
}
public final void storeUpdate() {

View File

@ -15,6 +15,7 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.schema.type.EventType.ENTITY_FIELDS_CHANGED;
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.service.Entity.INGESTION_PIPELINE;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriInfo;
@ -30,9 +31,11 @@ import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.json.JSONObject;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.entity.applications.configuration.ApplicationConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineServiceClientResponse;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.metadataIngestion.ApplicationPipeline;
@ -233,9 +236,17 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
secretsManager.encryptOpenMetadataConnection(openmetadataConnection, true);
}
ingestionPipeline.withService(null).withOpenMetadataServerConnection(null);
EntityReference processingEngine = ingestionPipeline.getProcessingEngine();
ingestionPipeline
.withService(null)
.withOpenMetadataServerConnection(null)
.withProcessingEngine(null);
store(ingestionPipeline, update);
ingestionPipeline.withService(service).withOpenMetadataServerConnection(openmetadataConnection);
ingestionPipeline
.withService(service)
.withOpenMetadataServerConnection(openmetadataConnection)
.withProcessingEngine(processingEngine);
}
@Override
@ -249,6 +260,15 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
ingestionPipeline.getIngestionRunner().getType(),
Relationship.USES);
}
if (ingestionPipeline.getProcessingEngine() != null) {
addRelationship(
ingestionPipeline.getId(),
ingestionPipeline.getProcessingEngine().getId(),
entityType,
ingestionPipeline.getProcessingEngine().getType(),
Relationship.USES);
}
}
@Override
@ -462,6 +482,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
@Transaction
@Override
public void entitySpecificUpdate(boolean consolidatingChanges) {
updateProcessingEngine(original, updated);
updateSourceConfig();
updateAirflowConfig(original.getAirflowConfig(), updated.getAirflowConfig());
updateLogLevel(original.getLoggerLevel(), updated.getLoggerLevel());
@ -470,6 +491,27 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
updateRaiseOnError(original.getRaiseOnError(), updated.getRaiseOnError());
}
protected void updateProcessingEngine(IngestionPipeline original, IngestionPipeline updated) {
String entityType =
original.getProcessingEngine() != null
? original.getProcessingEngine().getType()
: updated.getProcessingEngine() != null
? updated.getProcessingEngine().getType()
: null;
if (entityType == null) {
return;
}
updateToRelationship(
"processingEngine",
INGESTION_PIPELINE,
original.getId(),
Relationship.USES,
entityType,
original.getProcessingEngine(),
updated.getProcessingEngine(),
false);
}
private void updateSourceConfig() {
JSONObject origSourceConfig =
new JSONObject(JsonUtils.pojoToJson(original.getSourceConfig().getConfig()));
@ -513,7 +555,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
}
}
private static IngestionPipeline buildIngestionPipelineDecrypted(IngestionPipeline original) {
protected static IngestionPipeline buildIngestionPipelineDecrypted(IngestionPipeline original) {
IngestionPipeline decrypted =
JsonUtils.convertValue(JsonUtils.getMap(original), IngestionPipeline.class);
SecretsManagerFactory.getSecretsManager().decryptIngestionPipeline(decrypted);
@ -545,4 +587,9 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
return ingestionPipeline.getPipelineType().value();
}
}
public PipelineServiceClientResponse deployIngestionPipeline(
IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
return pipelineServiceClient.deployPipeline(ingestionPipeline, service);
}
}