mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-26 09:35:23 +00:00
Remove old ingestion scripts
This commit is contained in:
parent
b91d0cf63b
commit
95faca45e2
@ -1,29 +0,0 @@
|
|||||||
# SQL-Based Metadata Ingestion
|
|
||||||
|
|
||||||
This directory contains example ETL scripts that use [SQLAlchemy](https://www.sqlalchemy.org/) to ingest basic metadata
|
|
||||||
from a wide range of [commonly used SQL-based data systems](https://docs.sqlalchemy.org/en/13/dialects/index.html),
|
|
||||||
including MySQL, PostgreSQL, Oracle, MS SQL, Redshift, BigQuery, Snowflake, etc.
|
|
||||||
|
|
||||||
## Requirements
|
|
||||||
You'll need to install both the common requirements (`common.txt`) and the system-specific driver for the script (e.g.
|
|
||||||
`mysql_etl.txt` for `mysql_etl.py`). Some drivers also require additional dependencies to be installed so please check
|
|
||||||
the driver's official project page for more details.
|
|
||||||
|
|
||||||
## Example
|
|
||||||
Here's an example on how to ingest metadata from MySQL.
|
|
||||||
|
|
||||||
Install requirements
|
|
||||||
```
|
|
||||||
pip install --user -r common.txt -r mysql_etl.txt
|
|
||||||
```
|
|
||||||
|
|
||||||
Modify these variables in `mysql_etl.py` to match your environment
|
|
||||||
```
|
|
||||||
URL # Connection URL in the form of mysql+pymysql://username:password@hostname:port
|
|
||||||
OPTIONS # Additional conenction options for the driver
|
|
||||||
```
|
|
||||||
|
|
||||||
Run the ETL script
|
|
||||||
```
|
|
||||||
python mysql_etl.py
|
|
||||||
```
|
|
@ -1,8 +0,0 @@
|
|||||||
from common import run
|
|
||||||
|
|
||||||
# See https://github.com/mxmzdlv/pybigquery/ for more details
|
|
||||||
URL = '' # e.g. bigquery://project_id
|
|
||||||
OPTIONS = {} # e.g. {"credentials_path": "/path/to/keyfile.json"}
|
|
||||||
PLATFORM = 'bigquery'
|
|
||||||
|
|
||||||
run(URL, OPTIONS, PLATFORM)
|
|
@ -1 +0,0 @@
|
|||||||
pybigquery==0.4.15
|
|
@ -1,111 +0,0 @@
|
|||||||
#! /usr/bin/python
|
|
||||||
import time
|
|
||||||
|
|
||||||
from confluent_kafka import avro
|
|
||||||
from confluent_kafka.avro import AvroProducer
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from sqlalchemy import create_engine
|
|
||||||
from sqlalchemy import types
|
|
||||||
from sqlalchemy.engine import reflection
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class KafkaConfig:
|
|
||||||
avsc_path = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc'
|
|
||||||
kafka_topic = 'MetadataChangeEvent_v4'
|
|
||||||
bootstrap_server = 'localhost:9092'
|
|
||||||
schema_registry = 'http://localhost:8081'
|
|
||||||
|
|
||||||
|
|
||||||
def get_column_type(column_type):
|
|
||||||
"""
|
|
||||||
Maps SQLAlchemy types (https://docs.sqlalchemy.org/en/13/core/type_basics.html) to corresponding schema types
|
|
||||||
"""
|
|
||||||
if isinstance(column_type, (types.Integer, types.Numeric)):
|
|
||||||
return ("com.linkedin.pegasus2avro.schema.NumberType", {})
|
|
||||||
|
|
||||||
if isinstance(column_type, (types.Boolean)):
|
|
||||||
return ("com.linkedin.pegasus2avro.schema.BooleanType", {})
|
|
||||||
|
|
||||||
if isinstance(column_type, (types.Enum)):
|
|
||||||
return ("com.linkedin.pegasus2avro.schema.EnumType", {})
|
|
||||||
|
|
||||||
if isinstance(column_type, (types._Binary, types.PickleType)):
|
|
||||||
return ("com.linkedin.pegasus2avro.schema.BytesType", {})
|
|
||||||
|
|
||||||
if isinstance(column_type, (types.ARRAY)):
|
|
||||||
return ("com.linkedin.pegasus2avro.schema.ArrayType", {})
|
|
||||||
|
|
||||||
if isinstance(column_type, (types.String)):
|
|
||||||
return ("com.linkedin.pegasus2avro.schema.StringType", {})
|
|
||||||
|
|
||||||
return ("com.linkedin.pegasus2avro.schema.NullType", {})
|
|
||||||
|
|
||||||
|
|
||||||
def build_dataset_mce(platform, dataset_name, columns):
|
|
||||||
"""
|
|
||||||
Creates MetadataChangeEvent for the dataset.
|
|
||||||
"""
|
|
||||||
actor, sys_time = "urn:li:corpuser:etl", int(time.time()) * 1000
|
|
||||||
|
|
||||||
fields = []
|
|
||||||
for column in columns:
|
|
||||||
fields.append({
|
|
||||||
"fieldPath": column["name"],
|
|
||||||
"nativeDataType": repr(column["type"]),
|
|
||||||
"type": { "type":get_column_type(column["type"]) },
|
|
||||||
"description": column.get("comment", None)
|
|
||||||
})
|
|
||||||
|
|
||||||
schema_metadata = {
|
|
||||||
"schemaName": dataset_name,
|
|
||||||
"platform": f"urn:li:dataPlatform:{platform}",
|
|
||||||
"version": 0,
|
|
||||||
"created": { "time": sys_time, "actor": actor },
|
|
||||||
"lastModified": { "time":sys_time, "actor": actor },
|
|
||||||
"hash": "",
|
|
||||||
"platformSchema": { "tableSchema": "" },
|
|
||||||
"fields": fields
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
"auditHeader": None,
|
|
||||||
"proposedSnapshot":("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", {
|
|
||||||
"urn": f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},PROD)",
|
|
||||||
"aspects": [("com.linkedin.pegasus2avro.schema.SchemaMetadata", schema_metadata)]
|
|
||||||
}),
|
|
||||||
"proposedDelta": None
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def delivery_report(err, msg):
|
|
||||||
""" Called once for each message produced to indicate delivery result.
|
|
||||||
Triggered by poll() or flush(). """
|
|
||||||
if err is not None:
|
|
||||||
print('Message delivery failed: {}'.format(err))
|
|
||||||
else:
|
|
||||||
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
|
|
||||||
|
|
||||||
|
|
||||||
def produce_dataset_mce(mce, kafka_config):
|
|
||||||
"""
|
|
||||||
Produces a MetadataChangeEvent to Kafka
|
|
||||||
"""
|
|
||||||
conf = {'bootstrap.servers': kafka_config.bootstrap_server,
|
|
||||||
'on_delivery': delivery_report,
|
|
||||||
'schema.registry.url': kafka_config.schema_registry}
|
|
||||||
key_schema = avro.loads('{"type": "string"}')
|
|
||||||
record_schema = avro.load(kafka_config.avsc_path)
|
|
||||||
producer = AvroProducer(conf, default_key_schema=key_schema, default_value_schema=record_schema)
|
|
||||||
|
|
||||||
producer.produce(topic=kafka_config.kafka_topic, key=mce['proposedSnapshot'][1]['urn'], value=mce)
|
|
||||||
producer.flush()
|
|
||||||
|
|
||||||
|
|
||||||
def run(url, options, platform, kafka_config = KafkaConfig()):
|
|
||||||
engine = create_engine(url, **options)
|
|
||||||
inspector = reflection.Inspector.from_engine(engine)
|
|
||||||
for schema in inspector.get_schema_names():
|
|
||||||
for table in inspector.get_table_names(schema):
|
|
||||||
columns = inspector.get_columns(table, schema)
|
|
||||||
mce = build_dataset_mce(platform, f'{schema}.{table}', columns)
|
|
||||||
produce_dataset_mce(mce, kafka_config)
|
|
@ -1,3 +0,0 @@
|
|||||||
avro-python3==1.8.2
|
|
||||||
confluent-kafka[avro]>=1.5
|
|
||||||
SQLAlchemy==1.3.17
|
|
@ -1,30 +0,0 @@
|
|||||||
HIVE_SITE_CONF_javax_jdo_option_ConnectionURL=jdbc:postgresql://hive-metastore-postgresql/metastore
|
|
||||||
HIVE_SITE_CONF_javax_jdo_option_ConnectionDriverName=org.postgresql.Driver
|
|
||||||
HIVE_SITE_CONF_javax_jdo_option_ConnectionUserName=hive
|
|
||||||
HIVE_SITE_CONF_javax_jdo_option_ConnectionPassword=hive
|
|
||||||
HIVE_SITE_CONF_datanucleus_autoCreateSchema=false
|
|
||||||
HIVE_SITE_CONF_hive_metastore_uris=thrift://hive-metastore:9083
|
|
||||||
HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false
|
|
||||||
|
|
||||||
CORE_CONF_fs_defaultFS=hdfs://namenode:8020
|
|
||||||
CORE_CONF_hadoop_http_staticuser_user=root
|
|
||||||
CORE_CONF_hadoop_proxyuser_hue_hosts=*
|
|
||||||
CORE_CONF_hadoop_proxyuser_hue_groups=*
|
|
||||||
|
|
||||||
HDFS_CONF_dfs_webhdfs_enabled=true
|
|
||||||
HDFS_CONF_dfs_permissions_enabled=false
|
|
||||||
|
|
||||||
YARN_CONF_yarn_log___aggregation___enable=true
|
|
||||||
YARN_CONF_yarn_resourcemanager_recovery_enabled=true
|
|
||||||
YARN_CONF_yarn_resourcemanager_store_class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
|
|
||||||
YARN_CONF_yarn_resourcemanager_fs_state___store_uri=/rmstate
|
|
||||||
YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs
|
|
||||||
YARN_CONF_yarn_log_server_url=http://historyserver:8188/applicationhistory/logs/
|
|
||||||
YARN_CONF_yarn_timeline___service_enabled=true
|
|
||||||
YARN_CONF_yarn_timeline___service_generic___application___history_enabled=true
|
|
||||||
YARN_CONF_yarn_resourcemanager_system___metrics___publisher_enabled=true
|
|
||||||
YARN_CONF_yarn_resourcemanager_hostname=resourcemanager
|
|
||||||
YARN_CONF_yarn_timeline___service_hostname=historyserver
|
|
||||||
YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032
|
|
||||||
YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030
|
|
||||||
YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031
|
|
@ -1,52 +0,0 @@
|
|||||||
# Based on https://github.com/big-data-europe/docker-hive
|
|
||||||
version: "3"
|
|
||||||
|
|
||||||
services:
|
|
||||||
namenode:
|
|
||||||
image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
|
|
||||||
volumes:
|
|
||||||
- namenode:/hadoop/dfs/name
|
|
||||||
environment:
|
|
||||||
- CLUSTER_NAME=test
|
|
||||||
env_file:
|
|
||||||
- ./hive.env
|
|
||||||
ports:
|
|
||||||
- "50070:50070"
|
|
||||||
datanode:
|
|
||||||
image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
|
|
||||||
volumes:
|
|
||||||
- datanode:/hadoop/dfs/data
|
|
||||||
env_file:
|
|
||||||
- ./hive.env
|
|
||||||
environment:
|
|
||||||
SERVICE_PRECONDITION: "namenode:50070"
|
|
||||||
ports:
|
|
||||||
- "50075:50075"
|
|
||||||
hive-server:
|
|
||||||
image: bde2020/hive:2.3.2-postgresql-metastore
|
|
||||||
env_file:
|
|
||||||
- ./hive.env
|
|
||||||
environment:
|
|
||||||
HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
|
|
||||||
SERVICE_PRECONDITION: "hive-metastore:9083"
|
|
||||||
ports:
|
|
||||||
- "10000:10000"
|
|
||||||
hive-metastore:
|
|
||||||
image: bde2020/hive:2.3.2-postgresql-metastore
|
|
||||||
env_file:
|
|
||||||
- ./hive.env
|
|
||||||
command: /opt/hive/bin/hive --service metastore
|
|
||||||
environment:
|
|
||||||
SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432"
|
|
||||||
ports:
|
|
||||||
- "9083:9083"
|
|
||||||
hive-metastore-postgresql:
|
|
||||||
image: bde2020/hive-metastore-postgresql:2.3.0
|
|
||||||
presto-coordinator:
|
|
||||||
image: shawnzhu/prestodb:0.181
|
|
||||||
ports:
|
|
||||||
- "8080:8080"
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
namenode:
|
|
||||||
datanode:
|
|
@ -1,8 +0,0 @@
|
|||||||
from common import run
|
|
||||||
|
|
||||||
# See https://github.com/dropbox/PyHive for more details
|
|
||||||
URL = '' # e.g. hive://username:password@hostname:port
|
|
||||||
OPTIONS = {} # e.g. {"connect_args": {"configuration": {"hive.exec.reducers.max": "123"}}
|
|
||||||
PLATFORM = 'hive'
|
|
||||||
|
|
||||||
run(URL, OPTIONS, PLATFORM)
|
|
@ -1 +0,0 @@
|
|||||||
pyhive[hive]==0.6.1
|
|
@ -1,12 +0,0 @@
|
|||||||
version: '3.1'
|
|
||||||
|
|
||||||
services:
|
|
||||||
|
|
||||||
postgres:
|
|
||||||
image: mcr.microsoft.com/mssql/server
|
|
||||||
restart: always
|
|
||||||
environment:
|
|
||||||
ACCEPT_EULA: Y
|
|
||||||
SA_PASSWORD: DatahubR0cks
|
|
||||||
ports:
|
|
||||||
- "1433:1433"
|
|
@ -1,8 +0,0 @@
|
|||||||
from common import run
|
|
||||||
|
|
||||||
# See https://github.com/m32/sqlalchemy-tds for more details
|
|
||||||
URL = '' # e.g. mssql+pytds://username:password@hostname:port
|
|
||||||
OPTIONS = {}
|
|
||||||
PLATFORM = 'mssql'
|
|
||||||
|
|
||||||
run(URL, OPTIONS, PLATFORM)
|
|
@ -1 +0,0 @@
|
|||||||
sqlalchemy-pytds==0.3
|
|
@ -1,8 +0,0 @@
|
|||||||
from common import run
|
|
||||||
|
|
||||||
# See https://github.com/PyMySQL/PyMySQL for more details
|
|
||||||
URL = '' # e.g. mysql+pymysql://username:password@hostname:port
|
|
||||||
OPTIONS = {} # e.g. {"encoding": "latin1"}
|
|
||||||
PLATFORM = 'mysql'
|
|
||||||
|
|
||||||
run(URL, OPTIONS, PLATFORM)
|
|
@ -1 +0,0 @@
|
|||||||
PyMySQL==0.9.3
|
|
@ -1,12 +0,0 @@
|
|||||||
version: '3.1'
|
|
||||||
|
|
||||||
services:
|
|
||||||
|
|
||||||
postgres:
|
|
||||||
image: postgres
|
|
||||||
restart: always
|
|
||||||
environment:
|
|
||||||
POSTGRES_USER: datahub
|
|
||||||
POSTGRES_PASSWORD: datahub
|
|
||||||
ports:
|
|
||||||
- "5432:5432"
|
|
@ -1,8 +0,0 @@
|
|||||||
from common import run
|
|
||||||
|
|
||||||
# See https://docs.sqlalchemy.org/en/13/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2 for more details
|
|
||||||
URL = '' # e.g. postgresql+psycopg2://user:password@host:port
|
|
||||||
OPTIONS = {} # e.g. {"client_encoding": "utf8"}
|
|
||||||
PLATFORM = 'postgresql'
|
|
||||||
|
|
||||||
run(URL, OPTIONS, PLATFORM)
|
|
@ -1 +0,0 @@
|
|||||||
psycopg2-binary==2.8.5
|
|
@ -1 +0,0 @@
|
|||||||
snowflake-sqlalchemy==1.2.3
|
|
@ -1,8 +0,0 @@
|
|||||||
from common import run
|
|
||||||
|
|
||||||
# See https://github.com/snowflakedb/snowflake-sqlalchemy for more details
|
|
||||||
URL = '' # e.g. snowflake://<user_login_name>:<password>@<account_name>
|
|
||||||
OPTIONS = {} # e.g. {"connect_args": {"timezone": "America/Los_Angeles"}}
|
|
||||||
PLATFORM = 'snowflake'
|
|
||||||
|
|
||||||
run(URL, OPTIONS, PLATFORM)
|
|
Loading…
x
Reference in New Issue
Block a user