Brought up ingestion docker image.

This commit is contained in:
Chris Lee 2019-12-16 12:05:42 -08:00
parent 07a6e8b085
commit 00612524b4
7 changed files with 152 additions and 1 deletions

View File

@ -17,7 +17,7 @@ cd docker/quickstart && docker-compose pull && docker-compose up --build
```
4. After you have all Docker containers running in your machine, run below command to ingest provided sample data to Data Hub:
```
./gradlew :metadata-events:mxe-schemas:build && cd metadata-ingestion/mce-cli && pip install --user -r requirements.txt && python mce_cli.py produce -d bootstrap_mce.dat
docker build -t ingestion -f docker/ingestion/Dockerfile . && docker run --network host ingestion
```
Note: Make sure that you're using Java 8, we have a strict dependency to Java 8 for build.

View File

@ -15,6 +15,10 @@ on below Docker images to be able to run:
* [**Elasticsearch**](elasticsearch)
* [**MySQL**](mysql)
Local-built ingestion image allows you to create on an ad-hoc basis `metadatachangeevent` with Python script.
The pipeline depends on all the above images composing up.
* [**Ingestion**](ingestion)
## Prerequisites
You need to install [docker](https://docs.docker.com/install/) and [docker-compose](https://docs.docker.com/compose/install/).

View File

@ -0,0 +1,12 @@
FROM openjdk:8
COPY . datahub-src
RUN cd datahub-src && ./gradlew :metadata-events:mxe-schemas:build
FROM python:2.7-slim
WORKDIR /ingestion
COPY docker/ingestion/mce-cli/requirements.txt ./
RUN pip install -r requirements.txt
COPY docker/ingestion/mce-cli /ingestion
COPY metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc /ingestion
RUN rm -rf datahub-src
CMD [ "python", "mce_cli.py", "produce" , "-d" , "bootstrap_mce.dat" ]

View File

@ -0,0 +1,22 @@
# Data Hub MetadataChangeEvent (MCE) Ingestion Docker Image
Refer to [Data Hub Metadata Ingestion](../../metadata-ingestion/mce-cli) to have a quick understanding of the architecture and
responsibility of this service for the Data Hub.
## Build
```
docker build -t ingestion -f docker/ingestion/Dockerfile .
```
This command will build and deploy the image in your local store.
## Run container
```
docker run --network host ingestion
```
This command will start the container. If you have the image available in your local store, this image will be used
for the container otherwise it will build the image from local repository and then start that.
### Container configuration
#### Kafka and Data Hub GMS Containers
Before starting `ingestion` container, `datahub-gms`, `kafka` and `datahub-mce-consumer` containers should already be up and running.

View File

@ -0,0 +1,5 @@
{"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot", {"urn": "urn:li:corpuser:datahub", "aspects": [{"active": True, "displayName": "Data Hub", "fullName": "Data Hub", "email": "datahub@linkedin.com"}, {}]}), "proposedDelta": None}
{"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot", {"urn": "urn:li:corpuser:ksahin", "aspects": [{"active": True, "displayName": "Kerem Sahin", "fullName": "Kerem Sahin", "email": "ksahin@linkedin.com"}, {}]}), "proposedDelta": None}
{"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", {"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,pageViewsKafka,PROD)", "aspects": [{"owners":[{"owner":"urn:li:corpuser:ksahin","type":"DATAOWNER"}, {"owner":"urn:li:corpuser:datahub","type":"DATAOWNER"}],"lastModified":{"time":0,"actor":"urn:li:corpuser:ksahin"}},{"upstreams":[]},{"elements":[{"url":"https://www.linkedin.com","description":"Sample doc","createStamp":{"time":0,"actor":"urn:li:corpuser:ksahin"}}]},{"schemaName":"PageViewEvent","platform":"urn:li:dataPlatform:kafka","version":0,"created":{"time":0,"actor":"urn:li:corpuser:ksahin"},"lastModified":{"time":0,"actor":"urn:li:corpuser:ksahin"},"hash":"","platformSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"MetadataChangeEvent\",\"namespace\":\"com.linkedin.pegasus2avro.mxe\",\"doc\":\"Kafka event for proposing a metadata change for an entity.\",\"fields\":[{\"name\":\"auditHeader\",\"type\":{\"type\":\"record\",\"name\":\"KafkaAuditHeader\",\"namespace\":\"com.linkedin.pegasus2avro.avro2pegasus.events\",\"doc\":\"Header\"}}]}"},"fields":[{"fieldPath":"foo","description":"Bar","nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]}]}), "proposedDelta": None}
{"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", {"urn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,pageViewsHdfs,PROD)", "aspects": [{"owners":[{"owner":"urn:li:corpuser:ksahin","type":"DATAOWNER"}, {"owner":"urn:li:corpuser:datahub","type":"DATAOWNER"}],"lastModified":{"time":0,"actor":"urn:li:corpuser:ksahin"}},{"upstreams":[{"auditStamp":{"time":0,"actor":"urn:li:corpuser:ksahin"},"dataset":"urn:li:dataset:(urn:li:dataPlatform:kafka,pageViewsKafka,PROD)","type":"TRANSFORMED"}]},{"elements":[{"url":"https://www.linkedin.com","description":"Sample doc","createStamp":{"time":0,"actor":"urn:li:corpuser:ksahin"}}]},{"schemaName":"PageViewEvent","platform":"urn:li:dataPlatform:kafka","version":0,"created":{"time":0,"actor":"urn:li:corpuser:ksahin"},"lastModified":{"time":0,"actor":"urn:li:corpuser:ksahin"},"hash":"","platformSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"MetadataChangeEvent\",\"namespace\":\"com.linkedin.pegasus2avro.mxe\",\"doc\":\"Kafka event for proposing a metadata change for an entity.\",\"fields\":[{\"name\":\"auditHeader\",\"type\":{\"type\":\"record\",\"name\":\"KafkaAuditHeader\",\"namespace\":\"com.linkedin.pegasus2avro.avro2pegasus.events\",\"doc\":\"Header\"}}]}"},"fields":[{"fieldPath":"foo","description":"Bar","nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]}]}), "proposedDelta": None}
{"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", {"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,pageViewsHive,PROD)", "aspects": [{"owners":[{"owner":"urn:li:corpuser:ksahin","type":"DATAOWNER"}, {"owner":"urn:li:corpuser:datahub","type":"DATAOWNER"}],"lastModified":{"time":0,"actor":"urn:li:corpuser:ksahin"}},{"upstreams":[{"auditStamp":{"time":0,"actor":"urn:li:corpuser:ksahin"},"dataset":"urn:li:dataset:(urn:li:dataPlatform:hdfs,pageViewsHdfs,PROD)","type":"TRANSFORMED"}]},{"elements":[{"url":"https://www.linkedin.com","description":"Sample doc","createStamp":{"time":0,"actor":"urn:li:corpuser:ksahin"}}]},{"schemaName":"PageViewEvent","platform":"urn:li:dataPlatform:kafka","version":0,"created":{"time":0,"actor":"urn:li:corpuser:ksahin"},"lastModified":{"time":0,"actor":"urn:li:corpuser:ksahin"},"hash":"","platformSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"MetadataChangeEvent\",\"namespace\":\"com.linkedin.pegasus2avro.mxe\",\"doc\":\"Kafka event for proposing a metadata change for an entity.\",\"fields\":[{\"name\":\"auditHeader\",\"type\":{\"type\":\"record\",\"name\":\"KafkaAuditHeader\",\"namespace\":\"com.linkedin.pegasus2avro.avro2pegasus.events\",\"doc\":\"Header\"}}]}"},"fields":[{"fieldPath":"foo","description":"Bar","nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]}]}), "proposedDelta": None}

View File

@ -0,0 +1,107 @@
#! /usr/bin/python
import argparse
from confluent_kafka import avro
record_schema = avro.load("MetadataChangeEvent.avsc")
topic = "MetadataChangeEvent"
class MetadataChangeEvent(object):
def __init__(self, avro_event=None):
self.value = avro_event
def produce(conf, data_file):
"""
Produce MetadataChangeEvent records
"""
from confluent_kafka.avro import AvroProducer
import ast
producer = AvroProducer(conf, default_value_schema=record_schema)
print("Producing MetadataChangeEvent records to topic {}. ^c to exit.".format(topic))
with open(data_file) as fp:
cnt = 0
while True:
sample = fp.readline()
cnt += 1
if not sample:
break
try:
content = ast.literal_eval(sample.strip())
producer.produce(topic=topic, value=content)
producer.poll(0)
print(" MCE{}: {}".format(cnt, sample))
except KeyboardInterrupt:
break
except ValueError as e:
print ("Message serialization failed {}".format(e))
continue
print("Flushing records...")
producer.flush()
def consume(conf):
"""
Consume MetadataChangeEvent records
"""
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
print("Consuming MetadataChangeEvent records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"]))
c = AvroConsumer(conf, reader_value_schema=record_schema)
c.subscribe([topic])
while True:
try:
msg = c.poll(1)
# There were no messages on the queue, continue polling
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
record = MetadataChangeEvent(msg.value())
print("avro_event: {}\n\t".format(record.value))
except SerializerError as e:
# Report malformed record, discard results, continue polling
print("Message deserialization failed {}".format(e))
continue
except KeyboardInterrupt:
break
print("Shutting down consumer..")
c.close()
def main(args):
# Handle common configs
conf = {'bootstrap.servers': args.bootstrap_servers,
'schema.registry.url': args.schema_registry}
if args.mode == "produce":
produce(conf, args.data_file)
else:
# Fallback to earliest to ensure all messages are consumed
conf['group.id'] = topic
conf['auto.offset.reset'] = "earliest"
consume(conf)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Client for producing/consuming MetadataChangeEvent")
parser.add_argument('-b', dest="bootstrap_servers",
default="localhost:9092", help="Kafka broker(s) (localhost[:port])")
parser.add_argument('-s', dest="schema_registry",
default="http://localhost:8081", help="Schema Registry (http(s)://localhost[:port]")
parser.add_argument('mode', choices=['produce', 'consume'],
help="Execution mode (produce | consume)")
parser.add_argument('-d', dest="data_file", default="bootstrap_mce.dat",
help="MCE data file; required if running 'producer' mode")
main(parser.parse_args())

View File

@ -0,0 +1 @@
confluent-kafka[avro]==1.1.0