From a8d94ec9792b8c415d95b89c77ca678d05e10e0c Mon Sep 17 00:00:00 2001 From: sonika-shah <58761340+sonika-shah@users.noreply.github.com> Date: Fri, 21 Nov 2025 08:17:39 +0530 Subject: [PATCH] Fix owner propagation for Domain children (subdomains & data products) in search (#24472) * Fix owner propagation for Domain children (subdomains & data products) in search * add feedback changes * minor improvements --- .../service/search/SearchConstants.java | 2 + .../service/search/SearchRepository.java | 93 ++++++++---- .../SearchPropagationIntegrationTest.java | 134 ++++++++++++++++++ 3 files changed, 199 insertions(+), 30 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchConstants.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchConstants.java index 2cbcfe6304e..450ac14f885 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchConstants.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchConstants.java @@ -10,6 +10,8 @@ public class SearchConstants { public static final String DATABASE_ID = "database.id"; public static final String DATABASE_SCHEMA_ID = "databaseSchema.id"; + public static final String PARENT_ID = "parent.id"; + public static final String DOMAINS_ID = "domains.id"; public static final String SENDING_REQUEST_TO_ELASTIC_SEARCH = "Sending request to ElasticSearch {}"; public static final String TAGS_FQN = "tags.tagFQN"; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 3cf9897f897..0886384ccee 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -41,12 +41,14 @@ import static org.openmetadata.service.search.SearchClient.UPDATE_ADDED_DELETE_G import static org.openmetadata.service.search.SearchClient.UPDATE_CERTIFICATION_SCRIPT; import static org.openmetadata.service.search.SearchClient.UPDATE_PROPAGATED_ENTITY_REFERENCE_FIELD_SCRIPT; import static org.openmetadata.service.search.SearchClient.UPDATE_TAGS_FIELD_SCRIPT; +import static org.openmetadata.service.search.SearchConstants.DOMAINS_ID; import static org.openmetadata.service.search.SearchConstants.ENTITY_TYPE; import static org.openmetadata.service.search.SearchConstants.FAILED_TO_CREATE_INDEX_MESSAGE; import static org.openmetadata.service.search.SearchConstants.FULLY_QUALIFIED_NAME; import static org.openmetadata.service.search.SearchConstants.HITS; import static org.openmetadata.service.search.SearchConstants.ID; import static org.openmetadata.service.search.SearchConstants.PARENT; +import static org.openmetadata.service.search.SearchConstants.PARENT_ID; import static org.openmetadata.service.search.SearchConstants.SEARCH_SOURCE; import static org.openmetadata.service.search.SearchConstants.SERVICE_ID; import static org.openmetadata.service.search.SearchConstants.TAGS_FQN; @@ -144,6 +146,19 @@ public class SearchRepository { @Getter @Setter public SearchIndexFactory searchIndexFactory = new SearchIndexFactory(); + private static final Set SERVICE_ENTITY_SET = + Set.of( + Entity.DATABASE_SERVICE, + Entity.DASHBOARD_SERVICE, + Entity.MESSAGING_SERVICE, + Entity.PIPELINE_SERVICE, + Entity.MLMODEL_SERVICE, + Entity.STORAGE_SERVICE, + Entity.SEARCH_SERVICE, + Entity.SECURITY_SERVICE, + Entity.API_SERVICE, + Entity.DRIVE_SERVICE); + private final List inheritableFields = List.of( FIELD_OWNERS, @@ -732,38 +747,56 @@ public class SearchRepository { IndexMapping indexMapping, EntityInterface entity) throws IOException { - if (changeDescription != null) { - Pair> updates = - getInheritedFieldChanges(changeDescription, entity); - Pair parentMatch; - if (!updates.getValue().isEmpty() - // domains can be updatedDomains or deletedDomains and need to be propagated from the - // service level - && (updates.getValue().keySet().stream() - .anyMatch(key -> key.toLowerCase().contains(FIELD_DOMAINS)) - || updates.getValue().containsKey(FIELD_DISPLAY_NAME))) { - if (entityType.equalsIgnoreCase(Entity.DATABASE_SERVICE) - || entityType.equalsIgnoreCase(Entity.DASHBOARD_SERVICE) - || entityType.equalsIgnoreCase(Entity.MESSAGING_SERVICE) - || entityType.equalsIgnoreCase(Entity.PIPELINE_SERVICE) - || entityType.equalsIgnoreCase(Entity.MLMODEL_SERVICE) - || entityType.equalsIgnoreCase(Entity.STORAGE_SERVICE) - || entityType.equalsIgnoreCase(Entity.SEARCH_SERVICE) - || entityType.equalsIgnoreCase(Entity.SECURITY_SERVICE) - || entityType.equalsIgnoreCase(Entity.API_SERVICE) - || entityType.equalsIgnoreCase(Entity.DRIVE_SERVICE)) { - parentMatch = new ImmutablePair<>(SERVICE_ID, entityId); - } else { - parentMatch = new ImmutablePair<>(entityType + ".id", entityId); - } - } else { - parentMatch = new ImmutablePair<>(entityType + ".id", entityId); - } - List childAliases = indexMapping.getChildAliases(clusterAlias); - if (updates.getKey() != null && !updates.getKey().isEmpty() && !nullOrEmpty(childAliases)) { - searchClient.updateChildren(childAliases, parentMatch, updates); + if (changeDescription == null) { + return; + } + + Pair> updates = getInheritedFieldChanges(changeDescription, entity); + if (updates.getKey() == null || updates.getKey().isEmpty()) { + return; + } + + List childAliases = indexMapping.getChildAliases(clusterAlias); + if (nullOrEmpty(childAliases)) { + return; + } + + // Domain has subdomains (parent.id) and data products (domains.id) - handle separately + if (entityType.equalsIgnoreCase(Entity.DOMAIN)) { + propagateToDomainChildren(entityId, indexMapping, updates); + return; + } + + // Other entities: resolve parent field name and propagate to children + String parentFieldName = resolveParentFieldName(entityType, updates); + Pair parentMatch = new ImmutablePair<>(parentFieldName, entityId); + searchClient.updateChildren(childAliases, parentMatch, updates); + } + + private String resolveParentFieldName( + String entityType, Pair> updates) { + if (!updates.getValue().isEmpty() + && (updates.getValue().keySet().stream() + .anyMatch(key -> key.toLowerCase().contains(FIELD_DOMAINS)) + || updates.getValue().containsKey(FIELD_DISPLAY_NAME))) { + if (SERVICE_ENTITY_SET.stream().anyMatch(s -> s.equalsIgnoreCase(entityType))) { + return SERVICE_ID; } } + return entityType + ".id"; + } + + private void propagateToDomainChildren( + String domainId, IndexMapping indexMapping, Pair> updates) + throws IOException { + searchClient.updateChildren( + List.of(indexMapping.getIndexName(clusterAlias)), + new ImmutablePair<>(PARENT_ID, domainId), + updates); + searchClient.updateChildren( + List.of(entityIndexMap.get(Entity.DATA_PRODUCT).getIndexName(clusterAlias)), + new ImmutablePair<>(DOMAINS_ID, domainId), + updates); } public void propagateGlossaryTags( diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchPropagationIntegrationTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchPropagationIntegrationTest.java index 0517c2a67ff..502b1a044d1 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchPropagationIntegrationTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/search/SearchPropagationIntegrationTest.java @@ -22,11 +22,13 @@ import org.junit.jupiter.api.TestMethodOrder; import org.openmetadata.schema.api.data.CreateDatabase; import org.openmetadata.schema.api.data.CreateDatabaseSchema; import org.openmetadata.schema.api.data.CreateTable; +import org.openmetadata.schema.api.domains.CreateDataProduct; import org.openmetadata.schema.api.domains.CreateDomain; import org.openmetadata.schema.api.teams.CreateUser; import org.openmetadata.schema.entity.data.Database; import org.openmetadata.schema.entity.data.DatabaseSchema; import org.openmetadata.schema.entity.data.Table; +import org.openmetadata.schema.entity.domains.DataProduct; import org.openmetadata.schema.entity.domains.Domain; import org.openmetadata.schema.entity.services.DatabaseService; import org.openmetadata.schema.entity.teams.User; @@ -41,6 +43,7 @@ import org.openmetadata.service.OpenMetadataApplicationTest; import org.openmetadata.service.resources.databases.DatabaseResourceTest; import org.openmetadata.service.resources.databases.DatabaseSchemaResourceTest; import org.openmetadata.service.resources.databases.TableResourceTest; +import org.openmetadata.service.resources.domains.DataProductResourceTest; import org.openmetadata.service.resources.domains.DomainResourceTest; import org.openmetadata.service.resources.services.DatabaseServiceResourceTest; import org.openmetadata.service.resources.teams.UserResourceTest; @@ -642,4 +645,135 @@ public class SearchPropagationIntegrationTest extends OpenMetadataApplicationTes LOG.info( "Search propagation metrics test completed - conditional propagation in search layer working as expected"); } + + @Test + @Order(9) + void testOwnerPropagationFromDomainToSubdomain() throws IOException, InterruptedException { + LOG.info("Testing owner propagation from parent domain to subdomain in search index"); + + CreateUser createDomainOwner = + new CreateUser() + .withName("test_domain_owner_propagation") + .withEmail("test_domain_owner_propagation@openmetadata.org") + .withDisplayName("Test Domain Owner for Propagation"); + User domainOwner = userResourceTest.createEntity(createDomainOwner, ADMIN_AUTH_HEADERS); + + CreateDomain createParentDomain = + new CreateDomain() + .withName("test_parent_domain_propagation") + .withDisplayName("Test Parent Domain for Propagation") + .withDescription("Parent domain to test owner propagation") + .withDomainType(CreateDomain.DomainType.AGGREGATE); + Domain parentDomain = domainResourceTest.createEntity(createParentDomain, ADMIN_AUTH_HEADERS); + + CreateDomain createSubDomain = + new CreateDomain() + .withName("test_subdomain_propagation") + .withDisplayName("Test Subdomain for Propagation") + .withDescription("Subdomain to test owner propagation from parent") + .withDomainType(CreateDomain.DomainType.AGGREGATE) + .withParent(parentDomain.getFullyQualifiedName()); + Domain subDomain = domainResourceTest.createEntity(createSubDomain, ADMIN_AUTH_HEADERS); + + simulateWork(2000); + + EntityReference ownerRef = + new EntityReference() + .withId(domainOwner.getId()) + .withType("user") + .withName(domainOwner.getName()) + .withFullyQualifiedName(domainOwner.getFullyQualifiedName()); + + String jsonPatch = + JsonUtils.pojoToJson( + List.of(Map.of("op", "add", "path", "/owners", "value", List.of(ownerRef)))); + + Domain patchedParentDomain = + domainResourceTest.patchEntity( + parentDomain.getId(), JsonUtils.readTree(jsonPatch), ADMIN_AUTH_HEADERS); + + assertNotNull(patchedParentDomain.getOwners()); + assertEquals(1, patchedParentDomain.getOwners().size()); + + simulateWork(3000); + + WebTarget subDomainSearchTarget = + getResource("search/query") + .queryParam("q", "fullyQualifiedName:" + subDomain.getFullyQualifiedName()) + .queryParam("index", "domain_search_index"); + + String subDomainSearchResponse = + TestUtils.get(subDomainSearchTarget, String.class, ADMIN_AUTH_HEADERS); + assertTrue( + subDomainSearchResponse.contains(domainOwner.getId().toString()), + "Subdomain search result should contain owner ID propagated from parent domain"); + + LOG.info( + "Owner propagation from domain to subdomain test passed - owner propagated in search index"); + } + + @Test + @Order(10) + void testOwnerPropagationFromDomainToDataProduct() throws IOException, InterruptedException { + LOG.info("Testing owner propagation from domain to data product in search index"); + + CreateUser createDomainOwner = + new CreateUser() + .withName("test_domain_owner_dp_propagation") + .withEmail("test_domain_owner_dp_propagation@openmetadata.org") + .withDisplayName("Test Domain Owner for DP Propagation"); + User domainOwner = userResourceTest.createEntity(createDomainOwner, ADMIN_AUTH_HEADERS); + + CreateDomain createDomain = + new CreateDomain() + .withName("test_domain_dp_propagation") + .withDisplayName("Test Domain for DP Propagation") + .withDescription("Domain to test owner propagation to data product") + .withDomainType(CreateDomain.DomainType.AGGREGATE); + Domain domain = domainResourceTest.createEntity(createDomain, ADMIN_AUTH_HEADERS); + + EntityReference ownerRef = + new EntityReference() + .withId(domainOwner.getId()) + .withType("user") + .withName(domainOwner.getName()) + .withFullyQualifiedName(domainOwner.getFullyQualifiedName()); + + String jsonPatch = + JsonUtils.pojoToJson( + List.of(Map.of("op", "add", "path", "/owners", "value", List.of(ownerRef)))); + + Domain patchedDomain = + domainResourceTest.patchEntity( + domain.getId(), JsonUtils.readTree(jsonPatch), ADMIN_AUTH_HEADERS); + + assertNotNull(patchedDomain.getOwners()); + assertEquals(1, patchedDomain.getOwners().size()); + + DataProductResourceTest dataProductResourceTest = new DataProductResourceTest(); + CreateDataProduct createDataProduct = + new CreateDataProduct() + .withName("test_data_product_propagation") + .withDisplayName("Test Data Product for Propagation") + .withDescription("Data product to test owner propagation from domain") + .withDomains(List.of(domain.getFullyQualifiedName())); + DataProduct dataProduct = + dataProductResourceTest.createEntity(createDataProduct, ADMIN_AUTH_HEADERS); + + simulateWork(3000); + + WebTarget dataProductSearchTarget = + getResource("search/query") + .queryParam("q", "fullyQualifiedName:" + dataProduct.getFullyQualifiedName()) + .queryParam("index", "data_product_search_index"); + + String dataProductSearchResponse = + TestUtils.get(dataProductSearchTarget, String.class, ADMIN_AUTH_HEADERS); + assertTrue( + dataProductSearchResponse.contains(domainOwner.getId().toString()), + "Data product search result should contain owner ID propagated from domain"); + + LOG.info( + "Owner propagation from domain to data product test passed - owner propagated in search index"); + } }