fix: search cache invalidation for iceberg entities (#12805)

This commit is contained in:
Chakru 2025-03-07 15:24:14 +05:30 committed by GitHub
parent a101c27388
commit f053188bb6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 717 additions and 15 deletions

View File

@ -41,6 +41,7 @@ dependencies {
implementation externalDependency.slf4jApi
runtimeOnly externalDependency.logbackClassic
compileOnly externalDependency.lombok
compileOnly externalDependency.hazelcast
implementation externalDependency.commonsCollections
api externalDependency.datastaxOssNativeProtocol
api(externalDependency.datastaxOssCore) {
@ -98,7 +99,6 @@ dependencies {
testImplementation spec.product.pegasus.restliServer
testImplementation externalDependency.ebeanTest
testImplementation externalDependency.opentelemetrySdk
// logback >=1.3 required due to `testcontainers` only
testImplementation 'ch.qos.logback:logback-classic:1.4.7'
testImplementation 'net.datafaker:datafaker:1.9.0'

View File

@ -0,0 +1,122 @@
package com.linkedin.metadata.search.client;
import com.hazelcast.map.IMap;
import com.linkedin.common.urn.Urn;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
/**
* A framework to enable search cache invalidation. The cache keys in the search cache are queries
* of different forms and when an entity is modified, there isnt a simple direct correlation of that
* entity to the queries in the cache (except for fully evaluating that search). This service
* provides a mechanism to implement some a CacheKeyMatcher that implements some approximations to
* check if a cache key is likely related to some entity that was updated and clear those entries.
* The evict method can be called when entities are updated and it is important for those updates to
* be visible in the UI without waiting for cache expiration. The eviction is disabled by default
* and enabled via a spring application property searchService.enableEviction
*/
@RequiredArgsConstructor
@Slf4j
public class CacheEvictionService {
private final CacheManager cacheManager;
private final Boolean cachingEnabled;
private final Boolean enableEviction;
// invalidates all caches
public void invalidateAll() {
if (cachingEnabled && enableEviction) {
cacheManager.getCacheNames().forEach(this::invalidate);
}
}
// invalidates a specific cache
public void invalidate(String cacheName) {
if (cachingEnabled && enableEviction) {
Cache cache = cacheManager.getCache(cacheName);
if (cache != null) {
cache.invalidate();
} else {
throw new AssertionError(String.format("Invalid cache name %s supplied", cacheName));
}
}
}
// Runs all cache keys through the supplied matcher implementation and clear the cache keys
// identified by the matcher.
public void evict(CacheKeyMatcher matcher) {
if (cachingEnabled && enableEviction) {
Collection<String> cacheNames = cacheManager.getCacheNames();
for (String cacheName : cacheNames) {
long evictCount = 0;
if (matcher.supportsCache(cacheName)) {
Cache cache = cacheManager.getCache(cacheName);
assert (cache != null);
Set<Object> keys = getKeys(cacheName);
for (Object key : keys) {
if (matcher.match(cacheName, key)) {
cache.evict(key);
evictCount++;
log.debug("From cache '{}' evicting key {}", cacheName, key);
}
}
if (evictCount > 0) {
log.info("Evicted {} keys from cache {}", evictCount, cacheName);
}
}
}
}
}
// Use a UrnCacheKeyMatcher implement to evict cache keys that are likely related to the supplied
// list of urns
public void evict(List<Urn> urns) {
log.info("Attempting eviction of search cache due to updates to {}", urns);
UrnCacheKeyMatcher matcher = new UrnCacheKeyMatcher(urns);
evict(matcher);
}
private Set<Object> getKeys(String cacheName) {
// Enumerating cache keys is not part of the standard Cache interface, but needs is native cache
// implementation
// dependent and so must be implemented for all cache implementations we may use.
Cache springCache = cacheManager.getCache(cacheName);
assert (springCache != null);
Object nativeCache = springCache.getNativeCache();
if (nativeCache instanceof com.github.benmanes.caffeine.cache.Cache) {
com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache =
(com.github.benmanes.caffeine.cache.Cache<Object, Object>) nativeCache;
return caffeineCache.asMap().keySet();
} else if (nativeCache instanceof IMap) {
IMap<Object, Object> hazelCache = (IMap<Object, Object>) nativeCache;
return hazelCache.keySet();
}
log.warn("Unhandled cache type {} of type {}", cacheName, nativeCache.getClass());
return Collections.emptySet();
}
// Useful during matcher debugging, but voluminous
void dumpCache(String message) {
log.debug("Begin Dump {}", message);
cacheManager
.getCacheNames()
.forEach(
cacheName -> {
log.debug("Dump cache: {}", cacheName);
Cache cache = cacheManager.getCache(cacheName);
getKeys(cacheName)
.forEach(
key -> {
log.debug(" key {} : {}", key, cache.get(key));
});
});
}
}

View File

@ -0,0 +1,8 @@
package com.linkedin.metadata.search.client;
public interface CacheKeyMatcher {
boolean supportsCache(String cacheName);
// Called for each supported cache, with each key
boolean match(String cacheName, Object key);
}

View File

@ -30,10 +30,10 @@ import org.springframework.cache.CacheManager;
@RequiredArgsConstructor
public class CachingEntitySearchService {
private static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch";
private static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME =
public static final String ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME = "entitySearchServiceSearch";
public static final String ENTITY_SEARCH_SERVICE_AUTOCOMPLETE_CACHE_NAME =
"entitySearchServiceAutoComplete";
private static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse";
public static final String ENTITY_SEARCH_SERVICE_BROWSE_CACHE_NAME = "entitySearchServiceBrowse";
public static final String ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME = "entitySearchServiceScroll";
private final CacheManager cacheManager;

View File

@ -0,0 +1,123 @@
package com.linkedin.metadata.search.client;
import static com.linkedin.metadata.search.client.CachingEntitySearchService.*;
import com.linkedin.common.urn.Urn;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.javatuples.Octet;
import org.javatuples.Septet;
class UrnCacheKeyMatcher implements CacheKeyMatcher {
private final List<Urn> urns;
private final Set<String> entityTypes;
final List<String> SUPPORTED_CACHE_NAMES =
Arrays.asList(
ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME, ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME);
UrnCacheKeyMatcher(List<Urn> urns) {
this.urns = urns;
this.entityTypes = new HashSet<>();
urns.forEach(
urn -> {
this.entityTypes.add(urn.getEntityType());
});
}
@Override
public boolean supportsCache(String cacheName) {
return SUPPORTED_CACHE_NAMES.contains(cacheName);
}
@Override
public boolean match(String cacheName, Object key) {
switch (cacheName) {
case ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME:
return matchSearchServiceCacheKey(key);
case ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME:
return matchSearchServiceScrollCacheKey(key);
}
return false;
}
private boolean matchSearchServiceScrollCacheKey(Object key) {
Octet<?, List<String>, String, String, ?, ?, List<String>, ?> cacheKey =
(Octet<?, List<String>, String, String, ?, ?, List<String>, ?>) key;
// For reference - cache key contents
// @Nonnull OperationContext opContext,
// @Nonnull List<String> entities,
// @Nonnull String query,
// @Nullable Filter filters,
// List<SortCriterion> sortCriteria,
// @Nullable String scrollId,
// @Nonnull List<String> facets
// int size,
List<String> entitiesInCacheKey = (List<String>) cacheKey.getValue(1);
String filter = (String) cacheKey.getValue(3);
String query = (String) cacheKey.getValue(2);
List<String> facets = (List<String>) cacheKey.getValue(6);
if (filter == null) {
filter = "";
}
filter += " " + String.join(" ", facets);
// Facets may contain urns. Since the check for urns in filters is similar, can append it to the
// filter.
return isKeyImpactedByEntity(entitiesInCacheKey, query, filter);
}
private boolean matchSearchServiceCacheKey(Object key) {
Septet<?, List<String>, ?, String, ?, ?, ?> cacheKey =
(Septet<?, List<String>, ?, String, ?, ?, ?>) key;
// For reference
// @Nonnull OperationContext opContext,
// @Nonnull List<String> entityNames,
// @Nonnull String query,
// @Nullable Filter filters,
// List<SortCriterion> sortCriteria,
// @Nonnull List<String> facets
// querySize
List<String> entitiesInCacheKey = (List<String>) cacheKey.getValue(1);
String filter = (String) cacheKey.getValue(3);
String query = (String) cacheKey.getValue(2);
List<String> facets = (List<String>) cacheKey.getValue(5);
// Facets may contain urns. Since the check for urns in filters is similar, can append it to the
// filter.
if (filter == null) {
filter = "";
}
filter += " " + String.join(" ", facets);
return isKeyImpactedByEntity(entitiesInCacheKey, query, filter);
}
boolean isKeyImpactedByEntity(List<String> entitiesInCacheKey, String query, String filter) {
boolean entityMatch = entitiesInCacheKey.stream().anyMatch(entityTypes::contains);
if (!entityMatch) {
return false;
}
// Ignoring query for now. A query could make this cache entry more targeted, but till there is
// a quick way to evaluate if the entities that were updated are affected by this query,
// ignoring it may mean some cache entries are invalidated even if they may not be a match,
// and an uncached query result will still be fetched.
boolean containsUrn = filter.contains("urn:li");
if (!containsUrn) {
return true; // Entity match, has a filter, but not on urn. this may be a suboptimal
}
return urns.stream()
.anyMatch(
urn ->
filter.contains(
urn.toString())); // If we found an exact URN match, this is to be evicted. If
// this entry was for some other urn, do not evict.
}
}

View File

@ -0,0 +1,351 @@
package com.linkedin.metadata.search.client;
import static com.linkedin.metadata.search.client.CachingEntitySearchService.*;
import static org.testng.Assert.*;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.linkedin.common.urn.Urn;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.javatuples.Octet;
import org.javatuples.Septet;
import org.jetbrains.annotations.NotNull;
import org.springframework.cache.CacheManager;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class CacheEvictionServiceTest {
CacheEvictionService evictionService;
CacheManager cacheManager;
CacheManager cacheManagerWithCaffeine;
CacheManager cacheManagerWithHazelCast;
// HazelcastInstance hazelcastInstance;
int cacheKeyCount;
// We cant use the spring Caffeine cache Manager in metadata-io due to a java 11 dependency --
// this is not a problem with the gms, but an issue with just the metadata-io jar and the
// associated unit tests.
final Map<String, Cache> nativeCacheMapForCaffeine = new HashMap<>();
final String UNSUPPORTED_CACHE_NAME = "SampleUnsupportedCacheName";
@BeforeClass
void setupCacheManagers() {
// hazelcastInstance = Hazelcast.newHazelcastInstance();
// this.cacheManagerWithHazelCast = new HazelcastCacheManager(hazelcastInstance);
// Not using the remaining cache methods in the unit tests.
this.cacheManagerWithCaffeine =
new CacheManager() {
{
Caffeine<Object, Object> caffeine =
Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.MINUTES).maximumSize(2000);
nativeCacheMapForCaffeine.put(
ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME, caffeine.build());
nativeCacheMapForCaffeine.put(UNSUPPORTED_CACHE_NAME, caffeine.build());
nativeCacheMapForCaffeine.put(
ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME, caffeine.build());
}
@Override
public org.springframework.cache.Cache getCache(String name) {
if (name.equals("missingcache")) {
return null;
} else {
return new org.springframework.cache.Cache() {
@Override
public String getName() {
return name;
}
@Override
public Object getNativeCache() {
return nativeCacheMapForCaffeine.get(name);
}
// Not using the remaining cache methods in the unit tests.
@Override
public ValueWrapper get(Object key) {
return null;
}
@Override
public <T> T get(Object key, Class<T> type) {
return null;
}
@Override
public <T> T get(Object key, Callable<T> valueLoader) {
return null;
}
@Override
public void put(Object key, Object value) {}
@Override
public void evict(Object key) {
nativeCacheMapForCaffeine.get(name).invalidate(key);
}
@Override
public void clear() {
nativeCacheMapForCaffeine.get(name).invalidateAll();
}
};
}
}
@Override
public Collection<String> getCacheNames() {
return nativeCacheMapForCaffeine.keySet();
}
};
}
@BeforeMethod
void setupCacheManager() {
this.cacheManager = cacheManagerWithCaffeine;
// prepare some cached results
// For all tuple fields that we dont care about in this test, are initialised with null.
Map<
@NotNull Septet<Object, List<String>, String, String, Object, Object, Object>,
@NotNull String>
searchCacheData =
Map.of(
Septet.with(
null, // opContext
Arrays.asList("container", "dataset"), // entity matches but no urn in filter.
"*", // query
"{\"or\":[{\"and\":[{\"condition\":\"EQUAL\",\"negated\":false,\"field\":\"_entityType\",\"value\":\"\",\"values\":[\"CONTAINER\"]}]}]}",
// filters
null, // sort criteria
Arrays.asList("some facet json"),
null /* querySize*/),
"allcontainers",
Septet.with(
null,
Arrays.asList("dashboard", "container"),
"*",
"some json that contains container urn:li:container:foo",
null,
Arrays.asList("some facet json"),
null),
"container.foo",
Septet.with(
null,
Arrays.asList("dashboard", "container"),
"*",
"some json that contains unknown urn:li:container:bar",
null,
Arrays.asList("some facet json"),
null),
"container.bar",
Septet.with(
null,
Arrays.asList(
"dashboard", "container"), // entity match, but URN not a match in filter
"*",
"some json that contains unknown urn:li:dashboard:foobar",
null,
Arrays.asList("some facet json"),
null),
"dashboard.foobar",
Septet.with(
null,
Arrays.asList("structuredproperty"), // entity not matching
"*",
"{\"or\":[{\"and\":[{\"condition\":\"EQUAL\",\"negated\":false,\"field\":\"_entityType\",\"value\":\"\",\"values\":[\"CONTAINER\"]}]}]}",
null,
Arrays.asList("some facet json"),
null),
"structuredproperty",
Septet.with(
null,
Arrays.asList("container"), // entity match, but URN not a match in filter
"*",
null, // filter
null,
Arrays.asList("some facet json"),
null),
"containeronly");
Cache cache =
(Cache) cacheManager.getCache(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME).getNativeCache();
cache.invalidateAll();
for (Map.Entry entry : searchCacheData.entrySet()) {
cache.put(entry.getKey(), entry.getValue());
}
Map<
@NotNull Octet<
Object, List<String>, String, String, Object, String, List<String>, Integer>,
@NotNull String>
scrollCacheData =
Map.of(
Octet.with(
null, // opContext
Arrays.asList("container", "dataset"), // entity matches but no urn in filter.
"*", // query
"{\"or\":[{\"and\":[{\"condition\":\"EQUAL\",\"negated\":false,\"field\":\"_entityType\",\"value\":\"\",\"values\":[\"CONTAINER\"]}]}]}",
// filters
null, // sort criteria
"scrollid",
Arrays.asList("some facet json"),
1 /* querySize*/),
"allcontainers",
Octet.with(
null, // opContext
Arrays.asList("container", "dataset"), // entity matches but no urn in filter.
"*", // query
null, // filters
null, // sort criteria
"scrollid",
Arrays.asList("some facet json"),
1 /* querySize*/),
"allcontainers-null-filter");
cacheKeyCount = searchCacheData.size();
cache = (Cache) cacheManager.getCache(ENTITY_SEARCH_SERVICE_SCROLL_CACHE_NAME).getNativeCache();
cache.invalidateAll();
for (Map.Entry entry : scrollCacheData.entrySet()) {
cache.put(entry.getKey(), entry.getValue());
}
cache = (Cache) cacheManager.getCache(UNSUPPORTED_CACHE_NAME).getNativeCache();
cache.invalidateAll();
for (Map.Entry entry : searchCacheData.entrySet()) {
cache.put(
entry.getKey(),
entry.getValue()); // oK to have the same values, this shouldn't even be looked up.
}
evictionService = new CacheEvictionService(cacheManager, true, true);
this.cacheManager = cacheManager;
}
Map getAsMap(String cacheName) {
// to inspect the cache for the test assertions
com.github.benmanes.caffeine.cache.Cache<Object, Object> cache =
(com.github.benmanes.caffeine.cache.Cache<Object, Object>)
cacheManager.getCache(cacheName).getNativeCache();
return cache.asMap();
}
@Test
void testEntityTypeNotInCache() throws URISyntaxException {
evictionService.evict(List.of(Urn.createFromString("urn:li:platform:foo")));
Map cacheAsMap = getAsMap(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME);
assertEquals(cacheAsMap.size(), cacheKeyCount); // no evictions
assertEquals(getAsMap(UNSUPPORTED_CACHE_NAME).size(), cacheKeyCount); // no evictions
}
@Test
void testEntityTypeMatched() throws URISyntaxException {
// Type in cache, but not urn
evictionService.evict(List.of(Urn.createFromString("urn:li:container:dontmatch")));
Map cacheAsMap = getAsMap(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME);
assertEquals(cacheAsMap.size(), cacheKeyCount - 2); // evictions
assertEquals(getAsMap(UNSUPPORTED_CACHE_NAME).size(), cacheKeyCount);
assertFalse(cacheAsMap.values().contains("allcontainers")); // show be evicted
assertFalse(cacheAsMap.values().contains("containeronly")); // show be evicted
assertEquals(getAsMap(UNSUPPORTED_CACHE_NAME).size(), cacheKeyCount); // no evictions
}
@Test
void testEntityTypeAndUrnMatched() throws URISyntaxException {
// type and urn in cache
evictionService.evict(List.of(Urn.createFromString("urn:li:container:bar")));
Map cacheAsMap = getAsMap(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME);
assertEquals(cacheAsMap.size(), cacheKeyCount - 3); // evictions
assertFalse(cacheAsMap.values().contains("allcontainers")); // evicted
assertFalse(cacheAsMap.values().contains("container.bar")); // evicted
assertFalse(cacheAsMap.values().contains("containeronly")); // evicted
assertEquals(getAsMap(UNSUPPORTED_CACHE_NAME).size(), cacheKeyCount); // no evictions
}
@Test
void testPerfWithLargeCache() throws URISyntaxException {
Cache cache =
(Cache) cacheManager.getCache(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME).getNativeCache();
for (int i = 0; i < 1000; i++) {
Septet key =
Septet.with(
null,
Arrays.asList("Non-matching-entity" + i),
"*",
"{\"or\":[{\"and\":[{\"condition\":\"EQUAL\",\"negated\":false,\"field\":\"_entityType\",\"value\":\"\",\"values\":[\"CONTAINER\"]}]}]}",
null,
Arrays.asList("some facet json"),
null);
String value = "structuredproperty" + i;
cache.put(key, value);
}
evictionService.evict(List.of(Urn.createFromString("urn:li:container:bar")));
Map cacheAsMap = getAsMap(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME);
assertEquals(cacheAsMap.size(), cacheKeyCount + 1000 - 3);
assertFalse(cacheAsMap.values().contains("allcontainers")); // evicted
assertFalse(cacheAsMap.values().contains("container.bar")); // evicted
assertFalse(cacheAsMap.values().contains("containeronly")); // evicted
assertEquals(getAsMap(UNSUPPORTED_CACHE_NAME).size(), cacheKeyCount);
// Note, this was just to check timing
}
@Test
void testInvalidateCache() {
evictionService.invalidate(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME);
;
Map cacheAsMap = getAsMap(ENTITY_SEARCH_SERVICE_SEARCH_CACHE_NAME);
assertEquals(cacheAsMap.size(), 0);
assertEquals(getAsMap(UNSUPPORTED_CACHE_NAME).size(), cacheKeyCount); // no evictions
evictionService.invalidateAll();
;
assertEquals(getAsMap(UNSUPPORTED_CACHE_NAME).size(), 0);
}
@Test
void testDumpCache() {
evictionService.dumpCache("test");
}
@Test(expectedExceptions = AssertionError.class)
void testInvalidCache() {
evictionService.invalidate("missingcache");
}
@Test
void testDisabledFlags() throws URISyntaxException {
CacheEvictionService service = new CacheEvictionService(null, true, false);
service.invalidateAll(); // should be no op though
service.invalidate("anycache"); // should be no op
service.evict(List.of(Urn.createFromString("urn:li:container:bar")));
service = new CacheEvictionService(null, false, true);
service.invalidateAll(); // should be no op though
service.invalidate("anycache"); // should be no op
service.evict(List.of(Urn.createFromString("urn:li:container:bar")));
}
}

View File

@ -115,6 +115,7 @@ graphService:
searchService:
resultBatchSize: ${SEARCH_SERVICE_BATCH_SIZE:100}
enableCache: ${SEARCH_SERVICE_ENABLE_CACHE:false}
enableEviction: ${SEARCH_SERVICE_ENABLE_CACHE_EVICTION:false}
cacheImplementation: ${SEARCH_SERVICE_CACHE_IMPLEMENTATION:caffeine}
cache:
hazelcast:

View File

@ -0,0 +1,29 @@
package com.linkedin.gms.factory.search;
import com.linkedin.metadata.search.client.CacheEvictionService;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class CachingEvictionServiceFactory {
@Autowired private CacheManager cacheManager;
@Value("${searchService.enableCache}")
private Boolean cachingEnabled;
@Value("${searchService.enableEviction}")
private Boolean enableEviction;
@Bean(name = "cachingEvictionService")
@Primary
@Nonnull
protected CacheEvictionService getInstance() {
return new CacheEvictionService(cacheManager, cachingEnabled, enableEviction);
}
}

View File

@ -17,6 +17,7 @@ import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.metadata.search.client.CacheEvictionService;
import com.linkedin.platformresource.PlatformResourceInfo;
import com.linkedin.secret.DataHubSecretValue;
import com.linkedin.util.Pair;
@ -48,24 +49,36 @@ public class DataHubIcebergWarehouse {
@Getter private final String platformInstance;
private final CacheEvictionService cacheEvictionService;
// When evicting a iceberg entity urn, these are additional urns that need to be evicted since
// they are a way to
// ge to the newly modified iceberg entity
private final List<Urn> commonUrnsToEvict;
@VisibleForTesting
DataHubIcebergWarehouse(
String platformInstance,
IcebergWarehouseInfo icebergWarehouse,
EntityService entityService,
SecretService secretService,
CacheEvictionService cacheEvictionService,
OperationContext operationContext) {
this.platformInstance = platformInstance;
this.icebergWarehouse = icebergWarehouse;
this.entityService = entityService;
this.secretService = secretService;
this.cacheEvictionService = cacheEvictionService;
this.operationContext = operationContext;
commonUrnsToEvict = List.of(Utils.platformInstanceUrn(platformInstance), Utils.platformUrn());
}
public static DataHubIcebergWarehouse of(
String platformInstance,
EntityService entityService,
SecretService secretService,
CacheEvictionService cacheEvictionService,
OperationContext operationContext) {
Urn platformInstanceUrn = Utils.platformInstanceUrn(platformInstance);
RecordTemplate warehouseAspect =
@ -80,7 +93,12 @@ public class DataHubIcebergWarehouse {
IcebergWarehouseInfo icebergWarehouse = new IcebergWarehouseInfo(warehouseAspect.data());
return new DataHubIcebergWarehouse(
platformInstance, icebergWarehouse, entityService, secretService, operationContext);
platformInstance,
icebergWarehouse,
entityService,
secretService,
cacheEvictionService,
operationContext);
}
public CredentialProvider.StorageProviderCredentials getStorageProviderCredentials() {
@ -270,7 +288,7 @@ public class DataHubIcebergWarehouse {
entityService.deleteUrn(operationContext, resourceUrn);
entityService.deleteUrn(operationContext, datasetUrn.get());
invalidateCacheEntries(List.of(datasetUrn.get()));
return result;
}
@ -281,9 +299,17 @@ public class DataHubIcebergWarehouse {
createResource(datasetUrn, tableIdentifier, view, icebergBatch);
Urn namespaceUrn = containerUrn(getPlatformInstance(), tableIdentifier.namespace());
invalidateCacheEntries(List.of(datasetUrn, namespaceUrn));
return datasetUrn;
}
void invalidateCacheEntries(List<Urn> urns) {
ArrayList<Urn> urnsToEvict = new ArrayList<>(urns);
urnsToEvict.addAll(commonUrnsToEvict);
cacheEvictionService.evict(urnsToEvict);
}
public void renameDataset(TableIdentifier fromTableId, TableIdentifier toTableId, boolean view) {
Optional<DatasetUrn> optDatasetUrn = getDatasetUrn(fromTableId);
@ -332,6 +358,15 @@ public class DataHubIcebergWarehouse {
}
entityService.deleteUrn(operationContext, resourceUrn(fromTableId));
Urn fromNamespaceUrn = containerUrn(getPlatformInstance(), fromTableId.namespace());
List<Urn> urnsToInvalidate = new ArrayList<>(List.of(datasetUrn, fromNamespaceUrn));
if (!fromTableId.namespace().equals(toTableId.namespace())) {
Urn toNamespaceUrn = containerUrn(getPlatformInstance(), fromTableId.namespace());
urnsToInvalidate.add(toNamespaceUrn);
}
invalidateCacheEntries(urnsToInvalidate);
}
private RuntimeException noSuchEntity(boolean view, TableIdentifier tableIdentifier) {

View File

@ -177,9 +177,10 @@ public class DataHubRestCatalog extends BaseMetastoreViewCatalog implements Supp
containerProperties(namespace, properties));
int nLevels = namespace.length();
Urn parentContainerUrn = null;
if (nLevels > 1) {
String[] parentLevels = Arrays.copyOfRange(namespace.levels(), 0, nLevels - 1);
Urn parentContainerUrn = containerUrn(platformInstance(), parentLevels);
parentContainerUrn = containerUrn(platformInstance(), parentLevels);
if (!entityService.exists(operationContext, parentContainerUrn)) {
throw new NoSuchNamespaceException(
"Parent namespace %s does not exist in platformInstance-catalog %s",
@ -196,6 +197,12 @@ public class DataHubRestCatalog extends BaseMetastoreViewCatalog implements Supp
SUB_TYPES_ASPECT_NAME, new SubTypes().setTypeNames(new StringArray(CONTAINER_SUB_TYPE)));
ingestBatch(icebergBatch);
List<Urn> urnsToInvalidate = new ArrayList<>(List.of(containerUrn));
if (parentContainerUrn != null) {
urnsToInvalidate.add(parentContainerUrn);
}
warehouse.invalidateCacheEntries(urnsToInvalidate);
}
@Override
@ -260,6 +267,7 @@ public class DataHubRestCatalog extends BaseMetastoreViewCatalog implements Supp
if (searchIsEmpty(filter, CONTAINER_ENTITY_NAME, DATASET_ENTITY_NAME)) {
// TODO handle race conditions
entityService.deleteUrn(operationContext, containerUrn);
warehouse.invalidateCacheEntries(List.of(containerUrn));
return true;
} else {
throw new NamespaceNotEmptyException("Namespace %s is not empty", namespace);

View File

@ -14,6 +14,7 @@ import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.client.CacheEvictionService;
import io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse;
import io.datahubproject.iceberg.catalog.DataHubRestCatalog;
import io.datahubproject.iceberg.catalog.DataOperation;
@ -40,6 +41,7 @@ public class AbstractIcebergController {
@Autowired protected EntityService entityService;
@Autowired private EntitySearchService searchService;
@Autowired private SecretService secretService;
@Autowired private CacheEvictionService cacheEvictionService;
@Inject
@Named("cachingCredentialProvider")
@ -136,7 +138,11 @@ public class AbstractIcebergController {
String platformInstance, Function<DataHubRestCatalog, R> function) {
DataHubIcebergWarehouse warehouse =
DataHubIcebergWarehouse.of(
platformInstance, entityService, secretService, systemOperationContext);
platformInstance,
entityService,
secretService,
cacheEvictionService,
systemOperationContext);
return catalogOperation(warehouse, systemOperationContext, function);
}
@ -178,7 +184,7 @@ public class AbstractIcebergController {
protected DataHubIcebergWarehouse warehouse(
String platformInstance, OperationContext operationContext) {
return DataHubIcebergWarehouse.of(
platformInstance, entityService, secretService, operationContext);
platformInstance, entityService, secretService, cacheEvictionService, operationContext);
}
protected RuntimeException noSuchEntityException(

View File

@ -22,6 +22,7 @@ import com.linkedin.entity.Aspect;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.client.CacheEvictionService;
import com.linkedin.platformresource.PlatformResourceInfo;
import com.linkedin.secret.DataHubSecretValue;
import com.linkedin.util.Pair;
@ -44,6 +45,8 @@ public class DataHubIcebergWarehouseTest {
@Mock private SecretService secretService;
@Mock private CacheEvictionService cacheEvictionService;
@Mock private OperationContext operationContext;
private IcebergWarehouseInfo icebergWarehouse;
@ -113,7 +116,7 @@ public class DataHubIcebergWarehouseTest {
DataHubIcebergWarehouse warehouse =
DataHubIcebergWarehouse.of(
platformInstance, entityService, secretService, operationContext);
platformInstance, entityService, secretService, cacheEvictionService, operationContext);
CredentialProvider.StorageProviderCredentials credentials =
warehouse.getStorageProviderCredentials();
@ -140,7 +143,7 @@ public class DataHubIcebergWarehouseTest {
DataHubIcebergWarehouse warehouse =
DataHubIcebergWarehouse.of(
platformInstance, entityService, secretService, operationContext);
platformInstance, entityService, secretService, cacheEvictionService, operationContext);
assertNotNull(warehouse);
assertEquals(warehouse.getPlatformInstance(), platformInstance);
@ -157,7 +160,8 @@ public class DataHubIcebergWarehouseTest {
eq(DataHubIcebergWarehouse.DATAPLATFORM_INSTANCE_ICEBERG_WAREHOUSE_ASPECT_NAME)))
.thenReturn(null);
DataHubIcebergWarehouse.of(platformInstance, entityService, secretService, operationContext);
DataHubIcebergWarehouse.of(
platformInstance, entityService, secretService, cacheEvictionService, operationContext);
}
@Test
@ -175,7 +179,7 @@ public class DataHubIcebergWarehouseTest {
DataHubIcebergWarehouse warehouse =
DataHubIcebergWarehouse.of(
platformInstance, entityService, secretService, operationContext);
platformInstance, entityService, secretService, cacheEvictionService, operationContext);
String result = warehouse.getDataRoot();
@ -210,6 +214,7 @@ public class DataHubIcebergWarehouseTest {
new IcebergWarehouseInfo(),
entityService,
secretService,
cacheEvictionService,
operationContext) {
@Override
IcebergBatch newIcebergBatch(OperationContext operationContext) {
@ -262,6 +267,7 @@ public class DataHubIcebergWarehouseTest {
new IcebergWarehouseInfo(),
entityService,
secretService,
cacheEvictionService,
operationContext) {
@Override
IcebergBatch newIcebergBatch(OperationContext operationContext) {
@ -314,6 +320,7 @@ public class DataHubIcebergWarehouseTest {
new IcebergWarehouseInfo(),
entityService,
secretService,
cacheEvictionService,
operationContext) {
@Override
IcebergBatch newIcebergBatch(OperationContext operationContext) {
@ -371,6 +378,7 @@ public class DataHubIcebergWarehouseTest {
new IcebergWarehouseInfo(),
entityService,
secretService,
cacheEvictionService,
operationContext) {
@Override
IcebergBatch newIcebergBatch(OperationContext operationContext) {
@ -430,6 +438,7 @@ public class DataHubIcebergWarehouseTest {
new IcebergWarehouseInfo(),
entityService,
secretService,
cacheEvictionService,
operationContext) {
@Override
IcebergBatch newIcebergBatch(OperationContext operationContext) {
@ -475,6 +484,7 @@ public class DataHubIcebergWarehouseTest {
new IcebergWarehouseInfo(),
entityService,
secretService,
cacheEvictionService,
operationContext) {
@Override
IcebergBatch newIcebergBatch(OperationContext operationContext) {
@ -501,6 +511,7 @@ public class DataHubIcebergWarehouseTest {
new IcebergWarehouseInfo().setEnv(FabricType.PROD),
entityService,
secretService,
cacheEvictionService,
operationContext) {
@Override
IcebergBatch newIcebergBatch(OperationContext operationContext) {
@ -571,6 +582,7 @@ public class DataHubIcebergWarehouseTest {
new IcebergWarehouseInfo(),
entityService,
secretService,
cacheEvictionService,
operationContext) {
@Override
IcebergBatch newIcebergBatch(OperationContext operationContext) {
@ -619,6 +631,7 @@ public class DataHubIcebergWarehouseTest {
new IcebergWarehouseInfo(),
entityService,
secretService,
cacheEvictionService,
operationContext) {
@Override
IcebergBatch newIcebergBatch(OperationContext operationContext) {

View File

@ -24,6 +24,7 @@ import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchEntityArray;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.search.client.CacheEvictionService;
import io.datahubproject.iceberg.catalog.credentials.CredentialProvider;
import io.datahubproject.metadata.context.ActorContext;
import io.datahubproject.metadata.context.OperationContext;
@ -47,6 +48,8 @@ public class DataHubRestCatalogTest {
@Mock private EntitySearchService searchService;
@Mock private CacheEvictionService cacheEvictionService;
@Mock private OperationContext operationContext;
@Mock private DataHubIcebergWarehouse warehouse;

View File

@ -17,6 +17,7 @@ import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataplatforminstance.IcebergWarehouseInfo;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.client.CacheEvictionService;
import com.linkedin.secret.DataHubSecretValue;
import io.datahubproject.iceberg.catalog.DataHubIcebergWarehouse;
import io.datahubproject.iceberg.catalog.Utils;
@ -47,6 +48,7 @@ public abstract class AbstractControllerTest<T extends AbstractIcebergController
@Mock protected HttpServletRequest request;
@Mock protected SecretService secretService;
@Mock protected EntitySearchService entitySearchService;
@Mock protected CacheEvictionService cacheEvictionService;
private OperationContext systemOperationContext;
private Authentication authentication;
@ -87,6 +89,7 @@ public abstract class AbstractControllerTest<T extends AbstractIcebergController
injectField("authorizer", authorizer);
injectField("systemOperationContext", systemOperationContext);
injectField("cachingCredentialProvider", credentialProvider);
injectField("cacheEvictionService", cacheEvictionService);
}
protected void setupDefaultAuthorization(boolean isAuthorized) {

View File

@ -21,7 +21,7 @@ public class IcebergConfigApiControllerTest
try (MockedStatic<DataHubIcebergWarehouse> warehouseMock =
Mockito.mockStatic(DataHubIcebergWarehouse.class)) {
warehouseMock
.when(() -> DataHubIcebergWarehouse.of(eq(warehouseName), any(), any(), any()))
.when(() -> DataHubIcebergWarehouse.of(eq(warehouseName), any(), any(), any(), any()))
.thenThrow(new NotFoundException(""));
controller.getConfig(request, warehouseName);
}
@ -34,7 +34,7 @@ public class IcebergConfigApiControllerTest
try (MockedStatic<DataHubIcebergWarehouse> warehouseMock =
Mockito.mockStatic(DataHubIcebergWarehouse.class)) {
warehouseMock
.when(() -> DataHubIcebergWarehouse.of(eq(warehouseName), any(), any(), any()))
.when(() -> DataHubIcebergWarehouse.of(eq(warehouseName), any(), any(), any(), any()))
.thenReturn(null);
ConfigResponse response = controller.getConfig(request, warehouseName);