mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-03 13:06:41 +00:00
parent
c97d3fc102
commit
c7851b7df4
@ -13,9 +13,9 @@ i split the ingestion procedure to two part: [datahub-producer] and different [m
|
||||
- [X] datahub-producer load json avro data.
|
||||
- [X] add lineage-hive generator
|
||||
- [X] add dataset-jdbc generator[include [mysql, mssql, postgresql, oracle] driver]
|
||||
- [ ] enhance dataset-jdbc generator [hive-driver]
|
||||
- [X] add dataset-hive generator
|
||||
- [ ] *> add lineage-oracle generator
|
||||
- [ ] enhance lineage-jdbc generator to lazy iterator mode.
|
||||
- [ ] add lineage-oracle generator
|
||||
- [ ] enchance avro parser to show error information
|
||||
|
||||
|
||||
@ -31,22 +31,35 @@ i split the ingestion procedure to two part: [datahub-producer] and different [m
|
||||
nix-channel --update nixpkgs
|
||||
```
|
||||
|
||||
2. load json data to datahub
|
||||
2. [optional] you can download specified dependency in advanced, or it will automatically download at run time.
|
||||
|
||||
```
|
||||
nix-shell bin/[datahub-producer].hs.nix
|
||||
nix-shell bin/[datahub-producer].py.nix
|
||||
...
|
||||
```
|
||||
|
||||
3. load json data to datahub
|
||||
|
||||
```
|
||||
cat sample/mce.json.dat | bin/datahub-producer.hs config
|
||||
```
|
||||
|
||||
3. parse hive sql to datahub
|
||||
4. parse hive sql to datahub
|
||||
```
|
||||
ls sample/hive_*.sql | bin/lineage_hive_generator.hs | bin/datahub-producer.hs config
|
||||
```
|
||||
|
||||
4. load jdbc schema(mysql, mssql, postgresql, oracle) to datahub
|
||||
5. load jdbc schema(mysql, mssql, postgresql, oracle) to datahub
|
||||
```
|
||||
bin/dataset-jdbc-generator.hs | bin/datahub-producer.hs config
|
||||
```
|
||||
|
||||
6. load hive schema to datahub
|
||||
```
|
||||
bin/dataset-hive-generator.py | bin/datahub-producer.hs config
|
||||
```
|
||||
|
||||
## Reference
|
||||
|
||||
- hive/presto/vertica SQL Parser
|
||||
|
66
contrib/metadata-ingestion/bin/dataset-hive-generator.py
Executable file
66
contrib/metadata-ingestion/bin/dataset-hive-generator.py
Executable file
@ -0,0 +1,66 @@
|
||||
#! /usr/bin/env nix-shell
|
||||
#! nix-shell dataset-hive-generator.py.nix -i python
|
||||
|
||||
import sys
|
||||
import time
|
||||
from pyhive import hive
|
||||
from TCLIService.ttypes import TOperationState
|
||||
|
||||
import simplejson as json
|
||||
|
||||
HIVESTORE='localhost'
|
||||
|
||||
AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc'
|
||||
KAFKATOPIC = 'MetadataChangeEvent'
|
||||
BOOTSTRAP = 'localhost:9092'
|
||||
SCHEMAREGISTRY = 'http://localhost:8081'
|
||||
|
||||
def hive_query(query):
|
||||
"""
|
||||
Execute the query to the HiveStore.
|
||||
"""
|
||||
cursor = hive.connect(HIVESTORE).cursor()
|
||||
cursor.execute(query, async_=True)
|
||||
status = cursor.poll().operationState
|
||||
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
|
||||
logs = cursor.fetch_logs()
|
||||
for message in logs:
|
||||
sys.stdout.write(message)
|
||||
status = cursor.poll().operationState
|
||||
results = cursor.fetchall()
|
||||
return results
|
||||
|
||||
def build_hive_dataset_mce(dataset_name, schema, metadata):
|
||||
"""
|
||||
Create the MetadataChangeEvent via dataset_name and schema.
|
||||
"""
|
||||
actor, type, created_time, upstreams_dataset, sys_time = "urn:li:corpuser:" + metadata[2][7:], str(metadata[-1][11:-1]), int(metadata[3][12:]), metadata[-28][10:], int(time.time())
|
||||
owners = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":sys_time,"actor":actor}}
|
||||
upstreams = {"upstreams":[{"auditStamp":{"time":sys_time,"actor":actor},"dataset":"urn:li:dataset:(urn:li:dataPlatform:hive," + upstreams_dataset + ",PROD)","type":"TRANSFORMED"}]}
|
||||
elements = {"elements":[{"url":HIVESTORE,"description":"sample doc to describe upstreams","createStamp":{"time":sys_time,"actor":actor}}]}
|
||||
schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:hive","version":0,"created":{"time":created_time,"actor":actor},
|
||||
"lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.OtherSchema": {"rawSchema": schema}},
|
||||
"fields":[{"fieldPath":"","description":{"string":""},"nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]}
|
||||
|
||||
mce = {"auditHeader": None,
|
||||
"proposedSnapshot":{"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot":
|
||||
{"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,"+ dataset_name +",PROD)"
|
||||
,"aspects": [
|
||||
{"com.linkedin.pegasus2avro.common.Ownership": owners}
|
||||
, {"com.linkedin.pegasus2avro.dataset.UpstreamLineage": upstreams}
|
||||
, {"com.linkedin.pegasus2avro.common.InstitutionalMemory": elements}
|
||||
, {"com.linkedin.pegasus2avro.schema.SchemaMetadata": schema_name}
|
||||
]}},
|
||||
"proposedDelta": None}
|
||||
|
||||
print(json.dumps(mce))
|
||||
|
||||
databases = hive_query('show databases')
|
||||
for database in databases:
|
||||
tables = hive_query('show tables in ' + database[0])
|
||||
for table in tables:
|
||||
dataset_name = database[0] + '.' + table[0]
|
||||
description = hive_query('describe extended ' + dataset_name)
|
||||
build_hive_dataset_mce(dataset_name, str(description[:-1][:-1]), description[-1][1].split(','))
|
||||
|
||||
sys.exit(0)
|
61
contrib/metadata-ingestion/bin/dataset-hive-generator.py.nix
Normal file
61
contrib/metadata-ingestion/bin/dataset-hive-generator.py.nix
Normal file
@ -0,0 +1,61 @@
|
||||
with import <nixpkgs> {} ;
|
||||
let
|
||||
avro-python3-1_8 = python3Packages.buildPythonPackage rec {
|
||||
pname = "avro-python3" ;
|
||||
version = "1.8.2" ;
|
||||
|
||||
src = python3Packages.fetchPypi {
|
||||
inherit pname version ;
|
||||
sha256 = "f82cf0d66189600b1e6b442f650ad5aca6c189576723dcbf6f9ce096eab81bd6" ;
|
||||
} ;
|
||||
doCheck = false;
|
||||
} ;
|
||||
|
||||
sasl = python3Packages.buildPythonPackage rec {
|
||||
pname = "sasl" ;
|
||||
version = "0.2.1" ;
|
||||
|
||||
src = python3Packages.fetchPypi {
|
||||
inherit pname version ;
|
||||
sha256 = "04f22e17bbebe0cd42471757a48c2c07126773c38741b1dad8d9fe724c16289d" ;
|
||||
} ;
|
||||
doCheck = false;
|
||||
propagatedBuildInputs = [ cyrus_sasl ] ++ (with python3Packages ; [six]) ;
|
||||
} ;
|
||||
|
||||
thrift_sasl = python3Packages.buildPythonPackage rec {
|
||||
pname = "thrift_sasl" ;
|
||||
version = "0.4.2" ;
|
||||
|
||||
src = python3Packages.fetchPypi {
|
||||
inherit pname version ;
|
||||
sha256 = "6a1c54731cb3ce61bdc041d9dc36e21f0f56db0163bb7b57be84de3fda70922f" ;
|
||||
} ;
|
||||
doCheck = false;
|
||||
propagatedBuildInputs = with python3Packages; [ thrift sasl ] ;
|
||||
} ;
|
||||
|
||||
PyHive = python3Packages.buildPythonPackage rec {
|
||||
pname = "PyHive" ;
|
||||
version = "0.6.1" ;
|
||||
|
||||
src = python3Packages.fetchPypi {
|
||||
inherit pname version ;
|
||||
sha256 = "a5f2b2f8bcd85a8cd80ab64ff8fbfe1c09515d266650a56f789a8d89ad66d7f4" ;
|
||||
} ;
|
||||
doCheck = false;
|
||||
propagatedBuildInputs = with python3Packages ; [ dateutil future thrift sasl thrift_sasl ];
|
||||
} ;
|
||||
|
||||
in
|
||||
mkShell {
|
||||
buildInputs = (with python3Packages ;[
|
||||
python
|
||||
requests
|
||||
PyHive
|
||||
|
||||
simplejson
|
||||
# avro-python3-1_8
|
||||
# confluent-kafka
|
||||
]) ;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user