feature(ingest): add bigquery ETL script (#1711)

Also fix minor issues in the common script
This commit is contained in:
Mars Lan 2020-06-25 15:28:13 -07:00 committed by GitHub
parent fa9fe5e110
commit 221c9af220
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 15 additions and 5 deletions

View File

@ -0,0 +1,8 @@
from common import run
# See https://github.com/mxmzdlv/pybigquery/ for more details
URL = '' # e.g. bigquery://project_id
OPTIONS = {} # e.g. {"credentials_path": "/path/to/keyfile.json"}
PLATFORM = 'bigquery'
run(URL, OPTIONS, PLATFORM)

View File

@ -0,0 +1 @@
pybigquery==0.4.15

View File

@ -55,7 +55,7 @@ def build_dataset_mce(platform, dataset_name, columns):
for column in columns:
fields.append({
"fieldPath": column["name"],
"nativeDataType": str(column["type"]),
"nativeDataType": repr(column["type"]),
"type": { "type":get_column_type(column["type"]) }
})
@ -102,7 +102,7 @@ def run(url, options, platform, kafka_config = KafkaConfig()):
engine = create_engine(url, **options)
inspector = reflection.Inspector.from_engine(engine)
for schema in inspector.get_schema_names():
for table in inspector.get_table_names(schema):
columns = inspector.get_columns(table, schema)
mce = build_dataset_mce(platform, f'{schema}.{table}', columns)
produce_dataset_mce(mce, kafka_config)
for table in inspector.get_table_names(schema):
columns = inspector.get_columns(table, schema)
mce = build_dataset_mce(platform, f'{schema}.{table}', columns)
produce_dataset_mce(mce, kafka_config)

View File

@ -1,5 +1,6 @@
from common import run
# See https://github.com/PyMySQL/PyMySQL for more details
URL = '' # e.g. mysql+pymysql://username:password@hostname:port
OPTIONS = {} # e.g. {"encoding": "latin1"}
PLATFORM = 'mysql'