diff --git a/contrib/metadata-ingestion/README.md b/contrib/metadata-ingestion/README.md index c53fcfef4e..0342d69302 100644 --- a/contrib/metadata-ingestion/README.md +++ b/contrib/metadata-ingestion/README.md @@ -13,9 +13,9 @@ i split the ingestion procedure to two part: [datahub-producer] and different [m - [X] datahub-producer load json avro data. - [X] add lineage-hive generator - [X] add dataset-jdbc generator[include [mysql, mssql, postgresql, oracle] driver] -- [ ] enhance dataset-jdbc generator [hive-driver] +- [X] add dataset-hive generator +- [ ] *> add lineage-oracle generator - [ ] enhance lineage-jdbc generator to lazy iterator mode. -- [ ] add lineage-oracle generator - [ ] enchance avro parser to show error information @@ -31,22 +31,35 @@ i split the ingestion procedure to two part: [datahub-producer] and different [m nix-channel --update nixpkgs ``` -2. load json data to datahub +2. [optional] you can download specified dependency in advanced, or it will automatically download at run time. + +``` + nix-shell bin/[datahub-producer].hs.nix + nix-shell bin/[datahub-producer].py.nix + ... +``` + +3. load json data to datahub ``` cat sample/mce.json.dat | bin/datahub-producer.hs config ``` -3. parse hive sql to datahub +4. parse hive sql to datahub ``` ls sample/hive_*.sql | bin/lineage_hive_generator.hs | bin/datahub-producer.hs config ``` -4. load jdbc schema(mysql, mssql, postgresql, oracle) to datahub +5. load jdbc schema(mysql, mssql, postgresql, oracle) to datahub ``` bin/dataset-jdbc-generator.hs | bin/datahub-producer.hs config ``` +6. load hive schema to datahub +``` + bin/dataset-hive-generator.py | bin/datahub-producer.hs config +``` + ## Reference - hive/presto/vertica SQL Parser diff --git a/contrib/metadata-ingestion/bin/dataset-hive-generator.py b/contrib/metadata-ingestion/bin/dataset-hive-generator.py new file mode 100755 index 0000000000..8067644d39 --- /dev/null +++ b/contrib/metadata-ingestion/bin/dataset-hive-generator.py @@ -0,0 +1,66 @@ +#! /usr/bin/env nix-shell +#! nix-shell dataset-hive-generator.py.nix -i python + +import sys +import time +from pyhive import hive +from TCLIService.ttypes import TOperationState + +import simplejson as json + +HIVESTORE='localhost' + +AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc' +KAFKATOPIC = 'MetadataChangeEvent' +BOOTSTRAP = 'localhost:9092' +SCHEMAREGISTRY = 'http://localhost:8081' + +def hive_query(query): + """ + Execute the query to the HiveStore. + """ + cursor = hive.connect(HIVESTORE).cursor() + cursor.execute(query, async_=True) + status = cursor.poll().operationState + while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): + logs = cursor.fetch_logs() + for message in logs: + sys.stdout.write(message) + status = cursor.poll().operationState + results = cursor.fetchall() + return results + +def build_hive_dataset_mce(dataset_name, schema, metadata): + """ + Create the MetadataChangeEvent via dataset_name and schema. + """ + actor, type, created_time, upstreams_dataset, sys_time = "urn:li:corpuser:" + metadata[2][7:], str(metadata[-1][11:-1]), int(metadata[3][12:]), metadata[-28][10:], int(time.time()) + owners = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":sys_time,"actor":actor}} + upstreams = {"upstreams":[{"auditStamp":{"time":sys_time,"actor":actor},"dataset":"urn:li:dataset:(urn:li:dataPlatform:hive," + upstreams_dataset + ",PROD)","type":"TRANSFORMED"}]} + elements = {"elements":[{"url":HIVESTORE,"description":"sample doc to describe upstreams","createStamp":{"time":sys_time,"actor":actor}}]} + schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:hive","version":0,"created":{"time":created_time,"actor":actor}, + "lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.OtherSchema": {"rawSchema": schema}}, + "fields":[{"fieldPath":"","description":{"string":""},"nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]} + + mce = {"auditHeader": None, + "proposedSnapshot":{"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": + {"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,"+ dataset_name +",PROD)" + ,"aspects": [ + {"com.linkedin.pegasus2avro.common.Ownership": owners} + , {"com.linkedin.pegasus2avro.dataset.UpstreamLineage": upstreams} + , {"com.linkedin.pegasus2avro.common.InstitutionalMemory": elements} + , {"com.linkedin.pegasus2avro.schema.SchemaMetadata": schema_name} + ]}}, + "proposedDelta": None} + + print(json.dumps(mce)) + +databases = hive_query('show databases') +for database in databases: + tables = hive_query('show tables in ' + database[0]) + for table in tables: + dataset_name = database[0] + '.' + table[0] + description = hive_query('describe extended ' + dataset_name) + build_hive_dataset_mce(dataset_name, str(description[:-1][:-1]), description[-1][1].split(',')) + +sys.exit(0) diff --git a/contrib/metadata-ingestion/bin/dataset-hive-generator.py.nix b/contrib/metadata-ingestion/bin/dataset-hive-generator.py.nix new file mode 100644 index 0000000000..9bfce06fdd --- /dev/null +++ b/contrib/metadata-ingestion/bin/dataset-hive-generator.py.nix @@ -0,0 +1,61 @@ +with import {} ; +let + avro-python3-1_8 = python3Packages.buildPythonPackage rec { + pname = "avro-python3" ; + version = "1.8.2" ; + + src = python3Packages.fetchPypi { + inherit pname version ; + sha256 = "f82cf0d66189600b1e6b442f650ad5aca6c189576723dcbf6f9ce096eab81bd6" ; + } ; + doCheck = false; + } ; + + sasl = python3Packages.buildPythonPackage rec { + pname = "sasl" ; + version = "0.2.1" ; + + src = python3Packages.fetchPypi { + inherit pname version ; + sha256 = "04f22e17bbebe0cd42471757a48c2c07126773c38741b1dad8d9fe724c16289d" ; + } ; + doCheck = false; + propagatedBuildInputs = [ cyrus_sasl ] ++ (with python3Packages ; [six]) ; + } ; + + thrift_sasl = python3Packages.buildPythonPackage rec { + pname = "thrift_sasl" ; + version = "0.4.2" ; + + src = python3Packages.fetchPypi { + inherit pname version ; + sha256 = "6a1c54731cb3ce61bdc041d9dc36e21f0f56db0163bb7b57be84de3fda70922f" ; + } ; + doCheck = false; + propagatedBuildInputs = with python3Packages; [ thrift sasl ] ; + } ; + + PyHive = python3Packages.buildPythonPackage rec { + pname = "PyHive" ; + version = "0.6.1" ; + + src = python3Packages.fetchPypi { + inherit pname version ; + sha256 = "a5f2b2f8bcd85a8cd80ab64ff8fbfe1c09515d266650a56f789a8d89ad66d7f4" ; + } ; + doCheck = false; + propagatedBuildInputs = with python3Packages ; [ dateutil future thrift sasl thrift_sasl ]; + } ; + +in +mkShell { + buildInputs = (with python3Packages ;[ + python + requests + PyHive + + simplejson + # avro-python3-1_8 + # confluent-kafka + ]) ; +}