refactor(py3): Refactor all ETL scripts to using Python 3 exclusively (#1710)

* refactor(py3): Refactor all ETL scripts to using Python 3 exclusively

Fix https://github.com/linkedin/datahub/issues/1688

* Update requirements.txt
This commit is contained in:
Mars Lan 2020-06-25 15:16:04 -07:00 committed by GitHub
parent 60b7c63b26
commit fa9fe5e110
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 50 additions and 42 deletions

View File

@ -8,7 +8,13 @@ way to do that is through [Docker images](../docker).
./gradlew :metadata-events:mxe-schemas:build ./gradlew :metadata-events:mxe-schemas:build
``` ```
This is needed to generate `MetadataChangeEvent.avsc` which is the schema for `MetadataChangeEvent` Kafka topic. This is needed to generate `MetadataChangeEvent.avsc` which is the schema for `MetadataChangeEvent` Kafka topic.
3. Before launching each ETL ingestion pipeline, you can install/verify the library versions as below. 3. All the scripts are written using Python 3 and most likely won't work with Python 2.x interpreters.
You can verify the version of your Python using the following command.
```
python --version
```
We recommend using [pyenv](https://github.com/pyenv/pyenv) to install and manage your Python environment.
4. Before launching each ETL ingestion pipeline, you can install/verify the library versions as below.
``` ```
pip install --user -r requirements.txt pip install --user -r requirements.txt
``` ```

View File

@ -1,6 +1,8 @@
#! /usr/bin/python #! /usr/bin/python
import sys import sys
import time import time
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from pyhive import hive from pyhive import hive
from TCLIService.ttypes import TOperationState from TCLIService.ttypes import TOperationState
@ -16,7 +18,7 @@ def hive_query(query):
Execute the query to the HiveStore. Execute the query to the HiveStore.
""" """
cursor = hive.connect(HIVESTORE).cursor() cursor = hive.connect(HIVESTORE).cursor()
cursor.execute(query, async=True) cursor.execute(query, async_=True)
status = cursor.poll().operationState status = cursor.poll().operationState
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
logs = cursor.fetch_logs() logs = cursor.fetch_logs()
@ -49,9 +51,6 @@ def produce_hive_dataset_mce(mce):
""" """
Produce MetadataChangeEvent records. Produce MetadataChangeEvent records.
""" """
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
conf = {'bootstrap.servers': BOOTSTRAP, conf = {'bootstrap.servers': BOOTSTRAP,
'schema.registry.url': SCHEMAREGISTRY} 'schema.registry.url': SCHEMAREGISTRY}
record_schema = avro.load(AVROLOADPATH) record_schema = avro.load(AVROLOADPATH)

View File

@ -1,2 +1,4 @@
confluent-kafka[avro]==1.1.0 avro-python3==1.8.2
confluent-kafka==1.4.0
pyhive==0.6.1 pyhive==0.6.1
thrift-sasl==0.4.2

View File

@ -2,7 +2,9 @@
import sys import sys
import time import time
from kazoo.client import KazooClient from kazoo.client import KazooClient
from confluent.schemaregistry.client import CachedSchemaRegistryClient from confluent_kafka import avro
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro import AvroProducer
ZOOKEEPER='localhost:2181' ZOOKEEPER='localhost:2181'
AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc' AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc'
@ -15,7 +17,7 @@ def build_kafka_dataset_mce(dataset_name, schema, schema_version):
""" """
Create the MetadataChangeEvent via dataset_name and schema. Create the MetadataChangeEvent via dataset_name and schema.
""" """
actor, sys_time = "urn:li:corpuser:", long(time.time()) actor, sys_time = "urn:li:corpuser:", time.time()
schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:kafka","version":schema_version,"created":{"time":sys_time,"actor":actor}, schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:kafka","version":schema_version,"created":{"time":sys_time,"actor":actor},
"lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"documentSchema": schema}, "lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"documentSchema": schema},
"fields":[{"fieldPath":"","description":"","nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]} "fields":[{"fieldPath":"","description":"","nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]}
@ -31,9 +33,6 @@ def produce_kafka_dataset_mce(mce):
""" """
Produce MetadataChangeEvent records. Produce MetadataChangeEvent records.
""" """
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
conf = {'bootstrap.servers': BOOTSTRAP, conf = {'bootstrap.servers': BOOTSTRAP,
'schema.registry.url': SCHEMAREGISTRY} 'schema.registry.url': SCHEMAREGISTRY}
record_schema = avro.load(AVROLOADPATH) record_schema = avro.load(AVROLOADPATH)
@ -58,7 +57,11 @@ for dataset_name in topics:
continue continue
topic = dataset_name + '-value' topic = dataset_name + '-value'
schema_id, schema, schema_version = client.get_latest_schema(topic) schema_id, schema, schema_version = client.get_latest_schema(topic)
print topic if schema_id is None:
print(f"Skipping topic without schema: {topic}")
continue
print(topic)
build_kafka_dataset_mce(dataset_name, str(schema), int(schema_version)) build_kafka_dataset_mce(dataset_name, str(schema), int(schema_version))
sys.exit(0) sys.exit(0)

View File

@ -1,3 +1,3 @@
confluent-kafka[avro]==1.1.0 avro-python3==1.8.2
python-schema-registry-client==1.2.1 confluent-kafka==1.4.0
kazoo==2.5.0 kazoo==2.5.0

View File

@ -1,6 +1,8 @@
#! /usr/bin/python #! /usr/bin/python
import sys import sys
import ldap import ldap
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from ldap.controls import SimplePagedResultsControl from ldap.controls import SimplePagedResultsControl
from distutils.version import LooseVersion from distutils.version import LooseVersion
@ -10,7 +12,7 @@ LDAPSERVER ='LDAPSERVER'
BASEDN ='BASEDN' BASEDN ='BASEDN'
LDAPUSER = 'LDAPUSER' LDAPUSER = 'LDAPUSER'
LDAPPASSWORD = 'LDAPPASSWORD' LDAPPASSWORD = 'LDAPPASSWORD'
PAGESIZE = PAGESIZE PAGESIZE = 20
ATTRLIST = ['cn', 'title', 'mail', 'sAMAccountName', 'department','manager'] ATTRLIST = ['cn', 'title', 'mail', 'sAMAccountName', 'department','manager']
SEARCHFILTER='SEARCHFILTER' SEARCHFILTER='SEARCHFILTER'
@ -81,9 +83,6 @@ def produce_corp_user_mce(mce):
""" """
Produce MetadataChangeEvent records Produce MetadataChangeEvent records
""" """
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
conf = {'bootstrap.servers': BOOTSTRAP, conf = {'bootstrap.servers': BOOTSTRAP,
'schema.registry.url': SCHEMAREGISTRY} 'schema.registry.url': SCHEMAREGISTRY}
record_schema = avro.load(AVROLOADPATH) record_schema = avro.load(AVROLOADPATH)

View File

@ -1,2 +1,3 @@
confluent-kafka[avro]==1.1.0 avro-python3==1.8.2
confluent-kafka==1.4.0
python-ldap==3.2.0 python-ldap==3.2.0

View File

@ -1,6 +1,11 @@
#! /usr/bin/python #! /usr/bin/python
import argparse import argparse
import ast
from confluent_kafka import avro from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.serializer import SerializerError
topic = "MetadataChangeEvent" topic = "MetadataChangeEvent"
@ -13,9 +18,6 @@ def produce(conf, data_file, schema_record):
""" """
Produce MetadataChangeEvent records Produce MetadataChangeEvent records
""" """
from confluent_kafka.avro import AvroProducer
import ast
producer = AvroProducer(conf, default_value_schema=avro.load(schema_record)) producer = AvroProducer(conf, default_value_schema=avro.load(schema_record))
print("Producing MetadataChangeEvent records to topic {}. ^c to exit.".format(topic)) print("Producing MetadataChangeEvent records to topic {}. ^c to exit.".format(topic))
@ -36,7 +38,7 @@ def produce(conf, data_file, schema_record):
break break
except ValueError as e: except ValueError as e:
print ("Message serialization failed {}".format(e)) print ("Message serialization failed {}".format(e))
continue break
print("Flushing records...") print("Flushing records...")
producer.flush() producer.flush()
@ -46,9 +48,6 @@ def consume(conf, schema_record):
""" """
Consume MetadataChangeEvent records Consume MetadataChangeEvent records
""" """
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
print("Consuming MetadataChangeEvent records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"])) print("Consuming MetadataChangeEvent records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"]))
c = AvroConsumer(conf, reader_value_schema=avro.load(schema_record)) c = AvroConsumer(conf, reader_value_schema=avro.load(schema_record))

View File

@ -1,3 +1,2 @@
avro-python3==1.8.2; python_version == '3.7' avro-python3==1.8.2
confluent-kafka==1.1.0; python_version == '3.7' confluent-kafka==1.4.0
confluent-kafka[avro]==1.1.0; python_version < '3.7'

View File

@ -3,6 +3,8 @@ import sys
import time import time
import mysql.connector import mysql.connector
from mysql.connector import Error from mysql.connector import Error
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
HOST = 'HOST' HOST = 'HOST'
DATABASE = 'DATABASE' DATABASE = 'DATABASE'
@ -19,7 +21,7 @@ def build_mysql_dataset_mce(dataset_name, schema, schema_version):
""" """
Create the MetadataChangeEvent via dataset_name and schema. Create the MetadataChangeEvent via dataset_name and schema.
""" """
actor, fields, sys_time = "urn:li:corpuser:datahub", [], long(time.time()) actor, fields, sys_time = "urn:li:corpuser:datahub", [], time.time()
owner = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":0,"actor":actor}} owner = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":0,"actor":actor}}
@ -41,9 +43,6 @@ def produce_mysql_dataset_mce(mce):
""" """
Produce MetadataChangeEvent records. Produce MetadataChangeEvent records.
""" """
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
conf = {'bootstrap.servers': BOOTSTRAP, conf = {'bootstrap.servers': BOOTSTRAP,
'schema.registry.url': SCHEMAREGISTRY} 'schema.registry.url': SCHEMAREGISTRY}
record_schema = avro.load(AVROLOADPATH) record_schema = avro.load(AVROLOADPATH)

View File

@ -1,2 +1,3 @@
confluent-kafka[avro]==1.1.0 avro-python3==1.8.2
confluent-kafka==1.4.0
mysql-connector==2.2.9 mysql-connector==2.2.9