From f33cf4229089c24a528875f647b5cca2de3c1926 Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Mon, 19 May 2025 09:29:02 +0530 Subject: [PATCH] Fix - 21204 inconsistent domain reassignment in bulkAsset API (#21205) --- .../service/jdbi3/CollectionDAO.java | 21 +++ .../service/jdbi3/DataProductRepository.java | 100 +++++++++++++++ .../service/jdbi3/DomainRepository.java | 118 +++++++++++++++-- .../service/jdbi3/EntityRepository.java | 120 ++++++++++++------ .../jdbi3/IngestionPipelineRepository.java | 2 +- .../openmetadata/service/util/EntityUtil.java | 19 ++- 6 files changed, 328 insertions(+), 52 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index f49e6c4515e..792976daadc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -975,6 +975,13 @@ public interface CollectionDAO { bulkRemoveTo(fromId, toIdsAsString, fromEntity, toEntity, relation); } + default void bulkRemoveFromRelationship( + List fromIds, UUID toId, String fromEntity, String toEntity, int relation) { + + List fromIdsAsString = fromIds.stream().map(UUID::toString).toList(); + bulkRemoveFrom(fromIdsAsString, toId, fromEntity, toEntity, relation); + } + @ConnectionAwareSqlUpdate( value = "INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) " @@ -1022,6 +1029,20 @@ public interface CollectionDAO { @Bind("toEntity") String toEntity, @Bind("relation") int relation); + @SqlUpdate( + "DELETE FROM entity_relationship " + + "WHERE fromEntity = :fromEntity " + + "AND fromId IN () " + + "AND toEntity = :toEntity " + + "AND relation = :relation " + + "AND toId = :toId") + void bulkRemoveFrom( + @BindList("fromIds") List fromIds, + @BindUUID("toId") UUID toId, + @Bind("fromEntity") String fromEntity, + @Bind("toEntity") String toEntity, + @Bind("relation") int relation); + // // Find to operations // diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java index cca08f6dac0..3bc47154e29 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java @@ -24,22 +24,28 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.domains.DataProduct; import org.openmetadata.schema.entity.domains.Domain; import org.openmetadata.schema.type.ApiStatus; +import org.openmetadata.schema.type.ChangeDescription; +import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.api.BulkAssets; import org.openmetadata.schema.type.api.BulkOperationResult; +import org.openmetadata.schema.type.api.BulkResponse; import org.openmetadata.schema.type.change.ChangeSource; import org.openmetadata.service.Entity; import org.openmetadata.service.resources.domains.DataProductResource; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; +import org.openmetadata.service.util.JsonUtils; import org.openmetadata.service.util.LineageUtil; @Slf4j @@ -147,6 +153,100 @@ public class DataProductRepository extends EntityRepository { return result; } + @Transaction + @Override + protected BulkOperationResult bulkAssetsOperation( + UUID entityId, + String fromEntity, + Relationship relationship, + BulkAssets request, + boolean isAdd) { + BulkOperationResult result = + new BulkOperationResult().withStatus(ApiStatus.SUCCESS).withDryRun(false); + List success = new ArrayList<>(); + + EntityUtil.populateEntityReferences(request.getAssets()); + + for (EntityReference ref : request.getAssets()) { + result.setNumberOfRowsProcessed(result.getNumberOfRowsProcessed() + 1); + + removeCrossDomainDataProducts(ref, relationship); + + if (isAdd) { + addRelationship(entityId, ref.getId(), fromEntity, ref.getType(), relationship); + } else { + deleteRelationship(entityId, fromEntity, ref.getId(), ref.getType(), relationship); + } + + success.add(new BulkResponse().withRequest(ref)); + result.setNumberOfRowsPassed(result.getNumberOfRowsPassed() + 1); + + searchRepository.updateEntity(ref); + } + + result.withSuccessRequest(success); + + // Create a Change Event on successful addition/removal of assets + if (result.getStatus().equals(ApiStatus.SUCCESS)) { + EntityInterface entityInterface = Entity.getEntity(fromEntity, entityId, "id", ALL); + ChangeDescription change = + addBulkAddRemoveChangeDescription( + entityInterface.getVersion(), isAdd, request.getAssets(), null); + ChangeEvent changeEvent = + getChangeEvent(entityInterface, change, fromEntity, entityInterface.getVersion()); + Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); + } + + return result; + } + + private void removeCrossDomainDataProducts(EntityReference ref, Relationship relationship) { + EntityReference domain = + getFromEntityRef(ref.getId(), ref.getType(), relationship, DOMAIN, false); + List dataProducts = getDataProducts(ref.getId(), ref.getType()); + + if (!dataProducts.isEmpty() && domain != null) { + // Map dataProduct -> domain + Map associatedDomains = + daoCollection + .relationshipDAO() + .findFromBatch( + dataProducts.stream() + .map(dp -> dp.getId().toString()) + .collect(Collectors.toList()), + relationship.ordinal(), + DOMAIN) + .stream() + .collect( + Collectors.toMap( + rec -> UUID.fromString(rec.getToId()), + rec -> UUID.fromString(rec.getFromId()))); + + List dataProductsToDelete = + dataProducts.stream() + .filter( + dataProduct -> { + UUID associatedDomainId = associatedDomains.get(dataProduct.getId()); + return associatedDomainId != null && !associatedDomainId.equals(domain.getId()); + }) + .collect(Collectors.toList()); + + if (!dataProductsToDelete.isEmpty()) { + daoCollection + .relationshipDAO() + .bulkRemoveFromRelationship( + dataProductsToDelete.stream() + .map(EntityReference::getId) + .collect(Collectors.toList()), + ref.getId(), + DATA_PRODUCT, + ref.getType(), + relationship.ordinal()); + LineageUtil.removeDataProductsLineage(ref.getId(), ref.getType(), dataProductsToDelete); + } + } + } + @Override public void restorePatchAttributes(DataProduct original, DataProduct updated) { super.restorePatchAttributes(original, updated); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DomainRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DomainRepository.java index 8e308918890..feef0be59b7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DomainRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DomainRepository.java @@ -15,8 +15,10 @@ package org.openmetadata.service.jdbi3; import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.schema.type.Include.ALL; +import static org.openmetadata.service.Entity.DATA_PRODUCT; import static org.openmetadata.service.Entity.DOMAIN; import static org.openmetadata.service.Entity.FIELD_ASSETS; +import static org.openmetadata.service.Entity.getEntityReferenceById; import java.util.ArrayList; import java.util.List; @@ -29,11 +31,14 @@ import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.data.EntityHierarchy; import org.openmetadata.schema.entity.domains.Domain; import org.openmetadata.schema.type.ApiStatus; +import org.openmetadata.schema.type.ChangeDescription; +import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.api.BulkAssets; import org.openmetadata.schema.type.api.BulkOperationResult; +import org.openmetadata.schema.type.api.BulkResponse; import org.openmetadata.schema.type.change.ChangeSource; import org.openmetadata.service.Entity; import org.openmetadata.service.resources.domains.DomainResource; @@ -111,29 +116,116 @@ public class DomainRepository extends EntityRepository { public BulkOperationResult bulkAddAssets(String domainName, BulkAssets request) { Domain domain = getByName(null, domainName, getFields("id")); - BulkOperationResult result = - bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, true); - // Add assets to domain - if (result.getStatus().equals(ApiStatus.SUCCESS)) { - for (EntityReference ref : listOrEmpty(request.getAssets())) { - LineageUtil.addDomainLineage(ref.getId(), ref.getType(), domain.getEntityReference()); - } - } - return result; + return bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, true); } public BulkOperationResult bulkRemoveAssets(String domainName, BulkAssets request) { Domain domain = getByName(null, domainName, getFields("id")); + return bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, false); + } + + @Transaction + @Override + protected BulkOperationResult bulkAssetsOperation( + UUID entityId, + String fromEntity, + Relationship relationship, + BulkAssets request, + boolean isAdd) { BulkOperationResult result = - bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, false); - if (result.getStatus().equals(ApiStatus.SUCCESS)) { - for (EntityReference ref : listOrEmpty(request.getAssets())) { - LineageUtil.removeDomainLineage(ref.getId(), ref.getType(), domain.getEntityReference()); + new BulkOperationResult().withStatus(ApiStatus.SUCCESS).withDryRun(false); + List success = new ArrayList<>(); + + EntityUtil.populateEntityReferences(request.getAssets()); + + for (EntityReference ref : request.getAssets()) { + result.setNumberOfRowsProcessed(result.getNumberOfRowsProcessed() + 1); + + cleanupOldDomain(ref, fromEntity, relationship); + cleanupDataProducts(entityId, ref, relationship, isAdd); + + if (isAdd) { + addRelationship(entityId, ref.getId(), fromEntity, ref.getType(), relationship); + EntityReference domainRef = getEntityReferenceById(DOMAIN, entityId, ALL); + LineageUtil.addDomainLineage(entityId, ref.getType(), domainRef); } + + success.add(new BulkResponse().withRequest(ref)); + result.setNumberOfRowsPassed(result.getNumberOfRowsPassed() + 1); + + searchRepository.updateEntity(ref); } + + result.withSuccessRequest(success); + + // Create a Change Event on successful addition/removal of assets + if (result.getStatus().equals(ApiStatus.SUCCESS)) { + EntityInterface entityInterface = Entity.getEntity(fromEntity, entityId, "id", ALL); + ChangeDescription change = + addBulkAddRemoveChangeDescription( + entityInterface.getVersion(), isAdd, request.getAssets(), null); + ChangeEvent changeEvent = + getChangeEvent(entityInterface, change, fromEntity, entityInterface.getVersion()); + Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); + } + return result; } + private void cleanupOldDomain(EntityReference ref, String fromEntity, Relationship relationship) { + EntityReference oldDomain = + getFromEntityRef(ref.getId(), ref.getType(), relationship, DOMAIN, false); + deleteTo(ref.getId(), ref.getType(), relationship, fromEntity); + LineageUtil.removeDomainLineage(ref.getId(), ref.getType(), oldDomain); + } + + private void cleanupDataProducts( + UUID entityId, EntityReference ref, Relationship relationship, boolean isAdd) { + List dataProducts = getDataProducts(ref.getId(), ref.getType()); + if (dataProducts.isEmpty()) return; + + // Map dataProduct -> domain + Map associatedDomains = + daoCollection + .relationshipDAO() + .findFromBatch( + dataProducts.stream().map(dp -> dp.getId().toString()).collect(Collectors.toList()), + relationship.ordinal(), + DOMAIN) + .stream() + .collect( + Collectors.toMap( + rec -> UUID.fromString(rec.getToId()), + rec -> UUID.fromString(rec.getFromId()))); + + // For isAdd, filter only those data products linked to a different domain. + // For isRemove, delete all data products. + List dataProductsToDelete = + isAdd + ? dataProducts.stream() + .filter( + dp -> { + UUID domainId = associatedDomains.get(dp.getId()); + return domainId != null && !domainId.equals(entityId); + }) + .collect(Collectors.toList()) + : dataProducts; + + if (!dataProductsToDelete.isEmpty()) { + daoCollection + .relationshipDAO() + .bulkRemoveFromRelationship( + dataProductsToDelete.stream() + .map(EntityReference::getId) + .collect(Collectors.toList()), + ref.getId(), + DATA_PRODUCT, + ref.getType(), + relationship.ordinal()); + LineageUtil.removeDataProductsLineage(ref.getId(), ref.getType(), dataProductsToDelete); + } + } + @Override public EntityRepository.EntityUpdater getUpdater( Domain original, Domain updated, Operation operation, ChangeSource changeSource) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index c284930569a..1f2012b2389 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -49,6 +49,7 @@ import static org.openmetadata.service.Entity.TEAM; import static org.openmetadata.service.Entity.USER; import static org.openmetadata.service.Entity.getEntityByName; import static org.openmetadata.service.Entity.getEntityFields; +import static org.openmetadata.service.Entity.getEntityReferenceById; import static org.openmetadata.service.exception.CatalogExceptionMessage.csvNotSupported; import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound; import static org.openmetadata.service.resources.tags.TagLabelUtil.addDerivedTags; @@ -56,6 +57,7 @@ import static org.openmetadata.service.resources.tags.TagLabelUtil.checkDisabled import static org.openmetadata.service.resources.tags.TagLabelUtil.checkMutuallyExclusive; import static org.openmetadata.service.resources.tags.TagLabelUtil.populateTagLabel; import static org.openmetadata.service.util.EntityUtil.compareTagLabel; +import static org.openmetadata.service.util.EntityUtil.entityReferenceListMatch; import static org.openmetadata.service.util.EntityUtil.entityReferenceMatch; import static org.openmetadata.service.util.EntityUtil.fieldAdded; import static org.openmetadata.service.util.EntityUtil.fieldDeleted; @@ -1577,7 +1579,7 @@ public abstract class EntityRepository { T entity = find(entityId, NON_DELETED); // Validate follower - EntityReference user = Entity.getEntityReferenceById(USER, userId, NON_DELETED); + EntityReference user = getEntityReferenceById(USER, userId, NON_DELETED); // Remove follower deleteRelationship(userId, USER, entityId, entityType, Relationship.FOLLOWS); @@ -2128,27 +2130,41 @@ public abstract class EntityRepository { return RestUtil.getHref(uriInfo, collectionPath, id); } - private void removeDomainDataProducts(EntityReference originalDomain, T entity) { - List dataProductIds = + private void removeCrossDomainDataProducts(EntityReference domain, T entity) { + if (!supportsDataProducts) { + return; + } + + List entityDataProducts = entity.getDataProducts(); + if (entityDataProducts == null) { + return; + } + + if (domain == null) { + entityDataProducts.clear(); + LOG.info( + "Removed all data products from entity {} as no domain is provided", + entity.getEntityReference().getType()); + return; + } + + // Fetch domain data products + List domainDataProductIds = daoCollection .relationshipDAO() - .findToIds( - originalDomain.getId(), DOMAIN, Relationship.HAS.ordinal(), Entity.DATA_PRODUCT); + .findToIds(domain.getId(), DOMAIN, Relationship.HAS.ordinal(), Entity.DATA_PRODUCT); - List updatedDataProducts = entity.getDataProducts(); - if (updatedDataProducts != null) { - updatedDataProducts.removeIf( - dataProduct -> { - boolean isDomainDataProduct = dataProductIds.contains(dataProduct.getId()); - if (isDomainDataProduct) { - LOG.info( - "Removing data product {} from entity {}", - dataProduct.getFullyQualifiedName(), - entity.getEntityReference().getType()); - } - return isDomainDataProduct; - }); - } + entityDataProducts.removeIf( + dataProduct -> { + boolean isNotDomainDataProduct = !domainDataProductIds.contains(dataProduct.getId()); + if (isNotDomainDataProduct) { + LOG.info( + "Removing data product {} from entity {}", + dataProduct.getFullyQualifiedName(), + entity.getEntityReference().getType()); + } + return isNotDomainDataProduct; + }); } @Transaction @@ -2290,6 +2306,21 @@ public abstract class EntityRepository { return getFromEntityRef(toId, Relationship.CONTAINS, fromEntityType, true); } + public final EntityReference getFromEntityRef( + UUID toId, + String toEntity, + Relationship relationship, + String fromEntityType, + boolean mustHaveRelationship) { + List records = + findFromRecords(toId, toEntity, relationship, fromEntityType); + ensureSingleRelationship( + toEntity, toId, records, relationship.value(), fromEntityType, mustHaveRelationship); + return !records.isEmpty() + ? Entity.getEntityReferenceById(records.get(0).getType(), records.get(0).getId(), ALL) + : null; + } + public final EntityReference getFromEntityRef( UUID toId, Relationship relationship, String fromEntityType, boolean mustHaveRelationship) { List records = @@ -2319,7 +2350,7 @@ public abstract class EntityRepository { ensureSingleRelationship( entityType, fromId, records, relationship.value(), toEntityType, mustHaveRelationship); return !records.isEmpty() - ? Entity.getEntityReferenceById(records.get(0).getType(), records.get(0).getId(), ALL) + ? getEntityReferenceById(records.get(0).getType(), records.get(0).getId(), ALL) : null; } @@ -2401,7 +2432,7 @@ public abstract class EntityRepository { for (EntityReference entityReference : entityReferences) { EntityReference ref = entityReference.getId() != null - ? Entity.getEntityReferenceById(USER, entityReference.getId(), ALL) + ? getEntityReferenceById(USER, entityReference.getId(), ALL) : Entity.getEntityReferenceByName( USER, entityReference.getFullyQualifiedName(), ALL); EntityUtil.copy(ref, entityReference); @@ -2426,7 +2457,7 @@ public abstract class EntityRepository { } else { EntityReference ref = entityReferences.get(0).getId() != null - ? Entity.getEntityReferenceById(TEAM, entityReferences.get(0).getId(), ALL) + ? getEntityReferenceById(TEAM, entityReferences.get(0).getId(), ALL) : Entity.getEntityReferenceByName( TEAM, entityReferences.get(0).getFullyQualifiedName(), ALL); EntityUtil.copy(ref, entityReferences.get(0)); @@ -2435,7 +2466,7 @@ public abstract class EntityRepository { for (EntityReference entityReference : entityReferences) { EntityReference ref = entityReference.getId() != null - ? Entity.getEntityReferenceById(USER, entityReference.getId(), ALL) + ? getEntityReferenceById(USER, entityReference.getId(), ALL) : Entity.getEntityReferenceByName( USER, entityReference.getFullyQualifiedName(), ALL); EntityUtil.copy(ref, entityReference); @@ -2451,8 +2482,7 @@ public abstract class EntityRepository { public final void validateRoles(List roles) { if (roles != null) { for (EntityReference entityReference : roles) { - EntityReference ref = - Entity.getEntityReferenceById(Entity.ROLE, entityReference.getId(), ALL); + EntityReference ref = getEntityReferenceById(Entity.ROLE, entityReference.getId(), ALL); EntityUtil.copy(ref, entityReference); } roles.sort(EntityUtil.compareEntityReference); @@ -2462,8 +2492,7 @@ public abstract class EntityRepository { final void validatePolicies(List policies) { if (policies != null) { for (EntityReference entityReference : policies) { - EntityReference ref = - Entity.getEntityReferenceById(Entity.POLICY, entityReference.getId(), ALL); + EntityReference ref = getEntityReferenceById(Entity.POLICY, entityReference.getId(), ALL); EntityUtil.copy(ref, entityReference); } policies.sort(EntityUtil.compareEntityReference); @@ -2620,6 +2649,7 @@ public abstract class EntityRepository { } } + @Transaction protected BulkOperationResult bulkAssetsOperation( UUID entityId, String fromEntity, @@ -2665,7 +2695,7 @@ public abstract class EntityRepository { return result; } - private ChangeDescription addBulkAddRemoveChangeDescription( + protected ChangeDescription addBulkAddRemoveChangeDescription( Double version, boolean isAdd, Object newValue, Object oldValue) { FieldChange fieldChange = new FieldChange().withName("assets").withNewValue(newValue).withOldValue(oldValue); @@ -2678,7 +2708,7 @@ public abstract class EntityRepository { return change; } - private ChangeEvent getChangeEvent( + protected ChangeEvent getChangeEvent( EntityInterface updated, ChangeDescription change, String entityType, Double prevVersion) { return new ChangeEvent() .withId(UUID.randomUUID()) @@ -2827,7 +2857,7 @@ public abstract class EntityRepository { throw new IllegalArgumentException( CatalogExceptionMessage.invalidOwnerType(owner.getType())); } - return Entity.getEntityReferenceById(owner.getType(), owner.getId(), ALL); + return getEntityReferenceById(owner.getType(), owner.getId(), ALL); }) .collect(Collectors.toList()); } @@ -2859,7 +2889,7 @@ public abstract class EntityRepository { if (!supportsDomain) { throw new IllegalArgumentException(CatalogExceptionMessage.invalidField(FIELD_DOMAIN)); } - Entity.getEntityReferenceById(DOMAIN, domain.getId(), NON_DELETED); + getEntityReferenceById(DOMAIN, domain.getId(), NON_DELETED); } public final void validateDataProducts(List dataProducts) { @@ -2869,7 +2899,7 @@ public abstract class EntityRepository { if (!nullOrEmpty(dataProducts)) { for (EntityReference dataProduct : dataProducts) { - Entity.getEntityReferenceById(DATA_PRODUCT, dataProduct.getId(), NON_DELETED); + getEntityReferenceById(DATA_PRODUCT, dataProduct.getId(), NON_DELETED); } } } @@ -3462,8 +3492,6 @@ public abstract class EntityRepository { removeDomainLineage(updated.getId(), entityType, origDomain); deleteRelationship( origDomain.getId(), DOMAIN, original.getId(), entityType, Relationship.HAS); - // Clean up data products associated with the old domain on domain update - removeDomainDataProducts(original.getDomain(), updated); } if (updatedDomain != null) { validateDomain(updatedDomain); @@ -3477,6 +3505,8 @@ public abstract class EntityRepository { addDomainLineage(updated.getId(), entityType, updatedDomain); } updated.setDomain(updatedDomain); + // Clean up data products associated not associated with the updated domain + removeCrossDomainDataProducts(updated.getDomain(), updated); } else { updated.setDomain(original.getDomain()); } @@ -3489,6 +3519,22 @@ public abstract class EntityRepository { List origDataProducts = listOrEmpty(original.getDataProducts()); List updatedDataProducts = listOrEmpty(updated.getDataProducts()); validateDataProducts(updatedDataProducts); + if (updated.getDomain() == null && !nullOrEmpty(updatedDataProducts)) { + throw new IllegalArgumentException( + "Domain cannot be empty when data products are provided."); + } + // Clean up data products associated with the old domain + if (original.getDomain() != null + && Objects.equals(original.getDomain(), updated.getDomain()) + && recordChange( + FIELD_DATA_PRODUCTS, + origDataProducts, + updatedDataProducts, + true, + entityReferenceListMatch)) { + removeCrossDomainDataProducts(updated.getDomain(), updated); + updatedDataProducts = listOrEmpty(updated.getDataProducts()); + } updateFromRelationships( FIELD_DATA_PRODUCTS, DATA_PRODUCT, @@ -4307,7 +4353,7 @@ public abstract class EntityRepository { for (CollectionDAO.EntityRelationshipObject rec : records) { UUID toId = UUID.fromString(rec.getToId()); EntityReference ownerRef = - Entity.getEntityReferenceById(rec.getFromEntity(), UUID.fromString(rec.getFromId()), ALL); + getEntityReferenceById(rec.getFromEntity(), UUID.fromString(rec.getFromId()), ALL); ownersMap.computeIfAbsent(toId, k -> new ArrayList<>()).add(ownerRef); } @@ -4349,7 +4395,7 @@ public abstract class EntityRepository { UUID fromId = UUID.fromString(rec.getFromId()); String fromEntity = rec.getFromEntity(); - EntityReference domainRef = Entity.getEntityReferenceById(fromEntity, fromId, ALL); + EntityReference domainRef = getEntityReferenceById(fromEntity, fromId, ALL); // Since each entity can have only one domain, we can directly put it in the map if (domainsMap.containsKey(toId)) { @@ -4376,7 +4422,7 @@ public abstract class EntityRepository { for (CollectionDAO.EntityRelationshipObject rec : records) { UUID entityId = UUID.fromString(rec.getToId()); EntityReference reviewerRef = - Entity.getEntityReferenceById(rec.getFromEntity(), UUID.fromString(rec.getFromId()), ALL); + getEntityReferenceById(rec.getFromEntity(), UUID.fromString(rec.getFromId()), ALL); reviewersMap.computeIfAbsent(entityId, k -> new ArrayList<>()).add(reviewerRef); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index 5a0f1666738..c3443eb808d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -188,7 +188,7 @@ public class IngestionPipelineRepository extends EntityRepository entityReferenceMatch = (ref1, ref2) -> ref1.getId().equals(ref2.getId()) && ref1.getType().equals(ref2.getType()); - + public static final BiPredicate, List> + entityReferenceListMatch = + (list1, list2) -> { + if (list1 == null || list2 == null) { + return list1 == list2; + } + if (list1.size() != list2.size()) { + return false; + } + for (int i = 0; i < list1.size(); i++) { + EntityReference ref1 = list1.get(i); + EntityReference ref2 = list2.get(i); + if (ref1 == null || ref2 == null || !entityReferenceMatch.test(ref1, ref2)) { + return false; + } + } + return true; + }; public static final BiPredicate tagLabelMatch = (tag1, tag2) -> tag1.getTagFQN().equals(tag2.getTagFQN()) && tag1.getSource().equals(tag2.getSource());