mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-10 16:25:37 +00:00
Fix: Domain assets count mismatch between API and UI (#23620)
* Fix: Domain assets count mismatch between API and UI * exclude SearchDerivedFields from reindex * refactor fallback and have new field assetsCount * Update generated TypeScript types * refactor: clean up groupEntitiesByType function * add tests --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
parent
e5cbb7a0e1
commit
c761ce9fbe
@ -17,6 +17,7 @@ import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDa
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -68,6 +69,8 @@ import org.openmetadata.service.search.RecreateIndexHandler.ReindexContext;
|
||||
import org.openmetadata.service.search.SearchClusterMetrics;
|
||||
import org.openmetadata.service.search.SearchRepository;
|
||||
import org.openmetadata.service.socket.WebSocketManager;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
import org.openmetadata.service.util.RestUtil;
|
||||
import org.openmetadata.service.workflows.interfaces.Source;
|
||||
@ -1376,8 +1379,20 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
||||
|
||||
private List<String> getSearchIndexFields(String entityType) {
|
||||
if (TIME_SERIES_ENTITIES.contains(entityType)) {
|
||||
return List.of(); // Empty list for time series
|
||||
return List.of();
|
||||
}
|
||||
|
||||
EntityRepository<?> repository = Entity.getEntityRepository(entityType);
|
||||
Set<String> searchDerivedFields = repository.getSearchDerivedFields();
|
||||
|
||||
// Excludes search-derived fields during reindexing to avoid circular dependencies.
|
||||
if (!searchDerivedFields.isEmpty()) {
|
||||
Fields fieldsWithExclusions =
|
||||
EntityUtil.Fields.createWithExcludedFields(
|
||||
repository.getAllowedFieldsCopy(), searchDerivedFields);
|
||||
return new ArrayList<>(fieldsWithExclusions.getFieldList());
|
||||
}
|
||||
|
||||
return List.of("*");
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -46,6 +47,10 @@ import org.openmetadata.schema.utils.JsonUtils;
|
||||
import org.openmetadata.schema.utils.ResultList;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.resources.domains.DomainResource;
|
||||
import org.openmetadata.service.search.DefaultInheritedFieldEntitySearch;
|
||||
import org.openmetadata.service.search.InheritedFieldEntitySearch;
|
||||
import org.openmetadata.service.search.InheritedFieldEntitySearch.InheritedFieldQuery;
|
||||
import org.openmetadata.service.search.InheritedFieldEntitySearch.InheritedFieldResult;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
import org.openmetadata.service.util.FullyQualifiedName;
|
||||
@ -54,6 +59,9 @@ import org.openmetadata.service.util.LineageUtil;
|
||||
@Slf4j
|
||||
public class DomainRepository extends EntityRepository<Domain> {
|
||||
private static final String UPDATE_FIELDS = "parent,children,experts";
|
||||
private static final String FIELD_ASSETS_COUNT = "assetsCount";
|
||||
|
||||
private InheritedFieldEntitySearch inheritedFieldEntitySearch;
|
||||
|
||||
public DomainRepository() {
|
||||
super(
|
||||
@ -65,15 +73,27 @@ public class DomainRepository extends EntityRepository<Domain> {
|
||||
UPDATE_FIELDS);
|
||||
supportsSearch = true;
|
||||
|
||||
// Initialize inherited field search
|
||||
if (searchRepository != null) {
|
||||
inheritedFieldEntitySearch = new DefaultInheritedFieldEntitySearch(searchRepository);
|
||||
}
|
||||
|
||||
// Register bulk field fetchers for efficient database operations
|
||||
fieldFetchers.put(FIELD_ASSETS, this::fetchAndSetAssets);
|
||||
fieldFetchers.put(FIELD_ASSETS_COUNT, this::fetchAndSetAssetsCount);
|
||||
fieldFetchers.put("parent", this::fetchAndSetParents);
|
||||
fieldFetchers.put("experts", this::fetchAndSetExperts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getSearchDerivedFields() {
|
||||
return Set.of(FIELD_ASSETS, FIELD_ASSETS_COUNT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFields(Domain entity, Fields fields) {
|
||||
entity.withAssets(fields.contains(FIELD_ASSETS) ? getAssets(entity) : null);
|
||||
entity.withAssetsCount(fields.contains(FIELD_ASSETS_COUNT) ? getAssets(entity).size() : 0);
|
||||
entity.withParent(getParent(entity));
|
||||
}
|
||||
|
||||
@ -92,6 +112,13 @@ public class DomainRepository extends EntityRepository<Domain> {
|
||||
setFieldFromMap(true, domains, batchFetchAssets(domains), Domain::setAssets);
|
||||
}
|
||||
|
||||
private void fetchAndSetAssetsCount(List<Domain> domains, Fields fields) {
|
||||
if (!fields.contains(FIELD_ASSETS_COUNT) || domains == null || domains.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
setFieldFromMap(true, domains, batchFetchAssetsCount(domains), Domain::setAssetsCount);
|
||||
}
|
||||
|
||||
private void fetchAndSetParents(List<Domain> domains, Fields fields) {
|
||||
if (!fields.contains("parent") || domains == null || domains.isEmpty()) {
|
||||
return;
|
||||
@ -147,7 +174,22 @@ public class DomainRepository extends EntityRepository<Domain> {
|
||||
}
|
||||
|
||||
private List<EntityReference> getAssets(Domain entity) {
|
||||
return findTo(entity.getId(), DOMAIN, Relationship.HAS, null);
|
||||
if (inheritedFieldEntitySearch == null) {
|
||||
LOG.warn("Search is unavailable for domain assets. Returning empty list for consistency.");
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
InheritedFieldQuery query = InheritedFieldQuery.forDomain(entity.getFullyQualifiedName());
|
||||
InheritedFieldResult result =
|
||||
inheritedFieldEntitySearch.getEntitiesForField(
|
||||
query,
|
||||
() -> {
|
||||
LOG.warn(
|
||||
"Search fallback triggered for domain {}. Returning empty list for consistency.",
|
||||
entity.getFullyQualifiedName());
|
||||
return new InheritedFieldResult(new ArrayList<>(), 0);
|
||||
});
|
||||
return result.entities();
|
||||
}
|
||||
|
||||
public BulkOperationResult bulkAddAssets(String domainName, BulkAssets request) {
|
||||
@ -348,31 +390,31 @@ public class DomainRepository extends EntityRepository<Domain> {
|
||||
}
|
||||
|
||||
private Map<UUID, List<EntityReference>> batchFetchAssets(List<Domain> domains) {
|
||||
var assetsMap = new HashMap<UUID, List<EntityReference>>();
|
||||
if (domains == null || domains.isEmpty()) {
|
||||
return assetsMap;
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
// Initialize empty lists for all domains
|
||||
domains.forEach(domain -> assetsMap.put(domain.getId(), new ArrayList<>()));
|
||||
|
||||
// Single batch query to get all assets for all domains
|
||||
var records =
|
||||
daoCollection
|
||||
.relationshipDAO()
|
||||
.findToBatchAllTypes(
|
||||
entityListToStrings(domains), Relationship.HAS.ordinal(), Include.ALL);
|
||||
|
||||
// Group assets by domain ID
|
||||
records.forEach(
|
||||
record -> {
|
||||
var domainId = UUID.fromString(record.getFromId());
|
||||
var assetRef =
|
||||
getEntityReferenceById(
|
||||
record.getToEntity(), UUID.fromString(record.getToId()), NON_DELETED);
|
||||
assetsMap.get(domainId).add(assetRef);
|
||||
});
|
||||
if (inheritedFieldEntitySearch == null) {
|
||||
LOG.warn("Search is unavailable for domain assets. Returning empty lists for consistency.");
|
||||
var emptyMap = new HashMap<UUID, List<EntityReference>>();
|
||||
domains.forEach(domain -> emptyMap.put(domain.getId(), new ArrayList<>()));
|
||||
return emptyMap;
|
||||
}
|
||||
|
||||
var assetsMap = new HashMap<UUID, List<EntityReference>>();
|
||||
for (Domain domain : domains) {
|
||||
InheritedFieldQuery query = InheritedFieldQuery.forDomain(domain.getFullyQualifiedName());
|
||||
InheritedFieldResult result =
|
||||
inheritedFieldEntitySearch.getEntitiesForField(
|
||||
query,
|
||||
() -> {
|
||||
LOG.warn(
|
||||
"Search fallback triggered for domain {}. Returning empty list for consistency.",
|
||||
domain.getFullyQualifiedName());
|
||||
return new InheritedFieldResult(new ArrayList<>(), 0);
|
||||
});
|
||||
assetsMap.put(domain.getId(), result.entities());
|
||||
}
|
||||
return assetsMap;
|
||||
}
|
||||
|
||||
@ -426,4 +468,34 @@ public class DomainRepository extends EntityRepository<Domain> {
|
||||
|
||||
return expertsMap;
|
||||
}
|
||||
|
||||
private Map<UUID, Integer> batchFetchAssetsCount(List<Domain> domains) {
|
||||
if (domains == null || domains.isEmpty()) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
if (inheritedFieldEntitySearch == null) {
|
||||
LOG.warn(
|
||||
"Search is unavailable for domain asset counts. Returning 0 for all domains for consistency.");
|
||||
var emptyCountMap = new HashMap<UUID, Integer>();
|
||||
domains.forEach(domain -> emptyCountMap.put(domain.getId(), 0));
|
||||
return emptyCountMap;
|
||||
}
|
||||
|
||||
Map<UUID, Integer> countsById = new HashMap<>();
|
||||
for (Domain domain : domains) {
|
||||
InheritedFieldQuery query = InheritedFieldQuery.forDomain(domain.getFullyQualifiedName());
|
||||
Integer count =
|
||||
inheritedFieldEntitySearch.getCountForField(
|
||||
query,
|
||||
() -> {
|
||||
LOG.warn(
|
||||
"Search fallback triggered for domain {}. Returning 0 for consistency.",
|
||||
domain.getFullyQualifiedName());
|
||||
return 0;
|
||||
});
|
||||
countsById.put(domain.getId(), count);
|
||||
}
|
||||
return countsById;
|
||||
}
|
||||
}
|
||||
|
@ -264,6 +264,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
.expireAfterWrite(30, TimeUnit.SECONDS)
|
||||
.recordStats()
|
||||
.build(new EntityLoaderWithId());
|
||||
|
||||
private final String collectionPath;
|
||||
@Getter public final Class<T> entityClass;
|
||||
@Getter protected final String entityType;
|
||||
@ -3404,6 +3405,15 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
||||
return new HashSet<>(allowedFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns entity fields not stored in the database but derived from search operations.
|
||||
*
|
||||
* @return Set of field names to exclude during reindexing. Empty by default.
|
||||
*/
|
||||
public Set<String> getSearchDerivedFields() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
protected String getCustomPropertyFQNPrefix(String entityType) {
|
||||
return FullyQualifiedName.build(entityType, "customProperties");
|
||||
}
|
||||
|
@ -0,0 +1,224 @@
|
||||
/*
|
||||
* Copyright 2024 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.search.SearchRequest;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.utils.JsonUtils;
|
||||
|
||||
@Slf4j
|
||||
public class DefaultInheritedFieldEntitySearch implements InheritedFieldEntitySearch {
|
||||
|
||||
private static final int MAX_PAGE_SIZE = 1000;
|
||||
private static final String EMPTY_QUERY = "";
|
||||
private static final String EMPTY_JSON = "{}";
|
||||
|
||||
// Elasticsearch/OpenSearch response field names
|
||||
private static final String HITS_KEY = "hits";
|
||||
private static final String SOURCE_KEY = "_source";
|
||||
private static final String TOTAL_KEY = "total";
|
||||
private static final String VALUE_KEY = "value";
|
||||
private static final String ENTITY_TYPE_KEY = "entityType";
|
||||
private static final String TYPE_KEY = "type";
|
||||
|
||||
private final SearchRepository searchRepository;
|
||||
|
||||
public DefaultInheritedFieldEntitySearch(SearchRepository searchRepository) {
|
||||
this.searchRepository = searchRepository;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InheritedFieldResult getEntitiesForField(
|
||||
InheritedFieldQuery query, Supplier<InheritedFieldResult> fallback) {
|
||||
try {
|
||||
if (isSearchUnavailable()) {
|
||||
return fallback.get();
|
||||
}
|
||||
|
||||
String queryFilter = getQueryFilter(query);
|
||||
|
||||
Integer totalCount = fetchTotalCount(queryFilter);
|
||||
|
||||
if (totalCount == 0) {
|
||||
return new InheritedFieldResult(Collections.emptyList(), 0);
|
||||
}
|
||||
|
||||
if (query.getSize() == 0) {
|
||||
return new InheritedFieldResult(Collections.emptyList(), totalCount);
|
||||
}
|
||||
|
||||
int entitiesToFetch =
|
||||
query.getSize() > 0 ? Math.min(query.getSize(), totalCount) : totalCount;
|
||||
|
||||
List<EntityReference> allEntities = new ArrayList<>();
|
||||
int currentFrom = query.getFrom();
|
||||
|
||||
while (allEntities.size() < entitiesToFetch) {
|
||||
int batchSize = Math.min(MAX_PAGE_SIZE, entitiesToFetch - allEntities.size());
|
||||
|
||||
SearchRequest searchRequest = buildSearchRequest(currentFrom, batchSize, queryFilter, true);
|
||||
|
||||
Response response = searchRepository.search(searchRequest, null);
|
||||
String responseBody = extractResponseBody(response);
|
||||
JsonNode searchResponse = JsonUtils.readTree(responseBody);
|
||||
|
||||
List<EntityReference> batchEntities =
|
||||
extractEntityReferencesFromSearchResponse(searchResponse);
|
||||
if (batchEntities.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
allEntities.addAll(batchEntities);
|
||||
currentFrom += batchSize;
|
||||
}
|
||||
|
||||
return new InheritedFieldResult(allEntities, totalCount);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Failed to fetch entities for inherited field, using fallback", e);
|
||||
return fallback.get();
|
||||
}
|
||||
}
|
||||
|
||||
private Integer fetchTotalCount(String queryFilter) throws Exception {
|
||||
SearchRequest countRequest = buildSearchRequest(0, 0, queryFilter, false);
|
||||
|
||||
Response response = searchRepository.search(countRequest, null);
|
||||
String responseBody = extractResponseBody(response);
|
||||
JsonNode searchResponse = JsonUtils.readTree(responseBody);
|
||||
|
||||
return extractTotalCountFromSearchResponse(searchResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getCountForField(InheritedFieldQuery query, Supplier<Integer> fallback) {
|
||||
try {
|
||||
if (isSearchUnavailable()) {
|
||||
return fallback.get();
|
||||
}
|
||||
|
||||
String queryFilter = getQueryFilter(query);
|
||||
SearchRequest searchRequest = buildSearchRequest(0, 0, queryFilter, false);
|
||||
|
||||
Response response = searchRepository.search(searchRequest, null);
|
||||
|
||||
String responseBody = extractResponseBody(response);
|
||||
JsonNode searchResponse = JsonUtils.readTree(responseBody);
|
||||
return extractTotalCountFromSearchResponse(searchResponse);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Failed to get count for inherited field, using fallback", e);
|
||||
return fallback.get();
|
||||
}
|
||||
}
|
||||
|
||||
private String getQueryFilter(InheritedFieldQuery query) {
|
||||
return switch (query.getFilterType()) {
|
||||
case DOMAIN_ASSETS -> QueryFilterBuilder.buildDomainAssetsFilter(query);
|
||||
case OWNER_ASSETS -> QueryFilterBuilder.buildOwnerAssetsFilter(query);
|
||||
case TAG_ASSETS -> QueryFilterBuilder.buildTagAssetsFilter(query);
|
||||
case GENERIC -> QueryFilterBuilder.buildGenericFilter(query);
|
||||
};
|
||||
}
|
||||
|
||||
private String extractResponseBody(Response response) {
|
||||
Object entity = response.getEntity();
|
||||
return entity != null ? entity.toString() : EMPTY_JSON;
|
||||
}
|
||||
|
||||
private List<EntityReference> extractEntityReferencesFromSearchResponse(JsonNode searchResponse) {
|
||||
List<EntityReference> entities = new ArrayList<>();
|
||||
|
||||
JsonNode searchResults = searchResponse.path(HITS_KEY).path(HITS_KEY);
|
||||
if (!searchResults.isArray()) {
|
||||
return entities;
|
||||
}
|
||||
|
||||
for (JsonNode searchHit : searchResults) {
|
||||
JsonNode documentSource = searchHit.path(SOURCE_KEY);
|
||||
if (documentSource.isMissingNode()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
EntityReference entityRef = extractEntityReferenceFromDocument(documentSource);
|
||||
entities.add(entityRef);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to extract EntityReference from document: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
return entities;
|
||||
}
|
||||
|
||||
private EntityReference extractEntityReferenceFromDocument(JsonNode document) throws Exception {
|
||||
ObjectNode modifiedDocument = document.deepCopy();
|
||||
if (modifiedDocument.has(ENTITY_TYPE_KEY) && !modifiedDocument.has(TYPE_KEY)) {
|
||||
modifiedDocument.set(TYPE_KEY, modifiedDocument.get(ENTITY_TYPE_KEY));
|
||||
modifiedDocument.remove(ENTITY_TYPE_KEY);
|
||||
}
|
||||
|
||||
ObjectMapper mapper = JsonUtils.getObjectMapper().copy();
|
||||
mapper.configure(
|
||||
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
return mapper.readValue(modifiedDocument.toString(), EntityReference.class);
|
||||
}
|
||||
|
||||
private Integer extractTotalCountFromSearchResponse(JsonNode searchResponse) {
|
||||
JsonNode total = searchResponse.path(HITS_KEY).path(TOTAL_KEY);
|
||||
if (total.has(VALUE_KEY)) {
|
||||
return total.get(VALUE_KEY).asInt();
|
||||
}
|
||||
return total.asInt(0);
|
||||
}
|
||||
|
||||
private boolean isSearchUnavailable() {
|
||||
try {
|
||||
if (searchRepository == null
|
||||
|| searchRepository.getSearchClient() == null
|
||||
|| !searchRepository.getSearchClient().isClientAvailable()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
String indexName = searchRepository.getIndexOrAliasName(GLOBAL_SEARCH_ALIAS);
|
||||
return indexName == null || indexName.isEmpty();
|
||||
} catch (Exception e) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private SearchRequest buildSearchRequest(
|
||||
int from, int size, String queryFilter, boolean fetchSource) {
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
searchRequest.setIndex(searchRepository.getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
|
||||
searchRequest.setQuery(EMPTY_QUERY);
|
||||
searchRequest.setFrom(from);
|
||||
searchRequest.setSize(size);
|
||||
searchRequest.setQueryFilter(queryFilter);
|
||||
searchRequest.setTrackTotalHits(true);
|
||||
searchRequest.setFetchSource(fetchSource);
|
||||
return searchRequest;
|
||||
}
|
||||
}
|
@ -0,0 +1,179 @@
|
||||
/*
|
||||
* Copyright 2024 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
|
||||
public interface InheritedFieldEntitySearch {
|
||||
|
||||
InheritedFieldResult getEntitiesForField(
|
||||
InheritedFieldQuery query, Supplier<InheritedFieldResult> fallback);
|
||||
|
||||
Integer getCountForField(InheritedFieldQuery query, Supplier<Integer> fallback);
|
||||
|
||||
enum QueryFilterType {
|
||||
DOMAIN_ASSETS,
|
||||
OWNER_ASSETS,
|
||||
TAG_ASSETS,
|
||||
GENERIC
|
||||
}
|
||||
|
||||
class InheritedFieldQuery {
|
||||
private final String fieldPath;
|
||||
private final String fieldValue;
|
||||
private final boolean supportsHierarchy;
|
||||
private final String entityTypeFilter;
|
||||
private final int from;
|
||||
private final int size;
|
||||
private final boolean includeDeleted;
|
||||
private final QueryFilterType filterType;
|
||||
|
||||
private InheritedFieldQuery(Builder builder) {
|
||||
this.fieldPath = builder.fieldPath;
|
||||
this.fieldValue = builder.fieldValue;
|
||||
this.supportsHierarchy = builder.supportsHierarchy;
|
||||
this.entityTypeFilter = builder.entityTypeFilter;
|
||||
this.from = builder.from;
|
||||
this.size = builder.size;
|
||||
this.includeDeleted = builder.includeDeleted;
|
||||
this.filterType = builder.filterType;
|
||||
}
|
||||
|
||||
public String getFieldPath() {
|
||||
return fieldPath;
|
||||
}
|
||||
|
||||
public String getFieldValue() {
|
||||
return fieldValue;
|
||||
}
|
||||
|
||||
public boolean isSupportsHierarchy() {
|
||||
return supportsHierarchy;
|
||||
}
|
||||
|
||||
public String getEntityTypeFilter() {
|
||||
return entityTypeFilter;
|
||||
}
|
||||
|
||||
public int getFrom() {
|
||||
return from;
|
||||
}
|
||||
|
||||
public int getSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
public boolean isIncludeDeleted() {
|
||||
return includeDeleted;
|
||||
}
|
||||
|
||||
public QueryFilterType getFilterType() {
|
||||
return filterType;
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private String fieldPath;
|
||||
private String fieldValue;
|
||||
private boolean supportsHierarchy = false;
|
||||
private String entityTypeFilter;
|
||||
private int from = 0;
|
||||
private int size = 100;
|
||||
private boolean includeDeleted = false;
|
||||
private QueryFilterType filterType = QueryFilterType.GENERIC;
|
||||
|
||||
public Builder fieldPath(String fieldPath) {
|
||||
this.fieldPath = fieldPath;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder fieldValue(String fieldValue) {
|
||||
this.fieldValue = fieldValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder supportsHierarchy(boolean supportsHierarchy) {
|
||||
this.supportsHierarchy = supportsHierarchy;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder entityTypeFilter(String entityTypeFilter) {
|
||||
this.entityTypeFilter = entityTypeFilter;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder from(int from) {
|
||||
this.from = from;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder size(int size) {
|
||||
this.size = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder includeDeleted(boolean includeDeleted) {
|
||||
this.includeDeleted = includeDeleted;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder filterType(QueryFilterType filterType) {
|
||||
this.filterType = filterType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public InheritedFieldQuery build() {
|
||||
if (fieldPath == null || fieldValue == null) {
|
||||
throw new IllegalArgumentException("fieldPath and fieldValue are required");
|
||||
}
|
||||
return new InheritedFieldQuery(this);
|
||||
}
|
||||
}
|
||||
|
||||
public static InheritedFieldQuery forDomain(String domainFqn) {
|
||||
return builder()
|
||||
.fieldPath("domains.fullyQualifiedName")
|
||||
.fieldValue(domainFqn)
|
||||
.supportsHierarchy(true)
|
||||
.filterType(QueryFilterType.DOMAIN_ASSETS)
|
||||
.includeDeleted(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static InheritedFieldQuery forOwner(String ownerId) {
|
||||
return builder()
|
||||
.fieldPath("owners.id")
|
||||
.fieldValue(ownerId)
|
||||
.supportsHierarchy(false)
|
||||
.filterType(QueryFilterType.OWNER_ASSETS)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static InheritedFieldQuery forTag(String tagFqn) {
|
||||
return builder()
|
||||
.fieldPath("tags.tagFQN")
|
||||
.fieldValue(tagFqn)
|
||||
.supportsHierarchy(false)
|
||||
.filterType(QueryFilterType.TAG_ASSETS)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
record InheritedFieldResult(List<EntityReference> entities, Integer total) {}
|
||||
}
|
@ -0,0 +1,147 @@
|
||||
/*
|
||||
* Copyright 2024 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.openmetadata.service.search;
|
||||
|
||||
import static org.openmetadata.service.Entity.DATA_PRODUCT;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.utils.JsonUtils;
|
||||
import org.openmetadata.service.search.InheritedFieldEntitySearch.InheritedFieldQuery;
|
||||
|
||||
@Slf4j
|
||||
public class QueryFilterBuilder {
|
||||
|
||||
private static final String QUERY_KEY = "query";
|
||||
private static final String BOOL_KEY = "bool";
|
||||
private static final String MUST_KEY = "must";
|
||||
private static final String MUST_NOT_KEY = "must_not";
|
||||
private static final String SHOULD_KEY = "should";
|
||||
private static final String TERM_KEY = "term";
|
||||
private static final String PREFIX_KEY = "prefix";
|
||||
private static final String DELETED_KEY = "deleted";
|
||||
private static final String ENTITY_TYPE_KEY = "entityType";
|
||||
private static final String EMPTY_JSON = "{}";
|
||||
private static final String HIERARCHY_SEPARATOR = ".";
|
||||
|
||||
private static final ObjectMapper MAPPER = JsonUtils.getObjectMapper();
|
||||
|
||||
public static String buildDomainAssetsFilter(InheritedFieldQuery query) {
|
||||
ObjectNode queryFilter = MAPPER.createObjectNode();
|
||||
ObjectNode queryNode = queryFilter.putObject(QUERY_KEY);
|
||||
ObjectNode boolNode = queryNode.putObject(BOOL_KEY);
|
||||
ArrayNode mustArray = boolNode.putArray(MUST_KEY);
|
||||
|
||||
addHierarchyCondition(mustArray, query.getFieldPath(), query.getFieldValue());
|
||||
addCommonFilters(mustArray, query);
|
||||
|
||||
// Exclude data products from domain assets
|
||||
ArrayNode mustNotArray = boolNode.putArray(MUST_NOT_KEY);
|
||||
ObjectNode dataProductNode = MAPPER.createObjectNode();
|
||||
dataProductNode.putObject(TERM_KEY).put(ENTITY_TYPE_KEY, DATA_PRODUCT);
|
||||
mustNotArray.add(dataProductNode);
|
||||
|
||||
return serializeQuery(queryFilter);
|
||||
}
|
||||
|
||||
public static String buildOwnerAssetsFilter(InheritedFieldQuery query) {
|
||||
ObjectNode queryFilter = MAPPER.createObjectNode();
|
||||
ObjectNode queryNode = queryFilter.putObject(QUERY_KEY);
|
||||
ObjectNode boolNode = queryNode.putObject(BOOL_KEY);
|
||||
ArrayNode mustArray = boolNode.putArray(MUST_KEY);
|
||||
|
||||
addExactMatchCondition(mustArray, query.getFieldPath(), query.getFieldValue());
|
||||
addCommonFilters(mustArray, query);
|
||||
|
||||
return serializeQuery(queryFilter);
|
||||
}
|
||||
|
||||
public static String buildTagAssetsFilter(InheritedFieldQuery query) {
|
||||
ObjectNode queryFilter = MAPPER.createObjectNode();
|
||||
ObjectNode queryNode = queryFilter.putObject(QUERY_KEY);
|
||||
ObjectNode boolNode = queryNode.putObject(BOOL_KEY);
|
||||
ArrayNode mustArray = boolNode.putArray(MUST_KEY);
|
||||
|
||||
addExactMatchCondition(mustArray, query.getFieldPath(), query.getFieldValue());
|
||||
addCommonFilters(mustArray, query);
|
||||
|
||||
return serializeQuery(queryFilter);
|
||||
}
|
||||
|
||||
public static String buildGenericFilter(InheritedFieldQuery query) {
|
||||
ObjectNode queryFilter = MAPPER.createObjectNode();
|
||||
ObjectNode queryNode = queryFilter.putObject(QUERY_KEY);
|
||||
ObjectNode boolNode = queryNode.putObject(BOOL_KEY);
|
||||
ArrayNode mustArray = boolNode.putArray(MUST_KEY);
|
||||
|
||||
if (query.isSupportsHierarchy()) {
|
||||
addHierarchyCondition(mustArray, query.getFieldPath(), query.getFieldValue());
|
||||
} else {
|
||||
addExactMatchCondition(mustArray, query.getFieldPath(), query.getFieldValue());
|
||||
}
|
||||
addCommonFilters(mustArray, query);
|
||||
|
||||
return serializeQuery(queryFilter);
|
||||
}
|
||||
|
||||
private static void addHierarchyCondition(
|
||||
ArrayNode mustArray, String fieldPath, String fieldValue) {
|
||||
ObjectNode fieldCondition = MAPPER.createObjectNode();
|
||||
ObjectNode innerBool = fieldCondition.putObject(BOOL_KEY);
|
||||
ArrayNode shouldArray = innerBool.putArray(SHOULD_KEY);
|
||||
|
||||
ObjectNode termNode = MAPPER.createObjectNode();
|
||||
termNode.putObject(TERM_KEY).put(fieldPath, fieldValue);
|
||||
shouldArray.add(termNode);
|
||||
|
||||
ObjectNode prefixNode = MAPPER.createObjectNode();
|
||||
prefixNode.putObject(PREFIX_KEY).put(fieldPath, fieldValue + HIERARCHY_SEPARATOR);
|
||||
shouldArray.add(prefixNode);
|
||||
|
||||
mustArray.add(fieldCondition);
|
||||
}
|
||||
|
||||
private static void addExactMatchCondition(
|
||||
ArrayNode mustArray, String fieldPath, String fieldValue) {
|
||||
ObjectNode termNode = MAPPER.createObjectNode();
|
||||
termNode.putObject(TERM_KEY).put(fieldPath, fieldValue);
|
||||
mustArray.add(termNode);
|
||||
}
|
||||
|
||||
private static void addCommonFilters(ArrayNode mustArray, InheritedFieldQuery query) {
|
||||
if (!query.isIncludeDeleted()) {
|
||||
ObjectNode deletedNode = MAPPER.createObjectNode();
|
||||
deletedNode.putObject(TERM_KEY).put(DELETED_KEY, false);
|
||||
mustArray.add(deletedNode);
|
||||
}
|
||||
|
||||
if (query.getEntityTypeFilter() != null && !query.getEntityTypeFilter().isEmpty()) {
|
||||
ObjectNode entityTypeNode = MAPPER.createObjectNode();
|
||||
entityTypeNode.putObject(TERM_KEY).put(ENTITY_TYPE_KEY, query.getEntityTypeFilter());
|
||||
mustArray.add(entityTypeNode);
|
||||
}
|
||||
}
|
||||
|
||||
private static String serializeQuery(ObjectNode queryFilter) {
|
||||
try {
|
||||
return MAPPER.writeValueAsString(queryFilter);
|
||||
} catch (JsonProcessingException e) {
|
||||
LOG.warn("Failed to serialize query filter", e);
|
||||
return EMPTY_JSON;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package org.openmetadata.service.search.indexes;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.entity.domains.Domain;
|
||||
import org.openmetadata.service.Entity;
|
||||
@ -8,6 +9,7 @@ import org.openmetadata.service.search.ParseTags;
|
||||
|
||||
@Slf4j
|
||||
public record DomainIndex(Domain domain) implements SearchIndex {
|
||||
private static final Set<String> excludeFields = Set.of("assets", "assetsCount");
|
||||
|
||||
@Override
|
||||
public Object getEntity() {
|
||||
@ -23,6 +25,11 @@ public record DomainIndex(Domain domain) implements SearchIndex {
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getExcludedFields() {
|
||||
return excludeFields;
|
||||
}
|
||||
|
||||
public static Map<String, Float> getFields() {
|
||||
return SearchIndex.getDefaultFields();
|
||||
}
|
||||
|
@ -30,16 +30,21 @@ import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInfo;
|
||||
import org.openmetadata.schema.api.domains.CreateDomain;
|
||||
import org.openmetadata.schema.api.domains.CreateDomain.DomainType;
|
||||
import org.openmetadata.schema.entity.data.DatabaseSchema;
|
||||
import org.openmetadata.schema.entity.data.EntityHierarchy;
|
||||
import org.openmetadata.schema.entity.data.Table;
|
||||
import org.openmetadata.schema.entity.domains.Domain;
|
||||
import org.openmetadata.schema.entity.type.Style;
|
||||
import org.openmetadata.schema.type.ChangeDescription;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.api.BulkAssets;
|
||||
import org.openmetadata.schema.utils.JsonUtils;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.jdbi3.TableRepository;
|
||||
import org.openmetadata.service.resources.EntityResourceTest;
|
||||
import org.openmetadata.service.resources.databases.DatabaseSchemaResourceTest;
|
||||
import org.openmetadata.service.resources.databases.TableResourceTest;
|
||||
import org.openmetadata.service.resources.domains.DomainResource.DomainList;
|
||||
import org.openmetadata.service.util.EntityHierarchyList;
|
||||
import org.openmetadata.service.util.TestUtils;
|
||||
@ -421,6 +426,115 @@ public class DomainResourceTest extends EntityResourceTest<Domain, CreateDomain>
|
||||
return getDomain;
|
||||
}
|
||||
|
||||
@Test
|
||||
void test_domainAssetsAndAssetsCountWithSubdomainInheritance(TestInfo test) throws IOException {
|
||||
// Verify that domain.assets and domain.assetsCount include:
|
||||
// 1. Direct assets added to the domain
|
||||
// 2. Assets inherited from all subdomains in the hierarchy
|
||||
Domain domain = createEntity(createRequest(test), ADMIN_AUTH_HEADERS);
|
||||
Domain subDomain =
|
||||
createEntity(
|
||||
createRequest(getEntityName(test, 1)).withParent(domain.getFullyQualifiedName()),
|
||||
ADMIN_AUTH_HEADERS);
|
||||
|
||||
TableResourceTest tableTest = new TableResourceTest();
|
||||
Table table1 =
|
||||
tableTest.createEntity(tableTest.createRequest(getEntityName(test, 2)), ADMIN_AUTH_HEADERS);
|
||||
Table table2 =
|
||||
tableTest.createEntity(tableTest.createRequest(getEntityName(test, 3)), ADMIN_AUTH_HEADERS);
|
||||
|
||||
// Initially, domain should have no assets
|
||||
Domain fetchedDomain = getEntity(domain.getId(), "assets,assetsCount", ADMIN_AUTH_HEADERS);
|
||||
assertNotNull(fetchedDomain.getAssets());
|
||||
assertEquals(0, fetchedDomain.getAssets().size());
|
||||
assertNotNull(fetchedDomain.getAssetsCount());
|
||||
assertEquals(0, fetchedDomain.getAssetsCount());
|
||||
|
||||
// Add 1 direct asset to root domain
|
||||
bulkAddAssets(
|
||||
domain.getFullyQualifiedName(),
|
||||
new BulkAssets().withAssets(List.of(table1.getEntityReference())));
|
||||
|
||||
fetchedDomain = getEntity(domain.getId(), "assets,assetsCount", ADMIN_AUTH_HEADERS);
|
||||
assertEquals(1, fetchedDomain.getAssets().size());
|
||||
assertEquals(1, fetchedDomain.getAssetsCount());
|
||||
assertTrue(fetchedDomain.getAssets().stream().anyMatch(a -> a.getId().equals(table1.getId())));
|
||||
|
||||
// Add 1 asset to subdomain
|
||||
bulkAddAssets(
|
||||
subDomain.getFullyQualifiedName(),
|
||||
new BulkAssets().withAssets(List.of(table2.getEntityReference())));
|
||||
|
||||
// Verify root domain now shows 2 assets: 1 direct + 1 inherited from subdomain
|
||||
fetchedDomain = getEntity(domain.getId(), "assets,assetsCount", ADMIN_AUTH_HEADERS);
|
||||
assertEquals(
|
||||
2,
|
||||
fetchedDomain.getAssets().size(),
|
||||
"Domain should have 1 direct asset + 1 from subdomain");
|
||||
assertEquals(2, fetchedDomain.getAssetsCount());
|
||||
assertTrue(fetchedDomain.getAssets().stream().anyMatch(a -> a.getId().equals(table1.getId())));
|
||||
assertTrue(fetchedDomain.getAssets().stream().anyMatch(a -> a.getId().equals(table2.getId())));
|
||||
|
||||
// Verify subdomain shows only its direct asset
|
||||
Domain fetchedSubDomain =
|
||||
getEntity(subDomain.getId(), "assets,assetsCount", ADMIN_AUTH_HEADERS);
|
||||
assertEquals(1, fetchedSubDomain.getAssets().size());
|
||||
assertEquals(1, fetchedSubDomain.getAssetsCount());
|
||||
assertTrue(
|
||||
fetchedSubDomain.getAssets().stream().anyMatch(a -> a.getId().equals(table2.getId())));
|
||||
}
|
||||
|
||||
@Test
|
||||
void test_domainAssetsAndAssetsCountWithAssetInheritance(TestInfo test) throws IOException {
|
||||
// Verify that when a domain is assigned to a parent entity (like database schema),
|
||||
// the domain.assets and domain.assetsCount automatically include:
|
||||
// 1. The parent entity itself (schema)
|
||||
// 2. All child entities (tables) that belong to that parent
|
||||
Domain domain = createEntity(createRequest(test), ADMIN_AUTH_HEADERS);
|
||||
|
||||
DatabaseSchemaResourceTest schemaTest = new DatabaseSchemaResourceTest();
|
||||
TableResourceTest tableTest = new TableResourceTest();
|
||||
|
||||
// Create a schema with 2 tables
|
||||
DatabaseSchema schema =
|
||||
schemaTest.createEntity(
|
||||
schemaTest.createRequest(getEntityName(test, 1)), ADMIN_AUTH_HEADERS);
|
||||
|
||||
Table table1 =
|
||||
tableTest.createEntity(
|
||||
tableTest
|
||||
.createRequest(getEntityName(test, 2))
|
||||
.withDatabaseSchema(schema.getFullyQualifiedName()),
|
||||
ADMIN_AUTH_HEADERS);
|
||||
Table table2 =
|
||||
tableTest.createEntity(
|
||||
tableTest
|
||||
.createRequest(getEntityName(test, 3))
|
||||
.withDatabaseSchema(schema.getFullyQualifiedName()),
|
||||
ADMIN_AUTH_HEADERS);
|
||||
|
||||
// Assign domain to the schema (not using bulk assets API)
|
||||
String json = JsonUtils.pojoToJson(schema);
|
||||
schema.withDomains(List.of(domain.getEntityReference()));
|
||||
schemaTest.patchEntity(schema.getId(), json, schema, ADMIN_AUTH_HEADERS);
|
||||
|
||||
// Verify domain shows 3 assets: 1 schema + 2 tables inherited from that schema
|
||||
Domain fetchedDomain = getEntity(domain.getId(), "assets,assetsCount", ADMIN_AUTH_HEADERS);
|
||||
assertEquals(
|
||||
3,
|
||||
fetchedDomain.getAssets().size(),
|
||||
"Domain should have 1 schema + 2 inherited tables from schema");
|
||||
assertEquals(3, fetchedDomain.getAssetsCount());
|
||||
assertTrue(fetchedDomain.getAssets().stream().anyMatch(a -> a.getId().equals(schema.getId())));
|
||||
assertTrue(fetchedDomain.getAssets().stream().anyMatch(a -> a.getId().equals(table1.getId())));
|
||||
assertTrue(fetchedDomain.getAssets().stream().anyMatch(a -> a.getId().equals(table2.getId())));
|
||||
}
|
||||
|
||||
private void bulkAddAssets(String domainName, BulkAssets request) throws HttpResponseException {
|
||||
WebTarget target = getResource("domains/" + domainName + "/assets/add");
|
||||
TestUtils.put(target, request, Status.OK, ADMIN_AUTH_HEADERS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assertFieldChange(String fieldName, Object expected, Object actual) {
|
||||
if (expected == actual) {
|
||||
|
@ -83,6 +83,10 @@
|
||||
"description": "Data assets collection that is part of this domain.",
|
||||
"$ref" : "../../type/entityReferenceList.json"
|
||||
},
|
||||
"assetsCount": {
|
||||
"description": "Count of data assets that are part of this domain (including inherited from sub-domains).",
|
||||
"type": "integer"
|
||||
},
|
||||
"tags": {
|
||||
"description": "Tags associated with the Domain.",
|
||||
"type": "array",
|
||||
|
@ -19,6 +19,10 @@ export interface Domain {
|
||||
* Data assets collection that is part of this domain.
|
||||
*/
|
||||
assets?: EntityReference[];
|
||||
/**
|
||||
* Count of data assets that are part of this domain (including inherited from sub-domains).
|
||||
*/
|
||||
assetsCount?: number;
|
||||
/**
|
||||
* Change that lead to this version of the entity.
|
||||
*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user