Domain And Data products Lineage (#20468)

* Extended Lineage

* Extended Lineage for Data Products addition in bulk asset and cleanup
This commit is contained in:
Mohit Yadav 2025-03-27 16:19:33 +05:30 committed by GitHub
parent e480e6ddf7
commit c86f1ef0f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 328 additions and 5 deletions

View File

@ -1003,6 +1003,41 @@ public interface CollectionDAO {
@Bind("fromEntity") String fromEntity,
@BindList("relation") List<Integer> relation);
@SqlQuery(
"SELECT * FROM entity_relationship er1 JOIN entity_relationship er2 ON er1.toId = er2.toId WHERE er1.relation = 10 AND er1.fromEntity = 'domain' AND er2.fromId = :fromId AND er2.fromEntity = :fromEntity AND er2.relation = 13")
@RegisterRowMapper(RelationshipObjectMapper.class)
List<EntityRelationshipObject> findDownstreamDomains(
@BindUUID("fromId") UUID fromId, @Bind("fromEntity") String fromEntity);
@SqlQuery(
"SELECT * FROM entity_relationship er1 JOIN entity_relationship er2 ON er1.toId = er2.fromId WHERE er1.relation = 10 AND er1.fromEntity = 'domain' AND er2.toId = :toId AND er2.toEntity = :toEntity AND er2.relation = 13")
@RegisterRowMapper(RelationshipObjectMapper.class)
List<EntityRelationshipObject> findUpstreamDomains(
@BindUUID("toId") UUID toId, @Bind("toEntity") String toEntity);
@SqlQuery(
"select count(*) from entity_relationship where fromId in (select toId from entity_relationship where fromId = :fromDomainId and fromEntity = 'domain' and relation = 10) AND toId in (select toId from entity_relationship where fromId = :toDomainId and fromEntity = 'domain' and relation = 10) and relation = 13")
Integer countDomainChildAssets(
@BindUUID("fromDomainId") UUID fromDomainId, @BindUUID("toDomainId") UUID toId);
@SqlQuery(
"SELECT * FROM entity_relationship er1 JOIN entity_relationship er2 ON er1.toId = er2.toId WHERE er1.relation = 10 AND er1.fromEntity = 'dataProduct' AND er2.fromId = :fromId AND er2.fromEntity = :fromEntity AND er2.relation = 13")
@RegisterRowMapper(RelationshipObjectMapper.class)
List<EntityRelationshipObject> findDownstreamDataProducts(
@BindUUID("fromId") UUID fromId, @Bind("fromEntity") String fromEntity);
@SqlQuery(
"SELECT * FROM entity_relationship er1 JOIN entity_relationship er2 ON er1.toId = er2.fromId WHERE er1.relation = 10 AND er1.fromEntity = 'dataProduct' AND er2.toId = :toId AND er2.toEntity = :toEntity AND er2.relation = 13")
@RegisterRowMapper(RelationshipObjectMapper.class)
List<EntityRelationshipObject> findUpstreamDataProducts(
@BindUUID("toId") UUID toId, @Bind("toEntity") String toEntity);
@SqlQuery(
"select count(*) from entity_relationship where fromId in (select toId from entity_relationship where fromId = :fromDataProductId and fromEntity = 'dataProduct' and relation = 10) AND toId in (select toId from entity_relationship where fromId = :toDataProductId and fromEntity = 'dataProduct' and relation = 10) and relation = 13")
Integer countDataProductsChildAssets(
@BindUUID("fromDataProductId") UUID fromDataProductId,
@BindUUID("toDataProductId") UUID toDataProductId);
default List<EntityRelationshipRecord> findTo(UUID fromId, String fromEntity, int relation) {
return findTo(fromId, fromEntity, List.of(relation));
}

View File

@ -29,6 +29,7 @@ import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.domains.DataProduct;
import org.openmetadata.schema.entity.domains.Domain;
import org.openmetadata.schema.type.ApiStatus;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
@ -39,6 +40,7 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.domains.DataProductResource;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.LineageUtil;
@Slf4j
public class DataProductRepository extends EntityRepository<DataProduct> {
@ -121,12 +123,28 @@ public class DataProductRepository extends EntityRepository<DataProduct> {
public BulkOperationResult bulkAddAssets(String domainName, BulkAssets request) {
DataProduct dataProduct = getByName(null, domainName, getFields("id"));
return bulkAssetsOperation(dataProduct.getId(), DATA_PRODUCT, Relationship.HAS, request, true);
BulkOperationResult result =
bulkAssetsOperation(dataProduct.getId(), DATA_PRODUCT, Relationship.HAS, request, true);
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
for (EntityReference ref : listOrEmpty(request.getAssets())) {
LineageUtil.addDataProductsLineage(
ref.getId(), ref.getType(), List.of(dataProduct.getEntityReference()));
}
}
return result;
}
public BulkOperationResult bulkRemoveAssets(String domainName, BulkAssets request) {
DataProduct dataProduct = getByName(null, domainName, getFields("id"));
return bulkAssetsOperation(dataProduct.getId(), DATA_PRODUCT, Relationship.HAS, request, false);
BulkOperationResult result =
bulkAssetsOperation(dataProduct.getId(), DATA_PRODUCT, Relationship.HAS, request, false);
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
for (EntityReference ref : listOrEmpty(request.getAssets())) {
LineageUtil.removeDataProductsLineage(
ref.getId(), ref.getType(), List.of(dataProduct.getEntityReference()));
}
}
return result;
}
@Override

View File

@ -28,6 +28,7 @@ import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.data.EntityHierarchy;
import org.openmetadata.schema.entity.domains.Domain;
import org.openmetadata.schema.type.ApiStatus;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.Relationship;
@ -40,6 +41,7 @@ import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.EntityUtil.Fields;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.LineageUtil;
import org.openmetadata.service.util.ResultList;
@Slf4j
@ -109,12 +111,27 @@ public class DomainRepository extends EntityRepository<Domain> {
public BulkOperationResult bulkAddAssets(String domainName, BulkAssets request) {
Domain domain = getByName(null, domainName, getFields("id"));
return bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, true);
BulkOperationResult result =
bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, true);
// Add assets to domain
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
for (EntityReference ref : listOrEmpty(request.getAssets())) {
LineageUtil.addDomainLineage(ref.getId(), ref.getType(), domain.getEntityReference());
}
}
return result;
}
public BulkOperationResult bulkRemoveAssets(String domainName, BulkAssets request) {
Domain domain = getByName(null, domainName, getFields("id"));
return bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, false);
BulkOperationResult result =
bulkAssetsOperation(domain.getId(), DOMAIN, Relationship.HAS, request, false);
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
for (EntityReference ref : listOrEmpty(request.getAssets())) {
LineageUtil.removeDomainLineage(ref.getId(), ref.getType(), domain.getEntityReference());
}
}
return result;
}
@Override

View File

@ -68,6 +68,10 @@ import static org.openmetadata.service.util.EntityUtil.nextMajorVersion;
import static org.openmetadata.service.util.EntityUtil.nextVersion;
import static org.openmetadata.service.util.EntityUtil.objectMatch;
import static org.openmetadata.service.util.EntityUtil.tagLabelMatch;
import static org.openmetadata.service.util.LineageUtil.addDataProductsLineage;
import static org.openmetadata.service.util.LineageUtil.addDomainLineage;
import static org.openmetadata.service.util.LineageUtil.removeDataProductsLineage;
import static org.openmetadata.service.util.LineageUtil.removeDomainLineage;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getAfterOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getBeforeOffset;
import static org.openmetadata.service.util.jdbi.JdbiUtils.getOffset;
@ -2429,6 +2433,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
: findFrom(entity.getId(), entityType, Relationship.HAS, DATA_PRODUCT);
}
protected List<EntityReference> getDataProducts(UUID entityId, String entityType) {
return findFrom(entityId, entityType, Relationship.HAS, DATA_PRODUCT);
}
public EntityInterface getParentEntity(T entity, String fields) {
return null;
}
@ -3265,6 +3273,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
"Removing domain {} for entity {}",
origDomain.getFullyQualifiedName(),
original.getFullyQualifiedName());
removeDomainLineage(updated.getId(), entityType, origDomain);
deleteRelationship(
origDomain.getId(), DOMAIN, original.getId(), entityType, Relationship.HAS);
}
@ -3277,6 +3286,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
original.getFullyQualifiedName());
addRelationship(
updatedDomain.getId(), original.getId(), DOMAIN, entityType, Relationship.HAS);
addDomainLineage(updated.getId(), entityType, updatedDomain);
}
updated.setDomain(updatedDomain);
} else {
@ -3299,6 +3309,8 @@ public abstract class EntityRepository<T extends EntityInterface> {
Relationship.HAS,
entityType,
original.getId());
removeDataProductsLineage(original.getId(), entityType, origDataProducts);
addDataProductsLineage(original.getId(), entityType, updatedDataProducts);
}
private void updateExperts() {

View File

@ -401,7 +401,7 @@ public class LineageRepository {
}
}
private static String getDocumentUniqueId(
public static String getDocumentUniqueId(
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
if (lineageDetails != null && !nullOrEmpty(lineageDetails.getPipeline())) {
EntityReference ref = lineageDetails.getPipeline();

View File

@ -0,0 +1,241 @@
package org.openmetadata.service.util;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.ADMIN_USER_NAME;
import static org.openmetadata.service.jdbi3.LineageRepository.buildEntityLineageData;
import static org.openmetadata.service.jdbi3.LineageRepository.getDocumentUniqueId;
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT;
import java.util.List;
import java.util.UUID;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.openmetadata.schema.api.lineage.EsLineageData;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.LineageDetails;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.models.IndexMapping;
public class LineageUtil {
private LineageUtil() {}
public static void addDomainLineage(
UUID entityId, String entityType, EntityReference updatedDomain) {
if (!nullOrEmpty(updatedDomain)) {
List<CollectionDAO.EntityRelationshipObject> downstreamDomains =
Entity.getCollectionDAO().relationshipDAO().findDownstreamDomains(entityId, entityType);
List<CollectionDAO.EntityRelationshipObject> upstreamDomains =
Entity.getCollectionDAO().relationshipDAO().findUpstreamDomains(entityId, entityType);
for (CollectionDAO.EntityRelationshipObject downstreamDomain : downstreamDomains) {
insertDomainLineage(
updatedDomain,
Entity.getEntityReferenceById(
downstreamDomain.getFromEntity(),
UUID.fromString(downstreamDomain.getFromId()),
Include.ALL));
}
for (CollectionDAO.EntityRelationshipObject upstreamDomain : upstreamDomains) {
insertDomainLineage(
Entity.getEntityReferenceById(
upstreamDomain.getFromEntity(),
UUID.fromString(upstreamDomain.getFromId()),
Include.ALL),
updatedDomain);
}
}
}
public static void removeDomainLineage(
UUID entityId, String entityType, EntityReference updatedDomain) {
if (!nullOrEmpty(updatedDomain)) {
List<CollectionDAO.EntityRelationshipObject> downstreamDomains =
Entity.getCollectionDAO().relationshipDAO().findDownstreamDomains(entityId, entityType);
List<CollectionDAO.EntityRelationshipObject> upstreamDomains =
Entity.getCollectionDAO().relationshipDAO().findUpstreamDomains(entityId, entityType);
for (CollectionDAO.EntityRelationshipObject downstreamDomain : downstreamDomains) {
updateLineage(
updatedDomain,
Entity.getEntityReferenceById(
downstreamDomain.getFromEntity(),
UUID.fromString(downstreamDomain.getFromId()),
Include.ALL));
}
for (CollectionDAO.EntityRelationshipObject upstreamDomain : upstreamDomains) {
updateLineage(
Entity.getEntityReferenceById(
upstreamDomain.getFromEntity(),
UUID.fromString(upstreamDomain.getFromId()),
Include.ALL),
updatedDomain);
}
}
}
private static void updateLineage(EntityReference fromRef, EntityReference toRef) {
if (fromRef == null || toRef == null) return;
CollectionDAO.EntityRelationshipObject relation =
Entity.getCollectionDAO()
.relationshipDAO()
.getRecord(fromRef.getId(), toRef.getId(), Relationship.UPSTREAM.ordinal());
if (relation == null) return;
LineageDetails lineageDetails = JsonUtils.readValue(relation.getJson(), LineageDetails.class);
if (lineageDetails.getAssetEdges() - 1 < 1) {
Entity.getCollectionDAO()
.relationshipDAO()
.delete(
fromRef.getId(),
fromRef.getType(),
toRef.getId(),
toRef.getType(),
Relationship.UPSTREAM.ordinal());
deleteLineageFromSearch(fromRef, toRef, lineageDetails);
} else {
lineageDetails.withAssetEdges(lineageDetails.getAssetEdges() - 1);
Entity.getCollectionDAO()
.relationshipDAO()
.insert(
fromRef.getId(),
toRef.getId(),
fromRef.getType(),
toRef.getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(lineageDetails));
addLineageToSearch(fromRef, toRef, lineageDetails);
}
}
private static void deleteLineageFromSearch(
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
String uniqueValue = getDocumentUniqueId(fromEntity, toEntity, lineageDetails);
Entity.getSearchRepository()
.getSearchClient()
.updateChildren(
GLOBAL_SEARCH_ALIAS,
new ImmutablePair<>("upstreamLineage.docUniqueId.keyword", uniqueValue),
new ImmutablePair<>(String.format(REMOVE_LINEAGE_SCRIPT, uniqueValue), null));
}
private static void insertDomainLineage(EntityReference fromDomain, EntityReference toDomain) {
int count =
Entity.getCollectionDAO()
.relationshipDAO()
.countDomainChildAssets(fromDomain.getId(), toDomain.getId());
insertLineage(count, fromDomain, toDomain);
}
private static void insertDataProductLineage(
EntityReference fromDataProduct, EntityReference toDataProduct) {
int count =
Entity.getCollectionDAO()
.relationshipDAO()
.countDataProductsChildAssets(fromDataProduct.getId(), toDataProduct.getId());
insertLineage(count, fromDataProduct, toDataProduct);
}
private static void insertLineage(int count, EntityReference fromRef, EntityReference toRef) {
if (count > 0) {
LineageDetails domainLineageDetails =
new LineageDetails()
.withCreatedAt(System.currentTimeMillis())
.withUpdatedAt(System.currentTimeMillis())
.withCreatedBy(ADMIN_USER_NAME)
.withUpdatedBy(ADMIN_USER_NAME)
.withSource(LineageDetails.Source.CHILD_ASSETS)
.withAssetEdges(count);
Entity.getCollectionDAO()
.relationshipDAO()
.insert(
fromRef.getId(),
toRef.getId(),
fromRef.getType(),
toRef.getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(domainLineageDetails));
addLineageToSearch(fromRef, toRef, domainLineageDetails);
}
}
private static void addLineageToSearch(
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
IndexMapping destinationIndexMapping =
Entity.getSearchRepository().getIndexMapping(toEntity.getType());
String destinationIndexName =
destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias());
// For lineage from -> to (not stored) since the doc itself is the toEntity
EsLineageData lineageData =
buildEntityLineageData(fromEntity, toEntity, lineageDetails).withToEntity(null);
Pair<String, String> to = new ImmutablePair<>("_id", toEntity.getId().toString());
Entity.getSearchRepository()
.getSearchClient()
.updateLineage(destinationIndexName, to, lineageData);
}
public static void addDataProductsLineage(
UUID entityId, String entityType, List<EntityReference> updatedDataProducts) {
for (EntityReference ref : listOrEmpty(updatedDataProducts)) {
List<CollectionDAO.EntityRelationshipObject> downstreamDataProducts =
Entity.getCollectionDAO()
.relationshipDAO()
.findDownstreamDataProducts(entityId, entityType);
List<CollectionDAO.EntityRelationshipObject> upstreamDataProducts =
Entity.getCollectionDAO()
.relationshipDAO()
.findUpstreamDataProducts(entityId, entityType);
for (CollectionDAO.EntityRelationshipObject downstreamDataProduct : downstreamDataProducts) {
insertDataProductLineage(
ref,
Entity.getEntityReferenceById(
downstreamDataProduct.getFromEntity(),
UUID.fromString(downstreamDataProduct.getFromId()),
Include.ALL));
}
for (CollectionDAO.EntityRelationshipObject upstreamDataProduct : upstreamDataProducts) {
insertDataProductLineage(
Entity.getEntityReferenceById(
upstreamDataProduct.getFromEntity(),
UUID.fromString(upstreamDataProduct.getFromId()),
Include.ALL),
ref);
}
}
}
public static void removeDataProductsLineage(
UUID entityId, String entityType, List<EntityReference> updatedDataProducts) {
for (EntityReference ref : listOrEmpty(updatedDataProducts)) {
List<CollectionDAO.EntityRelationshipObject> downstreamDataProducts =
Entity.getCollectionDAO()
.relationshipDAO()
.findDownstreamDataProducts(entityId, entityType);
List<CollectionDAO.EntityRelationshipObject> upstreamDataProducts =
Entity.getCollectionDAO()
.relationshipDAO()
.findUpstreamDataProducts(entityId, entityType);
for (CollectionDAO.EntityRelationshipObject downstreamDataProduct : downstreamDataProducts) {
updateLineage(
ref,
Entity.getEntityReferenceById(
downstreamDataProduct.getFromEntity(),
UUID.fromString(downstreamDataProduct.getFromId()),
Include.ALL));
}
for (CollectionDAO.EntityRelationshipObject upstreamDataProduct : upstreamDataProducts) {
updateLineage(
Entity.getEntityReferenceById(
upstreamDataProduct.getFromEntity(),
UUID.fromString(upstreamDataProduct.getFromId()),
Include.ALL),
ref);
}
}
}
}