feat(entity-client): enable client side cache for entity-client and usage-client (#8877)

This commit is contained in:
david-leifker 2023-09-21 22:00:14 -05:00 committed by GitHub
parent 4be8fd0905
commit aef49b8fb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
100 changed files with 951 additions and 298 deletions

View File

@ -11,16 +11,19 @@ import com.datahub.authentication.Authentication;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.util.Configuration;
import config.ConfigurationProvider;
import controllers.SsoCallbackController;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
@ -34,6 +37,7 @@ import org.pac4j.play.store.PlayCacheSessionStore;
import org.pac4j.play.store.PlayCookieSessionStore;
import org.pac4j.play.store.PlaySessionStore;
import org.pac4j.play.store.ShiroAesDataEncrypter;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import play.Environment;
import play.cache.SyncCacheApi;
import utils.ConfigUtil;
@ -104,7 +108,7 @@ public class AuthModule extends AbstractModule {
bind(SsoCallbackController.class).toConstructor(SsoCallbackController.class.getConstructor(
SsoManager.class,
Authentication.class,
EntityClient.class,
SystemEntityClient.class,
AuthServiceClient.class,
com.typesafe.config.Config.class));
} catch (NoSuchMethodException | SecurityException e) {
@ -161,10 +165,19 @@ public class AuthModule extends AbstractModule {
@Provides
@Singleton
protected EntityClient provideEntityClient() {
return new RestliEntityClient(buildRestliClient(),
protected ConfigurationProvider provideConfigurationProvider() {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConfigurationProvider.class);
return context.getBean(ConfigurationProvider.class);
}
@Provides
@Singleton
protected SystemEntityClient provideEntityClient(final Authentication systemAuthentication,
final ConfigurationProvider configurationProvider) {
return new SystemRestliEntityClient(buildRestliClient(),
new ExponentialBackoff(_configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)),
_configs.getInt(ENTITY_CLIENT_NUM_RETRIES));
_configs.getInt(ENTITY_CLIENT_NUM_RETRIES), systemAuthentication,
configurationProvider.getCache().getClient().getEntityClient());
}
@Provides

View File

@ -13,7 +13,7 @@ import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.SetMode;
import com.linkedin.entity.Entity;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.identity.CorpGroupInfo;
import com.linkedin.identity.CorpUserEditableInfo;
@ -78,13 +78,14 @@ import static play.mvc.Results.internalServerError;
public class OidcCallbackLogic extends DefaultCallbackLogic<Result, PlayWebContext> {
private final SsoManager _ssoManager;
private final EntityClient _entityClient;
private final SystemEntityClient _entityClient;
private final Authentication _systemAuthentication;
private final AuthServiceClient _authClient;
private final CookieConfigs _cookieConfigs;
public OidcCallbackLogic(final SsoManager ssoManager, final Authentication systemAuthentication,
final EntityClient entityClient, final AuthServiceClient authClient, final CookieConfigs cookieConfigs) {
final SystemEntityClient entityClient, final AuthServiceClient authClient,
final CookieConfigs cookieConfigs) {
_ssoManager = ssoManager;
_systemAuthentication = systemAuthentication;
_entityClient = entityClient;

View File

@ -0,0 +1,27 @@
package config;
import com.linkedin.metadata.config.cache.CacheConfiguration;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
/**
* Minimal sharing between metadata-service and frontend
* Initially for use of client caching configuration.
* Does not use the factories module to avoid transitive dependencies.
*/
@EnableConfigurationProperties
@PropertySource(value = "application.yml", factory = YamlPropertySourceFactory.class)
@ConfigurationProperties
@Data
public class ConfigurationProvider {
/**
* Configuration for caching
*/
private CacheConfiguration cache;
}

View File

@ -3,7 +3,7 @@ package controllers;
import auth.CookieConfigs;
import client.AuthServiceClient;
import com.datahub.authentication.Authentication;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
@ -40,7 +40,7 @@ public class SsoCallbackController extends CallbackController {
public SsoCallbackController(
@Nonnull SsoManager ssoManager,
@Nonnull Authentication systemAuthentication,
@Nonnull EntityClient entityClient,
@Nonnull SystemEntityClient entityClient,
@Nonnull AuthServiceClient authClient,
@Nonnull com.typesafe.config.Config configs) {
_ssoManager = ssoManager;
@ -79,7 +79,7 @@ public class SsoCallbackController extends CallbackController {
private final OidcCallbackLogic _oidcCallbackLogic;
SsoCallbackLogic(final SsoManager ssoManager, final Authentication systemAuthentication,
final EntityClient entityClient, final AuthServiceClient authClient, final CookieConfigs cookieConfigs) {
final SystemEntityClient entityClient, final AuthServiceClient authClient, final CookieConfigs cookieConfigs) {
_oidcCallbackLogic = new OidcCallbackLogic(ssoManager, systemAuthentication, entityClient, authClient, cookieConfigs);
}

View File

@ -16,9 +16,6 @@ dependencies {
implementation project(':datahub-web-react')
constraints {
play(externalDependency.springCore)
play(externalDependency.springBeans)
play(externalDependency.springContext)
play(externalDependency.jacksonDataBind)
play('com.nimbusds:oauth2-oidc-sdk:8.36.2')
play('com.nimbusds:nimbus-jose-jwt:8.18')
@ -35,7 +32,12 @@ dependencies {
implementation project(":metadata-service:restli-client")
implementation project(":metadata-service:auth-config")
implementation project(":metadata-service:configuration")
implementation externalDependency.springCore
implementation externalDependency.springBeans
implementation externalDependency.springContext
implementation externalDependency.springBootAutoconfigure
implementation externalDependency.jettyJaas
implementation externalDependency.graphqlJava
implementation externalDependency.antlr4Runtime

View File

@ -302,6 +302,7 @@ import com.linkedin.datahub.graphql.types.tag.TagType;
import com.linkedin.datahub.graphql.types.test.TestType;
import com.linkedin.datahub.graphql.types.view.DataHubViewType;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.config.DataHubConfiguration;
import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.config.TestsConfiguration;
@ -364,6 +365,7 @@ import static graphql.scalars.ExtendedScalars.*;
public class GmsGraphQLEngine {
private final EntityClient entityClient;
private final SystemEntityClient systemEntityClient;
private final GraphClient graphClient;
private final UsageClient usageClient;
private final SiblingGraphService siblingGraphService;
@ -476,6 +478,7 @@ public class GmsGraphQLEngine {
this.graphQLPlugins.forEach(plugin -> plugin.init(args));
this.entityClient = args.entityClient;
this.systemEntityClient = args.systemEntityClient;
this.graphClient = args.graphClient;
this.usageClient = args.usageClient;
this.siblingGraphService = args.siblingGraphService;

View File

@ -11,6 +11,7 @@ import com.datahub.authorization.role.RoleService;
import com.linkedin.datahub.graphql.analytics.service.AnalyticsService;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.config.DataHubConfiguration;
import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.config.TestsConfiguration;
@ -38,6 +39,7 @@ import lombok.Data;
@Data
public class GmsGraphQLEngineArgs {
EntityClient entityClient;
SystemEntityClient systemEntityClient;
GraphClient graphClient;
UsageClient usageClient;
AnalyticsService analyticsService;

View File

@ -1,13 +1,16 @@
package com.linkedin.datahub.graphql.resolvers.dataset;
import com.datahub.authorization.ResourceSpec;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.generated.CorpUser;
import com.linkedin.datahub.graphql.generated.DatasetStatsSummary;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.usage.UsageClient;
import com.linkedin.usage.UsageTimeRange;
import com.linkedin.usage.UserUsageCounts;
@ -15,6 +18,7 @@ import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -55,8 +59,15 @@ public class DatasetStatsSummaryResolver implements DataFetcher<CompletableFutur
try {
if (!isAuthorized(resourceUrn, context)) {
log.debug("User {} is not authorized to view profile information for dataset {}",
context.getActorUrn(),
resourceUrn.toString());
return null;
}
com.linkedin.usage.UsageQueryResult
usageQueryResult = usageClient.getUsageStats(resourceUrn.toString(), UsageTimeRange.MONTH, context.getAuthentication());
usageQueryResult = usageClient.getUsageStats(resourceUrn.toString(), UsageTimeRange.MONTH);
final DatasetStatsSummary result = new DatasetStatsSummary();
result.setQueryCountLast30Days(usageQueryResult.getAggregations().getTotalSqlQueries());
@ -90,4 +101,10 @@ public class DatasetStatsSummaryResolver implements DataFetcher<CompletableFutur
result.setUrn(userUrn.toString());
return result;
}
private boolean isAuthorized(final Urn resourceUrn, final QueryContext context) {
return AuthorizationUtils.isAuthorized(context,
Optional.of(new ResourceSpec(resourceUrn.getEntityType(), resourceUrn.toString())),
PoliciesConfig.VIEW_DATASET_USAGE_PRIVILEGE);
}
}

View File

@ -9,12 +9,10 @@ import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.UsageQueryResult;
import com.linkedin.datahub.graphql.types.usage.UsageQueryResultMapper;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.usage.UsageClient;
import com.linkedin.usage.UsageTimeRange;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
@ -44,10 +42,10 @@ public class DatasetUsageStatsResolver implements DataFetcher<CompletableFuture<
}
try {
com.linkedin.usage.UsageQueryResult
usageQueryResult = usageClient.getUsageStats(resourceUrn.toString(), range, context.getAuthentication());
usageQueryResult = usageClient.getUsageStats(resourceUrn.toString(), range);
return UsageQueryResultMapper.map(usageQueryResult);
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException(String.format("Failed to load Usage Stats for resource %s", resourceUrn.toString()), e);
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to load Usage Stats for resource %s", resourceUrn), e);
}
});
}

View File

@ -117,8 +117,7 @@ public class DashboardStatsSummaryTest {
UsageClient mockClient = Mockito.mock(UsageClient.class);
Mockito.when(mockClient.getUsageStats(
Mockito.eq(TEST_DASHBOARD_URN),
Mockito.eq(UsageTimeRange.MONTH),
Mockito.any(Authentication.class)
Mockito.eq(UsageTimeRange.MONTH)
)).thenThrow(RuntimeException.class);
// Execute resolver

View File

@ -1,6 +1,8 @@
package com.linkedin.datahub.graphql.resolvers.dataset;
import com.datahub.authentication.Authentication;
import com.datahub.authorization.AuthorizationResult;
import com.datahub.plugins.auth.authorization.Authorizer;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
@ -53,13 +55,18 @@ public class DatasetStatsSummaryResolverTest {
UsageClient mockClient = Mockito.mock(UsageClient.class);
Mockito.when(mockClient.getUsageStats(
Mockito.eq(TEST_DATASET_URN),
Mockito.eq(UsageTimeRange.MONTH),
Mockito.any(Authentication.class)
Mockito.eq(UsageTimeRange.MONTH)
)).thenReturn(testResult);
// Execute resolver
DatasetStatsSummaryResolver resolver = new DatasetStatsSummaryResolver(mockClient);
QueryContext mockContext = Mockito.mock(QueryContext.class);
Mockito.when(mockContext.getActorUrn()).thenReturn("urn:li:corpuser:test");
Authorizer mockAuthorizer = Mockito.mock(Authorizer.class);
AuthorizationResult mockAuthorizerResult = Mockito.mock(AuthorizationResult.class);
Mockito.when(mockAuthorizerResult.getType()).thenReturn(AuthorizationResult.Type.ALLOW);
Mockito.when(mockAuthorizer.authorize(Mockito.any())).thenReturn(mockAuthorizerResult);
Mockito.when(mockContext.getAuthorizer()).thenReturn(mockAuthorizer);
Mockito.when(mockContext.getAuthentication()).thenReturn(Mockito.mock(Authentication.class));
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getSource()).thenReturn(TEST_SOURCE);
@ -79,8 +86,7 @@ public class DatasetStatsSummaryResolverTest {
newResult.setAggregations(new UsageQueryResultAggregations());
Mockito.when(mockClient.getUsageStats(
Mockito.eq(TEST_DATASET_URN),
Mockito.eq(UsageTimeRange.MONTH),
Mockito.any(Authentication.class)
Mockito.eq(UsageTimeRange.MONTH)
)).thenReturn(newResult);
// Then verify that the new result is _not_ returned (cache hit)
@ -116,8 +122,7 @@ public class DatasetStatsSummaryResolverTest {
UsageClient mockClient = Mockito.mock(UsageClient.class);
Mockito.when(mockClient.getUsageStats(
Mockito.eq(TEST_DATASET_URN),
Mockito.eq(UsageTimeRange.MONTH),
Mockito.any(Authentication.class)
Mockito.eq(UsageTimeRange.MONTH)
)).thenThrow(RuntimeException.class);
// Execute resolver

View File

@ -1,11 +1,10 @@
package com.linkedin.datahub.upgrade.common.steps;
import com.datahub.authentication.Authentication;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
@ -13,8 +12,7 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class GMSDisableWriteModeStep implements UpgradeStep {
private final Authentication _systemAuthentication;
private final RestliEntityClient _entityClient;
private final SystemRestliEntityClient _entityClient;
@Override
public String id() {
@ -30,7 +28,7 @@ public class GMSDisableWriteModeStep implements UpgradeStep {
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
_entityClient.setWritable(false, _systemAuthentication);
_entityClient.setWritable(false);
} catch (Exception e) {
e.printStackTrace();
context.report().addLine("Failed to turn write mode off in GMS");

View File

@ -1,20 +1,17 @@
package com.linkedin.datahub.upgrade.common.steps;
import com.datahub.authentication.Authentication;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class GMSEnableWriteModeStep implements UpgradeStep {
private final Authentication _systemAuthentication;
private final RestliEntityClient _entityClient;
private final SystemRestliEntityClient _entityClient;
@Override
public String id() {
@ -30,7 +27,7 @@ public class GMSEnableWriteModeStep implements UpgradeStep {
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
_entityClient.setWritable(true, _systemAuthentication);
_entityClient.setWritable(true);
} catch (Exception e) {
e.printStackTrace();
context.report().addLine("Failed to turn write mode back on in GMS");

View File

@ -1,8 +1,7 @@
package com.linkedin.datahub.upgrade.config;
import com.datahub.authentication.Authentication;
import com.linkedin.datahub.upgrade.nocode.NoCodeUpgrade;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import io.ebean.Database;
@ -21,15 +20,14 @@ public class NoCodeUpgradeConfig {
ApplicationContext applicationContext;
@Bean(name = "noCodeUpgrade")
@DependsOn({"ebeanServer", "entityService", "systemAuthentication", "restliEntityClient", "entityRegistry"})
@DependsOn({"ebeanServer", "entityService", "systemRestliEntityClient", "entityRegistry"})
@Nonnull
public NoCodeUpgrade createInstance() {
final Database ebeanServer = applicationContext.getBean(Database.class);
final EntityService entityService = applicationContext.getBean(EntityService.class);
final Authentication systemAuthentication = applicationContext.getBean(Authentication.class);
final RestliEntityClient entityClient = applicationContext.getBean(RestliEntityClient.class);
final SystemRestliEntityClient entityClient = applicationContext.getBean(SystemRestliEntityClient.class);
final EntityRegistry entityRegistry = applicationContext.getBean(EntityRegistry.class);
return new NoCodeUpgrade(ebeanServer, entityService, entityRegistry, systemAuthentication, entityClient);
return new NoCodeUpgrade(ebeanServer, entityService, entityRegistry, entityClient);
}
}

View File

@ -1,8 +1,7 @@
package com.linkedin.datahub.upgrade.config;
import com.datahub.authentication.Authentication;
import com.linkedin.datahub.upgrade.restorebackup.RestoreBackup;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
@ -22,19 +21,18 @@ public class RestoreBackupConfig {
ApplicationContext applicationContext;
@Bean(name = "restoreBackup")
@DependsOn({"ebeanServer", "entityService", "systemAuthentication", "restliEntityClient", "graphService",
@DependsOn({"ebeanServer", "entityService", "systemRestliEntityClient", "graphService",
"searchService", "entityRegistry"})
@Nonnull
public RestoreBackup createInstance() {
final Database ebeanServer = applicationContext.getBean(Database.class);
final EntityService entityService = applicationContext.getBean(EntityService.class);
final Authentication systemAuthentication = applicationContext.getBean(Authentication.class);
final RestliEntityClient entityClient = applicationContext.getBean(RestliEntityClient.class);
final SystemRestliEntityClient entityClient = applicationContext.getBean(SystemRestliEntityClient.class);
final GraphService graphClient = applicationContext.getBean(GraphService.class);
final EntitySearchService searchClient = applicationContext.getBean(EntitySearchService.class);
final EntityRegistry entityRegistry = applicationContext.getBean(EntityRegistry.class);
return new RestoreBackup(ebeanServer, entityService, entityRegistry, systemAuthentication, entityClient,
return new RestoreBackup(ebeanServer, entityService, entityRegistry, entityClient,
graphClient, searchClient);
}
}

View File

@ -1,13 +1,12 @@
package com.linkedin.datahub.upgrade.nocode;
import com.datahub.authentication.Authentication;
import com.google.common.collect.ImmutableMap;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeCleanupStep;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.common.steps.GMSEnableWriteModeStep;
import com.linkedin.datahub.upgrade.common.steps.GMSQualificationStep;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import io.ebean.Database;
@ -30,12 +29,10 @@ public class NoCodeUpgrade implements Upgrade {
final Database server,
final EntityService entityService,
final EntityRegistry entityRegistry,
final Authentication systemAuthentication,
final RestliEntityClient entityClient) {
final SystemRestliEntityClient entityClient) {
_steps = buildUpgradeSteps(
server, entityService,
entityRegistry,
systemAuthentication,
entityClient);
_cleanupSteps = buildCleanupSteps();
}
@ -63,15 +60,14 @@ public class NoCodeUpgrade implements Upgrade {
final Database server,
final EntityService entityService,
final EntityRegistry entityRegistry,
final Authentication systemAuthentication,
final RestliEntityClient entityClient) {
final SystemRestliEntityClient entityClient) {
final List<UpgradeStep> steps = new ArrayList<>();
steps.add(new RemoveAspectV2TableStep(server));
steps.add(new GMSQualificationStep(ImmutableMap.of("noCode", "true")));
steps.add(new UpgradeQualificationStep(server));
steps.add(new CreateAspectTableStep(server));
steps.add(new DataMigrationStep(server, entityService, entityRegistry));
steps.add(new GMSEnableWriteModeStep(systemAuthentication, entityClient));
steps.add(new GMSEnableWriteModeStep(entityClient));
return steps;
}
}

View File

@ -1,6 +1,5 @@
package com.linkedin.datahub.upgrade.restorebackup;
import com.datahub.authentication.Authentication;
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeCleanupStep;
@ -9,7 +8,7 @@ import com.linkedin.datahub.upgrade.common.steps.ClearGraphServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSearchServiceStep;
import com.linkedin.datahub.upgrade.common.steps.GMSDisableWriteModeStep;
import com.linkedin.datahub.upgrade.common.steps.GMSEnableWriteModeStep;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
@ -27,11 +26,10 @@ public class RestoreBackup implements Upgrade {
final Database server,
final EntityService entityService,
final EntityRegistry entityRegistry,
final Authentication systemAuthentication,
final RestliEntityClient entityClient,
final SystemRestliEntityClient entityClient,
final GraphService graphClient,
final EntitySearchService searchClient) {
_steps = buildSteps(server, entityService, entityRegistry, systemAuthentication, entityClient, graphClient, searchClient);
_steps = buildSteps(server, entityService, entityRegistry, entityClient, graphClient, searchClient);
}
@Override
@ -48,17 +46,16 @@ public class RestoreBackup implements Upgrade {
final Database server,
final EntityService entityService,
final EntityRegistry entityRegistry,
final Authentication systemAuthentication,
final RestliEntityClient entityClient,
final SystemRestliEntityClient entityClient,
final GraphService graphClient,
final EntitySearchService searchClient) {
final List<UpgradeStep> steps = new ArrayList<>();
steps.add(new GMSDisableWriteModeStep(systemAuthentication, entityClient));
steps.add(new GMSDisableWriteModeStep(entityClient));
steps.add(new ClearSearchServiceStep(searchClient, true));
steps.add(new ClearGraphServiceStep(graphClient, true));
steps.add(new ClearAspectV2TableStep(server));
steps.add(new RestoreStorageStep(entityService, entityRegistry));
steps.add(new GMSEnableWriteModeStep(systemAuthentication, entityClient));
steps.add(new GMSEnableWriteModeStep(entityClient));
return steps;
}

View File

@ -0,0 +1,39 @@
package com.linkedin.metadata.client;
import com.datahub.authentication.Authentication;
import com.linkedin.entity.client.EntityClientCache;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.entity.DeleteEntityService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.LineageSearchService;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import lombok.Getter;
/**
* Java backed SystemEntityClient
*/
@Getter
public class SystemJavaEntityClient extends JavaEntityClient implements SystemEntityClient {
private final EntityClientCache entityClientCache;
private final Authentication systemAuthentication;
public SystemJavaEntityClient(EntityService entityService, DeleteEntityService deleteEntityService,
EntitySearchService entitySearchService, CachingEntitySearchService cachingEntitySearchService,
SearchService searchService, LineageSearchService lineageSearchService,
TimeseriesAspectService timeseriesAspectService, EventProducer eventProducer,
RestliEntityClient restliEntityClient, Authentication systemAuthentication,
EntityClientCacheConfig cacheConfig) {
super(entityService, deleteEntityService, entitySearchService, cachingEntitySearchService, searchService,
lineageSearchService, timeseriesAspectService, eventProducer, restliEntityClient);
this.systemAuthentication = systemAuthentication;
this.entityClientCache = buildEntityClientCache(SystemJavaEntityClient.class, systemAuthentication, cacheConfig);
}
}

View File

@ -1,6 +1,5 @@
package com.linkedin.metadata.timeline.eventgenerator;
import com.datahub.authentication.Authentication;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataprocess.DataProcessInstanceRelationships;
@ -8,7 +7,7 @@ import com.linkedin.dataprocess.DataProcessInstanceRunEvent;
import com.linkedin.dataprocess.DataProcessRunStatus;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeOperation;
@ -27,9 +26,8 @@ public class DataProcessInstanceRunEventChangeEventGenerator
private static final String COMPLETED_STATUS = "COMPLETED";
private static final String STARTED_STATUS = "STARTED";
public DataProcessInstanceRunEventChangeEventGenerator(@Nonnull final EntityClient entityClient, @Nonnull final
Authentication authentication) {
super(entityClient, authentication);
public DataProcessInstanceRunEventChangeEventGenerator(@Nonnull final SystemEntityClient entityClient) {
super(entityClient);
}
@Override
@ -108,8 +106,8 @@ public class DataProcessInstanceRunEventChangeEventGenerator
EntityResponse entityResponse;
try {
entityUrn = Urn.createFromString(entityUrnString);
entityResponse = _entityClient.getV2(DATA_PROCESS_INSTANCE_ENTITY_NAME, entityUrn,
Collections.singleton(DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME), _authentication);
entityResponse = _entityClient.getV2(entityUrn,
Collections.singleton(DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME));
} catch (Exception e) {
return null;
}

View File

@ -5,7 +5,7 @@ import com.github.fge.jsonpatch.JsonPatch;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.entity.EntityAspect;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
@ -19,16 +19,14 @@ import javax.annotation.Nonnull;
*/
public abstract class EntityChangeEventGenerator<T extends RecordTemplate> {
// TODO: Add a check for supported aspects
protected EntityClient _entityClient;
protected SystemEntityClient _entityClient;
protected Authentication _authentication;
public EntityChangeEventGenerator() {
}
public EntityChangeEventGenerator(@Nonnull final EntityClient entityClient,
@Nonnull final Authentication authentication) {
public EntityChangeEventGenerator(@Nonnull final SystemEntityClient entityClient) {
_entityClient = entityClient;
_authentication = authentication;
}
@Deprecated

View File

@ -1,6 +1,6 @@
package com.linkedin.metadata.kafka;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.metadata.entity.EntityServiceImpl;
@ -24,7 +24,7 @@ public class MaeConsumerApplicationTestConfiguration {
private EntityServiceImpl _entityServiceImpl;
@MockBean
private RestliEntityClient restliEntityClient;
private SystemRestliEntityClient restliEntityClient;
@MockBean
private Database ebeanServer;

View File

@ -1,10 +1,10 @@
package com.linkedin.metadata.kafka.config;
import com.datahub.authentication.Authentication;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.google.common.collect.ImmutableSet;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.metadata.kafka.hydrator.EntityHydrator;
import com.linkedin.metadata.models.registry.EntityRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
@ -13,19 +13,25 @@ import org.springframework.context.annotation.Import;
@Configuration
@Import({RestliEntityClientFactory.class, SystemAuthenticationFactory.class})
@Import({RestliEntityClientFactory.class})
public class EntityHydratorConfig {
@Autowired
@Qualifier("systemAuthentication")
private Authentication _systemAuthentication;
@Qualifier("systemRestliEntityClient")
private SystemRestliEntityClient _entityClient;
@Autowired
@Qualifier("restliEntityClient")
private RestliEntityClient _entityClient;
private EntityRegistry _entityRegistry;
public final static ImmutableSet<String> EXCLUDED_ASPECTS = ImmutableSet.<String>builder()
.add("datasetUpstreamLineage", "upstreamLineage")
.add("dataJobInputOutput")
.add("dataProcessInstanceRelationships", "dataProcessInstanceInput", "dataProcessInstanceOutput")
.add("inputFields")
.build();
@Bean
public EntityHydrator getEntityHydrator() {
return new EntityHydrator(_systemAuthentication, _entityClient);
return new EntityHydrator(_entityRegistry, _entityClient);
}
}

View File

@ -1,15 +1,12 @@
package com.linkedin.metadata.kafka.hook.event;
import com.datahub.authentication.Authentication;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.metadata.Constants;
@ -46,8 +43,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@Import({EntityChangeEventGeneratorRegistry.class, EntityRegistryFactory.class, RestliEntityClientFactory.class,
SystemAuthenticationFactory.class})
@Import({EntityChangeEventGeneratorRegistry.class, EntityRegistryFactory.class, RestliEntityClientFactory.class})
public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
/**
@ -83,20 +79,18 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
*/
private static final Set<String> SUPPORTED_OPERATIONS = ImmutableSet.of("CREATE", "UPSERT", "DELETE");
private final EntityChangeEventGeneratorRegistry _entityChangeEventGeneratorRegistry;
private final EntityClient _entityClient;
private final Authentication _systemAuthentication;
private final SystemRestliEntityClient _entityClient;
private final EntityRegistry _entityRegistry;
private final Boolean _isEnabled;
@Autowired
public EntityChangeEventGeneratorHook(
@Nonnull final EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry,
@Nonnull final RestliEntityClient entityClient, @Nonnull final Authentication systemAuthentication,
@Nonnull final SystemRestliEntityClient entityClient,
@Nonnull final EntityRegistry entityRegistry,
@Nonnull @Value("${entityChangeEvents.enabled:true}") Boolean isEnabled) {
_entityChangeEventGeneratorRegistry = Objects.requireNonNull(entityChangeEventGeneratorRegistry);
_entityClient = Objects.requireNonNull(entityClient);
_systemAuthentication = Objects.requireNonNull(systemAuthentication);
_entityRegistry = Objects.requireNonNull(entityRegistry);
_isEnabled = isEnabled;
}
@ -189,8 +183,7 @@ public class EntityChangeEventGeneratorHook implements MetadataChangeLogHook {
_entityClient.producePlatformEvent(
Constants.CHANGE_EVENT_PLATFORM_EVENT_NAME,
partitioningKey,
event,
_systemAuthentication
event
);
}

View File

@ -1,6 +1,5 @@
package com.linkedin.metadata.kafka.hook.siblings;
import com.datahub.authentication.Authentication;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@ -13,9 +12,8 @@ import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.UpstreamArray;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.search.EntitySearchServiceFactory;
@ -60,7 +58,7 @@ import static com.linkedin.metadata.Constants.*;
@Slf4j
@Component
@Singleton
@Import({EntityRegistryFactory.class, RestliEntityClientFactory.class, EntitySearchServiceFactory.class, SystemAuthenticationFactory.class})
@Import({EntityRegistryFactory.class, RestliEntityClientFactory.class, EntitySearchServiceFactory.class})
public class SiblingAssociationHook implements MetadataChangeLogHook {
public static final String SIBLING_ASSOCIATION_SYSTEM_ACTOR = "urn:li:corpuser:__datahub_system_sibling_hook";
@ -73,23 +71,20 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
public static final String SOURCE_SUBTYPE_V2 = "Source";
private final EntityRegistry _entityRegistry;
private final RestliEntityClient _entityClient;
private final SystemRestliEntityClient _entityClient;
private final EntitySearchService _searchService;
private final Authentication _systemAuthentication;
private final boolean _isEnabled;
@Autowired
public SiblingAssociationHook(
@Nonnull final EntityRegistry entityRegistry,
@Nonnull final RestliEntityClient entityClient,
@Nonnull final SystemRestliEntityClient entityClient,
@Nonnull final EntitySearchService searchService,
@Nonnull final Authentication systemAuthentication,
@Nonnull @Value("${siblings.enabled:true}") Boolean isEnabled
) {
_entityRegistry = entityRegistry;
_entityClient = entityClient;
_searchService = searchService;
_systemAuthentication = systemAuthentication;
_isEnabled = isEnabled;
}
@ -251,9 +246,9 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
dbtSiblingProposal.setEntityUrn(dbtUrn);
try {
_entityClient.ingestProposal(dbtSiblingProposal, _systemAuthentication);
_entityClient.ingestProposal(dbtSiblingProposal, true);
} catch (RemoteInvocationException e) {
log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString());
log.error("Error while associating {} with {}: {}", dbtUrn, sourceUrn, e.toString());
throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e);
}
@ -274,9 +269,9 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
List<Urn> filteredNewSiblingsArray =
newSiblingsUrnArray.stream().filter(urn -> {
try {
return _entityClient.exists(urn, _systemAuthentication);
return _entityClient.exists(urn);
} catch (RemoteInvocationException e) {
log.error("Error while checking existence of {}: {}", urn.toString(), e.toString());
log.error("Error while checking existence of {}: {}", urn, e.toString());
throw new RuntimeException("Error checking existence. Skipping processing.", e);
}
}).collect(Collectors.toList());
@ -294,9 +289,9 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
sourceSiblingProposal.setEntityUrn(sourceUrn);
try {
_entityClient.ingestProposal(sourceSiblingProposal, _systemAuthentication);
_entityClient.ingestProposal(sourceSiblingProposal, true);
} catch (RemoteInvocationException e) {
log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString());
log.error("Error while associating {} with {}: {}", dbtUrn, sourceUrn, e.toString());
throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e);
}
}
@ -406,11 +401,8 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
) {
try {
EntityResponse entityResponse = _entityClient.getV2(
DATASET_ENTITY_NAME,
urn,
ImmutableSet.of(SUB_TYPES_ASPECT_NAME),
_systemAuthentication
);
ImmutableSet.of(SUB_TYPES_ASPECT_NAME));
if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SUB_TYPES_ASPECT_NAME)) {
return new SubTypes(entityResponse.getAspects().get(Constants.SUB_TYPES_ASPECT_NAME).getValue().data());
@ -427,10 +419,8 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
) {
try {
EntityResponse entityResponse = _entityClient.getV2(
DATASET_ENTITY_NAME,
urn,
ImmutableSet.of(UPSTREAM_LINEAGE_ASPECT_NAME),
_systemAuthentication
ImmutableSet.of(UPSTREAM_LINEAGE_ASPECT_NAME)
);
if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
@ -448,10 +438,8 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
) {
try {
EntityResponse entityResponse = _entityClient.getV2(
DATASET_ENTITY_NAME,
urn,
ImmutableSet.of(SIBLINGS_ASPECT_NAME),
_systemAuthentication
ImmutableSet.of(SIBLINGS_ASPECT_NAME)
);
if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SIBLINGS_ASPECT_NAME)) {

View File

@ -1,28 +1,32 @@
package com.linkedin.metadata.kafka.hydrator;
import com.datahub.authentication.Authentication;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.r2.RemoteInvocationException;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.kafka.config.EntityHydratorConfig.EXCLUDED_ASPECTS;
@Slf4j
@RequiredArgsConstructor
public class EntityHydrator {
private final Authentication _systemAuthentication;
private final EntityClient _entityClient;
private final EntityRegistry _entityRegistry;
private final SystemRestliEntityClient _entityClient;
private final ChartHydrator _chartHydrator = new ChartHydrator();
private final CorpUserHydrator _corpUserHydrator = new CorpUserHydrator();
private final DashboardHydrator _dashboardHydrator = new DashboardHydrator();
@ -43,8 +47,12 @@ public class EntityHydrator {
// Hydrate fields from snapshot
EntityResponse entityResponse;
try {
entityResponse = _entityClient.batchGetV2(entityTypeName, Collections.singleton(urnObj), null,
this._systemAuthentication).get(urnObj);
Set<String> aspectNames = Optional.ofNullable(_entityRegistry.getEntitySpecs().get(urnObj.getEntityType()))
.map(spec -> spec.getAspectSpecs().stream().map(AspectSpec::getName)
.filter(aspectName -> !EXCLUDED_ASPECTS.contains(aspectName))
.collect(Collectors.toSet()))
.orElse(Set.of());
entityResponse = _entityClient.batchGetV2(Collections.singleton(urnObj), aspectNames).get(urnObj);
} catch (RemoteInvocationException | URISyntaxException e) {
log.error("Error while calling GMS to hydrate entity for urn {}", urn);
return Optional.empty();

View File

@ -1,6 +1,5 @@
package com.linkedin.metadata.kafka.hook.event;
import com.datahub.authentication.Authentication;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.linkedin.assertion.AssertionResult;
@ -38,8 +37,7 @@ import com.linkedin.entity.Aspect;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DatasetKey;
@ -66,6 +64,7 @@ import com.linkedin.platform.event.v1.EntityChangeEvent;
import com.linkedin.platform.event.v1.Parameters;
import java.net.URISyntaxException;
import java.util.Map;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@ -92,22 +91,19 @@ public class EntityChangeEventGeneratorHookTest {
private static final String TEST_DATA_FLOW_URN = "urn:li:dataFlow:flow";
private static final String TEST_DATA_JOB_URN = "urn:li:dataJob:job";
private Urn actorUrn;
private Authentication _mockAuthentication;
private RestliEntityClient _mockClient;
private SystemRestliEntityClient _mockClient;
private EntityService _mockEntityService;
private EntityChangeEventGeneratorHook _entityChangeEventHook;
@BeforeMethod
public void setupTest() throws URISyntaxException {
actorUrn = Urn.createFromString(TEST_ACTOR_URN);
_mockAuthentication = Mockito.mock(Authentication.class);
_mockClient = Mockito.mock(RestliEntityClient.class);
_mockClient = Mockito.mock(SystemRestliEntityClient.class);
_mockEntityService = Mockito.mock(EntityService.class);
EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry = createEntityChangeEventGeneratorRegistry();
_entityChangeEventHook =
new EntityChangeEventGeneratorHook(entityChangeEventGeneratorRegistry, _mockClient, _mockAuthentication,
createMockEntityRegistry(), true);
new EntityChangeEventGeneratorHook(entityChangeEventGeneratorRegistry, _mockClient, createMockEntityRegistry(), true);
}
@Test
@ -498,8 +494,7 @@ public class EntityChangeEventGeneratorHookTest {
final EntityResponse entityResponse =
buildEntityResponse(ImmutableMap.of(DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME, relationships));
Mockito.when(_mockClient.getV2(eq(DATA_PROCESS_INSTANCE_ENTITY_NAME), eq(dataProcessInstanceUrn),
any(), eq(_mockAuthentication))).thenReturn(entityResponse);
Mockito.when(_mockClient.getV2(eq(dataProcessInstanceUrn), any())).thenReturn(entityResponse);
_entityChangeEventHook.invoke(event);
@ -540,8 +535,7 @@ public class EntityChangeEventGeneratorHookTest {
final EntityResponse entityResponse =
buildEntityResponse(ImmutableMap.of(DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME, relationships));
Mockito.when(_mockClient.getV2(eq(DATA_PROCESS_INSTANCE_ENTITY_NAME), eq(dataProcessInstanceUrn),
any(), eq(_mockAuthentication))).thenReturn(entityResponse);
Mockito.when(_mockClient.getV2(eq(dataProcessInstanceUrn), any())).thenReturn(entityResponse);
_entityChangeEventHook.invoke(event);
@ -618,7 +612,7 @@ public class EntityChangeEventGeneratorHookTest {
// Run change event generators
registry.register(ASSERTION_RUN_EVENT_ASPECT_NAME, new AssertionRunEventChangeEventGenerator());
registry.register(DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,
new DataProcessInstanceRunEventChangeEventGenerator(_mockClient, _mockAuthentication));
new DataProcessInstanceRunEventChangeEventGenerator(_mockClient));
return registry;
}
@ -668,14 +662,14 @@ public class EntityChangeEventGeneratorHookTest {
return registry;
}
private void verifyProducePlatformEvent(EntityClient mockClient, PlatformEvent platformEvent) throws Exception {
private void verifyProducePlatformEvent(SystemRestliEntityClient mockClient, PlatformEvent platformEvent) throws Exception {
verifyProducePlatformEvent(mockClient, platformEvent, true);
}
private void verifyProducePlatformEvent(EntityClient mockClient, PlatformEvent platformEvent, boolean noMoreInteractions) throws Exception {
private void verifyProducePlatformEvent(SystemRestliEntityClient mockClient, PlatformEvent platformEvent, boolean noMoreInteractions) throws Exception {
// Verify event has been emitted.
verify(mockClient, Mockito.times(1)).producePlatformEvent(eq(CHANGE_EVENT_PLATFORM_EVENT_NAME), Mockito.anyString(),
argThat(new PlatformEventMatcher(platformEvent)), Mockito.any(Authentication.class));
argThat(new PlatformEventMatcher(platformEvent)));
if (noMoreInteractions) {
Mockito.verifyNoMoreInteractions(_mockClient);

View File

@ -1,6 +1,5 @@
package com.linkedin.metadata.kafka.hook.siblings;
import com.datahub.authentication.Authentication;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.FabricType;
@ -19,7 +18,7 @@ import com.linkedin.entity.Aspect;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.key.DatasetKey;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
@ -44,19 +43,16 @@ import static org.mockito.ArgumentMatchers.*;
public class SiblingAssociationHookTest {
private SiblingAssociationHook _siblingAssociationHook;
RestliEntityClient _mockEntityClient;
SystemRestliEntityClient _mockEntityClient;
EntitySearchService _mockSearchService;
Authentication _mockAuthentication;
@BeforeMethod
public void setupTest() {
EntityRegistry registry = new ConfigEntityRegistry(
SiblingAssociationHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry-siblings.yml"));
_mockEntityClient = Mockito.mock(RestliEntityClient.class);
_mockEntityClient = Mockito.mock(SystemRestliEntityClient.class);
_mockSearchService = Mockito.mock(EntitySearchService.class);
_mockAuthentication = Mockito.mock(Authentication.class);
_siblingAssociationHook = new SiblingAssociationHook(registry, _mockEntityClient, _mockSearchService, _mockAuthentication,
true);
_siblingAssociationHook = new SiblingAssociationHook(registry, _mockEntityClient, _mockSearchService, true);
_siblingAssociationHook.setEnabled(true);
}
@ -69,15 +65,13 @@ public class SiblingAssociationHookTest {
EntityResponse mockResponse = new EntityResponse();
mockResponse.setAspects(mockResponseMap);
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
Mockito.when(_mockEntityClient.exists(Mockito.any())).thenReturn(true);
Mockito.when(
_mockEntityClient.getV2(
DATASET_ENTITY_NAME,
Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"),
ImmutableSet.of(SUB_TYPES_ASPECT_NAME),
_mockAuthentication
ImmutableSet.of(SUB_TYPES_ASPECT_NAME)
)).thenReturn(mockResponse);
@ -105,10 +99,7 @@ public class SiblingAssociationHookTest {
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
proposal.setChangeType(ChangeType.UPSERT);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal),
Mockito.eq(_mockAuthentication)
);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(Mockito.eq(proposal), eq(true));
final Siblings sourceSiblingsAspect = new Siblings()
.setSiblings(new UrnArray(ImmutableList.of(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"))))
@ -121,10 +112,7 @@ public class SiblingAssociationHookTest {
proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect));
proposal2.setChangeType(ChangeType.UPSERT);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.eq(_mockAuthentication)
);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(Mockito.eq(proposal2), eq(true));
}
@Test
@ -132,23 +120,20 @@ public class SiblingAssociationHookTest {
SubTypes mockSourceSubtypesAspect = new SubTypes();
mockSourceSubtypesAspect.setTypeNames(new StringArray(ImmutableList.of("model")));
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
Mockito.when(_mockEntityClient.exists(Mockito.any())).thenReturn(true);
EnvelopedAspectMap mockResponseMap = new EnvelopedAspectMap();
mockResponseMap.put(SUB_TYPES_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(mockSourceSubtypesAspect.data())));
EntityResponse mockResponse = new EntityResponse();
mockResponse.setAspects(mockResponseMap);
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
Mockito.when(_mockEntityClient.exists(Mockito.any())).thenReturn(true);
Mockito.when(
_mockEntityClient.getV2(
DATASET_ENTITY_NAME,
Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"),
ImmutableSet.of(SUB_TYPES_ASPECT_NAME),
_mockAuthentication
)).thenReturn(mockResponse);
ImmutableSet.of(SUB_TYPES_ASPECT_NAME))).thenReturn(mockResponse);
MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED);
@ -174,15 +159,12 @@ public class SiblingAssociationHookTest {
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
proposal.setChangeType(ChangeType.UPSERT);
Mockito.verify(_mockEntityClient, Mockito.times(0)).ingestProposal(
Mockito.eq(proposal),
Mockito.eq(_mockAuthentication)
);
Mockito.verify(_mockEntityClient, Mockito.times(0)).ingestProposal(Mockito.eq(proposal), eq(true));
}
@Test
public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Exception {
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
Mockito.when(_mockEntityClient.exists(Mockito.any())).thenReturn(true);
MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
@ -208,10 +190,7 @@ public class SiblingAssociationHookTest {
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
proposal.setChangeType(ChangeType.UPSERT);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal),
Mockito.eq(_mockAuthentication)
);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(Mockito.eq(proposal), eq(true));
final Siblings sourceSiblingsAspect = new Siblings()
.setSiblings(new UrnArray(ImmutableList.of(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"))))
@ -224,15 +203,12 @@ public class SiblingAssociationHookTest {
proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect));
proposal2.setChangeType(ChangeType.UPSERT);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.eq(_mockAuthentication)
);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(Mockito.eq(proposal2), eq(true));
}
@Test
public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception {
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
Mockito.when(_mockEntityClient.exists(Mockito.any())).thenReturn(true);
SearchResult returnSearchResult = new SearchResult();
SearchEntityArray returnEntityArray = new SearchEntityArray();
@ -271,10 +247,7 @@ public class SiblingAssociationHookTest {
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
proposal.setChangeType(ChangeType.UPSERT);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal),
Mockito.eq(_mockAuthentication)
);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(Mockito.eq(proposal), eq(true));
final Siblings sourceSiblingsAspect = new Siblings()
.setSiblings(new UrnArray(ImmutableList.of(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"))))
@ -287,10 +260,7 @@ public class SiblingAssociationHookTest {
proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect));
proposal2.setChangeType(ChangeType.UPSERT);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
Mockito.eq(proposal2),
Mockito.eq(_mockAuthentication)
);
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(Mockito.eq(proposal2), eq(true));
}
@Test
public void testInvokeWhenSourceUrnHasTwoDbtUpstreams() throws Exception {
@ -309,10 +279,7 @@ public class SiblingAssociationHookTest {
_siblingAssociationHook.invoke(event);
Mockito.verify(_mockEntityClient, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.eq(_mockAuthentication)
);
Mockito.verify(_mockEntityClient, Mockito.times(0)).ingestProposal(Mockito.any(), eq(true));
}
@ -335,12 +302,7 @@ public class SiblingAssociationHookTest {
_siblingAssociationHook.invoke(event);
Mockito.verify(_mockEntityClient, Mockito.times(2)).ingestProposal(
Mockito.any(),
Mockito.eq(_mockAuthentication)
);
Mockito.verify(_mockEntityClient, Mockito.times(2)).ingestProposal(Mockito.any(), eq(true));
}
private MetadataChangeLog createEvent(String entityType, String aspectName, ChangeType changeType) {

View File

@ -2,7 +2,7 @@ package com.linkedin.metadata.kafka.hook.spring;
import com.datahub.authentication.Authentication;
import com.datahub.metadata.ingestion.IngestionScheduler;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
@ -44,8 +44,8 @@ public class MCLSpringTestConfiguration {
@MockBean
public IngestionScheduler ingestionScheduler;
@MockBean
public RestliEntityClient entityClient;
@MockBean(name = "systemRestliEntityClient")
public SystemRestliEntityClient entityClient;
@MockBean
public ElasticSearchService searchService;

View File

@ -1,8 +1,8 @@
package com.linkedin.metadata.kafka;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.actuate.autoconfigure.solr.SolrHealthContributorAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;

View File

@ -4,8 +4,7 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.datahub.authentication.Authentication;
import com.linkedin.entity.Entity;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
@ -40,15 +39,14 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Conditional(MetadataChangeProposalProcessorCondition.class)
@Import({RestliEntityClientFactory.class, SystemAuthenticationFactory.class, KafkaEventConsumerFactory.class,
DataHubKafkaProducerFactory.class})
@Import({RestliEntityClientFactory.class, KafkaEventConsumerFactory.class, DataHubKafkaProducerFactory.class})
@EnableKafka
@RequiredArgsConstructor
public class MetadataChangeEventsProcessor {
@NonNull
private final Authentication systemAuthentication;
private final RestliEntityClient entityClient;
private final SystemRestliEntityClient entityClient;
private final Producer<String, IndexedRecord> kafkaProducer;
private final Histogram kafkaLagStats = MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag"));

View File

@ -2,9 +2,7 @@ package com.linkedin.metadata.kafka;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.datahub.authentication.Authentication;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
@ -35,15 +33,13 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Import({RestliEntityClientFactory.class, SystemAuthenticationFactory.class, KafkaEventConsumerFactory.class,
DataHubKafkaProducerFactory.class})
@Import({RestliEntityClientFactory.class, KafkaEventConsumerFactory.class, DataHubKafkaProducerFactory.class})
@Conditional(MetadataChangeProposalProcessorCondition.class)
@EnableKafka
@RequiredArgsConstructor
public class MetadataChangeProposalsProcessor {
private final Authentication systemAuthentication;
private final RestliEntityClient entityClient;
private final SystemRestliEntityClient entityClient;
private final Producer<String, IndexedRecord> kafkaProducer;
private final Histogram kafkaLagStats = MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag"));
@ -64,7 +60,7 @@ public class MetadataChangeProposalsProcessor {
event = EventUtils.avroToPegasusMCP(record);
log.debug("MetadataChangeProposal {}", event);
// TODO: Get this from the event itself.
entityClient.ingestProposal(event, this.systemAuthentication, false);
entityClient.ingestProposal(event, false);
} catch (Throwable throwable) {
log.error("MCP Processor Error", throwable);
log.error("Message: {}", record);

View File

@ -7,6 +7,7 @@ dependencies {
implementation externalDependency.slf4jApi
implementation externalDependency.springCore
implementation externalDependency.springBeans
compileOnly externalDependency.lombok

View File

@ -1,5 +1,6 @@
package com.linkedin.metadata.config.cache;
import com.linkedin.metadata.config.cache.client.ClientCacheConfiguration;
import lombok.Data;
@ -8,4 +9,5 @@ public class CacheConfiguration {
PrimaryCacheConfiguration primary;
HomepageCacheConfiguration homepage;
SearchCacheConfiguration search;
ClientCacheConfiguration client;
}

View File

@ -0,0 +1,10 @@
package com.linkedin.metadata.config.cache.client;
public interface ClientCacheConfig {
boolean isEnabled();
boolean isStatsEnabled();
int getStatsIntervalSeconds();
int getDefaultTTLSeconds();
int getMaxBytes();
}

View File

@ -0,0 +1,9 @@
package com.linkedin.metadata.config.cache.client;
import lombok.Data;
@Data
public class ClientCacheConfiguration {
EntityClientCacheConfig entityClient;
UsageClientCacheConfig usageClient;
}

View File

@ -0,0 +1,17 @@
package com.linkedin.metadata.config.cache.client;
import lombok.Data;
import java.util.Map;
@Data
public class EntityClientCacheConfig implements ClientCacheConfig {
private boolean enabled;
private boolean statsEnabled;
private int statsIntervalSeconds;
private int defaultTTLSeconds;
private int maxBytes;
// entityName -> aspectName -> cache ttl override
private Map<String, Map<String, Integer>> entityAspectTTLSeconds;
}

View File

@ -0,0 +1,12 @@
package com.linkedin.metadata.config.cache.client;
import lombok.Data;
@Data
public class UsageClientCacheConfig implements ClientCacheConfig {
private boolean enabled;
private boolean statsEnabled;
private int statsIntervalSeconds;
private int defaultTTLSeconds;
private int maxBytes;
}

View File

@ -1,14 +1,18 @@
package com.linkedin.gms.factory.spring;
package com.linkedin.metadata.spring;
import java.io.IOException;
import java.util.Properties;
import org.springframework.beans.factory.config.YamlPropertiesFactoryBean;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.support.EncodedResource;
import org.springframework.core.io.support.PropertySourceFactory;
import java.io.IOException;
import java.util.Properties;
/**
* Required for Spring to parse the application.yml provided by this module
*/
public class YamlPropertySourceFactory implements PropertySourceFactory {
@Override

View File

@ -327,3 +327,27 @@ cache:
lineage:
ttlSeconds: ${CACHE_SEARCH_LINEAGE_TTL_SECONDS:86400} # 1 day
lightningThreshold: ${CACHE_SEARCH_LINEAGE_LIGHTNING_THRESHOLD:300}
client:
usageClient:
enabled: ${CACHE_CLIENT_USAGE_CLIENT_ENABLED:true}
statsEnabled: ${CACHE_CLIENT_USAGE_CLIENT_STATS_ENABLED:true}
statsIntervalSeconds: ${CACHE_CLIENT_USAGE_CLIENT_STATS_INTERVAL_SECONDS:120}
defaultTTLSeconds: ${CACHE_CLIENT_USAGE_CLIENT_TTL_SECONDS:86400} # 1 day
maxBytes: ${CACHE_CLIENT_USAGE_CLIENT_MAX_BYTES:52428800} # 50MB
entityClient:
enabled: ${CACHE_CLIENT_ENTITY_CLIENT_ENABLED:true}
statsEnabled: ${CACHE_CLIENT_ENTITY_CLIENT_STATS_ENABLED:true}
statsIntervalSeconds: ${CACHE_CLIENT_ENTITY_CLIENT_STATS_INTERVAL_SECONDS:120}
defaultTTLSeconds: ${CACHE_CLIENT_ENTITY_CLIENT_TTL_SECONDS:0} # do not cache entity/aspects by default
maxBytes: ${CACHE_CLIENT_USAGE_ENTITY_MAX_BYTES:104857600} # 100MB
entityAspectTTLSeconds:
# cache user aspects for 20s
corpuser:
corpUserKey: 20
corpUserInfo: 20
corpUserEditableInfo: 20
corpUserStatus: 20
globalTags: 20
status: 20
corpUserCredentials: 20
corpUserSettings: 20

View File

@ -19,7 +19,7 @@ import com.datahub.plugins.loader.IsolatedClassLoader;
import com.datahub.plugins.loader.PluginPermissionManagerImpl;
import com.google.common.collect.ImmutableMap;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import java.nio.file.Path;
import java.nio.file.Paths;

View File

@ -4,7 +4,7 @@ import com.datahub.authentication.Authentication;
import com.datahub.authorization.DataHubAuthorizer;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

View File

@ -1,9 +1,10 @@
package com.linkedin.gms.factory.auth;
import com.datahub.authentication.token.StatefulTokenService;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.entity.EntityService;
import javax.annotation.Nonnull;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;

View File

@ -4,7 +4,7 @@ package com.linkedin.gms.factory.auth;
import com.datahub.authentication.group.GroupService;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import javax.annotation.Nonnull;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.auth;
import com.datahub.authentication.invite.InviteTokenService;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.secret.SecretService;
import javax.annotation.Nonnull;

View File

@ -4,7 +4,7 @@ package com.linkedin.gms.factory.auth;
import com.datahub.authentication.user.NativeUserService;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.secret.SecretService;
import javax.annotation.Nonnull;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.auth;
import com.datahub.authentication.post.PostService;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -3,7 +3,7 @@
package com.linkedin.gms.factory.auth;
import com.datahub.authorization.role.RoleService;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -3,7 +3,7 @@ package com.linkedin.gms.factory.auth;
import com.datahub.authentication.Actor;
import com.datahub.authentication.ActorType;
import com.datahub.authentication.Authentication;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import javax.annotation.Nonnull;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;

View File

@ -3,7 +3,7 @@ package com.linkedin.gms.factory.common;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.LineageRegistry;
import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO;
import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.common;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.systemmetadata.ESSystemMetadataDAO;
import com.linkedin.metadata.systemmetadata.ElasticSearchSystemMetadataService;
import javax.annotation.Nonnull;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.common;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import org.apache.http.ssl.SSLContextBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.common;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.neo4j.Neo4jGraphService;
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.common;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import org.springframework.beans.factory.annotation.Value;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.common;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.ebean.config.ServerConfig;
import io.ebean.datasource.DataSourceConfig;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.common;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import java.util.concurrent.TimeUnit;
import org.neo4j.driver.AuthTokens;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.common;
import com.linkedin.gms.factory.auth.AwsRequestSigningApacheInterceptor;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import java.io.IOException;
import javax.annotation.Nonnull;
import javax.net.ssl.HostnameVerifier;

View File

@ -12,7 +12,7 @@ import com.linkedin.metadata.config.cache.CacheConfiguration;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.config.search.ElasticSearchConfiguration;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.config.telemetry.TelemetryConfiguration;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.dataproduct;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.service.DataProductService;

View File

@ -1,8 +1,11 @@
package com.linkedin.gms.factory.entity;
import com.datahub.authentication.Authentication;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
import com.linkedin.metadata.client.SystemJavaEntityClient;
import com.linkedin.metadata.entity.DeleteEntityService;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.event.EventProducer;
@ -53,12 +56,8 @@ public class JavaEntityClientFactory {
@Qualifier("kafkaEventProducer")
private EventProducer _eventProducer;
@Autowired
@Qualifier("restliEntityClient")
private RestliEntityClient _restliEntityClient;
@Bean("javaEntityClient")
public JavaEntityClient getJavaEntityClient() {
public JavaEntityClient getJavaEntityClient(@Qualifier("restliEntityClient") final RestliEntityClient restliEntityClient) {
return new JavaEntityClient(
_entityService,
_deleteEntityService,
@ -68,6 +67,24 @@ public class JavaEntityClientFactory {
_lineageSearchService,
_timeseriesAspectService,
_eventProducer,
_restliEntityClient);
restliEntityClient);
}
@Bean("systemJavaEntityClient")
public SystemJavaEntityClient systemJavaEntityClient(@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider,
@Qualifier("systemAuthentication") final Authentication systemAuthentication,
@Qualifier("systemRestliEntityClient") final RestliEntityClient restliEntityClient) {
return new SystemJavaEntityClient(
_entityService,
_deleteEntityService,
_entitySearchService,
_cachingEntitySearchService,
_searchService,
_lineageSearchService,
_timeseriesAspectService,
_eventProducer,
restliEntityClient,
systemAuthentication,
configurationProvider.getCache().getClient().getEntityClient());
}
}

View File

@ -1,10 +1,14 @@
package com.linkedin.gms.factory.entity;
import com.datahub.authentication.Authentication;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.restli.client.Client;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -48,4 +52,17 @@ public class RestliEntityClientFactory {
}
return new RestliEntityClient(restClient, new ExponentialBackoff(retryInterval), numRetries);
}
@Bean("systemRestliEntityClient")
public SystemRestliEntityClient systemRestliEntityClient(@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider,
@Qualifier("systemAuthentication") final Authentication systemAuthentication) {
final Client restClient;
if (gmsUri != null) {
restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol);
} else {
restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
}
return new SystemRestliEntityClient(restClient, new ExponentialBackoff(retryInterval), numRetries,
systemAuthentication, configurationProvider.getCache().getClient().getEntityClient());
}
}

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.entity;
import com.datastax.oss.driver.api.core.CqlSession;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.RetentionService;
import com.linkedin.metadata.entity.cassandra.CassandraRetentionService;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.entityregistry;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import java.io.IOException;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.entityregistry;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.PluginEntityRegistryLoader;
import java.io.FileNotFoundException;
import java.net.MalformedURLException;

View File

@ -20,6 +20,7 @@ import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.recommendation.RecommendationServiceFactory;
import com.linkedin.metadata.client.SystemJavaEntityClient;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.GraphService;
@ -65,6 +66,10 @@ public class GraphQLEngineFactory {
@Qualifier("javaEntityClient")
private JavaEntityClient _entityClient;
@Autowired
@Qualifier("systemJavaEntityClient")
private SystemJavaEntityClient _systemEntityClient;
@Autowired
@Qualifier("graphClient")
private GraphClient _graphClient;
@ -170,6 +175,7 @@ public class GraphQLEngineFactory {
protected GraphQLEngine getInstance() {
GmsGraphQLEngineArgs args = new GmsGraphQLEngineArgs();
args.setEntityClient(_entityClient);
args.setSystemEntityClient(_systemEntityClient);
args.setGraphClient(_graphClient);
args.setUsageClient(_usageClient);
if (isAnalyticsEnabled) {

View File

@ -6,7 +6,7 @@ import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.kafka;
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.mxe.TopicConvention;

View File

@ -6,7 +6,7 @@ import com.linkedin.gms.factory.kafka.schemaregistry.AwsGlueSchemaRegistryFactor
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.KafkaSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import java.util.Arrays;
import java.util.Map;
import org.apache.avro.generic.IndexedRecord;

View File

@ -5,7 +5,7 @@ import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafka
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.kafka.schemaregistry;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.lineage;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import javax.annotation.Nonnull;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.ownership;
import com.datahub.authentication.Authentication;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.service.OwnershipTypeService;
import javax.annotation.Nonnull;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.query;
import com.datahub.authentication.Authentication;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.service.QueryService;
import javax.annotation.Nonnull;

View File

@ -2,7 +2,7 @@ package com.linkedin.gms.factory.search;
import com.linkedin.gms.factory.common.IndexConventionFactory;
import com.linkedin.gms.factory.common.RestHighLevelClientFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.search;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.client.CachingEntitySearchService;
import javax.annotation.Nonnull;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.search;
import com.linkedin.gms.factory.common.RestHighLevelClientFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import javax.annotation.Nonnull;
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;

View File

@ -6,7 +6,7 @@ import com.linkedin.gms.factory.common.GitVersionFactory;
import com.linkedin.gms.factory.common.IndexConventionFactory;
import com.linkedin.gms.factory.common.RestHighLevelClientFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.version.GitVersion;
import javax.annotation.Nonnull;

View File

@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;

View File

@ -2,7 +2,7 @@ package com.linkedin.gms.factory.search;
import com.linkedin.gms.factory.common.GraphServiceFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.LineageSearchService;
import com.linkedin.metadata.search.SearchService;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.search;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.search;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.SearchService;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.search;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder;
import org.springframework.beans.factory.annotation.Autowired;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.search.views;
import com.datahub.authentication.Authentication;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.service.ViewService;
import javax.annotation.Nonnull;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.settings;
import com.datahub.authentication.Authentication;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.service.SettingsService;
import javax.annotation.Nonnull;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.telemetry;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.mixpanel.mixpanelapi.MixpanelAPI;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.telemetry;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.mixpanel.mixpanelapi.MessageBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.telemetry;
import com.datahub.telemetry.TrackingService;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.secret.SecretService;
import com.linkedin.metadata.version.GitVersion;

View File

@ -1,7 +1,7 @@
package com.linkedin.gms.factory.timeline;
import com.datahub.authentication.Authentication;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.timeline.eventgenerator.AssertionRunEventChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.DataProcessInstanceRunEventChangeEventGenerator;
import com.linkedin.metadata.timeline.eventgenerator.DatasetPropertiesChangeEventGenerator;
@ -38,7 +38,7 @@ public class EntityChangeEventGeneratorRegistryFactory {
@Singleton
@Nonnull
protected com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGeneratorRegistry entityChangeEventGeneratorRegistry() {
final RestliEntityClient entityClient = applicationContext.getBean(RestliEntityClient.class);
final SystemRestliEntityClient entityClient = applicationContext.getBean(SystemRestliEntityClient.class);
final Authentication systemAuthentication = applicationContext.getBean(Authentication.class);
final com.linkedin.metadata.timeline.eventgenerator.EntityChangeEventGeneratorRegistry registry =
@ -74,7 +74,7 @@ public class EntityChangeEventGeneratorRegistryFactory {
// Data Process Instance differs
registry.register(DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,
new DataProcessInstanceRunEventChangeEventGenerator(entityClient, systemAuthentication));
new DataProcessInstanceRunEventChangeEventGenerator(entityClient));
// TODO: Add ML models.

View File

@ -1,6 +1,6 @@
package com.linkedin.gms.factory.timeline;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.timeline.TimelineService;

View File

@ -2,7 +2,7 @@ package com.linkedin.gms.factory.timeseries;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.timeseries.elastic.ElasticSearchTimeseriesAspectService;
import com.linkedin.metadata.timeseries.elastic.indexbuilder.TimeseriesAspectIndexBuilders;

View File

@ -1,10 +1,14 @@
package com.linkedin.gms.factory.usage;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.datahub.authentication.Authentication;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.restli.client.Client;
import com.linkedin.usage.UsageClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -33,10 +37,15 @@ public class UsageClientFactory {
@Value("${usageClient.numRetries:3}")
private int numRetries;
@Autowired
@Qualifier("configurationProvider")
private ConfigurationProvider configurationProvider;
@Bean("usageClient")
public UsageClient getUsageClient() {
public UsageClient getUsageClient(@Qualifier("systemAuthentication") final Authentication systemAuthentication) {
Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
return new UsageClient(restClient, new ExponentialBackoff(retryInterval), numRetries);
return new UsageClient(restClient, new ExponentialBackoff(retryInterval), numRetries, systemAuthentication,
configurationProvider.getCache().getClient().getUsageClient());
}
}

View File

@ -1,7 +1,7 @@
package com.linkedin.metadata.boot.factories;
import com.linkedin.gms.factory.entity.RetentionServiceFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.boot.steps.IngestRetentionPoliciesStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.RetentionService;

View File

@ -1,7 +1,7 @@
package io.datahubproject.openapi.util;
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.models.registry.EntityRegistry;
import io.datahubproject.openapi.config.OpenAPIEntityTestConfiguration;
import io.datahubproject.openapi.dto.UpsertAspectRequest;

View File

@ -7,6 +7,7 @@ dependencies {
api project(path: ':metadata-service:restli-api', configuration: 'restClient')
api project(':metadata-events:mxe-schemas')
api project(':metadata-utils')
implementation project(':metadata-service:configuration')
implementation externalDependency.slf4jApi
compileOnly externalDependency.lombok

View File

@ -0,0 +1,134 @@
package com.linkedin.common.client;
import com.codahale.metrics.Gauge;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.linkedin.metadata.config.cache.client.ClientCacheConfig;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import lombok.Builder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* Generic cache with common configuration for limited weight, per item expiry, and batch loading
* @param <K> key
* @param <V> value
*/
@Slf4j
@Builder
public class ClientCache<K, V, C extends ClientCacheConfig> {
@NonNull
protected final C config;
@NonNull
protected final LoadingCache<K, V> cache;
@NonNull
private final Function<Iterable<? extends K>, Map<K, V>> loadFunction;
@NonNull
private final Weigher<K, V> weigher;
@NonNull
private final BiFunction<C, K, Integer> ttlSecondsFunction;
public @Nullable V get(@NonNull K key) {
return cache.get(key);
}
public @NonNull Map<@NonNull K, @NonNull V> getAll(@NonNull Iterable<? extends @NonNull K> keys) {
return cache.getAll(keys);
}
public void refresh(@NonNull K key) {
cache.refresh(key);
}
public static class ClientCacheBuilder<K, V, C extends ClientCacheConfig> {
private ClientCacheBuilder<K, V, C> cache(LoadingCache<K, V> cache) {
return null;
}
private ClientCache<K, V, C> build() {
return null;
}
public ClientCache<K, V, C> build(Class<?> metricClazz) {
// loads data from entity client
CacheLoader<K, V> loader = new CacheLoader<>() {
@Override
public V load(@NonNull K key) {
return loadAll(List.of(key)).get(key);
}
@Override
@NonNull
public Map<K, V> loadAll(@NonNull Iterable<? extends K> keys) {
return loadFunction.apply(keys);
}
};
// build cache
Caffeine<K, V> caffeine = Caffeine.newBuilder()
.maximumWeight(config.getMaxBytes())
// limit total size
.weigher(weigher)
.softValues()
// define per entity/aspect ttls
.expireAfter(new Expiry<K, V>() {
public long expireAfterCreate(@NonNull K key, @NonNull V aspect, long currentTime) {
int ttlSeconds = ttlSecondsFunction.apply(config, key);
if (ttlSeconds < 0) {
ttlSeconds = Integer.MAX_VALUE;
}
return TimeUnit.SECONDS.toNanos(ttlSeconds);
}
public long expireAfterUpdate(@NonNull K key, @NonNull V aspect,
long currentTime, long currentDuration) {
return currentDuration;
}
public long expireAfterRead(@NonNull K key, @NonNull V aspect,
long currentTime, long currentDuration) {
return currentDuration;
}
});
if (config.isStatsEnabled()) {
caffeine.recordStats();
}
LoadingCache<K, V> cache = caffeine.build(loader);
if (config.isStatsEnabled()) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
executor.scheduleAtFixedRate(() -> {
CacheStats cacheStats = cache.stats();
MetricUtils.gauge(metricClazz, "hitRate", () -> (Gauge<Double>) cacheStats::hitRate);
MetricUtils.gauge(metricClazz, "loadFailureRate", () ->
(Gauge<Double>) cacheStats::loadFailureRate);
MetricUtils.gauge(metricClazz, "evictionCount", () ->
(Gauge<Long>) cacheStats::evictionCount);
MetricUtils.gauge(metricClazz, "loadFailureCount", () ->
(Gauge<Long>) cacheStats::loadFailureCount);
MetricUtils.gauge(metricClazz, "averageLoadPenalty", () ->
(Gauge<Double>) cacheStats::averageLoadPenalty);
MetricUtils.gauge(metricClazz, "evictionWeight", () ->
(Gauge<Long>) cacheStats::evictionWeight);
log.debug(metricClazz.getSimpleName() + ": " + cacheStats);
}, 0, config.getStatsIntervalSeconds(), TimeUnit.SECONDS);
}
return new ClientCache<>(config, cache, loadFunction, weigher, ttlSecondsFunction);
}
}
}

View File

@ -0,0 +1,141 @@
package com.linkedin.entity.client;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
import com.linkedin.common.client.ClientCache;
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.util.Pair;
import lombok.Builder;
import lombok.Data;
import lombok.NonNull;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName;
@Builder
public class EntityClientCache {
@NonNull
private EntityClientCacheConfig config;
@NonNull
private final ClientCache<Key, EnvelopedAspect, EntityClientCacheConfig> cache;
@NonNull
private BiFunction<Set<Urn>, Set<String>, Map<Urn, EntityResponse>> loadFunction;
public EntityResponse getV2(@Nonnull final Urn urn, @Nonnull final Set<String> aspectNames) {
return batchGetV2(Set.of(urn), aspectNames).get(urn);
}
public Map<Urn, EntityResponse> batchGetV2(@Nonnull final Set<Urn> urns, @Nonnull final Set<String> aspectNames) {
final Map<Urn, EntityResponse> response;
if (config.isEnabled()) {
Set<Key> keys = urns.stream()
.flatMap(urn -> aspectNames.stream()
.map(a -> Key.builder().urn(urn).aspectName(a).build()))
.collect(Collectors.toSet());
Map<Key, EnvelopedAspect> envelopedAspects = cache.getAll(keys);
Set<EntityResponse> responses = envelopedAspects.entrySet().stream()
.map(entry -> Pair.of(entry.getKey().getUrn(), entry.getValue()))
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())))
.entrySet().stream().map(e -> toEntityResponse(e.getKey(), e.getValue()))
.collect(Collectors.toSet());
response = responses.stream().collect(Collectors.toMap(EntityResponse::getUrn, Function.identity()));
} else {
response = loadFunction.apply(urns, aspectNames);
}
return response;
}
private static EntityResponse toEntityResponse(Urn urn, Collection<EnvelopedAspect> envelopedAspects) {
final EntityResponse response = new EntityResponse();
response.setUrn(urn);
response.setEntityName(urnToEntityName(urn));
response.setAspects(new EnvelopedAspectMap(
envelopedAspects.stream()
.collect(Collectors.toMap(EnvelopedAspect::getName, aspect -> aspect))
));
return response;
}
public static class EntityClientCacheBuilder {
private EntityClientCacheBuilder cache(LoadingCache<Key, EnvelopedAspect> cache) {
return this;
}
public EntityClientCache build(Class<?> metricClazz) {
// estimate size
Weigher<Key, EnvelopedAspect> weighByEstimatedSize = (key, value) ->
value.getValue().data().values().parallelStream()
.mapToInt(o -> o.toString().getBytes().length)
.sum();
// batch loads data from entity client (restli or java)
Function<Iterable<? extends Key>, Map<Key, EnvelopedAspect>> loader = (Iterable<? extends Key> keys) -> {
Map<String, Set<Key>> keysByEntity = StreamSupport.stream(keys.spliterator(), true)
.collect(Collectors.groupingBy(Key::getEntityName, Collectors.toSet()));
Stream<Map.Entry<Key, EnvelopedAspect>> results = keysByEntity.entrySet().parallelStream()
.flatMap(entry -> {
Set<Urn> urns = entry.getValue().stream()
.map(Key::getUrn)
.collect(Collectors.toSet());
Set<String> aspects = entry.getValue().stream()
.map(Key::getEntityName)
.collect(Collectors.toSet());
return loadFunction.apply(urns, aspects).entrySet().stream();
})
.flatMap(resp -> resp.getValue().getAspects().values().stream()
.map(envAspect -> {
Key key = Key.builder().urn(resp.getKey()).aspectName(envAspect.getName()).build();
return Map.entry(key, envAspect);
}));
return results.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
};
// ideally the cache time comes from caching headers from service, but configuration driven for now
BiFunction<EntityClientCacheConfig, Key, Integer> ttlSeconds = (config, key) ->
Optional.ofNullable(config.getEntityAspectTTLSeconds()).orElse(Map.of())
.getOrDefault(key.getEntityName(), Map.of())
.getOrDefault(key.getAspectName(), config.getDefaultTTLSeconds());
cache = ClientCache.<Key, EnvelopedAspect, EntityClientCacheConfig>builder()
.weigher(weighByEstimatedSize)
.config(config)
.loadFunction(loader)
.ttlSecondsFunction(ttlSeconds)
.build(metricClazz);
return new EntityClientCache(config, cache, loadFunction);
}
}
@Data
@Builder
protected static class Key {
private final Urn urn;
private final String aspectName;
public String getEntityName() {
return urn.getEntityType();
}
}
}

View File

@ -0,0 +1,91 @@
package com.linkedin.entity.client;
import com.datahub.authentication.Authentication;
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.EntityResponse;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.PlatformEvent;
import com.linkedin.r2.RemoteInvocationException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Set;
/**
* Adds entity/aspect cache and assumes system authentication
*/
public interface SystemEntityClient extends EntityClient {
EntityClientCache getEntityClientCache();
Authentication getSystemAuthentication();
/**
* Builds the cache
* @param systemAuthentication system authentication
* @param cacheConfig cache configuration
* @return the cache
*/
default EntityClientCache buildEntityClientCache(Class<?> metricClazz, Authentication systemAuthentication, EntityClientCacheConfig cacheConfig) {
return EntityClientCache.builder()
.config(cacheConfig)
.loadFunction((Set<Urn> urns, Set<String> aspectNames) -> {
try {
String entityName = urns.stream().findFirst().map(Urn::getEntityType).get();
if (urns.stream().anyMatch(urn -> !urn.getEntityType().equals(entityName))) {
throw new IllegalArgumentException("Urns must be of the same entity type. RestliEntityClient API limitation.");
}
return batchGetV2(entityName, urns, aspectNames, systemAuthentication);
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException(e);
}
}).build(metricClazz);
}
/**
* Get an entity by urn with the given aspects
* @param urn the id of the entity
* @param aspectNames aspects of the entity
* @return response object
* @throws RemoteInvocationException
* @throws URISyntaxException
*/
@Nullable
default EntityResponse getV2(@Nonnull Urn urn, @Nonnull Set<String> aspectNames)
throws RemoteInvocationException, URISyntaxException {
return getEntityClientCache().getV2(urn, aspectNames);
}
/**
* Batch get a set of aspects for a single entity type, multiple ids with the given aspects.
*
* @param urns the urns of the entities to batch get
* @param aspectNames the aspect names to batch get
* @throws RemoteInvocationException
*/
@Nonnull
default Map<Urn, EntityResponse> batchGetV2(@Nonnull Set<Urn> urns, @Nonnull Set<String> aspectNames)
throws RemoteInvocationException, URISyntaxException {
return getEntityClientCache().batchGetV2(urns, aspectNames);
}
default void producePlatformEvent(@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event) throws Exception {
producePlatformEvent(name, key, event, getSystemAuthentication());
}
default boolean exists(@Nonnull Urn urn) throws RemoteInvocationException {
return exists(urn, getSystemAuthentication());
}
default String ingestProposal(@Nonnull final MetadataChangeProposal metadataChangeProposal, final boolean async) throws RemoteInvocationException {
return ingestProposal(metadataChangeProposal, getSystemAuthentication(), async);
}
default void setWritable(boolean canWrite) throws RemoteInvocationException {
setWritable(canWrite, getSystemAuthentication());
}
}

View File

@ -0,0 +1,25 @@
package com.linkedin.entity.client;
import com.datahub.authentication.Authentication;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.restli.client.Client;
import lombok.Getter;
import javax.annotation.Nonnull;
/**
* Restli backed SystemEntityClient
*/
@Getter
public class SystemRestliEntityClient extends RestliEntityClient implements SystemEntityClient {
private final EntityClientCache entityClientCache;
private final Authentication systemAuthentication;
public SystemRestliEntityClient(@Nonnull final Client restliClient, @Nonnull final BackoffPolicy backoffPolicy, int retryCount,
Authentication systemAuthentication, EntityClientCacheConfig cacheConfig) {
super(restliClient, backoffPolicy, retryCount);
this.systemAuthentication = systemAuthentication;
this.entityClientCache = buildEntityClientCache(SystemRestliEntityClient.class, systemAuthentication, cacheConfig);
}
}

View File

@ -5,6 +5,7 @@ import com.linkedin.common.EntityRelationships;
import com.linkedin.common.WindowDuration;
import com.linkedin.common.client.BaseClient;
import com.linkedin.metadata.config.cache.client.UsageClientCacheConfig;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.restli.client.Client;
@ -17,19 +18,39 @@ public class UsageClient extends BaseClient {
private static final UsageStatsRequestBuilders USAGE_STATS_REQUEST_BUILDERS =
new UsageStatsRequestBuilders();
public UsageClient(@Nonnull final Client restliClient, @Nonnull final BackoffPolicy backoffPolicy, int retryCount) {
private final UsageClientCache usageClientCache;
public UsageClient(@Nonnull final Client restliClient, @Nonnull final BackoffPolicy backoffPolicy, int retryCount,
Authentication systemAuthentication, UsageClientCacheConfig cacheConfig) {
super(restliClient, backoffPolicy, retryCount);
this.usageClientCache = UsageClientCache.builder()
.config(cacheConfig)
.loadFunction((String resource, UsageTimeRange range) -> {
try {
return getUsageStats(resource, range, systemAuthentication);
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException(e);
}
}).build();
}
/**
* Gets a specific version of downstream {@link EntityRelationships} for the given dataset.
* Using cache and system authentication.
* Validate permissions before use!
*/
@Nonnull
public UsageQueryResult getUsageStats(@Nonnull String resource, @Nonnull UsageTimeRange range) {
return usageClientCache.getUsageStats(resource, range);
}
/**
* Gets a specific version of downstream {@link EntityRelationships} for the given dataset.
*/
@Nonnull
public UsageQueryResult getUsageStats(
@Nonnull String resource,
@Nonnull UsageTimeRange range,
@Nonnull Authentication authentication
) throws RemoteInvocationException, URISyntaxException {
private UsageQueryResult getUsageStats(@Nonnull String resource, @Nonnull UsageTimeRange range,
@Nonnull Authentication authentication)
throws RemoteInvocationException, URISyntaxException {
final UsageStatsDoQueryRangeRequestBuilder requestBuilder = USAGE_STATS_REQUEST_BUILDERS.actionQueryRange()
.resourceParam(resource)
.durationParam(WindowDuration.DAY)

View File

@ -0,0 +1,75 @@
package com.linkedin.usage;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
import com.linkedin.common.client.ClientCache;
import com.linkedin.metadata.config.cache.client.UsageClientCacheConfig;
import lombok.Builder;
import lombok.Data;
import lombok.NonNull;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@Builder
public class UsageClientCache {
@NonNull
private UsageClientCacheConfig config;
@NonNull
private final ClientCache<Key, UsageQueryResult, UsageClientCacheConfig> cache;
@NonNull
private BiFunction<String, UsageTimeRange, UsageQueryResult> loadFunction;
public UsageQueryResult getUsageStats(@Nonnull String resource, @Nonnull UsageTimeRange range) {
if (config.isEnabled()) {
return cache.get(Key.builder().resource(resource).range(range).build());
} else {
return loadFunction.apply(resource, range);
}
}
public static class UsageClientCacheBuilder {
private UsageClientCacheBuilder cache(LoadingCache<Key, UsageQueryResult> cache) {
return this;
}
public UsageClientCache build() {
// estimate size
Weigher<Key, UsageQueryResult> weighByEstimatedSize = (key, value) ->
value.data().values().parallelStream()
.mapToInt(o -> o.toString().getBytes().length)
.sum();
// batch loads data from usage client
Function<Iterable<? extends Key>, Map<Key, UsageQueryResult>> loader = (Iterable<? extends Key> keys) ->
StreamSupport.stream(keys.spliterator(), true)
.map(k -> Map.entry(k, loadFunction.apply(k.getResource(), k.getRange())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// default ttl only
BiFunction<UsageClientCacheConfig, Key, Integer> ttlSeconds = (config, key) -> config.getDefaultTTLSeconds();
cache = ClientCache.<Key, UsageQueryResult, UsageClientCacheConfig>builder()
.weigher(weighByEstimatedSize)
.config(config)
.loadFunction(loader)
.ttlSecondsFunction(ttlSeconds)
.build(UsageClientCache.class);
return new UsageClientCache(config, cache, loadFunction);
}
}
@Data
@Builder
protected static class Key {
private final String resource;
private final UsageTimeRange range;
}
}

View File

@ -1,6 +1,7 @@
package com.linkedin.metadata.utils.metrics;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
@ -48,4 +49,8 @@ public class MetricUtils {
public static Timer timer(String metricName) {
return REGISTRY.timer(MetricRegistry.name(metricName));
}
public static <T extends Gauge<?>> T gauge(Class<?> clazz, String metricName, MetricRegistry.MetricSupplier<T> supplier) {
return REGISTRY.gauge(MetricRegistry.name(clazz, metricName), supplier);
}
}