diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 1fe08898cbe..55c3cceac84 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -1003,6 +1003,41 @@ public interface CollectionDAO { @Bind("fromEntity") String fromEntity, @BindList("relation") List relation); + @SqlQuery( + "SELECT * FROM entity_relationship er1 JOIN entity_relationship er2 ON er1.toId = er2.toId WHERE er1.relation = 10 AND er1.fromEntity = 'domain' AND er2.fromId = :fromId AND er2.fromEntity = :fromEntity AND er2.relation = 13") + @RegisterRowMapper(RelationshipObjectMapper.class) + List findDownstreamDomains( + @BindUUID("fromId") UUID fromId, @Bind("fromEntity") String fromEntity); + + @SqlQuery( + "SELECT * FROM entity_relationship er1 JOIN entity_relationship er2 ON er1.toId = er2.fromId WHERE er1.relation = 10 AND er1.fromEntity = 'domain' AND er2.toId = :toId AND er2.toEntity = :toEntity AND er2.relation = 13") + @RegisterRowMapper(RelationshipObjectMapper.class) + List findUpstreamDomains( + @BindUUID("toId") UUID toId, @Bind("toEntity") String toEntity); + + @SqlQuery( + "select count(*) from entity_relationship where fromId in (select toId from entity_relationship where fromId = :fromDomainId and fromEntity = 'domain' and relation = 10) AND toId in (select toId from entity_relationship where fromId = :toDomainId and fromEntity = 'domain' and relation = 10) and relation = 13") + Integer countDomainChildAssets( + @BindUUID("fromDomainId") UUID fromDomainId, @BindUUID("toDomainId") UUID toId); + + @SqlQuery( + "SELECT * FROM entity_relationship er1 JOIN entity_relationship er2 ON er1.toId = er2.toId WHERE er1.relation = 10 AND er1.fromEntity = 'dataProduct' AND er2.fromId = :fromId AND er2.fromEntity = :fromEntity AND er2.relation = 13") + @RegisterRowMapper(RelationshipObjectMapper.class) + List findDownstreamDataProducts( + @BindUUID("fromId") UUID fromId, @Bind("fromEntity") String fromEntity); + + @SqlQuery( + "SELECT * FROM entity_relationship er1 JOIN entity_relationship er2 ON er1.toId = er2.fromId WHERE er1.relation = 10 AND er1.fromEntity = 'dataProduct' AND er2.toId = :toId AND er2.toEntity = :toEntity AND er2.relation = 13") + @RegisterRowMapper(RelationshipObjectMapper.class) + List findUpstreamDataProducts( + @BindUUID("toId") UUID toId, @Bind("toEntity") String toEntity); + + @SqlQuery( + "select count(*) from entity_relationship where fromId in (select toId from entity_relationship where fromId = :fromDataProductId and fromEntity = 'dataProduct' and relation = 10) AND toId in (select toId from entity_relationship where fromId = :toDataProductId and fromEntity = 'dataProduct' and relation = 10) and relation = 13") + Integer countDataProductsChildAssets( + @BindUUID("fromDataProductId") UUID fromDataProductId, + @BindUUID("toDataProductId") UUID toDataProductId); + default List findTo(UUID fromId, String fromEntity, int relation) { return findTo(fromId, fromEntity, List.of(relation)); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java index d3a6cbb4cc9..cca08f6dac0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java @@ -29,6 +29,7 @@ import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.domains.DataProduct; import org.openmetadata.schema.entity.domains.Domain; +import org.openmetadata.schema.type.ApiStatus; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Relationship; @@ -39,6 +40,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.resources.domains.DataProductResource; import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; +import org.openmetadata.service.util.LineageUtil; @Slf4j public class DataProductRepository extends EntityRepository { @@ -121,12 +123,28 @@ public class DataProductRepository extends EntityRepository { public BulkOperationResult bulkAddAssets(String domainName, BulkAssets request) { DataProduct dataProduct = getByName(null, domainName, getFields("id")); - return bulkAssetsOperation(dataProduct.getId(), DATA_PRODUCT, Relationship.HAS, request, true); + BulkOperationResult result = + bulkAssetsOperation(dataProduct.getId(), DATA_PRODUCT, Relationship.HAS, request, true); + if (result.getStatus().equals(ApiStatus.SUCCESS)) { + for (EntityReference ref : listOrEmpty(request.getAssets())) { + LineageUtil.addDataProductsLineage( + ref.getId(), ref.getType(), List.of(dataProduct.getEntityReference())); + } + } + return result; } public BulkOperationResult bulkRemoveAssets(String domainName, BulkAssets request) { DataProduct dataProduct = getByName(null, domainName, getFields("id")); - return bulkAssetsOperation(dataProduct.getId(), DATA_PRODUCT, Relationship.HAS, request, false); + BulkOperationResult result = + bulkAssetsOperation(dataProduct.getId(), DATA_PRODUCT, Relationship.HAS, request, false); + if (result.getStatus().equals(ApiStatus.SUCCESS)) { + for (EntityReference ref : listOrEmpty(request.getAssets())) { + LineageUtil.removeDataProductsLineage( + ref.getId(), ref.getType(), List.of(dataProduct.getEntityReference())); + } + } + return result; } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DomainRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DomainRepository.java index 4114d1503b4..8e308918890 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DomainRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DomainRepository.java @@ -28,6 +28,7 @@ import org.jdbi.v3.sqlobject.transaction.Transaction; import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.entity.data.EntityHierarchy; import org.openmetadata.schema.entity.domains.Domain; +import org.openmetadata.schema.type.ApiStatus; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Relationship; @@ -40,6 +41,7 @@ import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.EntityUtil.Fields; import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.JsonUtils; +import org.openmetadata.service.util.LineageUtil; import org.openmetadata.service.util.ResultList; @Slf4j @@ -109,12 +111,27 @@ public class DomainRepository extends EntityRepository { public BulkOperationResult bulkAddAssets(String domainName, BulkAssets request) { Domain domain = getByName(null, domainName, getFields("id")); - return bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, true); + BulkOperationResult result = + bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, true); + // Add assets to domain + if (result.getStatus().equals(ApiStatus.SUCCESS)) { + for (EntityReference ref : listOrEmpty(request.getAssets())) { + LineageUtil.addDomainLineage(ref.getId(), ref.getType(), domain.getEntityReference()); + } + } + return result; } public BulkOperationResult bulkRemoveAssets(String domainName, BulkAssets request) { Domain domain = getByName(null, domainName, getFields("id")); - return bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, false); + BulkOperationResult result = + bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, false); + if (result.getStatus().equals(ApiStatus.SUCCESS)) { + for (EntityReference ref : listOrEmpty(request.getAssets())) { + LineageUtil.removeDomainLineage(ref.getId(), ref.getType(), domain.getEntityReference()); + } + } + return result; } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 1a21b192051..d882b14fa32 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -68,6 +68,10 @@ import static org.openmetadata.service.util.EntityUtil.nextMajorVersion; import static org.openmetadata.service.util.EntityUtil.nextVersion; import static org.openmetadata.service.util.EntityUtil.objectMatch; import static org.openmetadata.service.util.EntityUtil.tagLabelMatch; +import static org.openmetadata.service.util.LineageUtil.addDataProductsLineage; +import static org.openmetadata.service.util.LineageUtil.addDomainLineage; +import static org.openmetadata.service.util.LineageUtil.removeDataProductsLineage; +import static org.openmetadata.service.util.LineageUtil.removeDomainLineage; import static org.openmetadata.service.util.jdbi.JdbiUtils.getAfterOffset; import static org.openmetadata.service.util.jdbi.JdbiUtils.getBeforeOffset; import static org.openmetadata.service.util.jdbi.JdbiUtils.getOffset; @@ -2429,6 +2433,10 @@ public abstract class EntityRepository { : findFrom(entity.getId(), entityType, Relationship.HAS, DATA_PRODUCT); } + protected List getDataProducts(UUID entityId, String entityType) { + return findFrom(entityId, entityType, Relationship.HAS, DATA_PRODUCT); + } + public EntityInterface getParentEntity(T entity, String fields) { return null; } @@ -3265,6 +3273,7 @@ public abstract class EntityRepository { "Removing domain {} for entity {}", origDomain.getFullyQualifiedName(), original.getFullyQualifiedName()); + removeDomainLineage(updated.getId(), entityType, origDomain); deleteRelationship( origDomain.getId(), DOMAIN, original.getId(), entityType, Relationship.HAS); } @@ -3277,6 +3286,7 @@ public abstract class EntityRepository { original.getFullyQualifiedName()); addRelationship( updatedDomain.getId(), original.getId(), DOMAIN, entityType, Relationship.HAS); + addDomainLineage(updated.getId(), entityType, updatedDomain); } updated.setDomain(updatedDomain); } else { @@ -3299,6 +3309,8 @@ public abstract class EntityRepository { Relationship.HAS, entityType, original.getId()); + removeDataProductsLineage(original.getId(), entityType, origDataProducts); + addDataProductsLineage(original.getId(), entityType, updatedDataProducts); } private void updateExperts() { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index ef0016589fc..d2b28f2be20 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -401,7 +401,7 @@ public class LineageRepository { } } - private static String getDocumentUniqueId( + public static String getDocumentUniqueId( EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) { if (lineageDetails != null && !nullOrEmpty(lineageDetails.getPipeline())) { EntityReference ref = lineageDetails.getPipeline(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/LineageUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/LineageUtil.java new file mode 100644 index 00000000000..bf467ad2253 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/LineageUtil.java @@ -0,0 +1,241 @@ +package org.openmetadata.service.util; + +import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; +import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.service.Entity.ADMIN_USER_NAME; +import static org.openmetadata.service.jdbi3.LineageRepository.buildEntityLineageData; +import static org.openmetadata.service.jdbi3.LineageRepository.getDocumentUniqueId; +import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS; +import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT; + +import java.util.List; +import java.util.UUID; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.openmetadata.schema.api.lineage.EsLineageData; +import org.openmetadata.schema.type.EntityReference; +import org.openmetadata.schema.type.Include; +import org.openmetadata.schema.type.LineageDetails; +import org.openmetadata.schema.type.Relationship; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.search.models.IndexMapping; + +public class LineageUtil { + + private LineageUtil() {} + + public static void addDomainLineage( + UUID entityId, String entityType, EntityReference updatedDomain) { + if (!nullOrEmpty(updatedDomain)) { + List downstreamDomains = + Entity.getCollectionDAO().relationshipDAO().findDownstreamDomains(entityId, entityType); + List upstreamDomains = + Entity.getCollectionDAO().relationshipDAO().findUpstreamDomains(entityId, entityType); + for (CollectionDAO.EntityRelationshipObject downstreamDomain : downstreamDomains) { + insertDomainLineage( + updatedDomain, + Entity.getEntityReferenceById( + downstreamDomain.getFromEntity(), + UUID.fromString(downstreamDomain.getFromId()), + Include.ALL)); + } + for (CollectionDAO.EntityRelationshipObject upstreamDomain : upstreamDomains) { + insertDomainLineage( + Entity.getEntityReferenceById( + upstreamDomain.getFromEntity(), + UUID.fromString(upstreamDomain.getFromId()), + Include.ALL), + updatedDomain); + } + } + } + + public static void removeDomainLineage( + UUID entityId, String entityType, EntityReference updatedDomain) { + if (!nullOrEmpty(updatedDomain)) { + List downstreamDomains = + Entity.getCollectionDAO().relationshipDAO().findDownstreamDomains(entityId, entityType); + List upstreamDomains = + Entity.getCollectionDAO().relationshipDAO().findUpstreamDomains(entityId, entityType); + for (CollectionDAO.EntityRelationshipObject downstreamDomain : downstreamDomains) { + updateLineage( + updatedDomain, + Entity.getEntityReferenceById( + downstreamDomain.getFromEntity(), + UUID.fromString(downstreamDomain.getFromId()), + Include.ALL)); + } + for (CollectionDAO.EntityRelationshipObject upstreamDomain : upstreamDomains) { + updateLineage( + Entity.getEntityReferenceById( + upstreamDomain.getFromEntity(), + UUID.fromString(upstreamDomain.getFromId()), + Include.ALL), + updatedDomain); + } + } + } + + private static void updateLineage(EntityReference fromRef, EntityReference toRef) { + if (fromRef == null || toRef == null) return; + + CollectionDAO.EntityRelationshipObject relation = + Entity.getCollectionDAO() + .relationshipDAO() + .getRecord(fromRef.getId(), toRef.getId(), Relationship.UPSTREAM.ordinal()); + + if (relation == null) return; + + LineageDetails lineageDetails = JsonUtils.readValue(relation.getJson(), LineageDetails.class); + if (lineageDetails.getAssetEdges() - 1 < 1) { + Entity.getCollectionDAO() + .relationshipDAO() + .delete( + fromRef.getId(), + fromRef.getType(), + toRef.getId(), + toRef.getType(), + Relationship.UPSTREAM.ordinal()); + deleteLineageFromSearch(fromRef, toRef, lineageDetails); + } else { + lineageDetails.withAssetEdges(lineageDetails.getAssetEdges() - 1); + Entity.getCollectionDAO() + .relationshipDAO() + .insert( + fromRef.getId(), + toRef.getId(), + fromRef.getType(), + toRef.getType(), + Relationship.UPSTREAM.ordinal(), + JsonUtils.pojoToJson(lineageDetails)); + addLineageToSearch(fromRef, toRef, lineageDetails); + } + } + + private static void deleteLineageFromSearch( + EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) { + String uniqueValue = getDocumentUniqueId(fromEntity, toEntity, lineageDetails); + Entity.getSearchRepository() + .getSearchClient() + .updateChildren( + GLOBAL_SEARCH_ALIAS, + new ImmutablePair<>("upstreamLineage.docUniqueId.keyword", uniqueValue), + new ImmutablePair<>(String.format(REMOVE_LINEAGE_SCRIPT, uniqueValue), null)); + } + + private static void insertDomainLineage(EntityReference fromDomain, EntityReference toDomain) { + int count = + Entity.getCollectionDAO() + .relationshipDAO() + .countDomainChildAssets(fromDomain.getId(), toDomain.getId()); + insertLineage(count, fromDomain, toDomain); + } + + private static void insertDataProductLineage( + EntityReference fromDataProduct, EntityReference toDataProduct) { + int count = + Entity.getCollectionDAO() + .relationshipDAO() + .countDataProductsChildAssets(fromDataProduct.getId(), toDataProduct.getId()); + insertLineage(count, fromDataProduct, toDataProduct); + } + + private static void insertLineage(int count, EntityReference fromRef, EntityReference toRef) { + if (count > 0) { + LineageDetails domainLineageDetails = + new LineageDetails() + .withCreatedAt(System.currentTimeMillis()) + .withUpdatedAt(System.currentTimeMillis()) + .withCreatedBy(ADMIN_USER_NAME) + .withUpdatedBy(ADMIN_USER_NAME) + .withSource(LineageDetails.Source.CHILD_ASSETS) + .withAssetEdges(count); + Entity.getCollectionDAO() + .relationshipDAO() + .insert( + fromRef.getId(), + toRef.getId(), + fromRef.getType(), + toRef.getType(), + Relationship.UPSTREAM.ordinal(), + JsonUtils.pojoToJson(domainLineageDetails)); + addLineageToSearch(fromRef, toRef, domainLineageDetails); + } + } + + private static void addLineageToSearch( + EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) { + IndexMapping destinationIndexMapping = + Entity.getSearchRepository().getIndexMapping(toEntity.getType()); + String destinationIndexName = + destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()); + // For lineage from -> to (not stored) since the doc itself is the toEntity + EsLineageData lineageData = + buildEntityLineageData(fromEntity, toEntity, lineageDetails).withToEntity(null); + Pair to = new ImmutablePair<>("_id", toEntity.getId().toString()); + Entity.getSearchRepository() + .getSearchClient() + .updateLineage(destinationIndexName, to, lineageData); + } + + public static void addDataProductsLineage( + UUID entityId, String entityType, List updatedDataProducts) { + for (EntityReference ref : listOrEmpty(updatedDataProducts)) { + List downstreamDataProducts = + Entity.getCollectionDAO() + .relationshipDAO() + .findDownstreamDataProducts(entityId, entityType); + List upstreamDataProducts = + Entity.getCollectionDAO() + .relationshipDAO() + .findUpstreamDataProducts(entityId, entityType); + for (CollectionDAO.EntityRelationshipObject downstreamDataProduct : downstreamDataProducts) { + insertDataProductLineage( + ref, + Entity.getEntityReferenceById( + downstreamDataProduct.getFromEntity(), + UUID.fromString(downstreamDataProduct.getFromId()), + Include.ALL)); + } + for (CollectionDAO.EntityRelationshipObject upstreamDataProduct : upstreamDataProducts) { + insertDataProductLineage( + Entity.getEntityReferenceById( + upstreamDataProduct.getFromEntity(), + UUID.fromString(upstreamDataProduct.getFromId()), + Include.ALL), + ref); + } + } + } + + public static void removeDataProductsLineage( + UUID entityId, String entityType, List updatedDataProducts) { + for (EntityReference ref : listOrEmpty(updatedDataProducts)) { + List downstreamDataProducts = + Entity.getCollectionDAO() + .relationshipDAO() + .findDownstreamDataProducts(entityId, entityType); + List upstreamDataProducts = + Entity.getCollectionDAO() + .relationshipDAO() + .findUpstreamDataProducts(entityId, entityType); + for (CollectionDAO.EntityRelationshipObject downstreamDataProduct : downstreamDataProducts) { + updateLineage( + ref, + Entity.getEntityReferenceById( + downstreamDataProduct.getFromEntity(), + UUID.fromString(downstreamDataProduct.getFromId()), + Include.ALL)); + } + for (CollectionDAO.EntityRelationshipObject upstreamDataProduct : upstreamDataProducts) { + updateLineage( + Entity.getEntityReferenceById( + upstreamDataProduct.getFromEntity(), + UUID.fromString(upstreamDataProduct.getFromId()), + Include.ALL), + ref); + } + } + } +}