Add Data Product Lineage (#20400)

* Add Data Product Lineage

* Migrate Data Products

* add null check for entity

* add data product layer

* Add index specific conditions

* add tests

* update nlp icon

* Add missing conditions

---------

Co-authored-by: karanh37 <karanh37@gmail.com>
This commit is contained in:
Mohit Yadav 2025-03-26 10:02:00 +05:30 committed by GitHub
parent 3245b2d98e
commit 7e731648ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 415 additions and 211 deletions

View File

@ -50,6 +50,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import javax.json.JsonPatch;
import javax.ws.rs.core.Response;
import lombok.Getter;
@ -196,6 +197,7 @@ public class LineageRepository {
addServiceLineage(fromEntity, toEntity, lineageDetails, childRelationExists);
addDomainLineage(fromEntity, toEntity, lineageDetails, childRelationExists);
addDataProductsLineage(fromEntity, toEntity, lineageDetails, childRelationExists);
}
private void addServiceLineage(
@ -203,44 +205,17 @@ public class LineageRepository {
EntityInterface toEntity,
LineageDetails entityLineageDetails,
boolean childRelationExists) {
boolean addService =
Entity.entityHasField(fromEntity.getEntityReference().getType(), FIELD_SERVICE)
&& Entity.entityHasField(toEntity.getEntityReference().getType(), FIELD_SERVICE);
if (!shouldAddServiceLineage(fromEntity, toEntity)) {
return;
}
// Add Service Level Lineage
if (addService && fromEntity.getService() != null && toEntity.getService() != null) {
EntityReference fromService = fromEntity.getService();
EntityReference toService = toEntity.getService();
if (Boolean.FALSE.equals(fromService.getId().equals(toService.getId()))) {
CollectionDAO.EntityRelationshipObject serviceRelation =
dao.relationshipDAO()
.getRecord(fromService.getId(), toService.getId(), Relationship.UPSTREAM.ordinal());
LineageDetails serviceLineageDetails;
if (serviceRelation != null) {
serviceLineageDetails =
JsonUtils.readValue(serviceRelation.getJson(), LineageDetails.class);
if (!childRelationExists) {
serviceLineageDetails.withAssetEdges(serviceLineageDetails.getAssetEdges() + 1);
}
} else {
serviceLineageDetails =
new LineageDetails()
.withCreatedAt(entityLineageDetails.getCreatedAt())
.withCreatedBy(entityLineageDetails.getCreatedBy())
.withUpdatedAt(entityLineageDetails.getUpdatedAt())
.withUpdatedBy(entityLineageDetails.getUpdatedBy())
.withSource(LineageDetails.Source.CHILD_ASSETS)
.withAssetEdges(1);
}
dao.relationshipDAO()
.insert(
fromService.getId(),
toService.getId(),
fromService.getType(),
toService.getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(serviceLineageDetails));
addLineageToSearch(fromService, toService, serviceLineageDetails);
}
EntityReference fromService = fromEntity.getService();
EntityReference toService = toEntity.getService();
if (Boolean.FALSE.equals(fromService.getId().equals(toService.getId()))) {
LineageDetails serviceLineageDetails =
getOrCreateLineageDetails(
fromService.getId(), toService.getId(), entityLineageDetails, childRelationExists);
insertLineage(fromService, toService, serviceLineageDetails);
}
}
@ -249,47 +224,106 @@ public class LineageRepository {
EntityInterface toEntity,
LineageDetails entityLineageDetails,
boolean childRelationExists) {
boolean addDomain =
Entity.entityHasField(fromEntity.getEntityReference().getType(), FIELD_DOMAIN)
&& Entity.entityHasField(toEntity.getEntityReference().getType(), FIELD_DOMAIN);
// Add Service Level Lineage
if (addDomain && fromEntity.getDomain() != null && toEntity.getDomain() != null) {
EntityReference fromDomain = fromEntity.getDomain();
EntityReference toDomain = toEntity.getDomain();
if (Boolean.FALSE.equals(fromDomain.getId().equals(toDomain.getId()))) {
CollectionDAO.EntityRelationshipObject serviceRelation =
dao.relationshipDAO()
.getRecord(fromDomain.getId(), toDomain.getId(), Relationship.UPSTREAM.ordinal());
LineageDetails domainLineageDetails;
if (serviceRelation != null) {
domainLineageDetails =
JsonUtils.readValue(serviceRelation.getJson(), LineageDetails.class);
if (!childRelationExists) {
domainLineageDetails.withAssetEdges(domainLineageDetails.getAssetEdges() + 1);
}
} else {
domainLineageDetails =
new LineageDetails()
.withCreatedAt(entityLineageDetails.getCreatedAt())
.withCreatedBy(entityLineageDetails.getCreatedBy())
.withUpdatedAt(entityLineageDetails.getUpdatedAt())
.withUpdatedBy(entityLineageDetails.getUpdatedBy())
.withSource(LineageDetails.Source.CHILD_ASSETS)
.withAssetEdges(1);
if (!shouldAddDomainsLineage(fromEntity, toEntity)) {
return;
}
EntityReference fromDomain = fromEntity.getDomain();
EntityReference toDomain = toEntity.getDomain();
if (Boolean.FALSE.equals(fromDomain.getId().equals(toDomain.getId()))) {
LineageDetails domainLineageDetails =
getOrCreateLineageDetails(
fromDomain.getId(), toDomain.getId(), entityLineageDetails, childRelationExists);
insertLineage(fromDomain, toDomain, domainLineageDetails);
}
}
private void addDataProductsLineage(
EntityInterface fromEntity,
EntityInterface toEntity,
LineageDetails entityLineageDetails,
boolean childRelationExists) {
if (!shouldAddDataProductLineage(fromEntity, toEntity)) {
return;
}
for (EntityReference fromEntityRef : fromEntity.getDataProducts()) {
for (EntityReference toEntityRef : toEntity.getDataProducts()) {
if (!fromEntityRef.getId().equals(toEntityRef.getId())) {
LineageDetails dataProductsLineageDetails =
getOrCreateLineageDetails(
fromEntityRef.getId(),
toEntityRef.getId(),
entityLineageDetails,
childRelationExists);
insertLineage(fromEntityRef, toEntityRef, dataProductsLineageDetails);
}
dao.relationshipDAO()
.insert(
fromDomain.getId(),
toDomain.getId(),
fromDomain.getType(),
toDomain.getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(domainLineageDetails));
addLineageToSearch(fromDomain, toDomain, domainLineageDetails);
}
}
}
private boolean shouldAddDataProductLineage(
EntityInterface fromEntity, EntityInterface toEntity) {
return Entity.entityHasField(fromEntity.getEntityReference().getType(), FIELD_DATA_PRODUCTS)
&& Entity.entityHasField(toEntity.getEntityReference().getType(), FIELD_DATA_PRODUCTS)
&& !nullOrEmpty(fromEntity.getDataProducts())
&& !nullOrEmpty(toEntity.getDataProducts());
}
private boolean shouldAddDomainsLineage(EntityInterface fromEntity, EntityInterface toEntity) {
return Entity.entityHasField(fromEntity.getEntityReference().getType(), FIELD_DOMAIN)
&& Entity.entityHasField(toEntity.getEntityReference().getType(), FIELD_DOMAIN)
&& fromEntity.getDomain() != null
&& toEntity.getDomain() != null;
}
private boolean shouldAddServiceLineage(EntityInterface fromEntity, EntityInterface toEntity) {
return Entity.entityHasField(fromEntity.getEntityReference().getType(), FIELD_SERVICE)
&& Entity.entityHasField(toEntity.getEntityReference().getType(), FIELD_SERVICE)
&& fromEntity.getService() != null
&& toEntity.getService() != null;
}
private LineageDetails getOrCreateLineageDetails(
UUID fromId, UUID toId, LineageDetails entityLineageDetails, boolean childRelationExists) {
CollectionDAO.EntityRelationshipObject existingRelation =
dao.relationshipDAO().getRecord(fromId, toId, Relationship.UPSTREAM.ordinal());
if (existingRelation != null) {
LineageDetails lineageDetails =
JsonUtils.readValue(existingRelation.getJson(), LineageDetails.class);
if (!childRelationExists) {
lineageDetails.withAssetEdges(lineageDetails.getAssetEdges() + 1);
}
return lineageDetails;
}
return new LineageDetails()
.withCreatedAt(entityLineageDetails.getCreatedAt())
.withCreatedBy(entityLineageDetails.getCreatedBy())
.withUpdatedAt(entityLineageDetails.getUpdatedAt())
.withUpdatedBy(entityLineageDetails.getUpdatedBy())
.withSource(LineageDetails.Source.CHILD_ASSETS)
.withAssetEdges(1);
}
private void insertLineage(
EntityReference from, EntityReference to, LineageDetails lineageDetails) {
dao.relationshipDAO()
.insert(
from.getId(),
to.getId(),
from.getType(),
to.getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(lineageDetails));
addLineageToSearch(from, to, lineageDetails);
}
private String getExtendedLineageFields(boolean service, boolean domain, boolean dataProducts) {
StringBuilder fieldsBuilder = new StringBuilder();
@ -535,6 +569,12 @@ public class LineageRepository {
JsonNode fromEntity = entityMap.getOrDefault(fromEntityId, null);
JsonNode toEntity = entityMap.getOrDefault(toEntityId, null);
if (fromEntity == null || toEntity == null) {
LOG.error(
"Entity not found for IDs: fromEntityId={}, toEntityId={}", fromEntityId, toEntityId);
return;
}
Map<String, String> baseRow = new HashMap<>();
baseRow.put("fromEntityFQN", getText(fromEntity, FIELD_FULLY_QUALIFIED_NAME));
baseRow.put("fromServiceName", getText(fromEntity.path(FIELD_SERVICE), FIELD_NAME));
@ -906,99 +946,88 @@ public class LineageRepository {
}
private void cleanUpExtendedLineage(EntityReference from, EntityReference to) {
boolean addService =
Entity.entityHasField(from.getType(), FIELD_SERVICE)
&& Entity.entityHasField(to.getType(), FIELD_SERVICE);
boolean addDomain =
Entity.entityHasField(from.getType(), FIELD_DOMAIN)
&& Entity.entityHasField(to.getType(), FIELD_DOMAIN);
boolean addService = hasField(from, FIELD_SERVICE) && hasField(to, FIELD_SERVICE);
boolean addDomain = hasField(from, FIELD_DOMAIN) && hasField(to, FIELD_DOMAIN);
boolean addDataProduct =
Entity.entityHasField(from.getType(), FIELD_DATA_PRODUCTS)
&& Entity.entityHasField(to.getType(), FIELD_DATA_PRODUCTS);
hasField(from, FIELD_DATA_PRODUCTS) && hasField(to, FIELD_DATA_PRODUCTS);
String fields = getExtendedLineageFields(addService, addDomain, addDataProduct);
EntityInterface fromEntity =
Entity.getEntity(from.getType(), from.getId(), fields, Include.ALL);
EntityInterface toEntity = Entity.getEntity(to.getType(), to.getId(), fields, Include.ALL);
cleanUpServiceLineage(fromEntity, toEntity);
cleanUpDomainLineage(fromEntity, toEntity);
cleanUpLineage(fromEntity, toEntity, FIELD_SERVICE, EntityInterface::getService);
cleanUpLineage(fromEntity, toEntity, FIELD_DOMAIN, EntityInterface::getDomain);
cleanUpLineageForDataProducts(
fromEntity, toEntity, FIELD_DATA_PRODUCTS, EntityInterface::getDataProducts);
}
private void cleanUpServiceLineage(EntityInterface fromEntity, EntityInterface toEntity) {
boolean hasServiceField =
Entity.entityHasField(fromEntity.getEntityReference().getType(), FIELD_SERVICE)
&& Entity.entityHasField(toEntity.getEntityReference().getType(), FIELD_SERVICE);
if (hasServiceField && fromEntity.getService() != null && toEntity.getService() != null) {
EntityReference fromService = fromEntity.getService();
EntityReference toService = toEntity.getService();
CollectionDAO.EntityRelationshipObject serviceRelation =
dao.relationshipDAO()
.getRecord(fromService.getId(), toService.getId(), Relationship.UPSTREAM.ordinal());
LineageDetails serviceLineageDetails;
if (serviceRelation != null) {
serviceLineageDetails =
JsonUtils.readValue(serviceRelation.getJson(), LineageDetails.class);
if (serviceLineageDetails.getAssetEdges() - 1 < 1) {
dao.relationshipDAO()
.delete(
fromService.getId(),
fromService.getType(),
toService.getId(),
toService.getType(),
Relationship.UPSTREAM.ordinal());
deleteLineageFromSearch(fromService, toService, serviceLineageDetails);
} else {
serviceLineageDetails.withAssetEdges(serviceLineageDetails.getAssetEdges() - 1);
dao.relationshipDAO()
.insert(
fromService.getId(),
toService.getId(),
fromService.getType(),
toService.getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(serviceLineageDetails));
addLineageToSearch(fromService, toService, serviceLineageDetails);
}
private boolean hasField(EntityReference entity, String field) {
return Entity.entityHasField(entity.getType(), field);
}
private void cleanUpLineage(
EntityInterface fromEntity,
EntityInterface toEntity,
String field,
Function<EntityInterface, EntityReference> getter) {
boolean hasField =
hasField(fromEntity.getEntityReference(), field)
&& hasField(toEntity.getEntityReference(), field);
if (!hasField) return;
EntityReference fromRef = getter.apply(fromEntity);
EntityReference toRef = getter.apply(toEntity);
processExtendedLineageCleanup(fromRef, toRef);
}
private void cleanUpLineageForDataProducts(
EntityInterface fromEntity,
EntityInterface toEntity,
String field,
Function<EntityInterface, List<EntityReference>> getter) {
boolean hasField =
hasField(fromEntity.getEntityReference(), field)
&& hasField(toEntity.getEntityReference(), field);
if (!hasField) return;
for (EntityReference fromRef : getter.apply(fromEntity)) {
for (EntityReference toRef : getter.apply(toEntity)) {
processExtendedLineageCleanup(fromRef, toRef);
}
}
}
private void cleanUpDomainLineage(EntityInterface fromEntity, EntityInterface toEntity) {
boolean hasDomainField =
Entity.entityHasField(fromEntity.getEntityReference().getType(), FIELD_DOMAIN)
&& Entity.entityHasField(toEntity.getEntityReference().getType(), FIELD_DOMAIN);
if (hasDomainField && fromEntity.getDomain() != null && toEntity.getDomain() != null) {
EntityReference fromDomain = fromEntity.getDomain();
EntityReference toDomain = toEntity.getDomain();
CollectionDAO.EntityRelationshipObject domainRelation =
dao.relationshipDAO()
.getRecord(fromDomain.getId(), toDomain.getId(), Relationship.UPSTREAM.ordinal());
LineageDetails domainLineageDetails;
if (domainRelation != null) {
domainLineageDetails = JsonUtils.readValue(domainRelation.getJson(), LineageDetails.class);
if (domainLineageDetails.getAssetEdges() - 1 < 1) {
dao.relationshipDAO()
.delete(
fromDomain.getId(),
fromDomain.getType(),
toDomain.getId(),
toDomain.getType(),
Relationship.UPSTREAM.ordinal());
deleteLineageFromSearch(fromDomain, toDomain, domainLineageDetails);
} else {
domainLineageDetails.withAssetEdges(domainLineageDetails.getAssetEdges() - 1);
dao.relationshipDAO()
.insert(
fromDomain.getId(),
toDomain.getId(),
fromDomain.getType(),
toDomain.getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(domainLineageDetails));
addLineageToSearch(fromDomain, toDomain, domainLineageDetails);
}
}
private void processExtendedLineageCleanup(EntityReference fromRef, EntityReference toRef) {
if (fromRef == null || toRef == null) return;
CollectionDAO.EntityRelationshipObject relation =
dao.relationshipDAO()
.getRecord(fromRef.getId(), toRef.getId(), Relationship.UPSTREAM.ordinal());
if (relation == null) return;
LineageDetails lineageDetails = JsonUtils.readValue(relation.getJson(), LineageDetails.class);
if (lineageDetails.getAssetEdges() - 1 < 1) {
dao.relationshipDAO()
.delete(
fromRef.getId(),
fromRef.getType(),
toRef.getId(),
toRef.getType(),
Relationship.UPSTREAM.ordinal());
deleteLineageFromSearch(fromRef, toRef, lineageDetails);
} else {
lineageDetails.withAssetEdges(lineageDetails.getAssetEdges() - 1);
dao.relationshipDAO()
.insert(
fromRef.getId(),
toRef.getId(),
fromRef.getType(),
toRef.getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(lineageDetails));
addLineageToSearch(fromRef, toRef, lineageDetails);
}
}

View File

@ -3,6 +3,7 @@ package org.openmetadata.service.migration.mysql.v170;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.createServiceCharts;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runLineageMigrationForNonNullColumn;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runLineageMigrationForNullColumn;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runMigrationForDataProductsLineage;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runMigrationForDomainLineage;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runMigrationServiceLineage;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.updateDataInsightsApplication;
@ -32,6 +33,7 @@ public class Migration extends MigrationProcessImpl {
runLineageMigrationForNonNullColumn(handle);
runMigrationServiceLineage(handle);
runMigrationForDomainLineage(handle);
runMigrationForDataProductsLineage(handle);
// DI
createServiceCharts();

View File

@ -3,6 +3,7 @@ package org.openmetadata.service.migration.postgres.v170;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.createServiceCharts;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runLineageMigrationForNonNullColumn;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runLineageMigrationForNullColumn;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runMigrationForDataProductsLineage;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runMigrationForDomainLineage;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.runMigrationServiceLineage;
import static org.openmetadata.service.migration.utils.v170.MigrationUtil.updateDataInsightsApplication;
@ -32,6 +33,7 @@ public class Migration extends MigrationProcessImpl {
runLineageMigrationForNonNullColumn(handle);
runMigrationServiceLineage(handle);
runMigrationForDomainLineage(handle);
runMigrationForDataProductsLineage(handle);
// DI
createServiceCharts();

View File

@ -19,12 +19,14 @@ import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart;
import org.openmetadata.schema.dataInsight.custom.LineChart;
import org.openmetadata.schema.dataInsight.custom.LineChartMetric;
import org.openmetadata.schema.entity.domains.DataProduct;
import org.openmetadata.schema.entity.domains.Domain;
import org.openmetadata.schema.entity.policies.Policy;
import org.openmetadata.schema.entity.policies.accessControl.Rule;
import org.openmetadata.schema.governance.workflows.WorkflowConfiguration;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.LineageDetails;
import org.openmetadata.schema.type.MetadataOperation;
@ -36,6 +38,7 @@ import org.openmetadata.service.governance.workflows.flowable.MainWorkflow;
import org.openmetadata.service.jdbi3.AppMarketPlaceRepository;
import org.openmetadata.service.jdbi3.AppRepository;
import org.openmetadata.service.jdbi3.DataInsightSystemChartRepository;
import org.openmetadata.service.jdbi3.DataProductRepository;
import org.openmetadata.service.jdbi3.DomainRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.PolicyRepository;
@ -63,7 +66,7 @@ public class MigrationUtil {
private MigrationUtil() {}
public static final String DOMAIN_LINEAGE =
public static final String DOMAIN_AND_PRODUCTS_LINEAGE =
"select count(*) from entity_relationship where fromId in (select toId from entity_relationship where fromId = '%s' and relation = 10) AND toId in (select toId from entity_relationship where fromId = '%s' and relation = 10) and relation = 13";
public static final String SERVICE_ENTITY_MIGRATION =
@ -383,36 +386,11 @@ public class MigrationUtil {
public static void runMigrationForDomainLineage(Handle handle) {
try {
LOG.info("MIGRATION 1.7.0 - STARTING MIGRATION FOR DOMAIN LINEAGE");
List<Domain> allDomains = getAllDomains();
for (Domain fromDomain : allDomains) {
for (Domain toDomain : allDomains) {
if (fromDomain.getId().equals(toDomain.getId())) {
continue;
}
String sql =
String.format(
DOMAIN_LINEAGE, fromDomain.getId().toString(), toDomain.getId().toString());
int count = handle.createQuery(sql).mapTo(Integer.class).one();
if (count > 0) {
LineageDetails domainLineageDetails =
new LineageDetails()
.withCreatedAt(System.currentTimeMillis())
.withUpdatedAt(System.currentTimeMillis())
.withCreatedBy(ADMIN_USER_NAME)
.withUpdatedBy(ADMIN_USER_NAME)
.withSource(LineageDetails.Source.CHILD_ASSETS)
.withAssetEdges(count);
Entity.getCollectionDAO()
.relationshipDAO()
.insert(
fromDomain.getId(),
toDomain.getId(),
fromDomain.getEntityReference().getType(),
toDomain.getEntityReference().getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(domainLineageDetails));
}
insertDomainAndDataProductLineage(
handle, fromDomain.getEntityReference(), toDomain.getEntityReference());
}
}
@ -423,6 +401,57 @@ public class MigrationUtil {
}
}
public static void runMigrationForDataProductsLineage(Handle handle) {
try {
List<DataProduct> allDataProducts = getAllDataProducts();
for (DataProduct fromDataProduct : allDataProducts) {
for (DataProduct toDataProduct : allDataProducts) {
insertDomainAndDataProductLineage(
handle, fromDataProduct.getEntityReference(), toDataProduct.getEntityReference());
}
}
} catch (Exception ex) {
LOG.error(
"Error while updating null json rows with createdAt, createdBy, updatedAt and updatedBy for lineage.",
ex);
}
}
private static void insertDomainAndDataProductLineage(
Handle handle, EntityReference fromRef, EntityReference toRef) {
LOG.info(
"MIGRATION 1.7.0 - STARTING MIGRATION FOR DOMAIN/DATA_PRODUCT LINEAGE, FROM: {} TO: {}",
fromRef.getFullyQualifiedName(),
toRef.getFullyQualifiedName());
if (fromRef.getId().equals(toRef.getId())) {
return;
}
String sql =
String.format(
DOMAIN_AND_PRODUCTS_LINEAGE, fromRef.getId().toString(), toRef.getId().toString());
int count = handle.createQuery(sql).mapTo(Integer.class).one();
if (count > 0) {
LineageDetails domainLineageDetails =
new LineageDetails()
.withCreatedAt(System.currentTimeMillis())
.withUpdatedAt(System.currentTimeMillis())
.withCreatedBy(ADMIN_USER_NAME)
.withUpdatedBy(ADMIN_USER_NAME)
.withSource(LineageDetails.Source.CHILD_ASSETS)
.withAssetEdges(count);
Entity.getCollectionDAO()
.relationshipDAO()
.insert(
fromRef.getId(),
toRef.getId(),
fromRef.getType(),
toRef.getType(),
Relationship.UPSTREAM.ordinal(),
JsonUtils.pojoToJson(domainLineageDetails));
}
}
public static void runMigrationServiceLineage(Handle handle) {
try {
List<ServiceEntityInterface> allServices = getAllServicesForLineage();
@ -441,7 +470,10 @@ public class MigrationUtil {
private static void insertServiceLineageDetails(
Handle handle, ServiceEntityInterface fromService, ServiceEntityInterface toService) {
try {
LOG.info("MIGRATION 1.7.0 - STARTING MIGRATION FOR SERVICES LINEAGE");
LOG.info(
"MIGRATION 1.7.0 - STARTING MIGRATION FOR SERVICES LINEAGE , FROM: {} TO: {}",
fromService.getFullyQualifiedName(),
toService.getFullyQualifiedName());
if (fromService.getId().equals(toService.getId())
&& fromService
@ -510,6 +542,12 @@ public class MigrationUtil {
return repository.listAll(repository.getFields("id"), new ListFilter(Include.ALL));
}
private static List<DataProduct> getAllDataProducts() {
DataProductRepository repository =
(DataProductRepository) Entity.getEntityRepository(Entity.DATA_PRODUCT);
return repository.listAll(repository.getFields("id"), new ListFilter(Include.ALL));
}
public static void updateLineageBotPolicy() {
PolicyRepository policyRepository =
(PolicyRepository) Entity.getEntityRepository(Entity.POLICY);

View File

@ -261,7 +261,7 @@ public class LineageResource {
@Parameter(description = "view (service or domain)")
@QueryParam("view")
@Pattern(
regexp = "service|domain|all",
regexp = "service|domain|dataProduct|all",
message = "Invalid type. Allowed values: service, domain.")
String view,
@Parameter(
@ -273,6 +273,12 @@ public class LineageResource {
@QueryParam("includeDeleted")
boolean deleted)
throws IOException {
if (Entity.getSearchRepository().getIndexMapping(view) != null) {
view =
Entity.getSearchRepository()
.getIndexMapping(view)
.getIndexName(Entity.getSearchRepository().getClusterAlias());
}
return Entity.getSearchRepository().searchPlatformLineage(view, queryFilter, deleted);
}

View File

@ -28,6 +28,7 @@ public record DataProductIndex(DataProduct dataProduct) implements SearchIndex {
ParseTags parseTags = new ParseTags(Entity.getEntityTags(Entity.DATA_PRODUCT, dataProduct));
doc.put("tags", parseTags.getTags());
doc.putAll(commonAttributes);
doc.put("upstreamLineage", SearchIndex.getLineageData(dataProduct.getEntityReference()));
return doc;
}

View File

@ -44,7 +44,6 @@ export const SIDEBAR_LIST_ITEMS = {
[SidebarItem.GLOSSARY]: [SidebarItem.GOVERNANCE, SidebarItem.GLOSSARY],
[SidebarItem.TAGS]: [SidebarItem.GOVERNANCE, SidebarItem.TAGS],
[SidebarItem.METRICS]: [SidebarItem.GOVERNANCE, SidebarItem.METRICS],
[SidebarItem.LINEAGE]: [SidebarItem.GOVERNANCE, SidebarItem.LINEAGE],
// Profile Dropdown
'user-name': ['dropdown-profile', 'user-name'],

View File

@ -0,0 +1,50 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import test, { expect } from '@playwright/test';
import { SidebarItem } from '../../constant/sidebar';
import { redirectToHomePage } from '../../utils/common';
import { sidebarClick } from '../../utils/sidebar';
test.use({
storageState: 'playwright/.auth/admin.json',
});
test('Verify Platform Lineage View', async ({ page }) => {
await redirectToHomePage(page);
const lineageRes = page.waitForResponse(
'/api/v1/lineage/getPlatformLineage?view=service*'
);
await sidebarClick(page, SidebarItem.LINEAGE);
await lineageRes;
await expect(page.getByTestId('lineage-export')).not.toBeVisible();
await page.getByTestId('lineage-layer-btn').click();
await page.waitForSelector(
'[data-testid="lineage-layer-domain-btn"]:not(.active)'
);
const domainRes = page.waitForResponse(
'/api/v1/lineage/getPlatformLineage?view=domain*'
);
await page.getByTestId('lineage-layer-domain-btn').click();
await domainRes;
await page.getByTestId('lineage-layer-btn').click();
const dataProductRes = page.waitForResponse(
'/api/v1/lineage/getPlatformLineage?view=dataProduct*'
);
await page.getByTestId('lineage-layer-data-product-btn').click();
await dataProductRes;
});

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 5.8 KiB

View File

@ -354,7 +354,7 @@ const Suggestions = ({
className="m-b-md w-100 text-left d-flex items-center p-0"
data-testid="nlp-suggestions-button"
icon={
<div className="nlp-button active w-6 h-6 flex-center m-r-md">
<div className="nlp-button w-6 h-6 flex-center m-r-md">
<IconSuggestionsBlue />
</div>
}

View File

@ -135,19 +135,21 @@ const LineageControlButtons: FC<LineageControlButtonsProps> = ({
/>
)}
<Button
className="lineage-button"
data-testid="lineage-export"
disabled={isEditMode}
icon={
<span className="anticon">
<ExportIcon height={18} width={18} />
</span>
}
title={t('label.export-entity', { entity: t('label.lineage') })}
type="text"
onClick={onExportClick}
/>
{entityType && (
<Button
className="lineage-button"
data-testid="lineage-export"
disabled={isEditMode}
icon={
<span className="anticon">
<ExportIcon height={18} width={18} />
</span>
}
title={t('label.export-entity', { entity: t('label.lineage') })}
type="text"
onClick={onExportClick}
/>
)}
{handleFullScreenViewClick && (
<Button

View File

@ -16,6 +16,7 @@ import classNames from 'classnames';
import { t } from 'i18next';
import React from 'react';
import { ReactComponent as DataQualityIcon } from '../../../../assets/svg/ic-data-contract.svg';
import { ReactComponent as DataProductIcon } from '../../../../assets/svg/ic-data-product.svg';
import { ReactComponent as DomainIcon } from '../../../../assets/svg/ic-domain.svg';
import { ReactComponent as Layers } from '../../../../assets/svg/ic-layers.svg';
import { ReactComponent as ServiceView } from '../../../../assets/svg/services.svg';
@ -23,6 +24,7 @@ import { SERVICE_TYPES } from '../../../../constants/Services.constant';
import { useLineageProvider } from '../../../../context/LineageProvider/LineageProvider';
import { LineagePlatformView } from '../../../../context/LineageProvider/LineageProvider.interface';
import { EntityType } from '../../../../enums/entity.enum';
import { Table } from '../../../../generated/entity/data/table';
import { LineageLayer } from '../../../../generated/settings/settings';
import searchClassBase from '../../../../utils/SearchClassBase';
import { AssetsUnion } from '../../../DataAssets/AssetsSelectionModal/AssetSelectionModal.interface';
@ -48,7 +50,7 @@ const LayerButton: React.FC<LayerButtonProps> = React.memo(
)
);
const LineageLayers = ({ entityType }: LineageLayersProps) => {
const LineageLayers = ({ entityType, entity }: LineageLayersProps) => {
const {
activeLayer,
onUpdateLayerView,
@ -116,7 +118,9 @@ const LineageLayers = ({ entityType }: LineageLayersProps) => {
)}
{(isPlatformLineage ||
(entityType && entityType !== EntityType.DOMAIN)) && (
(entityType &&
entityType !== EntityType.DOMAIN &&
entity?.domain)) && (
<LayerButton
icon={<DomainIcon />}
isActive={platformView === LineagePlatformView.Domain}
@ -125,6 +129,21 @@ const LineageLayers = ({ entityType }: LineageLayersProps) => {
onClick={() => handlePlatformViewChange(LineagePlatformView.Domain)}
/>
)}
{(isPlatformLineage ||
(entityType &&
entityType !== EntityType.DOMAIN &&
((entity as Table)?.dataProducts ?? [])?.length > 0)) && (
<LayerButton
icon={<DataProductIcon />}
isActive={platformView === LineagePlatformView.DataProduct}
label={t('label.data-product')}
testId="lineage-layer-data-product-btn"
onClick={() =>
handlePlatformViewChange(LineagePlatformView.DataProduct)
}
/>
)}
</ButtonGroup>
),
[

View File

@ -25,6 +25,7 @@ import React, {
import { useTranslation } from 'react-i18next';
import { useHistory } from 'react-router-dom';
import { ReactComponent as IconCloseCircleOutlined } from '../../assets/svg/close-circle-outlined.svg';
import { ReactComponent as IconSuggestionsActive } from '../../assets/svg/ic-suggestions-active.svg';
import { ReactComponent as IconSuggestionsBlue } from '../../assets/svg/ic-suggestions-blue.svg';
import { ReactComponent as IconSearch } from '../../assets/svg/search.svg';
import { TOUR_SEARCH_TERM } from '../../constants/constants';
@ -185,7 +186,13 @@ export const GlobalSearchBar = () => {
active: isNLPActive,
})}
data-testid="nlp-suggestions-button"
icon={<Icon component={IconSuggestionsBlue} />}
icon={
<Icon
component={
isNLPActive ? IconSuggestionsActive : IconSuggestionsBlue
}
/>
}
type="text"
onClick={() => setNLPActive(!isNLPActive)}
/>

View File

@ -13,6 +13,8 @@
@import (reference) '../../styles/variables.less';
@nlp-border-color: #b9e6fe;
.search-container {
border: 1px solid #eaecf5;
border-radius: 12px;
@ -20,8 +22,9 @@
padding: 6px 20px;
.nlp-button {
border: 0.5px solid @border-color !important;
border: 0.5px solid @nlp-border-color !important;
border-radius: 8px;
background-color: @blue-11 !important;
svg {
width: 14px;
@ -29,8 +32,17 @@
}
&.active {
background-color: @blue-11 !important;
border: 0.5px solid #b9e6fe !important;
padding: 0;
display: flex;
align-items: center;
justify-content: center;
border: none !important;
svg {
width: 24px;
height: 24px;
fill: none;
}
}
}
}

View File

@ -662,7 +662,7 @@ const AssetsTabs = forwardRef(
activeEntity &&
permissions.Create &&
data.length > 0 && (
<div className="w-full d-flex justify-between items-center">
<div className="w-full d-flex justify-between items-center m-b-sm">
<Checkbox
className="assets-checkbox p-x-sm"
onChange={(e) => onSelectAll(e.target.checked)}>

View File

@ -196,7 +196,7 @@ const Lineage = ({
<MiniMap pannable zoomable position="bottom-right" />
<Panel position="bottom-left">
<LineageLayers entityType={entityType} />
<LineageLayers entity={entity} entityType={entityType} />
</Panel>
</ReactFlow>
</ReactFlowProvider>

View File

@ -47,6 +47,7 @@ export enum LineagePlatformView {
None = 'None',
Service = 'Service',
Domain = 'Domain',
DataProduct = 'DataProduct',
}
export interface LineageContextType {

View File

@ -65,6 +65,7 @@ import { EntityLineageNodeType, EntityType } from '../../enums/entity.enum';
import { AddLineage } from '../../generated/api/lineage/addLineage';
import { LineageDirection } from '../../generated/api/lineage/lineageDirection';
import { LineageSettings } from '../../generated/configuration/lineageSettings';
import { Table } from '../../generated/entity/data/table';
import { LineageLayer } from '../../generated/settings/settings';
import {
ColumnLineage,
@ -94,6 +95,7 @@ import {
getConnectedNodesEdges,
getEdgeDataFromEdge,
getELKLayoutedElements,
getEntityTypeFromPlatformView,
getLineageEdge,
getLineageEdgeForAPI,
getLoadingStatusValue,
@ -346,7 +348,7 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
);
const fetchPlatformLineage = useCallback(
async (view: 'service' | 'domain', config?: LineageConfig) => {
async (view: string, config?: LineageConfig) => {
try {
setLoading(true);
setInit(false);
@ -1329,17 +1331,26 @@ const LineageProvider = ({ children }: LineageProviderProps) => {
entity?.domain.type,
lineageConfig
);
} else if (
platformView === LineagePlatformView.DataProduct &&
((entity as Table)?.dataProducts ?? [])?.length > 0
) {
fetchLineageData(
(entity as Table)?.dataProducts?.[0]?.fullyQualifiedName ?? '',
(entity as Table)?.dataProducts?.[0]?.type ?? '',
lineageConfig
);
} else if (platformView === LineagePlatformView.None) {
fetchLineageData(decodedFqn, entityType, lineageConfig);
} else if (isPlatformLineage) {
fetchPlatformLineage(
platformView === LineagePlatformView.Domain ? 'domain' : 'service',
getEntityTypeFromPlatformView(platformView),
lineageConfig
);
}
} else if (isPlatformLineage) {
fetchPlatformLineage(
platformView === LineagePlatformView.Domain ? 'domain' : 'service',
getEntityTypeFromPlatformView(platformView),
lineageConfig
);
}

View File

@ -94,7 +94,7 @@ export const getPlatformLineage = async ({
}: {
config?: LineageConfig;
queryFilter?: string;
view: 'service' | 'domain';
view: string;
}) => {
const { upstreamDepth = 1, downstreamDepth = 1 } = config ?? {};
const API_PATH = `lineage/getPlatformLineage`;

View File

@ -168,7 +168,7 @@
@text-highlighter: #ffc34e40;
@team-avatar-bg: #0950c51a;
@om-navbar-height: ~'var(--ant-navbar-height)';
@sidebar-width: 60px;
@sidebar-width: 84px;
@alert-text-color: @text-color-tertiary;
@alert-info-bg: @blue-14;
@alert-success-bg: #f6fef9;

View File

@ -63,6 +63,7 @@ import {
ZOOM_TRANSITION_DURATION,
ZOOM_VALUE,
} from '../constants/Lineage.constants';
import { LineagePlatformView } from '../context/LineageProvider/LineageProvider.interface';
import {
EntityLineageDirection,
EntityLineageNodeType,
@ -1748,3 +1749,16 @@ export const getLineageEntityExclusionFilter = () => {
},
};
};
export const getEntityTypeFromPlatformView = (
platformView: LineagePlatformView
): string => {
switch (platformView) {
case LineagePlatformView.DataProduct:
return EntityType.DATA_PRODUCT;
case LineagePlatformView.Domain:
return EntityType.DOMAIN;
default:
return 'service';
}
};