diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 6a38587d8a..4e6688feec 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -4,14 +4,20 @@ 1. Before running any metadata ingestion job, you should make sure that DataHub backend services are all running. Easiest way to do that is through [Docker images](../docker). 2. You also need to build the `mxe-schemas` module as below. - ``` - ./gradlew :metadata-events:mxe-schemas:build - ``` - This is needed to generate `MetadataChangeEvent.avsc` which is the schema for `MetadataChangeEvent` Kafka topic. -3. Before launching each ETL ingestion pipeline, you can install/verify the library versions as below. - ``` - pip install --user -r requirements.txt - ``` + ``` + ./gradlew :metadata-events:mxe-schemas:build + ``` + This is needed to generate `MetadataChangeEvent.avsc` which is the schema for `MetadataChangeEvent` Kafka topic. +3. All the scripts are written using Python 3 and most likely won't work with Python 2.x interpreters. + You can verify the version of your Python using the following command. + ``` + python --version + ``` + We recommend using [pyenv](https://github.com/pyenv/pyenv) to install and manage your Python environment. +4. Before launching each ETL ingestion pipeline, you can install/verify the library versions as below. + ``` + pip install --user -r requirements.txt + ``` ## MCE Producer/Consumer CLI `mce_cli.py` script provides a convenient way to produce a list of MCEs from a data file. diff --git a/metadata-ingestion/hive-etl/hive_etl.py b/metadata-ingestion/hive-etl/hive_etl.py index f2207d27f9..7a772e7b28 100644 --- a/metadata-ingestion/hive-etl/hive_etl.py +++ b/metadata-ingestion/hive-etl/hive_etl.py @@ -1,6 +1,8 @@ #! /usr/bin/python import sys import time +from confluent_kafka import avro +from confluent_kafka.avro import AvroProducer from pyhive import hive from TCLIService.ttypes import TOperationState @@ -16,7 +18,7 @@ def hive_query(query): Execute the query to the HiveStore. """ cursor = hive.connect(HIVESTORE).cursor() - cursor.execute(query, async=True) + cursor.execute(query, async_=True) status = cursor.poll().operationState while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): logs = cursor.fetch_logs() @@ -49,9 +51,6 @@ def produce_hive_dataset_mce(mce): """ Produce MetadataChangeEvent records. """ - from confluent_kafka import avro - from confluent_kafka.avro import AvroProducer - conf = {'bootstrap.servers': BOOTSTRAP, 'schema.registry.url': SCHEMAREGISTRY} record_schema = avro.load(AVROLOADPATH) diff --git a/metadata-ingestion/hive-etl/requirements.txt b/metadata-ingestion/hive-etl/requirements.txt index 8f224207f0..dfa41875b2 100644 --- a/metadata-ingestion/hive-etl/requirements.txt +++ b/metadata-ingestion/hive-etl/requirements.txt @@ -1,2 +1,4 @@ -confluent-kafka[avro]==1.1.0 -pyhive==0.6.1 \ No newline at end of file +avro-python3==1.8.2 +confluent-kafka==1.4.0 +pyhive==0.6.1 +thrift-sasl==0.4.2 diff --git a/metadata-ingestion/kafka-etl/kafka_etl.py b/metadata-ingestion/kafka-etl/kafka_etl.py index cbf998d055..53eb5cfd2d 100644 --- a/metadata-ingestion/kafka-etl/kafka_etl.py +++ b/metadata-ingestion/kafka-etl/kafka_etl.py @@ -2,7 +2,9 @@ import sys import time from kazoo.client import KazooClient -from confluent.schemaregistry.client import CachedSchemaRegistryClient +from confluent_kafka import avro +from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient +from confluent_kafka.avro import AvroProducer ZOOKEEPER='localhost:2181' AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc' @@ -15,7 +17,7 @@ def build_kafka_dataset_mce(dataset_name, schema, schema_version): """ Create the MetadataChangeEvent via dataset_name and schema. """ - actor, sys_time = "urn:li:corpuser:", long(time.time()) + actor, sys_time = "urn:li:corpuser:", time.time() schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:kafka","version":schema_version,"created":{"time":sys_time,"actor":actor}, "lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"documentSchema": schema}, "fields":[{"fieldPath":"","description":"","nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]} @@ -31,9 +33,6 @@ def produce_kafka_dataset_mce(mce): """ Produce MetadataChangeEvent records. """ - from confluent_kafka import avro - from confluent_kafka.avro import AvroProducer - conf = {'bootstrap.servers': BOOTSTRAP, 'schema.registry.url': SCHEMAREGISTRY} record_schema = avro.load(AVROLOADPATH) @@ -58,7 +57,11 @@ for dataset_name in topics: continue topic = dataset_name + '-value' schema_id, schema, schema_version = client.get_latest_schema(topic) - print topic + if schema_id is None: + print(f"Skipping topic without schema: {topic}") + continue + + print(topic) build_kafka_dataset_mce(dataset_name, str(schema), int(schema_version)) sys.exit(0) \ No newline at end of file diff --git a/metadata-ingestion/kafka-etl/requirements.txt b/metadata-ingestion/kafka-etl/requirements.txt index a47b16bc64..53ed252f9d 100644 --- a/metadata-ingestion/kafka-etl/requirements.txt +++ b/metadata-ingestion/kafka-etl/requirements.txt @@ -1,3 +1,3 @@ -confluent-kafka[avro]==1.1.0 -python-schema-registry-client==1.2.1 +avro-python3==1.8.2 +confluent-kafka==1.4.0 kazoo==2.5.0 \ No newline at end of file diff --git a/metadata-ingestion/ldap-etl/ldap_etl.py b/metadata-ingestion/ldap-etl/ldap_etl.py index 1c700d7d19..2892bb65cb 100644 --- a/metadata-ingestion/ldap-etl/ldap_etl.py +++ b/metadata-ingestion/ldap-etl/ldap_etl.py @@ -1,6 +1,8 @@ #! /usr/bin/python import sys import ldap +from confluent_kafka import avro +from confluent_kafka.avro import AvroProducer from ldap.controls import SimplePagedResultsControl from distutils.version import LooseVersion @@ -10,7 +12,7 @@ LDAPSERVER ='LDAPSERVER' BASEDN ='BASEDN' LDAPUSER = 'LDAPUSER' LDAPPASSWORD = 'LDAPPASSWORD' -PAGESIZE = PAGESIZE +PAGESIZE = 20 ATTRLIST = ['cn', 'title', 'mail', 'sAMAccountName', 'department','manager'] SEARCHFILTER='SEARCHFILTER' @@ -81,9 +83,6 @@ def produce_corp_user_mce(mce): """ Produce MetadataChangeEvent records """ - from confluent_kafka import avro - from confluent_kafka.avro import AvroProducer - conf = {'bootstrap.servers': BOOTSTRAP, 'schema.registry.url': SCHEMAREGISTRY} record_schema = avro.load(AVROLOADPATH) diff --git a/metadata-ingestion/ldap-etl/requirements.txt b/metadata-ingestion/ldap-etl/requirements.txt index 3baa66925b..d27e41dee4 100644 --- a/metadata-ingestion/ldap-etl/requirements.txt +++ b/metadata-ingestion/ldap-etl/requirements.txt @@ -1,2 +1,3 @@ -confluent-kafka[avro]==1.1.0 +avro-python3==1.8.2 +confluent-kafka==1.4.0 python-ldap==3.2.0 \ No newline at end of file diff --git a/metadata-ingestion/mce-cli/mce_cli.py b/metadata-ingestion/mce-cli/mce_cli.py index 67d757ac08..57c4c07b80 100644 --- a/metadata-ingestion/mce-cli/mce_cli.py +++ b/metadata-ingestion/mce-cli/mce_cli.py @@ -1,6 +1,11 @@ #! /usr/bin/python import argparse +import ast from confluent_kafka import avro +from confluent_kafka.avro import AvroConsumer +from confluent_kafka.avro import AvroProducer +from confluent_kafka.avro.serializer import SerializerError + topic = "MetadataChangeEvent" @@ -13,9 +18,6 @@ def produce(conf, data_file, schema_record): """ Produce MetadataChangeEvent records """ - from confluent_kafka.avro import AvroProducer - import ast - producer = AvroProducer(conf, default_value_schema=avro.load(schema_record)) print("Producing MetadataChangeEvent records to topic {}. ^c to exit.".format(topic)) @@ -36,7 +38,7 @@ def produce(conf, data_file, schema_record): break except ValueError as e: print ("Message serialization failed {}".format(e)) - continue + break print("Flushing records...") producer.flush() @@ -46,9 +48,6 @@ def consume(conf, schema_record): """ Consume MetadataChangeEvent records """ - from confluent_kafka.avro import AvroConsumer - from confluent_kafka.avro.serializer import SerializerError - print("Consuming MetadataChangeEvent records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"])) c = AvroConsumer(conf, reader_value_schema=avro.load(schema_record)) diff --git a/metadata-ingestion/mce-cli/requirements.txt b/metadata-ingestion/mce-cli/requirements.txt index 6e0d1bb837..c94a21f046 100644 --- a/metadata-ingestion/mce-cli/requirements.txt +++ b/metadata-ingestion/mce-cli/requirements.txt @@ -1,3 +1,2 @@ -avro-python3==1.8.2; python_version == '3.7' -confluent-kafka==1.1.0; python_version == '3.7' -confluent-kafka[avro]==1.1.0; python_version < '3.7' +avro-python3==1.8.2 +confluent-kafka==1.4.0 diff --git a/metadata-ingestion/mysql-etl/mysql_etl.py b/metadata-ingestion/mysql-etl/mysql_etl.py index 18feb60084..bbc3541264 100644 --- a/metadata-ingestion/mysql-etl/mysql_etl.py +++ b/metadata-ingestion/mysql-etl/mysql_etl.py @@ -3,6 +3,8 @@ import sys import time import mysql.connector from mysql.connector import Error +from confluent_kafka import avro +from confluent_kafka.avro import AvroProducer HOST = 'HOST' DATABASE = 'DATABASE' @@ -19,7 +21,7 @@ def build_mysql_dataset_mce(dataset_name, schema, schema_version): """ Create the MetadataChangeEvent via dataset_name and schema. """ - actor, fields, sys_time = "urn:li:corpuser:datahub", [], long(time.time()) + actor, fields, sys_time = "urn:li:corpuser:datahub", [], time.time() owner = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":0,"actor":actor}} @@ -41,9 +43,6 @@ def produce_mysql_dataset_mce(mce): """ Produce MetadataChangeEvent records. """ - from confluent_kafka import avro - from confluent_kafka.avro import AvroProducer - conf = {'bootstrap.servers': BOOTSTRAP, 'schema.registry.url': SCHEMAREGISTRY} record_schema = avro.load(AVROLOADPATH) diff --git a/metadata-ingestion/mysql-etl/requirements.txt b/metadata-ingestion/mysql-etl/requirements.txt index aa5c30229d..3d4c48569d 100644 --- a/metadata-ingestion/mysql-etl/requirements.txt +++ b/metadata-ingestion/mysql-etl/requirements.txt @@ -1,2 +1,3 @@ -confluent-kafka[avro]==1.1.0 +avro-python3==1.8.2 +confluent-kafka==1.4.0 mysql-connector==2.2.9 \ No newline at end of file