feat(Search): Inject restli client into index builders (#2024)

* feat(Search): Inject restli client into index builders
Inject restli client into index builders
This commit is contained in:
Nagarjuna Kanamarlapudi 2020-12-03 11:43:48 -08:00 committed by GitHub
parent 838f964114
commit f103998dcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 121 additions and 100 deletions

View File

@ -1,29 +0,0 @@
package com.linkedin.metadata.builders.search;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* A class that holds all the registered {@link BaseIndexBuilder}.
*
* Register new type of index builders by adding them to {@link #REGISTERED_INDEX_BUILDERS}.
*/
public class RegisteredIndexBuilders {
public static final Set<BaseIndexBuilder> REGISTERED_INDEX_BUILDERS =
Collections.unmodifiableSet(new HashSet<BaseIndexBuilder>() {
{
add(new ChartIndexBuilder());
add(new CorpUserInfoIndexBuilder());
add(new DashboardIndexBuilder());
add(new DataProcessIndexBuilder());
add(new DatasetIndexBuilder());
}
});
private RegisteredIndexBuilders() {
// Util class
}
}

View File

@ -20,33 +20,26 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class SnapshotProcessor {
// Set of document index builders that are interested in parsing the snapshot
private final Set<? extends BaseIndexBuilder> _registeredBuilders;
/**
* Mapping of metadata snapshot type to the list of document index builders that subscribe to it.
*/
private final Map<String, List<BaseIndexBuilder<?>>> _snapshotTypeToIndexBuilders;
/**
* Constructor.
*
* @param registerdBuilders Set of document index builders who are interested in parsing metadata snapshot
* @param builders Set of document index builders who are interested in parsing metadata snapshot
*/
public SnapshotProcessor(@Nonnull Set<? extends BaseIndexBuilder> registerdBuilders) {
_registeredBuilders = registerdBuilders;
}
public SnapshotProcessor(@Nonnull Set<? extends BaseIndexBuilder> builders) {
_snapshotTypeToIndexBuilders = new HashMap<>();
/**
* Constructs mapping of metadata snapshot type to the list of document index builders that subscribe to it.
*/
@Nonnull
private Map<String, List<Class<? extends BaseIndexBuilder>>> getSnapshotBuildersMap() {
Map<String, List<Class<? extends BaseIndexBuilder>>> snapshotBuilderMap = new HashMap<>();
for (BaseIndexBuilder builder : _registeredBuilders) {
@SuppressWarnings("unchecked")
for (BaseIndexBuilder<?> builder : builders) {
List<Class<? extends RecordTemplate>> snapshotsSubscribed = builder._snapshotsInterested;
snapshotsSubscribed.forEach(snapshot -> {
snapshotBuilderMap.putIfAbsent(snapshot.getName(), new ArrayList<>());
snapshotBuilderMap.get(snapshot.getName()).add(builder.getClass());
_snapshotTypeToIndexBuilders.putIfAbsent(snapshot.getName(), new ArrayList<>());
_snapshotTypeToIndexBuilders.get(snapshot.getName()).add(builder);
});
}
return snapshotBuilderMap;
}
/**
@ -69,21 +62,18 @@ public final class SnapshotProcessor {
*/
@Nonnull
public List<RecordTemplate> getDocumentsToUpdate(@Nonnull Snapshot snapshot) {
Map<String, List<Class<? extends BaseIndexBuilder>>> snapshotBuilderMap = getSnapshotBuildersMap();
List<RecordTemplate> docsList = new ArrayList<>();
DataMap snapshotData = (DataMap) snapshot.data();
final List<RecordTemplate> docsList = new ArrayList<>();
final DataMap snapshotData = (DataMap) snapshot.data();
for (String clazz : snapshotData.keySet()) {
Class<? extends RecordTemplate> snapshotClass = ModelUtils.getMetadataSnapshotClassFromName(clazz);
if (!snapshotBuilderMap.containsKey(clazz)) {
if (!_snapshotTypeToIndexBuilders.containsKey(clazz)) {
continue;
}
List<Class<? extends BaseIndexBuilder>> builders = snapshotBuilderMap.get(clazz);
for (Class<? extends BaseIndexBuilder> builderClass : builders) {
final List<? extends BaseIndexBuilder<?>> builders = _snapshotTypeToIndexBuilders.get(clazz);
for (BaseIndexBuilder<?> builder : builders) {
try {
Object obj = snapshotClass.getConstructor(DataMap.class).newInstance((DataMap) snapshotData.get(clazz));
@SuppressWarnings("unchecked")
List<? extends RecordTemplate> records =
builderClass.getConstructor().newInstance().getDocumentsToUpdate((RecordTemplate) obj);
final Object obj = snapshotClass.getConstructor(DataMap.class).newInstance((DataMap) snapshotData.get(clazz));
List<? extends RecordTemplate> records = builder.getDocumentsToUpdate((RecordTemplate) obj);
docsList.addAll(records);
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
log.error("Failed to get documents due to error ", e);

View File

@ -18,6 +18,7 @@ dependencies {
}
compile project(':metadata-utils')
compile project(':metadata-builders')
compile project(':metadata-dao-impl:restli-dao')
compile project(':metadata-events:mxe-schemas')
compile project(':metadata-events:mxe-avro-1.7')
compile project(':metadata-events:mxe-registration')

View File

@ -1,18 +1,5 @@
package com.linkedin.metadata.kafka;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.EventUtils;
@ -20,7 +7,6 @@ import com.linkedin.metadata.builders.graph.BaseGraphBuilder;
import com.linkedin.metadata.builders.graph.GraphBuilder;
import com.linkedin.metadata.builders.graph.RegisteredGraphBuilders;
import com.linkedin.metadata.builders.search.BaseIndexBuilder;
import com.linkedin.metadata.builders.search.RegisteredIndexBuilders;
import com.linkedin.metadata.builders.search.SnapshotProcessor;
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import com.linkedin.metadata.dao.utils.RecordUtils;
@ -28,10 +14,20 @@ import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnector;
import com.linkedin.metadata.utils.elasticsearch.MCEElasticEvent;
import com.linkedin.mxe.MetadataAuditEvent;
import lombok.extern.slf4j.Slf4j;
import com.linkedin.mxe.Topics;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@ -44,12 +40,16 @@ public class MetadataAuditEventsProcessor {
private ElasticsearchConnector elasticSearchConnector;
private SnapshotProcessor snapshotProcessor;
private BaseGraphWriterDAO graphWriterDAO;
private Set<BaseIndexBuilder<? extends RecordTemplate>> indexBuilders;
public MetadataAuditEventsProcessor(ElasticsearchConnector elasticSearchConnector,
SnapshotProcessor snapshotProcessor, BaseGraphWriterDAO graphWriterDAO) {
SnapshotProcessor snapshotProcessor, BaseGraphWriterDAO graphWriterDAO,
Set<BaseIndexBuilder<? extends RecordTemplate>> indexBuilders) {
this.elasticSearchConnector = elasticSearchConnector;
this.snapshotProcessor = snapshotProcessor;
this.graphWriterDAO = graphWriterDAO;
this.indexBuilders = indexBuilders;
log.info("registered index builders {}", indexBuilders);
}
@KafkaListener(id = "mae-consumer-job-client", topics = "${KAFKA_TOPIC_NAME:" + Topics.METADATA_AUDIT_EVENT + "}")
@ -111,7 +111,7 @@ public class MetadataAuditEventsProcessor {
for (RecordTemplate doc : docs) {
MCEElasticEvent elasticEvent = new MCEElasticEvent(doc);
BaseIndexBuilder indexBuilderForDoc = null;
for (BaseIndexBuilder indexBuilder : RegisteredIndexBuilders.REGISTERED_INDEX_BUILDERS) {
for (BaseIndexBuilder indexBuilder : indexBuilders) {
Class docType = indexBuilder.getDocumentType();
if (docType.isInstance(doc)) {
indexBuilderForDoc = indexBuilder;

View File

@ -1,40 +1,40 @@
package com.linkedin.metadata.kafka.config;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.gms.factory.common.RestHighLevelClientFactory;
import com.linkedin.metadata.builders.search.BaseIndexBuilder;
import com.linkedin.metadata.builders.search.SnapshotProcessor;
import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnector;
import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnectorFactory;
import java.util.Set;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import com.linkedin.gms.factory.common.RestHighLevelClientFactory;
import com.linkedin.metadata.builders.search.RegisteredIndexBuilders;
import com.linkedin.metadata.builders.search.SnapshotProcessor;
import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnector;
import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnectorFactory;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Configuration
@Import({RestHighLevelClientFactory.class})
@Import({RestHighLevelClientFactory.class, IndexBuildersConfig.class})
public class ElasticSearchConfig {
@Value("${ELASTICSEARCH_HOST:localhost}")
private String elasticSearchHost;
@Value("${ELASTICSEARCH_PORT:9200}")
private int elasticSearchPort;
@Value("${ELASTICSEARCH_HOST:localhost}")
private String elasticSearchHost;
@Value("${ELASTICSEARCH_PORT:9200}")
private int elasticSearchPort;
@Bean
public ElasticsearchConnector elasticSearchConnector() {
ElasticsearchConnector elasticSearchConnector = ElasticsearchConnectorFactory.createInstance(
elasticSearchHost,
elasticSearchPort
);
log.info("ElasticSearchConnector built successfully");
return elasticSearchConnector;
}
@Bean
public ElasticsearchConnector elasticSearchConnector() {
final ElasticsearchConnector elasticSearchConnector =
ElasticsearchConnectorFactory.createInstance(elasticSearchHost, elasticSearchPort);
log.info("ElasticSearchConnector built successfully");
return elasticSearchConnector;
}
@Bean
public SnapshotProcessor snapshotProcessor() {
return new SnapshotProcessor(RegisteredIndexBuilders.REGISTERED_INDEX_BUILDERS);
}
@Bean
public SnapshotProcessor snapshotProcessor(@Nonnull Set<BaseIndexBuilder<? extends RecordTemplate>> indexBuilders) {
return new SnapshotProcessor(indexBuilders);
}
}

View File

@ -0,0 +1,59 @@
package com.linkedin.metadata.kafka.config;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.builders.search.BaseIndexBuilder;
import com.linkedin.metadata.builders.search.ChartIndexBuilder;
import com.linkedin.metadata.builders.search.CorpGroupIndexBuilder;
import com.linkedin.metadata.builders.search.DashboardIndexBuilder;
import com.linkedin.metadata.builders.search.DataProcessIndexBuilder;
import com.linkedin.metadata.builders.search.DatasetIndexBuilder;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.restli.client.Client;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Configurations for search index builders
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class IndexBuildersConfig {
@Value("${GMS_HOST:localhost}")
private String gmsHost;
@Value("${GMS_PORT:8080}")
private int gmsPort;
/**
* Registered index builders powering GMA search
*
* @param restliClient Rest.li client to interact with GMS
*/
@Bean
public Set<BaseIndexBuilder<? extends RecordTemplate>> indexBuilders(@Nonnull Client restliClient) {
log.debug("restli client {}", restliClient);
final Set<BaseIndexBuilder<? extends RecordTemplate>> builders = new HashSet<>();
builders.add(new CorpGroupIndexBuilder());
builders.add(new ChartIndexBuilder());
builders.add(new DatasetIndexBuilder());
builders.add(new DataProcessIndexBuilder());
builders.add(new DashboardIndexBuilder());
return builders;
}
/**
* Rest.li client to interact with GMS
*/
@Bean
public Client restliClient() {
return DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort);
}
}