mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-13 17:58:36 +00:00
Fix - 21204 inconsistent domain reassignment in bulkAsset API (#21205)
This commit is contained in:
parent
b1edb96469
commit
f33cf42290
@ -975,6 +975,13 @@ public interface CollectionDAO {
|
|||||||
bulkRemoveTo(fromId, toIdsAsString, fromEntity, toEntity, relation);
|
bulkRemoveTo(fromId, toIdsAsString, fromEntity, toEntity, relation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default void bulkRemoveFromRelationship(
|
||||||
|
List<UUID> fromIds, UUID toId, String fromEntity, String toEntity, int relation) {
|
||||||
|
|
||||||
|
List<String> fromIdsAsString = fromIds.stream().map(UUID::toString).toList();
|
||||||
|
bulkRemoveFrom(fromIdsAsString, toId, fromEntity, toEntity, relation);
|
||||||
|
}
|
||||||
|
|
||||||
@ConnectionAwareSqlUpdate(
|
@ConnectionAwareSqlUpdate(
|
||||||
value =
|
value =
|
||||||
"INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) "
|
"INSERT INTO entity_relationship(fromId, toId, fromEntity, toEntity, relation, json) "
|
||||||
@ -1022,6 +1029,20 @@ public interface CollectionDAO {
|
|||||||
@Bind("toEntity") String toEntity,
|
@Bind("toEntity") String toEntity,
|
||||||
@Bind("relation") int relation);
|
@Bind("relation") int relation);
|
||||||
|
|
||||||
|
@SqlUpdate(
|
||||||
|
"DELETE FROM entity_relationship "
|
||||||
|
+ "WHERE fromEntity = :fromEntity "
|
||||||
|
+ "AND fromId IN (<fromIds>) "
|
||||||
|
+ "AND toEntity = :toEntity "
|
||||||
|
+ "AND relation = :relation "
|
||||||
|
+ "AND toId = :toId")
|
||||||
|
void bulkRemoveFrom(
|
||||||
|
@BindList("fromIds") List<String> fromIds,
|
||||||
|
@BindUUID("toId") UUID toId,
|
||||||
|
@Bind("fromEntity") String fromEntity,
|
||||||
|
@Bind("toEntity") String toEntity,
|
||||||
|
@Bind("relation") int relation);
|
||||||
|
|
||||||
//
|
//
|
||||||
// Find to operations
|
// Find to operations
|
||||||
//
|
//
|
||||||
|
@ -24,22 +24,28 @@ import java.util.ArrayList;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
import org.jdbi.v3.sqlobject.transaction.Transaction;
|
||||||
import org.openmetadata.schema.EntityInterface;
|
import org.openmetadata.schema.EntityInterface;
|
||||||
import org.openmetadata.schema.entity.domains.DataProduct;
|
import org.openmetadata.schema.entity.domains.DataProduct;
|
||||||
import org.openmetadata.schema.entity.domains.Domain;
|
import org.openmetadata.schema.entity.domains.Domain;
|
||||||
import org.openmetadata.schema.type.ApiStatus;
|
import org.openmetadata.schema.type.ApiStatus;
|
||||||
|
import org.openmetadata.schema.type.ChangeDescription;
|
||||||
|
import org.openmetadata.schema.type.ChangeEvent;
|
||||||
import org.openmetadata.schema.type.EntityReference;
|
import org.openmetadata.schema.type.EntityReference;
|
||||||
import org.openmetadata.schema.type.Include;
|
import org.openmetadata.schema.type.Include;
|
||||||
import org.openmetadata.schema.type.Relationship;
|
import org.openmetadata.schema.type.Relationship;
|
||||||
import org.openmetadata.schema.type.api.BulkAssets;
|
import org.openmetadata.schema.type.api.BulkAssets;
|
||||||
import org.openmetadata.schema.type.api.BulkOperationResult;
|
import org.openmetadata.schema.type.api.BulkOperationResult;
|
||||||
|
import org.openmetadata.schema.type.api.BulkResponse;
|
||||||
import org.openmetadata.schema.type.change.ChangeSource;
|
import org.openmetadata.schema.type.change.ChangeSource;
|
||||||
import org.openmetadata.service.Entity;
|
import org.openmetadata.service.Entity;
|
||||||
import org.openmetadata.service.resources.domains.DataProductResource;
|
import org.openmetadata.service.resources.domains.DataProductResource;
|
||||||
import org.openmetadata.service.util.EntityUtil;
|
import org.openmetadata.service.util.EntityUtil;
|
||||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||||
|
import org.openmetadata.service.util.JsonUtils;
|
||||||
import org.openmetadata.service.util.LineageUtil;
|
import org.openmetadata.service.util.LineageUtil;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -147,6 +153,100 @@ public class DataProductRepository extends EntityRepository<DataProduct> {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Transaction
|
||||||
|
@Override
|
||||||
|
protected BulkOperationResult bulkAssetsOperation(
|
||||||
|
UUID entityId,
|
||||||
|
String fromEntity,
|
||||||
|
Relationship relationship,
|
||||||
|
BulkAssets request,
|
||||||
|
boolean isAdd) {
|
||||||
|
BulkOperationResult result =
|
||||||
|
new BulkOperationResult().withStatus(ApiStatus.SUCCESS).withDryRun(false);
|
||||||
|
List<BulkResponse> success = new ArrayList<>();
|
||||||
|
|
||||||
|
EntityUtil.populateEntityReferences(request.getAssets());
|
||||||
|
|
||||||
|
for (EntityReference ref : request.getAssets()) {
|
||||||
|
result.setNumberOfRowsProcessed(result.getNumberOfRowsProcessed() + 1);
|
||||||
|
|
||||||
|
removeCrossDomainDataProducts(ref, relationship);
|
||||||
|
|
||||||
|
if (isAdd) {
|
||||||
|
addRelationship(entityId, ref.getId(), fromEntity, ref.getType(), relationship);
|
||||||
|
} else {
|
||||||
|
deleteRelationship(entityId, fromEntity, ref.getId(), ref.getType(), relationship);
|
||||||
|
}
|
||||||
|
|
||||||
|
success.add(new BulkResponse().withRequest(ref));
|
||||||
|
result.setNumberOfRowsPassed(result.getNumberOfRowsPassed() + 1);
|
||||||
|
|
||||||
|
searchRepository.updateEntity(ref);
|
||||||
|
}
|
||||||
|
|
||||||
|
result.withSuccessRequest(success);
|
||||||
|
|
||||||
|
// Create a Change Event on successful addition/removal of assets
|
||||||
|
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
|
||||||
|
EntityInterface entityInterface = Entity.getEntity(fromEntity, entityId, "id", ALL);
|
||||||
|
ChangeDescription change =
|
||||||
|
addBulkAddRemoveChangeDescription(
|
||||||
|
entityInterface.getVersion(), isAdd, request.getAssets(), null);
|
||||||
|
ChangeEvent changeEvent =
|
||||||
|
getChangeEvent(entityInterface, change, fromEntity, entityInterface.getVersion());
|
||||||
|
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeCrossDomainDataProducts(EntityReference ref, Relationship relationship) {
|
||||||
|
EntityReference domain =
|
||||||
|
getFromEntityRef(ref.getId(), ref.getType(), relationship, DOMAIN, false);
|
||||||
|
List<EntityReference> dataProducts = getDataProducts(ref.getId(), ref.getType());
|
||||||
|
|
||||||
|
if (!dataProducts.isEmpty() && domain != null) {
|
||||||
|
// Map dataProduct -> domain
|
||||||
|
Map<UUID, UUID> associatedDomains =
|
||||||
|
daoCollection
|
||||||
|
.relationshipDAO()
|
||||||
|
.findFromBatch(
|
||||||
|
dataProducts.stream()
|
||||||
|
.map(dp -> dp.getId().toString())
|
||||||
|
.collect(Collectors.toList()),
|
||||||
|
relationship.ordinal(),
|
||||||
|
DOMAIN)
|
||||||
|
.stream()
|
||||||
|
.collect(
|
||||||
|
Collectors.toMap(
|
||||||
|
rec -> UUID.fromString(rec.getToId()),
|
||||||
|
rec -> UUID.fromString(rec.getFromId())));
|
||||||
|
|
||||||
|
List<EntityReference> dataProductsToDelete =
|
||||||
|
dataProducts.stream()
|
||||||
|
.filter(
|
||||||
|
dataProduct -> {
|
||||||
|
UUID associatedDomainId = associatedDomains.get(dataProduct.getId());
|
||||||
|
return associatedDomainId != null && !associatedDomainId.equals(domain.getId());
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
if (!dataProductsToDelete.isEmpty()) {
|
||||||
|
daoCollection
|
||||||
|
.relationshipDAO()
|
||||||
|
.bulkRemoveFromRelationship(
|
||||||
|
dataProductsToDelete.stream()
|
||||||
|
.map(EntityReference::getId)
|
||||||
|
.collect(Collectors.toList()),
|
||||||
|
ref.getId(),
|
||||||
|
DATA_PRODUCT,
|
||||||
|
ref.getType(),
|
||||||
|
relationship.ordinal());
|
||||||
|
LineageUtil.removeDataProductsLineage(ref.getId(), ref.getType(), dataProductsToDelete);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void restorePatchAttributes(DataProduct original, DataProduct updated) {
|
public void restorePatchAttributes(DataProduct original, DataProduct updated) {
|
||||||
super.restorePatchAttributes(original, updated);
|
super.restorePatchAttributes(original, updated);
|
||||||
|
@ -15,8 +15,10 @@ package org.openmetadata.service.jdbi3;
|
|||||||
|
|
||||||
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
|
||||||
import static org.openmetadata.schema.type.Include.ALL;
|
import static org.openmetadata.schema.type.Include.ALL;
|
||||||
|
import static org.openmetadata.service.Entity.DATA_PRODUCT;
|
||||||
import static org.openmetadata.service.Entity.DOMAIN;
|
import static org.openmetadata.service.Entity.DOMAIN;
|
||||||
import static org.openmetadata.service.Entity.FIELD_ASSETS;
|
import static org.openmetadata.service.Entity.FIELD_ASSETS;
|
||||||
|
import static org.openmetadata.service.Entity.getEntityReferenceById;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -29,11 +31,14 @@ import org.openmetadata.schema.EntityInterface;
|
|||||||
import org.openmetadata.schema.entity.data.EntityHierarchy;
|
import org.openmetadata.schema.entity.data.EntityHierarchy;
|
||||||
import org.openmetadata.schema.entity.domains.Domain;
|
import org.openmetadata.schema.entity.domains.Domain;
|
||||||
import org.openmetadata.schema.type.ApiStatus;
|
import org.openmetadata.schema.type.ApiStatus;
|
||||||
|
import org.openmetadata.schema.type.ChangeDescription;
|
||||||
|
import org.openmetadata.schema.type.ChangeEvent;
|
||||||
import org.openmetadata.schema.type.EntityReference;
|
import org.openmetadata.schema.type.EntityReference;
|
||||||
import org.openmetadata.schema.type.Include;
|
import org.openmetadata.schema.type.Include;
|
||||||
import org.openmetadata.schema.type.Relationship;
|
import org.openmetadata.schema.type.Relationship;
|
||||||
import org.openmetadata.schema.type.api.BulkAssets;
|
import org.openmetadata.schema.type.api.BulkAssets;
|
||||||
import org.openmetadata.schema.type.api.BulkOperationResult;
|
import org.openmetadata.schema.type.api.BulkOperationResult;
|
||||||
|
import org.openmetadata.schema.type.api.BulkResponse;
|
||||||
import org.openmetadata.schema.type.change.ChangeSource;
|
import org.openmetadata.schema.type.change.ChangeSource;
|
||||||
import org.openmetadata.service.Entity;
|
import org.openmetadata.service.Entity;
|
||||||
import org.openmetadata.service.resources.domains.DomainResource;
|
import org.openmetadata.service.resources.domains.DomainResource;
|
||||||
@ -111,29 +116,116 @@ public class DomainRepository extends EntityRepository<Domain> {
|
|||||||
|
|
||||||
public BulkOperationResult bulkAddAssets(String domainName, BulkAssets request) {
|
public BulkOperationResult bulkAddAssets(String domainName, BulkAssets request) {
|
||||||
Domain domain = getByName(null, domainName, getFields("id"));
|
Domain domain = getByName(null, domainName, getFields("id"));
|
||||||
BulkOperationResult result =
|
return bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, true);
|
||||||
bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, true);
|
|
||||||
// Add assets to domain
|
|
||||||
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
|
|
||||||
for (EntityReference ref : listOrEmpty(request.getAssets())) {
|
|
||||||
LineageUtil.addDomainLineage(ref.getId(), ref.getType(), domain.getEntityReference());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public BulkOperationResult bulkRemoveAssets(String domainName, BulkAssets request) {
|
public BulkOperationResult bulkRemoveAssets(String domainName, BulkAssets request) {
|
||||||
Domain domain = getByName(null, domainName, getFields("id"));
|
Domain domain = getByName(null, domainName, getFields("id"));
|
||||||
|
return bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transaction
|
||||||
|
@Override
|
||||||
|
protected BulkOperationResult bulkAssetsOperation(
|
||||||
|
UUID entityId,
|
||||||
|
String fromEntity,
|
||||||
|
Relationship relationship,
|
||||||
|
BulkAssets request,
|
||||||
|
boolean isAdd) {
|
||||||
BulkOperationResult result =
|
BulkOperationResult result =
|
||||||
bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, false);
|
new BulkOperationResult().withStatus(ApiStatus.SUCCESS).withDryRun(false);
|
||||||
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
|
List<BulkResponse> success = new ArrayList<>();
|
||||||
for (EntityReference ref : listOrEmpty(request.getAssets())) {
|
|
||||||
LineageUtil.removeDomainLineage(ref.getId(), ref.getType(), domain.getEntityReference());
|
EntityUtil.populateEntityReferences(request.getAssets());
|
||||||
|
|
||||||
|
for (EntityReference ref : request.getAssets()) {
|
||||||
|
result.setNumberOfRowsProcessed(result.getNumberOfRowsProcessed() + 1);
|
||||||
|
|
||||||
|
cleanupOldDomain(ref, fromEntity, relationship);
|
||||||
|
cleanupDataProducts(entityId, ref, relationship, isAdd);
|
||||||
|
|
||||||
|
if (isAdd) {
|
||||||
|
addRelationship(entityId, ref.getId(), fromEntity, ref.getType(), relationship);
|
||||||
|
EntityReference domainRef = getEntityReferenceById(DOMAIN, entityId, ALL);
|
||||||
|
LineageUtil.addDomainLineage(entityId, ref.getType(), domainRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
success.add(new BulkResponse().withRequest(ref));
|
||||||
|
result.setNumberOfRowsPassed(result.getNumberOfRowsPassed() + 1);
|
||||||
|
|
||||||
|
searchRepository.updateEntity(ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
result.withSuccessRequest(success);
|
||||||
|
|
||||||
|
// Create a Change Event on successful addition/removal of assets
|
||||||
|
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
|
||||||
|
EntityInterface entityInterface = Entity.getEntity(fromEntity, entityId, "id", ALL);
|
||||||
|
ChangeDescription change =
|
||||||
|
addBulkAddRemoveChangeDescription(
|
||||||
|
entityInterface.getVersion(), isAdd, request.getAssets(), null);
|
||||||
|
ChangeEvent changeEvent =
|
||||||
|
getChangeEvent(entityInterface, change, fromEntity, entityInterface.getVersion());
|
||||||
|
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
|
||||||
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanupOldDomain(EntityReference ref, String fromEntity, Relationship relationship) {
|
||||||
|
EntityReference oldDomain =
|
||||||
|
getFromEntityRef(ref.getId(), ref.getType(), relationship, DOMAIN, false);
|
||||||
|
deleteTo(ref.getId(), ref.getType(), relationship, fromEntity);
|
||||||
|
LineageUtil.removeDomainLineage(ref.getId(), ref.getType(), oldDomain);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanupDataProducts(
|
||||||
|
UUID entityId, EntityReference ref, Relationship relationship, boolean isAdd) {
|
||||||
|
List<EntityReference> dataProducts = getDataProducts(ref.getId(), ref.getType());
|
||||||
|
if (dataProducts.isEmpty()) return;
|
||||||
|
|
||||||
|
// Map dataProduct -> domain
|
||||||
|
Map<UUID, UUID> associatedDomains =
|
||||||
|
daoCollection
|
||||||
|
.relationshipDAO()
|
||||||
|
.findFromBatch(
|
||||||
|
dataProducts.stream().map(dp -> dp.getId().toString()).collect(Collectors.toList()),
|
||||||
|
relationship.ordinal(),
|
||||||
|
DOMAIN)
|
||||||
|
.stream()
|
||||||
|
.collect(
|
||||||
|
Collectors.toMap(
|
||||||
|
rec -> UUID.fromString(rec.getToId()),
|
||||||
|
rec -> UUID.fromString(rec.getFromId())));
|
||||||
|
|
||||||
|
// For isAdd, filter only those data products linked to a different domain.
|
||||||
|
// For isRemove, delete all data products.
|
||||||
|
List<EntityReference> dataProductsToDelete =
|
||||||
|
isAdd
|
||||||
|
? dataProducts.stream()
|
||||||
|
.filter(
|
||||||
|
dp -> {
|
||||||
|
UUID domainId = associatedDomains.get(dp.getId());
|
||||||
|
return domainId != null && !domainId.equals(entityId);
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
: dataProducts;
|
||||||
|
|
||||||
|
if (!dataProductsToDelete.isEmpty()) {
|
||||||
|
daoCollection
|
||||||
|
.relationshipDAO()
|
||||||
|
.bulkRemoveFromRelationship(
|
||||||
|
dataProductsToDelete.stream()
|
||||||
|
.map(EntityReference::getId)
|
||||||
|
.collect(Collectors.toList()),
|
||||||
|
ref.getId(),
|
||||||
|
DATA_PRODUCT,
|
||||||
|
ref.getType(),
|
||||||
|
relationship.ordinal());
|
||||||
|
LineageUtil.removeDataProductsLineage(ref.getId(), ref.getType(), dataProductsToDelete);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public EntityRepository<Domain>.EntityUpdater getUpdater(
|
public EntityRepository<Domain>.EntityUpdater getUpdater(
|
||||||
Domain original, Domain updated, Operation operation, ChangeSource changeSource) {
|
Domain original, Domain updated, Operation operation, ChangeSource changeSource) {
|
||||||
|
@ -49,6 +49,7 @@ import static org.openmetadata.service.Entity.TEAM;
|
|||||||
import static org.openmetadata.service.Entity.USER;
|
import static org.openmetadata.service.Entity.USER;
|
||||||
import static org.openmetadata.service.Entity.getEntityByName;
|
import static org.openmetadata.service.Entity.getEntityByName;
|
||||||
import static org.openmetadata.service.Entity.getEntityFields;
|
import static org.openmetadata.service.Entity.getEntityFields;
|
||||||
|
import static org.openmetadata.service.Entity.getEntityReferenceById;
|
||||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.csvNotSupported;
|
import static org.openmetadata.service.exception.CatalogExceptionMessage.csvNotSupported;
|
||||||
import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound;
|
import static org.openmetadata.service.exception.CatalogExceptionMessage.entityNotFound;
|
||||||
import static org.openmetadata.service.resources.tags.TagLabelUtil.addDerivedTags;
|
import static org.openmetadata.service.resources.tags.TagLabelUtil.addDerivedTags;
|
||||||
@ -56,6 +57,7 @@ import static org.openmetadata.service.resources.tags.TagLabelUtil.checkDisabled
|
|||||||
import static org.openmetadata.service.resources.tags.TagLabelUtil.checkMutuallyExclusive;
|
import static org.openmetadata.service.resources.tags.TagLabelUtil.checkMutuallyExclusive;
|
||||||
import static org.openmetadata.service.resources.tags.TagLabelUtil.populateTagLabel;
|
import static org.openmetadata.service.resources.tags.TagLabelUtil.populateTagLabel;
|
||||||
import static org.openmetadata.service.util.EntityUtil.compareTagLabel;
|
import static org.openmetadata.service.util.EntityUtil.compareTagLabel;
|
||||||
|
import static org.openmetadata.service.util.EntityUtil.entityReferenceListMatch;
|
||||||
import static org.openmetadata.service.util.EntityUtil.entityReferenceMatch;
|
import static org.openmetadata.service.util.EntityUtil.entityReferenceMatch;
|
||||||
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
|
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
|
||||||
import static org.openmetadata.service.util.EntityUtil.fieldDeleted;
|
import static org.openmetadata.service.util.EntityUtil.fieldDeleted;
|
||||||
@ -1577,7 +1579,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
T entity = find(entityId, NON_DELETED);
|
T entity = find(entityId, NON_DELETED);
|
||||||
|
|
||||||
// Validate follower
|
// Validate follower
|
||||||
EntityReference user = Entity.getEntityReferenceById(USER, userId, NON_DELETED);
|
EntityReference user = getEntityReferenceById(USER, userId, NON_DELETED);
|
||||||
|
|
||||||
// Remove follower
|
// Remove follower
|
||||||
deleteRelationship(userId, USER, entityId, entityType, Relationship.FOLLOWS);
|
deleteRelationship(userId, USER, entityId, entityType, Relationship.FOLLOWS);
|
||||||
@ -2128,27 +2130,41 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
return RestUtil.getHref(uriInfo, collectionPath, id);
|
return RestUtil.getHref(uriInfo, collectionPath, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeDomainDataProducts(EntityReference originalDomain, T entity) {
|
private void removeCrossDomainDataProducts(EntityReference domain, T entity) {
|
||||||
List<UUID> dataProductIds =
|
if (!supportsDataProducts) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<EntityReference> entityDataProducts = entity.getDataProducts();
|
||||||
|
if (entityDataProducts == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (domain == null) {
|
||||||
|
entityDataProducts.clear();
|
||||||
|
LOG.info(
|
||||||
|
"Removed all data products from entity {} as no domain is provided",
|
||||||
|
entity.getEntityReference().getType());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch domain data products
|
||||||
|
List<UUID> domainDataProductIds =
|
||||||
daoCollection
|
daoCollection
|
||||||
.relationshipDAO()
|
.relationshipDAO()
|
||||||
.findToIds(
|
.findToIds(domain.getId(), DOMAIN, Relationship.HAS.ordinal(), Entity.DATA_PRODUCT);
|
||||||
originalDomain.getId(), DOMAIN, Relationship.HAS.ordinal(), Entity.DATA_PRODUCT);
|
|
||||||
|
|
||||||
List<EntityReference> updatedDataProducts = entity.getDataProducts();
|
entityDataProducts.removeIf(
|
||||||
if (updatedDataProducts != null) {
|
dataProduct -> {
|
||||||
updatedDataProducts.removeIf(
|
boolean isNotDomainDataProduct = !domainDataProductIds.contains(dataProduct.getId());
|
||||||
dataProduct -> {
|
if (isNotDomainDataProduct) {
|
||||||
boolean isDomainDataProduct = dataProductIds.contains(dataProduct.getId());
|
LOG.info(
|
||||||
if (isDomainDataProduct) {
|
"Removing data product {} from entity {}",
|
||||||
LOG.info(
|
dataProduct.getFullyQualifiedName(),
|
||||||
"Removing data product {} from entity {}",
|
entity.getEntityReference().getType());
|
||||||
dataProduct.getFullyQualifiedName(),
|
}
|
||||||
entity.getEntityReference().getType());
|
return isNotDomainDataProduct;
|
||||||
}
|
});
|
||||||
return isDomainDataProduct;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Transaction
|
@Transaction
|
||||||
@ -2290,6 +2306,21 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
return getFromEntityRef(toId, Relationship.CONTAINS, fromEntityType, true);
|
return getFromEntityRef(toId, Relationship.CONTAINS, fromEntityType, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public final EntityReference getFromEntityRef(
|
||||||
|
UUID toId,
|
||||||
|
String toEntity,
|
||||||
|
Relationship relationship,
|
||||||
|
String fromEntityType,
|
||||||
|
boolean mustHaveRelationship) {
|
||||||
|
List<EntityRelationshipRecord> records =
|
||||||
|
findFromRecords(toId, toEntity, relationship, fromEntityType);
|
||||||
|
ensureSingleRelationship(
|
||||||
|
toEntity, toId, records, relationship.value(), fromEntityType, mustHaveRelationship);
|
||||||
|
return !records.isEmpty()
|
||||||
|
? Entity.getEntityReferenceById(records.get(0).getType(), records.get(0).getId(), ALL)
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
public final EntityReference getFromEntityRef(
|
public final EntityReference getFromEntityRef(
|
||||||
UUID toId, Relationship relationship, String fromEntityType, boolean mustHaveRelationship) {
|
UUID toId, Relationship relationship, String fromEntityType, boolean mustHaveRelationship) {
|
||||||
List<EntityRelationshipRecord> records =
|
List<EntityRelationshipRecord> records =
|
||||||
@ -2319,7 +2350,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
ensureSingleRelationship(
|
ensureSingleRelationship(
|
||||||
entityType, fromId, records, relationship.value(), toEntityType, mustHaveRelationship);
|
entityType, fromId, records, relationship.value(), toEntityType, mustHaveRelationship);
|
||||||
return !records.isEmpty()
|
return !records.isEmpty()
|
||||||
? Entity.getEntityReferenceById(records.get(0).getType(), records.get(0).getId(), ALL)
|
? getEntityReferenceById(records.get(0).getType(), records.get(0).getId(), ALL)
|
||||||
: null;
|
: null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2401,7 +2432,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
for (EntityReference entityReference : entityReferences) {
|
for (EntityReference entityReference : entityReferences) {
|
||||||
EntityReference ref =
|
EntityReference ref =
|
||||||
entityReference.getId() != null
|
entityReference.getId() != null
|
||||||
? Entity.getEntityReferenceById(USER, entityReference.getId(), ALL)
|
? getEntityReferenceById(USER, entityReference.getId(), ALL)
|
||||||
: Entity.getEntityReferenceByName(
|
: Entity.getEntityReferenceByName(
|
||||||
USER, entityReference.getFullyQualifiedName(), ALL);
|
USER, entityReference.getFullyQualifiedName(), ALL);
|
||||||
EntityUtil.copy(ref, entityReference);
|
EntityUtil.copy(ref, entityReference);
|
||||||
@ -2426,7 +2457,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
} else {
|
} else {
|
||||||
EntityReference ref =
|
EntityReference ref =
|
||||||
entityReferences.get(0).getId() != null
|
entityReferences.get(0).getId() != null
|
||||||
? Entity.getEntityReferenceById(TEAM, entityReferences.get(0).getId(), ALL)
|
? getEntityReferenceById(TEAM, entityReferences.get(0).getId(), ALL)
|
||||||
: Entity.getEntityReferenceByName(
|
: Entity.getEntityReferenceByName(
|
||||||
TEAM, entityReferences.get(0).getFullyQualifiedName(), ALL);
|
TEAM, entityReferences.get(0).getFullyQualifiedName(), ALL);
|
||||||
EntityUtil.copy(ref, entityReferences.get(0));
|
EntityUtil.copy(ref, entityReferences.get(0));
|
||||||
@ -2435,7 +2466,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
for (EntityReference entityReference : entityReferences) {
|
for (EntityReference entityReference : entityReferences) {
|
||||||
EntityReference ref =
|
EntityReference ref =
|
||||||
entityReference.getId() != null
|
entityReference.getId() != null
|
||||||
? Entity.getEntityReferenceById(USER, entityReference.getId(), ALL)
|
? getEntityReferenceById(USER, entityReference.getId(), ALL)
|
||||||
: Entity.getEntityReferenceByName(
|
: Entity.getEntityReferenceByName(
|
||||||
USER, entityReference.getFullyQualifiedName(), ALL);
|
USER, entityReference.getFullyQualifiedName(), ALL);
|
||||||
EntityUtil.copy(ref, entityReference);
|
EntityUtil.copy(ref, entityReference);
|
||||||
@ -2451,8 +2482,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
public final void validateRoles(List<EntityReference> roles) {
|
public final void validateRoles(List<EntityReference> roles) {
|
||||||
if (roles != null) {
|
if (roles != null) {
|
||||||
for (EntityReference entityReference : roles) {
|
for (EntityReference entityReference : roles) {
|
||||||
EntityReference ref =
|
EntityReference ref = getEntityReferenceById(Entity.ROLE, entityReference.getId(), ALL);
|
||||||
Entity.getEntityReferenceById(Entity.ROLE, entityReference.getId(), ALL);
|
|
||||||
EntityUtil.copy(ref, entityReference);
|
EntityUtil.copy(ref, entityReference);
|
||||||
}
|
}
|
||||||
roles.sort(EntityUtil.compareEntityReference);
|
roles.sort(EntityUtil.compareEntityReference);
|
||||||
@ -2462,8 +2492,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
final void validatePolicies(List<EntityReference> policies) {
|
final void validatePolicies(List<EntityReference> policies) {
|
||||||
if (policies != null) {
|
if (policies != null) {
|
||||||
for (EntityReference entityReference : policies) {
|
for (EntityReference entityReference : policies) {
|
||||||
EntityReference ref =
|
EntityReference ref = getEntityReferenceById(Entity.POLICY, entityReference.getId(), ALL);
|
||||||
Entity.getEntityReferenceById(Entity.POLICY, entityReference.getId(), ALL);
|
|
||||||
EntityUtil.copy(ref, entityReference);
|
EntityUtil.copy(ref, entityReference);
|
||||||
}
|
}
|
||||||
policies.sort(EntityUtil.compareEntityReference);
|
policies.sort(EntityUtil.compareEntityReference);
|
||||||
@ -2620,6 +2649,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Transaction
|
||||||
protected BulkOperationResult bulkAssetsOperation(
|
protected BulkOperationResult bulkAssetsOperation(
|
||||||
UUID entityId,
|
UUID entityId,
|
||||||
String fromEntity,
|
String fromEntity,
|
||||||
@ -2665,7 +2695,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ChangeDescription addBulkAddRemoveChangeDescription(
|
protected ChangeDescription addBulkAddRemoveChangeDescription(
|
||||||
Double version, boolean isAdd, Object newValue, Object oldValue) {
|
Double version, boolean isAdd, Object newValue, Object oldValue) {
|
||||||
FieldChange fieldChange =
|
FieldChange fieldChange =
|
||||||
new FieldChange().withName("assets").withNewValue(newValue).withOldValue(oldValue);
|
new FieldChange().withName("assets").withNewValue(newValue).withOldValue(oldValue);
|
||||||
@ -2678,7 +2708,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
return change;
|
return change;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ChangeEvent getChangeEvent(
|
protected ChangeEvent getChangeEvent(
|
||||||
EntityInterface updated, ChangeDescription change, String entityType, Double prevVersion) {
|
EntityInterface updated, ChangeDescription change, String entityType, Double prevVersion) {
|
||||||
return new ChangeEvent()
|
return new ChangeEvent()
|
||||||
.withId(UUID.randomUUID())
|
.withId(UUID.randomUUID())
|
||||||
@ -2827,7 +2857,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
CatalogExceptionMessage.invalidOwnerType(owner.getType()));
|
CatalogExceptionMessage.invalidOwnerType(owner.getType()));
|
||||||
}
|
}
|
||||||
return Entity.getEntityReferenceById(owner.getType(), owner.getId(), ALL);
|
return getEntityReferenceById(owner.getType(), owner.getId(), ALL);
|
||||||
})
|
})
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
@ -2859,7 +2889,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
if (!supportsDomain) {
|
if (!supportsDomain) {
|
||||||
throw new IllegalArgumentException(CatalogExceptionMessage.invalidField(FIELD_DOMAIN));
|
throw new IllegalArgumentException(CatalogExceptionMessage.invalidField(FIELD_DOMAIN));
|
||||||
}
|
}
|
||||||
Entity.getEntityReferenceById(DOMAIN, domain.getId(), NON_DELETED);
|
getEntityReferenceById(DOMAIN, domain.getId(), NON_DELETED);
|
||||||
}
|
}
|
||||||
|
|
||||||
public final void validateDataProducts(List<EntityReference> dataProducts) {
|
public final void validateDataProducts(List<EntityReference> dataProducts) {
|
||||||
@ -2869,7 +2899,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
|
|
||||||
if (!nullOrEmpty(dataProducts)) {
|
if (!nullOrEmpty(dataProducts)) {
|
||||||
for (EntityReference dataProduct : dataProducts) {
|
for (EntityReference dataProduct : dataProducts) {
|
||||||
Entity.getEntityReferenceById(DATA_PRODUCT, dataProduct.getId(), NON_DELETED);
|
getEntityReferenceById(DATA_PRODUCT, dataProduct.getId(), NON_DELETED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3462,8 +3492,6 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
removeDomainLineage(updated.getId(), entityType, origDomain);
|
removeDomainLineage(updated.getId(), entityType, origDomain);
|
||||||
deleteRelationship(
|
deleteRelationship(
|
||||||
origDomain.getId(), DOMAIN, original.getId(), entityType, Relationship.HAS);
|
origDomain.getId(), DOMAIN, original.getId(), entityType, Relationship.HAS);
|
||||||
// Clean up data products associated with the old domain on domain update
|
|
||||||
removeDomainDataProducts(original.getDomain(), updated);
|
|
||||||
}
|
}
|
||||||
if (updatedDomain != null) {
|
if (updatedDomain != null) {
|
||||||
validateDomain(updatedDomain);
|
validateDomain(updatedDomain);
|
||||||
@ -3477,6 +3505,8 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
addDomainLineage(updated.getId(), entityType, updatedDomain);
|
addDomainLineage(updated.getId(), entityType, updatedDomain);
|
||||||
}
|
}
|
||||||
updated.setDomain(updatedDomain);
|
updated.setDomain(updatedDomain);
|
||||||
|
// Clean up data products associated not associated with the updated domain
|
||||||
|
removeCrossDomainDataProducts(updated.getDomain(), updated);
|
||||||
} else {
|
} else {
|
||||||
updated.setDomain(original.getDomain());
|
updated.setDomain(original.getDomain());
|
||||||
}
|
}
|
||||||
@ -3489,6 +3519,22 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
List<EntityReference> origDataProducts = listOrEmpty(original.getDataProducts());
|
List<EntityReference> origDataProducts = listOrEmpty(original.getDataProducts());
|
||||||
List<EntityReference> updatedDataProducts = listOrEmpty(updated.getDataProducts());
|
List<EntityReference> updatedDataProducts = listOrEmpty(updated.getDataProducts());
|
||||||
validateDataProducts(updatedDataProducts);
|
validateDataProducts(updatedDataProducts);
|
||||||
|
if (updated.getDomain() == null && !nullOrEmpty(updatedDataProducts)) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Domain cannot be empty when data products are provided.");
|
||||||
|
}
|
||||||
|
// Clean up data products associated with the old domain
|
||||||
|
if (original.getDomain() != null
|
||||||
|
&& Objects.equals(original.getDomain(), updated.getDomain())
|
||||||
|
&& recordChange(
|
||||||
|
FIELD_DATA_PRODUCTS,
|
||||||
|
origDataProducts,
|
||||||
|
updatedDataProducts,
|
||||||
|
true,
|
||||||
|
entityReferenceListMatch)) {
|
||||||
|
removeCrossDomainDataProducts(updated.getDomain(), updated);
|
||||||
|
updatedDataProducts = listOrEmpty(updated.getDataProducts());
|
||||||
|
}
|
||||||
updateFromRelationships(
|
updateFromRelationships(
|
||||||
FIELD_DATA_PRODUCTS,
|
FIELD_DATA_PRODUCTS,
|
||||||
DATA_PRODUCT,
|
DATA_PRODUCT,
|
||||||
@ -4307,7 +4353,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
for (CollectionDAO.EntityRelationshipObject rec : records) {
|
for (CollectionDAO.EntityRelationshipObject rec : records) {
|
||||||
UUID toId = UUID.fromString(rec.getToId());
|
UUID toId = UUID.fromString(rec.getToId());
|
||||||
EntityReference ownerRef =
|
EntityReference ownerRef =
|
||||||
Entity.getEntityReferenceById(rec.getFromEntity(), UUID.fromString(rec.getFromId()), ALL);
|
getEntityReferenceById(rec.getFromEntity(), UUID.fromString(rec.getFromId()), ALL);
|
||||||
ownersMap.computeIfAbsent(toId, k -> new ArrayList<>()).add(ownerRef);
|
ownersMap.computeIfAbsent(toId, k -> new ArrayList<>()).add(ownerRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4349,7 +4395,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
UUID fromId = UUID.fromString(rec.getFromId());
|
UUID fromId = UUID.fromString(rec.getFromId());
|
||||||
String fromEntity = rec.getFromEntity();
|
String fromEntity = rec.getFromEntity();
|
||||||
|
|
||||||
EntityReference domainRef = Entity.getEntityReferenceById(fromEntity, fromId, ALL);
|
EntityReference domainRef = getEntityReferenceById(fromEntity, fromId, ALL);
|
||||||
|
|
||||||
// Since each entity can have only one domain, we can directly put it in the map
|
// Since each entity can have only one domain, we can directly put it in the map
|
||||||
if (domainsMap.containsKey(toId)) {
|
if (domainsMap.containsKey(toId)) {
|
||||||
@ -4376,7 +4422,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||||||
for (CollectionDAO.EntityRelationshipObject rec : records) {
|
for (CollectionDAO.EntityRelationshipObject rec : records) {
|
||||||
UUID entityId = UUID.fromString(rec.getToId());
|
UUID entityId = UUID.fromString(rec.getToId());
|
||||||
EntityReference reviewerRef =
|
EntityReference reviewerRef =
|
||||||
Entity.getEntityReferenceById(rec.getFromEntity(), UUID.fromString(rec.getFromId()), ALL);
|
getEntityReferenceById(rec.getFromEntity(), UUID.fromString(rec.getFromId()), ALL);
|
||||||
reviewersMap.computeIfAbsent(entityId, k -> new ArrayList<>()).add(reviewerRef);
|
reviewersMap.computeIfAbsent(entityId, k -> new ArrayList<>()).add(reviewerRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,7 +188,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
|
|||||||
return Entity.getEntity(entity.getService(), fields, Include.ALL);
|
return Entity.getEntity(entity.getService(), fields, Include.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ChangeEvent getChangeEvent(
|
protected ChangeEvent getChangeEvent(
|
||||||
EntityInterface updated, ChangeDescription change, String entityType, Double prevVersion) {
|
EntityInterface updated, ChangeDescription change, String entityType, Double prevVersion) {
|
||||||
return new ChangeEvent()
|
return new ChangeEvent()
|
||||||
.withId(UUID.randomUUID())
|
.withId(UUID.randomUUID())
|
||||||
|
@ -103,7 +103,24 @@ public final class EntityUtil {
|
|||||||
|
|
||||||
public static final BiPredicate<EntityReference, EntityReference> entityReferenceMatch =
|
public static final BiPredicate<EntityReference, EntityReference> entityReferenceMatch =
|
||||||
(ref1, ref2) -> ref1.getId().equals(ref2.getId()) && ref1.getType().equals(ref2.getType());
|
(ref1, ref2) -> ref1.getId().equals(ref2.getId()) && ref1.getType().equals(ref2.getType());
|
||||||
|
public static final BiPredicate<List<EntityReference>, List<EntityReference>>
|
||||||
|
entityReferenceListMatch =
|
||||||
|
(list1, list2) -> {
|
||||||
|
if (list1 == null || list2 == null) {
|
||||||
|
return list1 == list2;
|
||||||
|
}
|
||||||
|
if (list1.size() != list2.size()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < list1.size(); i++) {
|
||||||
|
EntityReference ref1 = list1.get(i);
|
||||||
|
EntityReference ref2 = list2.get(i);
|
||||||
|
if (ref1 == null || ref2 == null || !entityReferenceMatch.test(ref1, ref2)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
};
|
||||||
public static final BiPredicate<TagLabel, TagLabel> tagLabelMatch =
|
public static final BiPredicate<TagLabel, TagLabel> tagLabelMatch =
|
||||||
(tag1, tag2) ->
|
(tag1, tag2) ->
|
||||||
tag1.getTagFQN().equals(tag2.getTagFQN()) && tag1.getSource().equals(tag2.getSource());
|
tag1.getTagFQN().equals(tag2.getTagFQN()) && tag1.getSource().equals(tag2.getSource());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user