Fix owner propagation for Domain children (subdomains & data products) in search (#24472)

* Fix owner propagation for Domain children (subdomains & data products) in search

* add feedback changes

* minor improvements
This commit is contained in:
sonika-shah 2025-11-21 08:17:39 +05:30 committed by GitHub
parent 59dd2d57be
commit a8d94ec979
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 199 additions and 30 deletions

View File

@ -10,6 +10,8 @@ public class SearchConstants {
public static final String DATABASE_ID = "database.id";
public static final String DATABASE_SCHEMA_ID = "databaseSchema.id";
public static final String PARENT_ID = "parent.id";
public static final String DOMAINS_ID = "domains.id";
public static final String SENDING_REQUEST_TO_ELASTIC_SEARCH =
"Sending request to ElasticSearch {}";
public static final String TAGS_FQN = "tags.tagFQN";

View File

@ -41,12 +41,14 @@ import static org.openmetadata.service.search.SearchClient.UPDATE_ADDED_DELETE_G
import static org.openmetadata.service.search.SearchClient.UPDATE_CERTIFICATION_SCRIPT;
import static org.openmetadata.service.search.SearchClient.UPDATE_PROPAGATED_ENTITY_REFERENCE_FIELD_SCRIPT;
import static org.openmetadata.service.search.SearchClient.UPDATE_TAGS_FIELD_SCRIPT;
import static org.openmetadata.service.search.SearchConstants.DOMAINS_ID;
import static org.openmetadata.service.search.SearchConstants.ENTITY_TYPE;
import static org.openmetadata.service.search.SearchConstants.FAILED_TO_CREATE_INDEX_MESSAGE;
import static org.openmetadata.service.search.SearchConstants.FULLY_QUALIFIED_NAME;
import static org.openmetadata.service.search.SearchConstants.HITS;
import static org.openmetadata.service.search.SearchConstants.ID;
import static org.openmetadata.service.search.SearchConstants.PARENT;
import static org.openmetadata.service.search.SearchConstants.PARENT_ID;
import static org.openmetadata.service.search.SearchConstants.SEARCH_SOURCE;
import static org.openmetadata.service.search.SearchConstants.SERVICE_ID;
import static org.openmetadata.service.search.SearchConstants.TAGS_FQN;
@ -144,6 +146,19 @@ public class SearchRepository {
@Getter @Setter public SearchIndexFactory searchIndexFactory = new SearchIndexFactory();
private static final Set<String> SERVICE_ENTITY_SET =
Set.of(
Entity.DATABASE_SERVICE,
Entity.DASHBOARD_SERVICE,
Entity.MESSAGING_SERVICE,
Entity.PIPELINE_SERVICE,
Entity.MLMODEL_SERVICE,
Entity.STORAGE_SERVICE,
Entity.SEARCH_SERVICE,
Entity.SECURITY_SERVICE,
Entity.API_SERVICE,
Entity.DRIVE_SERVICE);
private final List<String> inheritableFields =
List.of(
FIELD_OWNERS,
@ -732,38 +747,56 @@ public class SearchRepository {
IndexMapping indexMapping,
EntityInterface entity)
throws IOException {
if (changeDescription != null) {
Pair<String, Map<String, Object>> updates =
getInheritedFieldChanges(changeDescription, entity);
Pair<String, String> parentMatch;
if (!updates.getValue().isEmpty()
// domains can be updatedDomains or deletedDomains and need to be propagated from the
// service level
&& (updates.getValue().keySet().stream()
.anyMatch(key -> key.toLowerCase().contains(FIELD_DOMAINS))
|| updates.getValue().containsKey(FIELD_DISPLAY_NAME))) {
if (entityType.equalsIgnoreCase(Entity.DATABASE_SERVICE)
|| entityType.equalsIgnoreCase(Entity.DASHBOARD_SERVICE)
|| entityType.equalsIgnoreCase(Entity.MESSAGING_SERVICE)
|| entityType.equalsIgnoreCase(Entity.PIPELINE_SERVICE)
|| entityType.equalsIgnoreCase(Entity.MLMODEL_SERVICE)
|| entityType.equalsIgnoreCase(Entity.STORAGE_SERVICE)
|| entityType.equalsIgnoreCase(Entity.SEARCH_SERVICE)
|| entityType.equalsIgnoreCase(Entity.SECURITY_SERVICE)
|| entityType.equalsIgnoreCase(Entity.API_SERVICE)
|| entityType.equalsIgnoreCase(Entity.DRIVE_SERVICE)) {
parentMatch = new ImmutablePair<>(SERVICE_ID, entityId);
} else {
parentMatch = new ImmutablePair<>(entityType + ".id", entityId);
}
} else {
parentMatch = new ImmutablePair<>(entityType + ".id", entityId);
}
List<String> childAliases = indexMapping.getChildAliases(clusterAlias);
if (updates.getKey() != null && !updates.getKey().isEmpty() && !nullOrEmpty(childAliases)) {
searchClient.updateChildren(childAliases, parentMatch, updates);
if (changeDescription == null) {
return;
}
Pair<String, Map<String, Object>> updates = getInheritedFieldChanges(changeDescription, entity);
if (updates.getKey() == null || updates.getKey().isEmpty()) {
return;
}
List<String> childAliases = indexMapping.getChildAliases(clusterAlias);
if (nullOrEmpty(childAliases)) {
return;
}
// Domain has subdomains (parent.id) and data products (domains.id) - handle separately
if (entityType.equalsIgnoreCase(Entity.DOMAIN)) {
propagateToDomainChildren(entityId, indexMapping, updates);
return;
}
// Other entities: resolve parent field name and propagate to children
String parentFieldName = resolveParentFieldName(entityType, updates);
Pair<String, String> parentMatch = new ImmutablePair<>(parentFieldName, entityId);
searchClient.updateChildren(childAliases, parentMatch, updates);
}
private String resolveParentFieldName(
String entityType, Pair<String, Map<String, Object>> updates) {
if (!updates.getValue().isEmpty()
&& (updates.getValue().keySet().stream()
.anyMatch(key -> key.toLowerCase().contains(FIELD_DOMAINS))
|| updates.getValue().containsKey(FIELD_DISPLAY_NAME))) {
if (SERVICE_ENTITY_SET.stream().anyMatch(s -> s.equalsIgnoreCase(entityType))) {
return SERVICE_ID;
}
}
return entityType + ".id";
}
private void propagateToDomainChildren(
String domainId, IndexMapping indexMapping, Pair<String, Map<String, Object>> updates)
throws IOException {
searchClient.updateChildren(
List.of(indexMapping.getIndexName(clusterAlias)),
new ImmutablePair<>(PARENT_ID, domainId),
updates);
searchClient.updateChildren(
List.of(entityIndexMap.get(Entity.DATA_PRODUCT).getIndexName(clusterAlias)),
new ImmutablePair<>(DOMAINS_ID, domainId),
updates);
}
public void propagateGlossaryTags(

View File

@ -22,11 +22,13 @@ import org.junit.jupiter.api.TestMethodOrder;
import org.openmetadata.schema.api.data.CreateDatabase;
import org.openmetadata.schema.api.data.CreateDatabaseSchema;
import org.openmetadata.schema.api.data.CreateTable;
import org.openmetadata.schema.api.domains.CreateDataProduct;
import org.openmetadata.schema.api.domains.CreateDomain;
import org.openmetadata.schema.api.teams.CreateUser;
import org.openmetadata.schema.entity.data.Database;
import org.openmetadata.schema.entity.data.DatabaseSchema;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.domains.DataProduct;
import org.openmetadata.schema.entity.domains.Domain;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.teams.User;
@ -41,6 +43,7 @@ import org.openmetadata.service.OpenMetadataApplicationTest;
import org.openmetadata.service.resources.databases.DatabaseResourceTest;
import org.openmetadata.service.resources.databases.DatabaseSchemaResourceTest;
import org.openmetadata.service.resources.databases.TableResourceTest;
import org.openmetadata.service.resources.domains.DataProductResourceTest;
import org.openmetadata.service.resources.domains.DomainResourceTest;
import org.openmetadata.service.resources.services.DatabaseServiceResourceTest;
import org.openmetadata.service.resources.teams.UserResourceTest;
@ -642,4 +645,135 @@ public class SearchPropagationIntegrationTest extends OpenMetadataApplicationTes
LOG.info(
"Search propagation metrics test completed - conditional propagation in search layer working as expected");
}
@Test
@Order(9)
void testOwnerPropagationFromDomainToSubdomain() throws IOException, InterruptedException {
LOG.info("Testing owner propagation from parent domain to subdomain in search index");
CreateUser createDomainOwner =
new CreateUser()
.withName("test_domain_owner_propagation")
.withEmail("test_domain_owner_propagation@openmetadata.org")
.withDisplayName("Test Domain Owner for Propagation");
User domainOwner = userResourceTest.createEntity(createDomainOwner, ADMIN_AUTH_HEADERS);
CreateDomain createParentDomain =
new CreateDomain()
.withName("test_parent_domain_propagation")
.withDisplayName("Test Parent Domain for Propagation")
.withDescription("Parent domain to test owner propagation")
.withDomainType(CreateDomain.DomainType.AGGREGATE);
Domain parentDomain = domainResourceTest.createEntity(createParentDomain, ADMIN_AUTH_HEADERS);
CreateDomain createSubDomain =
new CreateDomain()
.withName("test_subdomain_propagation")
.withDisplayName("Test Subdomain for Propagation")
.withDescription("Subdomain to test owner propagation from parent")
.withDomainType(CreateDomain.DomainType.AGGREGATE)
.withParent(parentDomain.getFullyQualifiedName());
Domain subDomain = domainResourceTest.createEntity(createSubDomain, ADMIN_AUTH_HEADERS);
simulateWork(2000);
EntityReference ownerRef =
new EntityReference()
.withId(domainOwner.getId())
.withType("user")
.withName(domainOwner.getName())
.withFullyQualifiedName(domainOwner.getFullyQualifiedName());
String jsonPatch =
JsonUtils.pojoToJson(
List.of(Map.of("op", "add", "path", "/owners", "value", List.of(ownerRef))));
Domain patchedParentDomain =
domainResourceTest.patchEntity(
parentDomain.getId(), JsonUtils.readTree(jsonPatch), ADMIN_AUTH_HEADERS);
assertNotNull(patchedParentDomain.getOwners());
assertEquals(1, patchedParentDomain.getOwners().size());
simulateWork(3000);
WebTarget subDomainSearchTarget =
getResource("search/query")
.queryParam("q", "fullyQualifiedName:" + subDomain.getFullyQualifiedName())
.queryParam("index", "domain_search_index");
String subDomainSearchResponse =
TestUtils.get(subDomainSearchTarget, String.class, ADMIN_AUTH_HEADERS);
assertTrue(
subDomainSearchResponse.contains(domainOwner.getId().toString()),
"Subdomain search result should contain owner ID propagated from parent domain");
LOG.info(
"Owner propagation from domain to subdomain test passed - owner propagated in search index");
}
@Test
@Order(10)
void testOwnerPropagationFromDomainToDataProduct() throws IOException, InterruptedException {
LOG.info("Testing owner propagation from domain to data product in search index");
CreateUser createDomainOwner =
new CreateUser()
.withName("test_domain_owner_dp_propagation")
.withEmail("test_domain_owner_dp_propagation@openmetadata.org")
.withDisplayName("Test Domain Owner for DP Propagation");
User domainOwner = userResourceTest.createEntity(createDomainOwner, ADMIN_AUTH_HEADERS);
CreateDomain createDomain =
new CreateDomain()
.withName("test_domain_dp_propagation")
.withDisplayName("Test Domain for DP Propagation")
.withDescription("Domain to test owner propagation to data product")
.withDomainType(CreateDomain.DomainType.AGGREGATE);
Domain domain = domainResourceTest.createEntity(createDomain, ADMIN_AUTH_HEADERS);
EntityReference ownerRef =
new EntityReference()
.withId(domainOwner.getId())
.withType("user")
.withName(domainOwner.getName())
.withFullyQualifiedName(domainOwner.getFullyQualifiedName());
String jsonPatch =
JsonUtils.pojoToJson(
List.of(Map.of("op", "add", "path", "/owners", "value", List.of(ownerRef))));
Domain patchedDomain =
domainResourceTest.patchEntity(
domain.getId(), JsonUtils.readTree(jsonPatch), ADMIN_AUTH_HEADERS);
assertNotNull(patchedDomain.getOwners());
assertEquals(1, patchedDomain.getOwners().size());
DataProductResourceTest dataProductResourceTest = new DataProductResourceTest();
CreateDataProduct createDataProduct =
new CreateDataProduct()
.withName("test_data_product_propagation")
.withDisplayName("Test Data Product for Propagation")
.withDescription("Data product to test owner propagation from domain")
.withDomains(List.of(domain.getFullyQualifiedName()));
DataProduct dataProduct =
dataProductResourceTest.createEntity(createDataProduct, ADMIN_AUTH_HEADERS);
simulateWork(3000);
WebTarget dataProductSearchTarget =
getResource("search/query")
.queryParam("q", "fullyQualifiedName:" + dataProduct.getFullyQualifiedName())
.queryParam("index", "data_product_search_index");
String dataProductSearchResponse =
TestUtils.get(dataProductSearchTarget, String.class, ADMIN_AUTH_HEADERS);
assertTrue(
dataProductSearchResponse.contains(domainOwner.getId().toString()),
"Data product search result should contain owner ID propagated from domain");
LOG.info(
"Owner propagation from domain to data product test passed - owner propagated in search index");
}
}