Enable datahub-mae-consumer job to build graph as well

This commit is contained in:
Kerem Sahin 2019-11-26 22:19:46 -08:00
parent 22eefc966b
commit 7e2e6b5c93
9 changed files with 105 additions and 18 deletions

View File

@ -10,6 +10,9 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- NEO4J_URI=bolt://neo4j
- NEO4J_USERNAME=neo4j
- NEO4J_PASSWORD=datahub
command: "bash -c './mae-consumer-job/bin/mae-consumer-job'"
networks:

View File

@ -18,6 +18,7 @@ how to change your exposed port settings.
```
ports:
- "7474:7474"
- "7687:7687"
```
### Docker Network
@ -31,4 +32,4 @@ networks:
## Neo4j Browser
To be able to debug and run Cypher queries against your Neo4j image, you can open up `Neo4j Browser` which is running at
[http://localhost:7474/browser/](http://localhost:7474/browser/). Default username and password for it is `neo4j`.
[http://localhost:7474/browser/](http://localhost:7474/browser/). Default username is `neo4j` and password is `datahub`.

View File

@ -5,6 +5,8 @@ services:
image: neo4j:3.5.7
hostname: neo4j
container_name: neo4j
environment:
NEO4J_AUTH: 'neo4j/datahub'
ports:
- "7474:7474"
- "7687:7687"

View File

@ -114,6 +114,8 @@ services:
image: neo4j:3.5.7
hostname: neo4j
container_name: neo4j
environment:
NEO4J_AUTH: 'neo4j/datahub'
ports:
- "7474:7474"
- "7687:7687"
@ -179,9 +181,13 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- NEO4J_URI=bolt://neo4j
- NEO4J_USERNAME=neo4j
- NEO4J_PASSWORD=datahub
depends_on:
- kafka-setup
- elasticsearch
- neo4j
command: "bash -c 'while ping -c1 kafka-setup &>/dev/null; do echo waiting for kafka-setup... && sleep 1; done; \
echo kafka-setup done! && ./mae-consumer-job/bin/mae-consumer-job'"

View File

@ -7,4 +7,4 @@ Data Hub uses Kafka as the pub-sub message queue in the backend. There are 2 Kaf
To be able to consume from these two topics, there are two [Kafka Streams](https://kafka.apache.org/documentation/streams/)
jobs Data Hub uses:
* [MCE Consumer Job](mce-consumer-job): Writes to [Data Hub GMS](../gms)
* [MAE Consumer Job](mae-consumer-job): Writes to [Elasticsearch](../docker/elasticsearch)
* [MAE Consumer Job](mae-consumer-job): Writes to [Elasticsearch](../docker/elasticsearch) & [Neo4j](../docker/neo4j)

View File

@ -1,5 +1,6 @@
apply plugin: 'application'
apply plugin: 'java'
apply plugin: 'pegasus'
configurations {
avro
@ -15,7 +16,10 @@ dependencies {
compile project(':metadata-events:mxe-avro-1.7')
compile project(':metadata-events:mxe-registration')
compile project(':metadata-events:mxe-utils-avro-1.7')
compile project(':metadata-dao')
compile project(':metadata-dao-impl:neo4j-dao')
compile externalDependency.elasticSearchRest
compile externalDependency.neo4jJavaDriver
compile externalDependency.kafkaAvroSerde
compile externalDependency.kafkaStreams

View File

@ -4,6 +4,7 @@ import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.builders.search.*;
import com.linkedin.metadata.neo4j.Neo4jDriverFactory;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnector;
import com.linkedin.metadata.utils.elasticsearch.ElasticsearchConnectorFactory;
@ -14,13 +15,19 @@ import com.linkedin.util.Configuration;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import com.linkedin.metadata.builders.graph.BaseGraphBuilder;
import com.linkedin.metadata.builders.graph.GraphBuilder;
import com.linkedin.metadata.builders.graph.RegisteredGraphBuilders;
import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO;
import com.linkedin.metadata.dao.internal.Neo4jGraphWriterDAO;
import com.linkedin.metadata.dao.utils.RecordUtils;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.net.URLEncoder;
@ -31,22 +38,24 @@ public class MaeStreamTask {
private static final String DOC_TYPE = "doc";
private static final String DEFAULT_KAFKA_TOPIC_NAME = Topics.METADATA_AUDIT_EVENT;
private static final String DEFAULT_ELASTICSEARCH_HOST = "localhost";
private static final String DEFAULT_ELASTICSEARCH_PORT = "9200";
private static final String DEFAULT_KAFKA_BOOTSTRAP_SERVER = "localhost:9092";
private static final String DEFAULT_KAFKA_SCHEMAREGISTRY_URL = "http://localhost:8081";
private static final String DEFAULT_ELASTICSEARCH_HOST = "localhost";
private static final String DEFAULT_ELASTICSEARCH_PORT = "9200";
private static final String DEFAULT_NEO4J_URI = "bolt://localhost";
private static final String DEFAULT_NEO4J_USERNAME = "neo4j";
private static final String DEFAULT_NEO4J_PASSWORD = "datahub";
private static ElasticsearchConnector _elasticSearchConnector;
private static SnapshotProcessor _snapshotProcessor;
private static BaseGraphWriterDAO _graphWriterDAO;
public static void main(final String[] args) {
// Initialize ElasticSearch connector and Snapshot processor
_elasticSearchConnector = ElasticsearchConnectorFactory.createInstance(
Configuration.getEnvironmentVariable("ELASTICSEARCH_HOST", DEFAULT_ELASTICSEARCH_HOST),
Integer.valueOf(Configuration.getEnvironmentVariable("ELASTICSEARCH_PORT", DEFAULT_ELASTICSEARCH_PORT))
);
_snapshotProcessor = new SnapshotProcessor(RegisteredIndexBuilders.REGISTERED_INDEX_BUILDERS);
log.info("ElasticSearchConnector built successfully");
initializateES();
initializateNeo4j();
// Configure the Streams application.
final Properties streamsConfiguration = getStreamsConfiguration();
@ -66,6 +75,24 @@ public class MaeStreamTask {
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
static void initializateNeo4j() {
_graphWriterDAO = new Neo4jGraphWriterDAO(Neo4jDriverFactory.createInstance(
Configuration.getEnvironmentVariable("NEO4J_URI", DEFAULT_NEO4J_URI),
Configuration.getEnvironmentVariable("NEO4J_USERNAME", DEFAULT_NEO4J_USERNAME),
Configuration.getEnvironmentVariable("NEO4J_PASSWORD", DEFAULT_NEO4J_PASSWORD)));
log.info("Neo4jDriver built successfully");
}
static void initializateES() {
// Initialize ElasticSearch connector and Snapshot processor
_elasticSearchConnector = ElasticsearchConnectorFactory.createInstance(
Configuration.getEnvironmentVariable("ELASTICSEARCH_HOST", DEFAULT_ELASTICSEARCH_HOST),
Integer.valueOf(Configuration.getEnvironmentVariable("ELASTICSEARCH_PORT", DEFAULT_ELASTICSEARCH_PORT))
);
_snapshotProcessor = new SnapshotProcessor(RegisteredIndexBuilders.REGISTERED_INDEX_BUILDERS);
log.info("ElasticSearchConnector built successfully");
}
/**
* Configure the Streams application.
*
@ -106,26 +133,56 @@ public class MaeStreamTask {
}
/**
* Process MAE and do reindexing in ES
* Process MAE and update Elasticsearch & Neo4j
*
* @param record single MAE message
*/
static void processSingleMAE(final GenericData.Record record) {
log.debug("Got MAE");
Snapshot snapshot = null;
try {
final MetadataAuditEvent event = EventUtils.avroToPegasusMAE(record);
snapshot = event.getNewSnapshot();
if (event.hasNewSnapshot()) {
final Snapshot snapshot = event.getNewSnapshot();
log.info(snapshot.toString());
updateElasticsearch(snapshot);
updateNeo4j(RecordUtils.getSelectedRecordTemplateFromUnion(snapshot));
}
} catch (Exception e) {
log.error("Error deserializing message: {}", e.toString());
log.error("Message: {}", record.toString());
}
}
if (snapshot == null) {
return;
/**
* Process snapshot and update Neo4j
*
* @param snapshot Snapshot
*/
static void updateNeo4j(final RecordTemplate snapshot) {
try {
final BaseGraphBuilder graphBuilder = RegisteredGraphBuilders.getGraphBuilder(snapshot.getClass()).get();
final GraphBuilder.GraphUpdates updates = graphBuilder.build(snapshot);
if (!updates.getEntities().isEmpty()) {
_graphWriterDAO.addEntities(updates.getEntities());
}
for (GraphBuilder.RelationshipUpdates update : updates.getRelationshipUpdates()) {
_graphWriterDAO.addRelationships(update.getRelationships(), update.getPreUpdateOperation());
}
} catch (Exception ex) {
}
log.info(snapshot.toString());
}
/**
* Process snapshot and update Elasticsearch
*
* @param snapshot Snapshot
*/
static void updateElasticsearch(final Snapshot snapshot) {
List<RecordTemplate> docs = new ArrayList<>();
try {
docs = _snapshotProcessor.getDocumentsToUpdate(snapshot);

View File

@ -6,6 +6,7 @@ dependencies {
compile externalDependency.elasticSearchRest
compile externalDependency.httpClient
compile externalDependency.logbackClassic
compile externalDependency.neo4jJavaDriver
compile spec.product.pegasus.restliClient
compile spec.product.pegasus.restliCommon

View File

@ -0,0 +1,13 @@
package com.linkedin.metadata.neo4j;
import javax.annotation.Nonnull;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
public class Neo4jDriverFactory {
public static Driver createInstance(@Nonnull String uri, @Nonnull String username, @Nonnull String password) {
return GraphDatabase.driver(uri, AuthTokens.basic(username, password));
}
}