From 6ece2d646980cd69af1143e80dd382ff3a967a7d Mon Sep 17 00:00:00 2001 From: John Plaisted Date: Fri, 11 Sep 2020 13:04:21 -0700 Subject: [PATCH] Start adding java ETL examples, starting with kafka etl. (#1805) Start adding java ETL examples, starting with kafka etl. We've had a few requests to start providing Java examples rather than Python due to type safety. I've also started to add these to metadata-ingestion-examples to make it clearer these are *examples*. They can be used directly or as a basis for other things. As we port to Java we'll move examples to contrib. --- README.md | 2 +- build.gradle | 9 +- .../{ => haskell}/README.md | 0 .../{ => haskell}/bin/datahub-producer.hs | 0 .../{ => haskell}/bin/datahub-producer.hs.nix | 0 .../bin/dataset-hive-generator.py | 0 .../bin/dataset-hive-generator.py.nix | 0 .../bin/dataset-jdbc-generator.hs | 0 .../bin/dataset-jdbc-generator.hs.nix | 0 .../bin/lineage_hive_generator.hs | 0 .../bin/lineage_hive_generator.hs.nix | 0 .../config/MetadataChangeEvent.avsc | 0 .../{ => haskell}/config/datahub-config.nix | 0 .../{ => haskell}/sample/hive_1.sql | 0 .../{ => haskell}/sample/mce.json.dat | 0 contrib/metadata-ingestion/python/README.md | 23 ++++ .../python/kafka-etl/README.md | 17 +++ .../python}/kafka-etl/kafka_etl.py | 0 .../python}/kafka-etl/requirements.txt | 0 .../{ => python}/openldap-etl/README.md | 0 .../openldap-etl/docker-compose.yml | 0 .../{ => python}/openldap-etl/openldap-etl.py | 0 .../openldap-etl/requirements.txt | 0 .../{ => python}/openldap-etl/sample-ldif.txt | 0 metadata-events/mxe-avro-1.7/build.gradle | 15 ++- .../com/linkedin/metadata/EventUtils.java | 2 +- metadata-ingestion-examples/README.md | 47 +++++++ .../common/build.gradle | 20 +++ .../examples/configs/KafkaConfig.java | 42 +++++++ .../configs/SchemaRegistryConfig.java | 19 +++ .../examples/configs/ZooKeeperConfig.java | 26 ++++ .../kafka-etl/README.md | 40 ++++++ .../kafka-etl/build.gradle | 29 +++++ .../metadata/examples/kafka/KafkaEtl.java | 115 ++++++++++++++++++ .../examples/kafka/KafkaEtlApplication.java | 16 +++ .../kafka-etl/src/main/resources/logback.xml | 40 ++++++ metadata-ingestion/README.md | 16 --- settings.gradle | 2 + 38 files changed, 453 insertions(+), 27 deletions(-) rename contrib/metadata-ingestion/{ => haskell}/README.md (100%) rename contrib/metadata-ingestion/{ => haskell}/bin/datahub-producer.hs (100%) rename contrib/metadata-ingestion/{ => haskell}/bin/datahub-producer.hs.nix (100%) rename contrib/metadata-ingestion/{ => haskell}/bin/dataset-hive-generator.py (100%) rename contrib/metadata-ingestion/{ => haskell}/bin/dataset-hive-generator.py.nix (100%) rename contrib/metadata-ingestion/{ => haskell}/bin/dataset-jdbc-generator.hs (100%) rename contrib/metadata-ingestion/{ => haskell}/bin/dataset-jdbc-generator.hs.nix (100%) rename contrib/metadata-ingestion/{ => haskell}/bin/lineage_hive_generator.hs (100%) rename contrib/metadata-ingestion/{ => haskell}/bin/lineage_hive_generator.hs.nix (100%) rename contrib/metadata-ingestion/{ => haskell}/config/MetadataChangeEvent.avsc (100%) rename contrib/metadata-ingestion/{ => haskell}/config/datahub-config.nix (100%) rename contrib/metadata-ingestion/{ => haskell}/sample/hive_1.sql (100%) rename contrib/metadata-ingestion/{ => haskell}/sample/mce.json.dat (100%) create mode 100644 contrib/metadata-ingestion/python/README.md create mode 100644 contrib/metadata-ingestion/python/kafka-etl/README.md rename {metadata-ingestion => contrib/metadata-ingestion/python}/kafka-etl/kafka_etl.py (100%) rename {metadata-ingestion => contrib/metadata-ingestion/python}/kafka-etl/requirements.txt (100%) rename contrib/metadata-ingestion/{ => python}/openldap-etl/README.md (100%) rename contrib/metadata-ingestion/{ => python}/openldap-etl/docker-compose.yml (100%) rename contrib/metadata-ingestion/{ => python}/openldap-etl/openldap-etl.py (100%) rename contrib/metadata-ingestion/{ => python}/openldap-etl/requirements.txt (100%) rename contrib/metadata-ingestion/{ => python}/openldap-etl/sample-ldif.txt (100%) create mode 100644 metadata-ingestion-examples/README.md create mode 100644 metadata-ingestion-examples/common/build.gradle create mode 100644 metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/KafkaConfig.java create mode 100644 metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/SchemaRegistryConfig.java create mode 100644 metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/ZooKeeperConfig.java create mode 100644 metadata-ingestion-examples/kafka-etl/README.md create mode 100644 metadata-ingestion-examples/kafka-etl/build.gradle create mode 100644 metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtl.java create mode 100644 metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtlApplication.java create mode 100644 metadata-ingestion-examples/kafka-etl/src/main/resources/logback.xml diff --git a/README.md b/README.md index 23b9ac5a4a..ce05454a76 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Please follow the [DataHub Quickstart Guide](docs/quickstart.md) to get a copy o * [Frontend](datahub-frontend) * [Web App](datahub-web) * [Generalized Metadata Service](gms) -* [Metadata Ingestion](metadata-ingestion) +* [Metadata Ingestion](metadata-ingestion-examples) * [Metadata Processing Jobs](metadata-jobs) ## Releases diff --git a/build.gradle b/build.gradle index 11952eaa4e..e6e639779c 100644 --- a/build.gradle +++ b/build.gradle @@ -44,7 +44,8 @@ project.ext.externalDependency = [ 'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9', 'jacksonCore': 'com.fasterxml.jackson.core:jackson-core:2.9.7', 'jacksonDataBind': 'com.fasterxml.jackson.core:jackson-databind:2.9.7', - "javatuples": "org.javatuples:javatuples:1.2", + 'javatuples': 'org.javatuples:javatuples:1.2', + 'javaxInject' : 'javax.inject:javax.inject:1', 'jerseyCore': 'org.glassfish.jersey.core:jersey-client:2.25.1', 'jerseyGuava': 'org.glassfish.jersey.bundles.repackaged:jersey-guava:2.25.1', 'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1', @@ -57,8 +58,8 @@ project.ext.externalDependency = [ 'mariadbConnector': 'org.mariadb.jdbc:mariadb-java-client:2.6.0', 'mockito': 'org.mockito:mockito-core:3.0.0', 'mysqlConnector': 'mysql:mysql-connector-java:5.1.47', - "neo4jHarness": "org.neo4j.test:neo4j-harness:3.4.11", - "neo4jJavaDriver": "org.neo4j.driver:neo4j-java-driver:4.0.0", + 'neo4jHarness': 'org.neo4j.test:neo4j-harness:3.4.11', + 'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:4.0.0', 'parseqTest': 'com.linkedin.parseq:parseq:3.0.7:test', 'playDocs': 'com.typesafe.play:play-docs_2.11:2.6.18', 'playGuice': 'com.typesafe.play:play-guice_2.11:2.6.18', @@ -66,7 +67,7 @@ project.ext.externalDependency = [ 'playTest': 'com.typesafe.play:play-test_2.11:2.6.18', 'postgresql': 'org.postgresql:postgresql:42.2.14', 'reflections': 'org.reflections:reflections:0.9.11', - "rythmEngine": "org.rythmengine:rythm-engine:1.3.0", + 'rythmEngine': 'org.rythmengine:rythm-engine:1.3.0', 'servletApi': 'javax.servlet:javax.servlet-api:3.1.0', 'springBeans': 'org.springframework:spring-beans:5.2.3.RELEASE', 'springContext': 'org.springframework:spring-context:5.2.3.RELEASE', diff --git a/contrib/metadata-ingestion/README.md b/contrib/metadata-ingestion/haskell/README.md similarity index 100% rename from contrib/metadata-ingestion/README.md rename to contrib/metadata-ingestion/haskell/README.md diff --git a/contrib/metadata-ingestion/bin/datahub-producer.hs b/contrib/metadata-ingestion/haskell/bin/datahub-producer.hs similarity index 100% rename from contrib/metadata-ingestion/bin/datahub-producer.hs rename to contrib/metadata-ingestion/haskell/bin/datahub-producer.hs diff --git a/contrib/metadata-ingestion/bin/datahub-producer.hs.nix b/contrib/metadata-ingestion/haskell/bin/datahub-producer.hs.nix similarity index 100% rename from contrib/metadata-ingestion/bin/datahub-producer.hs.nix rename to contrib/metadata-ingestion/haskell/bin/datahub-producer.hs.nix diff --git a/contrib/metadata-ingestion/bin/dataset-hive-generator.py b/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py similarity index 100% rename from contrib/metadata-ingestion/bin/dataset-hive-generator.py rename to contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py diff --git a/contrib/metadata-ingestion/bin/dataset-hive-generator.py.nix b/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py.nix similarity index 100% rename from contrib/metadata-ingestion/bin/dataset-hive-generator.py.nix rename to contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py.nix diff --git a/contrib/metadata-ingestion/bin/dataset-jdbc-generator.hs b/contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs similarity index 100% rename from contrib/metadata-ingestion/bin/dataset-jdbc-generator.hs rename to contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs diff --git a/contrib/metadata-ingestion/bin/dataset-jdbc-generator.hs.nix b/contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs.nix similarity index 100% rename from contrib/metadata-ingestion/bin/dataset-jdbc-generator.hs.nix rename to contrib/metadata-ingestion/haskell/bin/dataset-jdbc-generator.hs.nix diff --git a/contrib/metadata-ingestion/bin/lineage_hive_generator.hs b/contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs similarity index 100% rename from contrib/metadata-ingestion/bin/lineage_hive_generator.hs rename to contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs diff --git a/contrib/metadata-ingestion/bin/lineage_hive_generator.hs.nix b/contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs.nix similarity index 100% rename from contrib/metadata-ingestion/bin/lineage_hive_generator.hs.nix rename to contrib/metadata-ingestion/haskell/bin/lineage_hive_generator.hs.nix diff --git a/contrib/metadata-ingestion/config/MetadataChangeEvent.avsc b/contrib/metadata-ingestion/haskell/config/MetadataChangeEvent.avsc similarity index 100% rename from contrib/metadata-ingestion/config/MetadataChangeEvent.avsc rename to contrib/metadata-ingestion/haskell/config/MetadataChangeEvent.avsc diff --git a/contrib/metadata-ingestion/config/datahub-config.nix b/contrib/metadata-ingestion/haskell/config/datahub-config.nix similarity index 100% rename from contrib/metadata-ingestion/config/datahub-config.nix rename to contrib/metadata-ingestion/haskell/config/datahub-config.nix diff --git a/contrib/metadata-ingestion/sample/hive_1.sql b/contrib/metadata-ingestion/haskell/sample/hive_1.sql similarity index 100% rename from contrib/metadata-ingestion/sample/hive_1.sql rename to contrib/metadata-ingestion/haskell/sample/hive_1.sql diff --git a/contrib/metadata-ingestion/sample/mce.json.dat b/contrib/metadata-ingestion/haskell/sample/mce.json.dat similarity index 100% rename from contrib/metadata-ingestion/sample/mce.json.dat rename to contrib/metadata-ingestion/haskell/sample/mce.json.dat diff --git a/contrib/metadata-ingestion/python/README.md b/contrib/metadata-ingestion/python/README.md new file mode 100644 index 0000000000..5c6769cd13 --- /dev/null +++ b/contrib/metadata-ingestion/python/README.md @@ -0,0 +1,23 @@ +# Python ETL examples + +ETL scripts written in Python. + +## Prerequisites + +1. Before running any python metadata ingestion job, you should make sure that DataHub backend services are all running. +The easiest way to do that is through [Docker images](../../docker). +2. You also need to build the `mxe-schemas` module as below. + ``` + ./gradlew :metadata-events:mxe-schemas:build + ``` + This is needed to generate `MetadataChangeEvent.avsc` which is the schema for `MetadataChangeEvent` Kafka topic. +3. All the scripts are written using Python 3 and most likely won't work with Python 2.x interpreters. + You can verify the version of your Python using the following command. + ``` + python --version + ``` + We recommend using [pyenv](https://github.com/pyenv/pyenv) to install and manage your Python environment. +4. Before launching each ETL ingestion pipeline, you can install/verify the library versions as below. + ``` + pip install --user -r requirements.txt + ``` \ No newline at end of file diff --git a/contrib/metadata-ingestion/python/kafka-etl/README.md b/contrib/metadata-ingestion/python/kafka-etl/README.md new file mode 100644 index 0000000000..abdc861ba0 --- /dev/null +++ b/contrib/metadata-ingestion/python/kafka-etl/README.md @@ -0,0 +1,17 @@ +# Kafka ETL + +## Ingest metadata from Kafka to DataHub +The kafka_etl provides you ETL channel to communicate with your kafka. +``` +➜ Config your kafka environmental variable in the file. + ZOOKEEPER # Your zookeeper host. + +➜ Config your Kafka broker environmental variable in the file. + AVROLOADPATH # Your model event in avro format. + KAFKATOPIC # Your event topic. + BOOTSTRAP # Kafka bootstrap server. + SCHEMAREGISTRY # Kafka schema registry host. + +➜ python kafka_etl.py +``` +This will bootstrap DataHub with your metadata in the kafka as a dataset entity. diff --git a/metadata-ingestion/kafka-etl/kafka_etl.py b/contrib/metadata-ingestion/python/kafka-etl/kafka_etl.py similarity index 100% rename from metadata-ingestion/kafka-etl/kafka_etl.py rename to contrib/metadata-ingestion/python/kafka-etl/kafka_etl.py diff --git a/metadata-ingestion/kafka-etl/requirements.txt b/contrib/metadata-ingestion/python/kafka-etl/requirements.txt similarity index 100% rename from metadata-ingestion/kafka-etl/requirements.txt rename to contrib/metadata-ingestion/python/kafka-etl/requirements.txt diff --git a/contrib/metadata-ingestion/openldap-etl/README.md b/contrib/metadata-ingestion/python/openldap-etl/README.md similarity index 100% rename from contrib/metadata-ingestion/openldap-etl/README.md rename to contrib/metadata-ingestion/python/openldap-etl/README.md diff --git a/contrib/metadata-ingestion/openldap-etl/docker-compose.yml b/contrib/metadata-ingestion/python/openldap-etl/docker-compose.yml similarity index 100% rename from contrib/metadata-ingestion/openldap-etl/docker-compose.yml rename to contrib/metadata-ingestion/python/openldap-etl/docker-compose.yml diff --git a/contrib/metadata-ingestion/openldap-etl/openldap-etl.py b/contrib/metadata-ingestion/python/openldap-etl/openldap-etl.py similarity index 100% rename from contrib/metadata-ingestion/openldap-etl/openldap-etl.py rename to contrib/metadata-ingestion/python/openldap-etl/openldap-etl.py diff --git a/contrib/metadata-ingestion/openldap-etl/requirements.txt b/contrib/metadata-ingestion/python/openldap-etl/requirements.txt similarity index 100% rename from contrib/metadata-ingestion/openldap-etl/requirements.txt rename to contrib/metadata-ingestion/python/openldap-etl/requirements.txt diff --git a/contrib/metadata-ingestion/openldap-etl/sample-ldif.txt b/contrib/metadata-ingestion/python/openldap-etl/sample-ldif.txt similarity index 100% rename from contrib/metadata-ingestion/openldap-etl/sample-ldif.txt rename to contrib/metadata-ingestion/python/openldap-etl/sample-ldif.txt diff --git a/metadata-events/mxe-avro-1.7/build.gradle b/metadata-events/mxe-avro-1.7/build.gradle index bf201ca631..a51191865a 100644 --- a/metadata-events/mxe-avro-1.7/build.gradle +++ b/metadata-events/mxe-avro-1.7/build.gradle @@ -12,19 +12,24 @@ dependencies { avsc project(':metadata-events:mxe-schemas') } +def genDir = file("src/generated/java") + task avroCodeGen(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask, dependsOn: configurations.avsc) { source("$rootDir/metadata-events/mxe-schemas/src/renamed/avro") - outputDir = file("src/generated/java") + outputDir = genDir + dependsOn(':metadata-events:mxe-schemas:renameNamespace') } compileJava.source(avroCodeGen.outputs) -build.dependsOn avroCodeGen -clean { - project.delete('src/generated') +idea { + module { + sourceDirs += genDir + generatedSourceDirs += genDir + } } -avroCodeGen.dependsOn(':metadata-events:mxe-schemas:renameNamespace') +project.rootProject.tasks.idea.dependsOn(avroCodeGen) // Exclude classes from avro-schemas jar { diff --git a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java index af3ef7d0cc..0b68b4a1db 100644 --- a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java +++ b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java @@ -53,7 +53,7 @@ public class EventUtils { @Nonnull private static Schema getAvroSchemaFromResource(@Nonnull String resourcePath) { - URL url = Resources.getResource(resourcePath); + URL url = EventUtils.class.getClassLoader().getResource(resourcePath); try { return Schema.parse(Resources.toString(url, Charsets.UTF_8)); } catch (IOException e) { diff --git a/metadata-ingestion-examples/README.md b/metadata-ingestion-examples/README.md new file mode 100644 index 0000000000..8b65ab583b --- /dev/null +++ b/metadata-ingestion-examples/README.md @@ -0,0 +1,47 @@ +# Metadata Ingestion + +This directory contains example apps for ingesting data into DataHub. + +You are more than welcome to use these examples directly, or use them as a reference for you own jobs. + +See the READMEs of each example for more information on each. + +### Common themes + +All these examples ingest by firing MetadataChangeEvent Kafka events. They do not ingest directly into DataHub, though +this is possible. Instead, the mce-consumer-job should be running, listening for these events, and perform the ingestion +for us. + +### A note on languages + +We initially wrote these examples in Python (they still exist in `metadata-ingestion`; TODO to delete them once they're +all ported). The idea was that these were very small example scripts, that should've been easy to use. However, upon +reflection, not all developers are familiar with Python, and the lack of types can hinder development. So the decision +was made to port the examples to Java. + +You're more than welcome to extrapolate these examples into whatever languages you like. At LinkedIn, we primarily use +Java. + +### Ingestion at LinkedIn + +It is worth noting that we do not use any of these examples directly (in Java, Python, or anything else) at LinkedIn. We +have several different pipelines for ingesting data; it all depends on the source. + +- Some pipelines are based off other Kafka events, where we'll transform some existing Kafka event to a metadata event. + - For example, we get Kafka events hive changes. We make MCEs out of those hive events to ingest hive data. +- For others, we've directly instrumented existing pipelines / apps / jobs to also emit metadata events. + - For example, TODO? Gobblin? +- For others still, we've created a series offline jobs to ingest data. + - For example, we have an Azkaban job to process our HDFS datasets. + +For some sources of data one of these example scripts may work fine. For others, it may make more sense to have some +custom logic, like the above list. Namely, all these examples today are one-off (they run, fire events, and then stop), +you may wish to build continuous ingestion pipelines instead. + +### "Real" Ingestion Applications + +We appreciate any contributions of apps you may wish to make to ingest data from other sources. + +TODO this section feels a little weird. Are our ingestion apps not really real apps? :p LDAP is real, as is kafka. +Granted, these are just one off apps to ingest. Maybe we should provide a library for these, then expose the one off +apps as examples? diff --git a/metadata-ingestion-examples/common/build.gradle b/metadata-ingestion-examples/common/build.gradle new file mode 100644 index 0000000000..dc633503ef --- /dev/null +++ b/metadata-ingestion-examples/common/build.gradle @@ -0,0 +1,20 @@ +plugins { + id 'java' +} + +dependencies { + compile project(':metadata-dao-impl:kafka-producer') + + compile externalDependency.javaxInject + compile externalDependency.kafkaAvroSerde + compile externalDependency.kafkaSerializers + compile externalDependency.lombok + compile externalDependency.springBeans + compile externalDependency.springBootAutoconfigure + compile externalDependency.springCore + compile externalDependency.springKafka + + annotationProcessor externalDependency.lombok + + runtime externalDependency.logbackClassic +} \ No newline at end of file diff --git a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/KafkaConfig.java b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/KafkaConfig.java new file mode 100644 index 0000000000..6877d4f407 --- /dev/null +++ b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/KafkaConfig.java @@ -0,0 +1,42 @@ +package com.linkedin.metadata.examples.configs; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import java.util.Arrays; +import java.util.Map; +import org.apache.avro.generic.IndexedRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +public class KafkaConfig { + @Value("${KAFKA_BOOTSTRAP_SERVER:localhost:29092}") + private String kafkaBootstrapServers; + + @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") + private String kafkaSchemaRegistryUrl; + + @Bean(name = "kafkaEventProducer") + public Producer kafkaListenerContainerFactory(KafkaProperties properties) { + KafkaProperties.Producer producerProps = properties.getProducer(); + + producerProps.setKeySerializer(StringSerializer.class); + producerProps.setValueSerializer(KafkaAvroSerializer.class); + + // KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS + if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { + producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); + } // else we rely on KafkaProperties which defaults to localhost:9092 + + Map props = properties.buildProducerProperties(); + props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); + + return new KafkaProducer<>(props); + } +} diff --git a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/SchemaRegistryConfig.java b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/SchemaRegistryConfig.java new file mode 100644 index 0000000000..ceb1d05d6a --- /dev/null +++ b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/SchemaRegistryConfig.java @@ -0,0 +1,19 @@ +package com.linkedin.metadata.examples.configs; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +public class SchemaRegistryConfig { + @Value("${SCHEMAREGISTRY_URL:http://localhost:8081}") + private String schemaRegistryUrl; + + @Bean(name = "schemaRegistryClient") + public SchemaRegistryClient schemaRegistryFactory() { + return new CachedSchemaRegistryClient(schemaRegistryUrl, 512); + } +} diff --git a/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/ZooKeeperConfig.java b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/ZooKeeperConfig.java new file mode 100644 index 0000000000..2923cd1a7e --- /dev/null +++ b/metadata-ingestion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/ZooKeeperConfig.java @@ -0,0 +1,26 @@ +package com.linkedin.metadata.examples.configs; + +import java.io.IOException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +public class ZooKeeperConfig { + @Value("${ZOOKEEPER:localhost:2181}") + private String zookeeper; + + @Value("${ZOOKEEPER_TIMEOUT_MILLIS:3000}") + private int timeoutMillis; + + @Bean(name = "zooKeeper") + public ZooKeeper zooKeeperFactory() throws IOException { + Watcher noopWatcher = event -> { + }; + + return new ZooKeeper(zookeeper, timeoutMillis, noopWatcher); + } +} diff --git a/metadata-ingestion-examples/kafka-etl/README.md b/metadata-ingestion-examples/kafka-etl/README.md new file mode 100644 index 0000000000..2d5aa3d07b --- /dev/null +++ b/metadata-ingestion-examples/kafka-etl/README.md @@ -0,0 +1,40 @@ +# Kafka ETL + +A small application which reads existing Kafka topics from ZooKeeper, retrieves their schema from the schema registry, +and then fires an MCE for each schema. + +## Running the Application + +First, ensure that services this depends on, like schema registry / zookeeper / mce-consumer-job / gms / etc, are all +running. + +This application can be run via gradle: + +``` +./gradlew :metadata-ingestion-examples:kafka-etl:bootRun +``` + +Or by building and running the jar: + +``` +./gradlew :metadata-ingestion-examples:kafka-etl:build + +java -jar metadata-ingestion-examples/kafka-etl/build/libs/kafka-etl.jar +``` + +### Environment Variables + +See the files under `src/main/java/com/linkedin/metadata/examples/kafka/config` for a list of customizable spring +environment variables. + +### Common pitfalls + +For events to be fired correctly, schemas must exist in the schema registry. If a topic was newly created, but no schema +has been registered for it yet, this application will fail to retrieve the schema for that topic. Check the output of +the application to see if this happens. If you see a message like + +``` +io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401 +``` + +Then the odds are good that you need to register the schema for this topic. \ No newline at end of file diff --git a/metadata-ingestion-examples/kafka-etl/build.gradle b/metadata-ingestion-examples/kafka-etl/build.gradle new file mode 100644 index 0000000000..9ab77481b4 --- /dev/null +++ b/metadata-ingestion-examples/kafka-etl/build.gradle @@ -0,0 +1,29 @@ +plugins { + id 'org.springframework.boot' + id 'java' +} + +dependencies { + compile project(':metadata-utils') + compile project(':metadata-builders') + compile project(':metadata-dao-impl:kafka-producer') + compile project(':metadata-events:mxe-schemas') + compile project(':metadata-ingestion-examples:common') + + compile externalDependency.javaxInject + compile externalDependency.kafkaAvroSerde + compile externalDependency.kafkaSerializers + compile externalDependency.lombok + compile externalDependency.springBeans + compile externalDependency.springBootAutoconfigure + compile externalDependency.springCore + compile externalDependency.springKafka + + annotationProcessor externalDependency.lombok + + runtime externalDependency.logbackClassic +} + +bootJar { + mainClassName = 'com.linkedin.metadata.examples.kafka.KafkaEtlApplication' +} \ No newline at end of file diff --git a/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtl.java b/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtl.java new file mode 100644 index 0000000000..d441ab2e2e --- /dev/null +++ b/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtl.java @@ -0,0 +1,115 @@ +package com.linkedin.metadata.examples.kafka; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.FabricType; +import com.linkedin.common.urn.CorpuserUrn; +import com.linkedin.common.urn.DataPlatformUrn; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.metadata.aspect.DatasetAspect; +import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer; +import com.linkedin.metadata.snapshot.DatasetSnapshot; +import com.linkedin.mxe.MetadataChangeEvent; +import com.linkedin.schema.KafkaSchema; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaFieldArray; +import com.linkedin.schema.SchemaFieldDataType; +import com.linkedin.schema.SchemaMetadata; +import com.linkedin.schema.StringType; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import java.util.List; +import javax.inject.Inject; +import javax.inject.Named; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.IndexedRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.zookeeper.ZooKeeper; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +/** + * Gathers Kafka topics from the local zookeeper instance and schemas from the schema registry, and then fires + * MetadataChangeEvents for their schemas. + * + *

This should cause DataHub to be populated with this information, assuming it and the mce-consumer-job are running + * locally. + * + *

Can be run with {@code ./gradlew :metadata-ingestion-examples:java:kafka-etl:bootRun}. + */ +@Slf4j +@Component +public final class KafkaEtl implements CommandLineRunner { + private static final DataPlatformUrn KAFKA_URN = new DataPlatformUrn("kafka"); + + @Inject + @Named("kafkaEventProducer") + private Producer _producer; + + @Inject + @Named("zooKeeper") + private ZooKeeper _zooKeeper; + + @Inject + @Named("schemaRegistryClient") + private SchemaRegistryClient _schemaRegistryClient; + + private SchemaMetadata buildDatasetSchema(String datasetName, String schema, int schemaVersion) { + final AuditStamp auditStamp = new AuditStamp(); + auditStamp.setTime(System.currentTimeMillis()); + auditStamp.setActor(new CorpuserUrn(System.getenv("USER"))); + final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema(); + platformSchema.setKafkaSchema(new KafkaSchema().setDocumentSchema(schema)); + return new SchemaMetadata().setSchemaName(datasetName) + .setPlatform(KAFKA_URN) + .setCreated(auditStamp) + .setLastModified(auditStamp) + .setVersion(schemaVersion) + .setHash("") + .setPlatformSchema(platformSchema) + .setFields(new SchemaFieldArray(new SchemaField().setFieldPath("") + .setDescription("") + .setNativeDataType("string") + .setType(new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType()))))); + } + + private void produceKafkaDatasetMce(SchemaMetadata schemaMetadata) { + MetadataChangeEvent.class.getClassLoader().getResource("avro/com/linkedin/mxe/MetadataChangeEvent.avsc"); + + // Kafka topics are considered datasets in the current DataHub metadata ecosystem. + final KafkaMetadataEventProducer eventProducer = + new KafkaMetadataEventProducer<>(DatasetSnapshot.class, DatasetAspect.class, _producer); + eventProducer.produceSnapshotBasedMetadataChangeEvent( + new DatasetUrn(KAFKA_URN, schemaMetadata.getSchemaName(), FabricType.PROD), schemaMetadata); + _producer.flush(); + } + + @Override + public void run(String... args) throws Exception { + log.info("Starting up"); + + final List topics = _zooKeeper.getChildren("/brokers/topics", false); + for (String datasetName : topics) { + if (datasetName.startsWith("_")) { + continue; + } + + final String topic = datasetName + "-value"; + io.confluent.kafka.schemaregistry.client.SchemaMetadata schemaMetadata; + try { + schemaMetadata = _schemaRegistryClient.getLatestSchemaMetadata(topic); + } catch (Throwable t) { + log.error("Failed to get schema for topic " + datasetName, t); + log.error("Common failure: does this event schema exist in the schema registry?"); + continue; + } + + if (schemaMetadata == null) { + log.warn(String.format("Skipping topic without schema: %s", topic)); + continue; + } + log.trace(topic); + + produceKafkaDatasetMce(buildDatasetSchema(datasetName, schemaMetadata.getSchema(), schemaMetadata.getVersion())); + log.info("Successfully fired MCE for " + datasetName); + } + } +} diff --git a/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtlApplication.java b/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtlApplication.java new file mode 100644 index 0000000000..d03e13f604 --- /dev/null +++ b/metadata-ingestion-examples/kafka-etl/src/main/java/com/linkedin/metadata/examples/kafka/KafkaEtlApplication.java @@ -0,0 +1,16 @@ +package com.linkedin.metadata.examples.kafka; + +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; + + +@SuppressWarnings("checkstyle:HideUtilityClassConstructor") +@SpringBootApplication(exclude = {RestClientAutoConfiguration.class}, scanBasePackages = { + "com.linkedin.metadata.examples.configs", "com.linkedin.metadata.examples.kafka"}) +public class KafkaEtlApplication { + public static void main(String[] args) { + new SpringApplicationBuilder(KafkaEtlApplication.class).web(WebApplicationType.NONE).run(args); + } +} diff --git a/metadata-ingestion-examples/kafka-etl/src/main/resources/logback.xml b/metadata-ingestion-examples/kafka-etl/src/main/resources/logback.xml new file mode 100644 index 0000000000..2c389931ec --- /dev/null +++ b/metadata-ingestion-examples/kafka-etl/src/main/resources/logback.xml @@ -0,0 +1,40 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + ${LOG_DIR}/kafka-etl-java.log + true + + %d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n + + + ${LOG_DIR}/kafka-etl.%i.log + 1 + 3 + + + 100MB + + + + + + + + + + + + + + + + + + diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index c66b57f399..25421c5fca 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -90,22 +90,6 @@ The ldap_etl provides you ETL channel to communicate with your LDAP server. ``` This will bootstrap DataHub with your metadata in the LDAP server as an user entity. -## Ingest metadata from Kafka to DataHub -The kafka_etl provides you ETL channel to communicate with your kafka. -``` -➜ Config your kafka environmental variable in the file. - ZOOKEEPER # Your zookeeper host. - -➜ Config your Kafka broker environmental variable in the file. - AVROLOADPATH # Your model event in avro format. - KAFKATOPIC # Your event topic. - BOOTSTRAP # Kafka bootstrap server. - SCHEMAREGISTRY # Kafka schema registry host. - -➜ python kafka_etl.py -``` -This will bootstrap DataHub with your metadata in the kafka as a dataset entity. - ## Ingest metadata from MySQL to DataHub The mysql_etl provides you ETL channel to communicate with your MySQL. ``` diff --git a/settings.gradle b/settings.gradle index b9238f82c6..108eceb673 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,6 +18,8 @@ include 'metadata-events:mxe-avro-1.7' include 'metadata-events:mxe-registration' include 'metadata-events:mxe-schemas' include 'metadata-events:mxe-utils-avro-1.7' +include 'metadata-ingestion-examples:common' +include 'metadata-ingestion-examples:kafka-etl' include 'metadata-jobs:mae-consumer-job' include 'metadata-jobs:mce-consumer-job' include 'metadata-models'