diff --git a/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/RegisteredIndexBuilders.java b/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/RegisteredIndexBuilders.java deleted file mode 100644 index 8f94a831c9c..00000000000 --- a/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/RegisteredIndexBuilders.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.linkedin.metadata.builders.search; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - - -/** - * A class that holds all the registered {@link BaseIndexBuilder}. - * - * Register new type of index builders by adding them to {@link #REGISTERED_INDEX_BUILDERS}. - */ -public class RegisteredIndexBuilders { - - public static final Set REGISTERED_INDEX_BUILDERS = - Collections.unmodifiableSet(new HashSet() { - { - add(new ChartIndexBuilder()); - add(new CorpUserInfoIndexBuilder()); - add(new DashboardIndexBuilder()); - add(new DataProcessIndexBuilder()); - add(new DatasetIndexBuilder()); - } - }); - - private RegisteredIndexBuilders() { - // Util class - } -} \ No newline at end of file diff --git a/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/SnapshotProcessor.java b/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/SnapshotProcessor.java index f5ab2347cf4..b05a4842030 100644 --- a/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/SnapshotProcessor.java +++ b/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/SnapshotProcessor.java @@ -20,33 +20,26 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public final class SnapshotProcessor { - // Set of document index builders that are interested in parsing the snapshot - private final Set _registeredBuilders; + /** + * Mapping of metadata snapshot type to the list of document index builders that subscribe to it. + */ + private final Map>> _snapshotTypeToIndexBuilders; /** * Constructor. * - * @param registerdBuilders Set of document index builders who are interested in parsing metadata snapshot + * @param builders Set of document index builders who are interested in parsing metadata snapshot */ - public SnapshotProcessor(@Nonnull Set registerdBuilders) { - _registeredBuilders = registerdBuilders; - } + public SnapshotProcessor(@Nonnull Set builders) { + _snapshotTypeToIndexBuilders = new HashMap<>(); - /** - * Constructs mapping of metadata snapshot type to the list of document index builders that subscribe to it. - */ - @Nonnull - private Map>> getSnapshotBuildersMap() { - Map>> snapshotBuilderMap = new HashMap<>(); - for (BaseIndexBuilder builder : _registeredBuilders) { - @SuppressWarnings("unchecked") + for (BaseIndexBuilder builder : builders) { List> snapshotsSubscribed = builder._snapshotsInterested; snapshotsSubscribed.forEach(snapshot -> { - snapshotBuilderMap.putIfAbsent(snapshot.getName(), new ArrayList<>()); - snapshotBuilderMap.get(snapshot.getName()).add(builder.getClass()); + _snapshotTypeToIndexBuilders.putIfAbsent(snapshot.getName(), new ArrayList<>()); + _snapshotTypeToIndexBuilders.get(snapshot.getName()).add(builder); }); } - return snapshotBuilderMap; } /** @@ -69,21 +62,18 @@ public final class SnapshotProcessor { */ @Nonnull public List getDocumentsToUpdate(@Nonnull Snapshot snapshot) { - Map>> snapshotBuilderMap = getSnapshotBuildersMap(); - List docsList = new ArrayList<>(); - DataMap snapshotData = (DataMap) snapshot.data(); + final List docsList = new ArrayList<>(); + final DataMap snapshotData = (DataMap) snapshot.data(); for (String clazz : snapshotData.keySet()) { Class snapshotClass = ModelUtils.getMetadataSnapshotClassFromName(clazz); - if (!snapshotBuilderMap.containsKey(clazz)) { + if (!_snapshotTypeToIndexBuilders.containsKey(clazz)) { continue; } - List> builders = snapshotBuilderMap.get(clazz); - for (Class builderClass : builders) { + final List> builders = _snapshotTypeToIndexBuilders.get(clazz); + for (BaseIndexBuilder builder : builders) { try { - Object obj = snapshotClass.getConstructor(DataMap.class).newInstance((DataMap) snapshotData.get(clazz)); - @SuppressWarnings("unchecked") - List records = - builderClass.getConstructor().newInstance().getDocumentsToUpdate((RecordTemplate) obj); + final Object obj = snapshotClass.getConstructor(DataMap.class).newInstance((DataMap) snapshotData.get(clazz)); + List records = builder.getDocumentsToUpdate((RecordTemplate) obj); docsList.addAll(records); } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { log.error("Failed to get documents due to error ", e); diff --git a/metadata-jobs/mae-consumer-job/build.gradle b/metadata-jobs/mae-consumer-job/build.gradle index 6973ea3808b..e711e932345 100644 --- a/metadata-jobs/mae-consumer-job/build.gradle +++ b/metadata-jobs/mae-consumer-job/build.gradle @@ -18,6 +18,7 @@ dependencies { } compile project(':metadata-utils') compile project(':metadata-builders') + compile project(':metadata-dao-impl:restli-dao') compile project(':metadata-events:mxe-schemas') compile project(':metadata-events:mxe-avro-1.7') compile project(':metadata-events:mxe-registration') diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java index cdb00a80491..abe8ebc8edb 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataAuditEventsProcessor.java @@ -1,18 +1,5 @@ package com.linkedin.metadata.kafka; -import java.io.UnsupportedEncodingException; -import java.lang.reflect.InvocationTargetException; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.stereotype.Component; -import org.springframework.kafka.annotation.KafkaListener; - import com.linkedin.data.template.RecordTemplate; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.EventUtils; @@ -20,7 +7,6 @@ import com.linkedin.metadata.builders.graph.BaseGraphBuilder; import com.linkedin.metadata.builders.graph.GraphBuilder; import com.linkedin.metadata.builders.graph.RegisteredGraphBuilders; import com.linkedin.metadata.builders.search.BaseIndexBuilder; -import com.linkedin.metadata.builders.search.RegisteredIndexBuilders; import com.linkedin.metadata.builders.search.SnapshotProcessor; import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO; import com.linkedin.metadata.dao.utils.RecordUtils; @@ -28,10 +14,20 @@ import com.linkedin.metadata.snapshot.Snapshot; import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnector; import com.linkedin.metadata.utils.elasticsearch.MCEElasticEvent; import com.linkedin.mxe.MetadataAuditEvent; - -import lombok.extern.slf4j.Slf4j; - import com.linkedin.mxe.Topics; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.InvocationTargetException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; @Slf4j @@ -44,12 +40,16 @@ public class MetadataAuditEventsProcessor { private ElasticsearchConnector elasticSearchConnector; private SnapshotProcessor snapshotProcessor; private BaseGraphWriterDAO graphWriterDAO; + private Set> indexBuilders; public MetadataAuditEventsProcessor(ElasticsearchConnector elasticSearchConnector, - SnapshotProcessor snapshotProcessor, BaseGraphWriterDAO graphWriterDAO) { + SnapshotProcessor snapshotProcessor, BaseGraphWriterDAO graphWriterDAO, + Set> indexBuilders) { this.elasticSearchConnector = elasticSearchConnector; this.snapshotProcessor = snapshotProcessor; this.graphWriterDAO = graphWriterDAO; + this.indexBuilders = indexBuilders; + log.info("registered index builders {}", indexBuilders); } @KafkaListener(id = "mae-consumer-job-client", topics = "${KAFKA_TOPIC_NAME:" + Topics.METADATA_AUDIT_EVENT + "}") @@ -111,7 +111,7 @@ public class MetadataAuditEventsProcessor { for (RecordTemplate doc : docs) { MCEElasticEvent elasticEvent = new MCEElasticEvent(doc); BaseIndexBuilder indexBuilderForDoc = null; - for (BaseIndexBuilder indexBuilder : RegisteredIndexBuilders.REGISTERED_INDEX_BUILDERS) { + for (BaseIndexBuilder indexBuilder : indexBuilders) { Class docType = indexBuilder.getDocumentType(); if (docType.isInstance(doc)) { indexBuilderForDoc = indexBuilder; diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/ElasticSearchConfig.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/ElasticSearchConfig.java index c7f1e1669d5..20c73326b6a 100644 --- a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/ElasticSearchConfig.java +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/ElasticSearchConfig.java @@ -1,40 +1,40 @@ package com.linkedin.metadata.kafka.config; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.gms.factory.common.RestHighLevelClientFactory; +import com.linkedin.metadata.builders.search.BaseIndexBuilder; +import com.linkedin.metadata.builders.search.SnapshotProcessor; +import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnector; +import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnectorFactory; +import java.util.Set; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import com.linkedin.gms.factory.common.RestHighLevelClientFactory; -import com.linkedin.metadata.builders.search.RegisteredIndexBuilders; -import com.linkedin.metadata.builders.search.SnapshotProcessor; -import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnector; -import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnectorFactory; - -import lombok.extern.slf4j.Slf4j; @Slf4j @Configuration -@Import({RestHighLevelClientFactory.class}) +@Import({RestHighLevelClientFactory.class, IndexBuildersConfig.class}) public class ElasticSearchConfig { - @Value("${ELASTICSEARCH_HOST:localhost}") - private String elasticSearchHost; - @Value("${ELASTICSEARCH_PORT:9200}") - private int elasticSearchPort; + @Value("${ELASTICSEARCH_HOST:localhost}") + private String elasticSearchHost; + @Value("${ELASTICSEARCH_PORT:9200}") + private int elasticSearchPort; - @Bean - public ElasticsearchConnector elasticSearchConnector() { - ElasticsearchConnector elasticSearchConnector = ElasticsearchConnectorFactory.createInstance( - elasticSearchHost, - elasticSearchPort - ); - log.info("ElasticSearchConnector built successfully"); - return elasticSearchConnector; - } + @Bean + public ElasticsearchConnector elasticSearchConnector() { + final ElasticsearchConnector elasticSearchConnector = + ElasticsearchConnectorFactory.createInstance(elasticSearchHost, elasticSearchPort); + log.info("ElasticSearchConnector built successfully"); + return elasticSearchConnector; + } - @Bean - public SnapshotProcessor snapshotProcessor() { - return new SnapshotProcessor(RegisteredIndexBuilders.REGISTERED_INDEX_BUILDERS); - } + @Bean + public SnapshotProcessor snapshotProcessor(@Nonnull Set> indexBuilders) { + return new SnapshotProcessor(indexBuilders); + } } diff --git a/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/IndexBuildersConfig.java b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/IndexBuildersConfig.java new file mode 100644 index 00000000000..c88d3aafae6 --- /dev/null +++ b/metadata-jobs/mae-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/IndexBuildersConfig.java @@ -0,0 +1,59 @@ +package com.linkedin.metadata.kafka.config; + +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.builders.search.BaseIndexBuilder; +import com.linkedin.metadata.builders.search.ChartIndexBuilder; +import com.linkedin.metadata.builders.search.CorpGroupIndexBuilder; +import com.linkedin.metadata.builders.search.DashboardIndexBuilder; +import com.linkedin.metadata.builders.search.DataProcessIndexBuilder; +import com.linkedin.metadata.builders.search.DatasetIndexBuilder; +import com.linkedin.metadata.restli.DefaultRestliClientFactory; +import com.linkedin.restli.client.Client; +import java.util.HashSet; +import java.util.Set; +import javax.annotation.Nonnull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +/** + * Configurations for search index builders + */ +@Slf4j +@Configuration +@RequiredArgsConstructor +public class IndexBuildersConfig { + + @Value("${GMS_HOST:localhost}") + private String gmsHost; + @Value("${GMS_PORT:8080}") + private int gmsPort; + + /** + * Registered index builders powering GMA search + * + * @param restliClient Rest.li client to interact with GMS + */ + @Bean + public Set> indexBuilders(@Nonnull Client restliClient) { + log.debug("restli client {}", restliClient); + final Set> builders = new HashSet<>(); + builders.add(new CorpGroupIndexBuilder()); + builders.add(new ChartIndexBuilder()); + builders.add(new DatasetIndexBuilder()); + builders.add(new DataProcessIndexBuilder()); + builders.add(new DashboardIndexBuilder()); + return builders; + } + + /** + * Rest.li client to interact with GMS + */ + @Bean + public Client restliClient() { + return DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort); + } +}