Merge pull request #9 from SunZhaonan/master

Add Hive metadata ETL process
This commit is contained in:
Zhen Chen 2015-12-21 12:11:02 -08:00
commit 348385fa26
13 changed files with 789 additions and 2 deletions

View File

@ -16,6 +16,7 @@ package actors;
import java.util.Properties;
import metadata.etl.EtlJob;
import metadata.etl.dataset.hdfs.HdfsMetadataEtl;
import metadata.etl.dataset.hive.HiveMetadataEtl;
import metadata.etl.dataset.teradata.TeradataMetadataEtl;
import metadata.etl.git.GitMetadataEtl;
import metadata.etl.lineage.AzLineageMetadataEtl;
@ -49,6 +50,8 @@ public class EtlJobFactory {
return new LdapEtl(refId, whExecId, properties);
case GIT_MEDATA_ETL:
return new GitMetadataEtl(refId, whExecId, properties);
case HIVE_DATASET_METADATA_ETL:
return new HiveMetadataEtl(refId, whExecId, properties);
default:
throw new UnsupportedOperationException("Unsupported job type: " + etlJobName);
}

View File

@ -25,6 +25,7 @@ public enum EtlJobName {
HADOOP_DATASET_OWNER_ETL(EtlType.OWNER, RefIdType.DB),
LDAP_USER_ETL(EtlType.LDAP, RefIdType.APP),
GIT_MEDATA_ETL(EtlType.VCS, RefIdType.APP),
HIVE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
;
EtlType etlType;

View File

@ -69,5 +69,7 @@ subprojects {
"play" : "com.typesafe.play:play_2.10:2.2.4",
"play_ebean" : "com.typesafe.play:play-java-ebean_2.10:2.2.4",
"play_java_jdbc" : "com.typesafe.play:play-java-jdbc_2.10:2.2.4",
"play_cache" : "com.typesafe.play:play-cache_2.10:2.2.4"]
"play_cache" : "com.typesafe.play:play-cache_2.10:2.2.4",
"hive_exec" : "org.apache.hive:hive-exec:1.2.1"
]
}

View File

@ -22,6 +22,7 @@ dependencies {
compile externalDependency.akka
compile externalDependency.slf4j_api
compile externalDependency.slf4j_log4j
compile externalDependency.hive_exec
compile files("extralibs/terajdbc4-15.00.00.20.jar")
compile files("extralibs/tdgssconfig-15.00.00.20.jar")
// compile externalDependency.jython

View File

@ -0,0 +1,50 @@
package metadata.etl.dataset.hive;
import java.io.InputStream;
import java.util.Properties;
import metadata.etl.EtlJob;
/**
* Created by zsun on 11/16/15.
*/
public class HiveMetadataEtl extends EtlJob {
@Deprecated
public HiveMetadataEtl(int dbId, long whExecId) {
super(null, dbId, whExecId);
}
public HiveMetadataEtl(int dbId, long whExecId, Properties prop) {
super(null, dbId, whExecId, prop);
}
@Override
public void extract()
throws Exception {
logger.info("In Hive metadata ETL, launch extract jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/HiveExtract.py");
//logger.info("before call scripts " + interpreter.getSystemState().argv);
interpreter.execfile(inputStream);
inputStream.close();
}
@Override
public void transform()
throws Exception {
logger.info("In Hive metadata ETL, launch transform jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/HiveTransform.py");
interpreter.execfile(inputStream);
inputStream.close();
}
@Override
public void load()
throws Exception {
logger.info("In Hive metadata ETL, launch load jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/HiveLoad.py");
interpreter.execfile(inputStream);
inputStream.close();
}
}

View File

@ -0,0 +1,30 @@
package metadata.etl.dataset.hive;
import java.util.TreeSet;
import org.apache.hadoop.hive.ql.tools.LineageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by zsun on 12/14/15.
*/
public class HiveViewDependency {
final Logger logger = LoggerFactory.getLogger(getClass());
LineageInfo lineageInfoTool;
public HiveViewDependency() {
lineageInfoTool = new LineageInfo();
}
public String[] getViewDependency(String hiveQl) {
try {
lineageInfoTool.getLineageInfo(hiveQl);
TreeSet<String> inputs = lineageInfoTool.getInputTableList();
return inputs.toArray(new String[inputs.size()]);
} catch (Exception e) {
logger.error("Sql statements : \n" + hiveQl + "\n parse ERROR, will return an empty String array");
logger.error(String.valueOf(e.getCause()));
return new String[]{};
}
}
}

View File

@ -106,3 +106,13 @@ ldap.group.search.return.attributes=
git.host=
git.project.whitelist=
# hive metastore
hive.metastore.jdbc.url=
hive.metastore.jdbc.driver=
hive.metstore.username=
hive.metastore.password=
hive.schema_json_file=
#hive.sample_csv=
hive.schema_csv_file=
hive.field_metadata=

View File

@ -30,7 +30,7 @@ class HdfsLoad:
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_dict_dataset
FIELDS TERMINATED BY '\Z' ESCAPED BY '\0'
FIELDS TERMINATED BY '\Z' ESCAPED BY '\\'
(`name`, `schema`, properties, fields, urn, source, sample_partition_full_path, source_created_time, source_modified_time)
SET db_id = {db_id},
-- TODO storage_type = 'Avro',

View File

@ -0,0 +1,310 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from com.ziclix.python.sql import zxJDBC
import sys, os, re, json
import datetime
from wherehows.common import Constant
class TableInfo:
""" Class to define the variable name """
table_name = 'name'
type = 'type'
serialization_format = 'serialization_format'
create_time = 'create_time'
schema_url = 'schema_url'
field_delimiter = 'field_delimiter'
db_id = 'DB_ID'
table_id = 'TBL_ID'
serde_id = 'SD_ID'
table_type = 'tbl_type'
location = 'location'
view_expended_text = 'view_expanded_text'
input_format = 'input_format'
output_format = 'output_format'
is_compressed = 'is_compressed'
is_storedassubdirectories = 'is_storedassubdirectories'
etl_source = 'etl_source'
field_list = 'field_list'
schema_literal = 'schema_literal'
optional_prop = [create_time, serialization_format, field_delimiter, schema_url, db_id, table_id, serde_id,
table_type, location, view_expended_text, input_format, output_format, is_compressed,
is_storedassubdirectories, etl_source]
class HiveExtract:
"""
Extract hive metadata from hive metastore. store it in a json file
"""
conn_hms = None
db_dict = {} # name : index
table_dict = {} # fullname : index
serde_param_columns = []
def get_table_info_from_v2(self, database_name):
"""
get table, column info from table columns_v2
:param database_name:
:return: (DB_NAME, TBL_NAME, SERDE_FORMAT, TBL_CREATE_TIME, INTEGER_IDX, COLUMN_NAME, TYPE_NAME, COMMENT
DB_ID, TBL_ID, SD_ID, LOCATION, VIEW_EXPANDED_TEXT, TBL_TYPE, INPUT_FORMAT)
"""
curs = self.conn_hms.cursor()
tbl_info_sql = """select d.NAME DB_NAME, t.TBL_NAME TBL_NAME,
case when s.INPUT_FORMAT like '%.TextInput%' then 'Text'
when s.INPUT_FORMAT like '%.Avro%' then 'Avro'
when s.INPUT_FORMAT like '%.RCFile%' then 'RC'
when s.INPUT_FORMAT like '%.Orc%' then 'ORC'
when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence'
when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet'
else s.INPUT_FORMAT
end SerializationFormat,
t.CREATE_TIME TableCreateTime,
t.DB_ID, t.TBL_ID, s.SD_ID,
substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location,
t.TBL_TYPE, t.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES,
c.INTEGER_IDX, c.COLUMN_NAME, c.TYPE_NAME, c.COMMENT
from TBLS t join DBS d on t.DB_ID=d.DB_ID
join SDS s on t.SD_ID = s.SD_ID
join COLUMNS_V2 c on s.CD_ID = c.CD_ID
where
d.NAME in ('{db_name}')
order by 1,2
""".format(db_name=database_name)
curs.execute(tbl_info_sql)
rows = curs.fetchall()
curs.close()
return rows
def get_table_info_from_serde_params(self, database_name):
"""
get table, column info {MANAGED and EXTERNAL} from avro schema parameter
:param database_name:
:return: (DatabaseName, TableName, SerializationFormat, Create_Time, SchemaLiteral, SchemaUrl, FieldDelimiter, DB_ID, TBL_ID, SD_ID
TBL_TYPE, INPUT_FORMAT, OUTPUT_FORMAT, IS_COMPRESSED, IS_STOREDASSUBDIRECTORIES, LOCATION, VIEW_EXPANDED_TEXT)
"""
curs_et = self.conn_hms.cursor()
tbl_info_sql = """select d.NAME DatabaseName, et.TBL_NAME TableName,
case when s.INPUT_FORMAT like '%.TextInput%' then 'Text'
when s.INPUT_FORMAT like '%.Avro%' then 'Avro'
when s.INPUT_FORMAT like '%.RCFile%' then 'RC'
when s.INPUT_FORMAT like '%.Orc%' then 'ORC'
when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence'
when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet'
else s.INPUT_FORMAT
end SerializationFormat,
et.CREATE_TIME TableCreateTime, et.DB_ID, et.TBL_ID, s.SD_ID,
substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location,
et.TBL_TYPE, et.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES,
et.schema_literal SchemaLiteral, et.schema_url SchemaUrl, et.field_delim FieldDelimiter
from (
select t.DB_ID, t.TBL_ID, sp.SERDE_ID,
t.TBL_NAME, t.CREATE_TIME, t.TBL_TYPE, t.VIEW_EXPANDED_TEXT,
replace(max( case when param_key in ('avro.schema.literal', 'schema.literal')
then param_value
end), '\\n', ' ') schema_literal,
max( case when param_key in ('avro.schema.url', 'schema.url')
then param_value
end) schema_url,
max( case when param_key in ('field.delim')
then param_value
end) field_delim
from SERDE_PARAMS sp join TBLS t on sp.SERDE_ID = t.SD_ID
where sp.PARAM_KEY regexp 'schema.literal|schema.url|field.delim'
and sp.PARAM_VALUE regexp """ + r" '^(,|{|\\\\|\\|)' " + """
group by 1,2,3,4,5 ) et
JOIN DBS d on et.DB_ID = d.DB_ID
JOIN SDS s on et.SERDE_ID = s.SD_ID
where d.NAME = '{db_name}'
order by 1,2 """.format(db_name=database_name)
curs_et.execute(tbl_info_sql)
rows = curs_et.fetchall()
curs_et.close()
return rows
def format_table_metadata_v2(self, rows, schema):
"""
process info get from COLUMN_V2 into final table, several lines form one table info
:param rows: the info get from COLUMN_V2 table
:param schema: {database : _, type : _, tables : [{}, {} ...] }
:return:
"""
db_idx = len(schema) - 1
table_idx = -1
previous_db_name = ''
previous_tb_name = ''
field_list = []
for row_index, row_value in enumerate(rows):
if row_index != len(rows) - 1 and (row_index == 0 or (
row_index != len(rows) - 1 and row_value[0] == previous_db_name and row_value[1] == previous_tb_name)):
field_list.append(
{'IntegerIndex': row_value[14], 'ColumnName': row_value[15], 'TypeName': row_value[16], 'Comment': row_value[17]})
else: # add new record into result
if row_index == len(rows) - 1: # edge case for last row
field_list.append({'IntegerIndex': row_value[14], 'ColumnName': row_value[15], 'TypeName': row_value[16],
'Comment': row_value[17]})
table_record = {TableInfo.table_name: row_value[1], TableInfo.type: 'Table', TableInfo.serialization_format: row_value[2],
TableInfo.create_time: row_value[3], TableInfo.db_id: row_value[4], TableInfo.table_id: row_value[5],
TableInfo.serde_id: row_value[6], TableInfo.location: row_value[7], TableInfo.table_type: row_value[8],
TableInfo.view_expended_text: row_value[9], TableInfo.input_format: row_value[10], TableInfo.output_format: row_value[11],
TableInfo.is_compressed: row_value[12], TableInfo.is_storedassubdirectories: row_value[13],
TableInfo.etl_source: 'COLUMN_V2',
TableInfo.field_list: field_list}
field_list = []
if row_value[0] not in self.db_dict:
schema.append({'database': row_value[0], 'type': 'Hive', 'tables': []})
db_idx += 1
self.db_dict[row_value[0]] = db_idx
full_name = ''
if row_value[0]:
full_name = row_value[0]
if row_value[1]:
full_name += '.' + row_value[1]
elif row_value[1]:
full_name = row_value[1]
# put in schema result
if full_name not in self.table_dict:
schema[db_idx]['tables'].append(table_record)
table_idx += 1
self.table_dict[full_name] = table_idx
# print "%6d: %s: %s" % (table_idx, full_name, str(schema[db_idx]['tables'][table_idx]))
previous_db_name = row_value[0]
previous_tb_name = row_value[1]
print "%s %6d tables processed for database %12s from COLUMN_V2" % (
datetime.datetime.now(), table_idx + 1, row_value[0])
def format_table_metadata_serde(self, rows, schema):
"""
add table info from rows into schema
also add extra info.
:param rows: DatabaseName, TableName, SerializationFormat, Create_Time, SchemaLiteral, SchemaUrl, FieldDelimiter, DB_ID, TBL_ID, SD_ID
:param schema: {database : _, type : _, tables : ['name' : _, ... '' : _] }
:return:
"""
db_idx = len(schema) - 1
table_idx = -1
for row_value in rows:
if row_value[0] not in self.db_dict:
schema.append({'database': row_value[0], 'type': 'Hive', 'tables': []})
db_idx += 1
self.db_dict[row_value[0]] = db_idx
else:
db_idx = self.db_dict[row_value[0]]
full_name = ''
if row_value[0]:
full_name = row_value[0]
if row_value[1]:
full_name += '.' + row_value[1]
elif row_value[1]:
full_name = row_value[1]
# put in schema result
if full_name not in self.table_dict:
schema[db_idx]['tables'].append(
{TableInfo.table_name: row_value[1], TableInfo.type: 'Table', TableInfo.serialization_format: row_value[2],
TableInfo.create_time: row_value[3], TableInfo.db_id: row_value[4], TableInfo.table_id: row_value[5],
TableInfo.serde_id: row_value[6], TableInfo.location: row_value[7], TableInfo.table_type: row_value[8],
TableInfo.view_expended_text: row_value[9], TableInfo.input_format: row_value[10],
TableInfo.output_format: row_value[11], TableInfo.is_compressed: row_value[12],
TableInfo.is_storedassubdirectories: row_value[13],
TableInfo.etl_source: 'SERDE_PARAMS',
TableInfo.schema_literal: row_value[14], TableInfo.schema_url: row_value[15],
TableInfo.field_delimiter: row_value[16]}
)
table_idx += 1
self.table_dict[full_name] = table_idx
print "%s %6d tables processed for database %12s from SERDE_PARAM" % (
datetime.datetime.now(), table_idx + 1, row_value[0])
def run(self, schema_output_file, sample_output_file):
"""
The entrance of the class, extract schema.
One database per time
:param schema_output_file: output file
:return:
"""
cur = self.conn_hms.cursor()
schema = []
schema_json_file = open(schema_output_file, 'wb')
os.chmod(schema_output_file, 0666)
# open(sample_output_file, 'wb')
# os.chmod(sample_output_file, 0666)
# sample_file_writer = FileWriter(sample_output_file)
for database_name in self.databases:
print "Collecting hive tables in database : " + database_name
# tables from schemaLiteral
rows = []
begin = datetime.datetime.now().strftime("%H:%M:%S")
rows.extend(self.get_table_info_from_serde_params(database_name))
if len(rows) > 0:
self.format_table_metadata_serde(rows, schema)
end = datetime.datetime.now().strftime("%H:%M:%S")
print "Get table info from Serde %12s [%s -> %s]\n" % (database_name, str(begin), str(end))
# tables from Column V2
rows = []
begin = datetime.datetime.now().strftime("%H:%M:%S")
rows.extend(self.get_table_info_from_v2(database_name))
if len(rows) > 0:
self.format_table_metadata_v2(rows, schema)
end = datetime.datetime.now().strftime("%H:%M:%S")
print "Get table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end))
schema_json_file.write(json.dumps(schema, indent=None) + '\n')
cur.close()
schema_json_file.close()
def get_all_databases(self):
"""
Fetch all databases name from DBS table
:return:
"""
fetch_all_database_names = "SELECT `NAME` FROM DBS WHERE NAME NOT LIKE 'u_%'"
curs = self.conn_hms.cursor()
curs.execute(fetch_all_database_names)
rows = [item[0] for item in curs.fetchall()]
curs.close()
return rows
if __name__ == "__main__":
args = sys.argv[1]
# connection
username = args[Constant.HIVE_METASTORE_USERNAME]
password = args[Constant.HIVE_METASTORE_PASSWORD]
jdbc_driver = args[Constant.HIVE_METASTORE_JDBC_DRIVER]
jdbc_url = args[Constant.HIVE_METASTORE_JDBC_URL]
e = HiveExtract()
e.conn_hms = zxJDBC.connect(jdbc_url, username, password, jdbc_driver)
try:
e.databases = e.get_all_databases()
print 'Process databases : '
print e.databases
e.run(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], None)
finally:
e.conn_hms.close()

View File

@ -0,0 +1,165 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
import sys
from com.ziclix.python.sql import zxJDBC
from wherehows.common import Constant
class HiveLoad:
def load_metadata(self):
cursor = self.conn_mysql.cursor()
load_cmd = """
DELETE FROM stg_dict_dataset WHERE db_id = {db_id};
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_dict_dataset
FIELDS TERMINATED BY '\Z' ESCAPED BY '\0'
(`name`, `schema`, properties, fields, urn, source, @sample_partition_full_path, source_created_time, @source_modified_time)
SET db_id = {db_id},
storage_type = 'Table', dataset_type = 'hive',
source_modified_time=nullif(@source_modified_time,''),
sample_partition_full_path=nullif(@sample_partition_full_path,''),
wh_etl_exec_id = {wh_etl_exec_id};
-- SELECT COUNT(*) FROM stg_dict_dataset;
-- clear
DELETE FROM stg_dict_dataset where db_id = {db_id}
AND (length(`name`)) = 0
OR `name` like 'tmp\_%'
OR `name` like 't\_%'
;
update stg_dict_dataset
set location_prefix = substring_index(substring_index(urn, '/', 4), '/', -2) /* hive location_prefix is it's schema name*/
WHERE db_id = {db_id} and location_prefix is null;
update stg_dict_dataset
set parent_name = substring_index(substring_index(urn, '/', 4), '/', -1) /* hive parent_name is it's schema name*/
where db_id = {db_id} and parent_name is null;
-- insert into final table
INSERT INTO dict_dataset
( `name`,
`schema`,
schema_type,
fields,
properties,
urn,
source,
location_prefix,
parent_name,
storage_type,
ref_dataset_id,
status_id,
dataset_type,
hive_serdes_class,
is_partitioned,
partition_layout_pattern_id,
sample_partition_full_path,
source_created_time,
source_modified_time,
created_time,
wh_etl_exec_id
)
select s.name, s.schema, s.schema_type, s.fields,
s.properties, s.urn,
s.source, s.location_prefix, s.parent_name,
s.storage_type, s.ref_dataset_id, s.status_id,
s.dataset_type, s.hive_serdes_class, s.is_partitioned,
s.partition_layout_pattern_id, s.sample_partition_full_path,
s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()),
s.wh_etl_exec_id
from stg_dict_dataset s
where s.db_id = {db_id}
on duplicate key update
`name`=s.name, `schema`=s.schema, schema_type=s.schema_type, fields=s.fields,
properties=s.properties, source=s.source, location_prefix=s.location_prefix, parent_name=s.parent_name,
storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id,
dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned,
partition_layout_pattern_id=s.partition_layout_pattern_id, sample_partition_full_path=s.sample_partition_full_path,
source_created_time=s.source_created_time, source_modified_time=s.source_modified_time,
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
;
analyze table dict_dataset;
""".format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"):
print state
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
def load_field(self):
"""
TODO: Load field is not used for now, as we need to open the nested structure type
:return:
"""
cursor = self.conn_mysql.cursor()
load_cmd = """
DELETE FROM stg_dict_field_detail where db_id = {db_id};
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_dict_field_detail
FIELDS TERMINATED BY '\Z'
(urn, sort_id, parent_sort_id, @parent_path, field_name, field_label, data_type,
@data_size, @precision, @scale, @is_nullable, @is_indexed, @is_partitioned, @default_value, @namespace, description,
@dummy
)
set
parent_path=nullif(@parent_path,'null')
, data_size=nullif(@data_size,'null')
, data_precision=nullif(@precision,'null')
, data_scale=nullif(@scale,'null')
, is_nullable=nullif(@is_nullable,'null')
, is_indexed=nullif(@is_indexed,'null')
, is_partitioned=nullif(@is_partitioned,'null')
, default_value=nullif(@default_value,'null')
, namespace=nullif(@namespace,'null')
, db_id = {db_id}
;
analyze table stg_dict_field_detail;
""".format(source_file=self.input_field_file, db_id=self.db_id)
# didn't load into final table for now
for state in load_cmd.split(";"):
print state
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
if __name__ == "__main__":
args = sys.argv[1]
l = HiveLoad()
# set up connection
username = args[Constant.WH_DB_USERNAME_KEY]
password = args[Constant.WH_DB_PASSWORD_KEY]
JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY]
JDBC_URL = args[Constant.WH_DB_URL_KEY]
l.input_schema_file = args[Constant.HIVE_SCHEMA_CSV_FILE_KEY]
l.input_field_file = args[Constant.HIVE_FIELD_METADATA_KEY]
l.db_id = args[Constant.DB_ID_KEY]
l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
try:
l.load_metadata()
# l.load_field()
finally:
l.conn_mysql.close()

View File

@ -0,0 +1,145 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
import json
import pprint, datetime
import sys, os
import time
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord
from wherehows.common import Constant
from HiveExtract import TableInfo
from org.apache.hadoop.hive.ql.tools import LineageInfo
from metadata.etl.dataset.hive import HiveViewDependency
class HiveTransform:
def transform(self, input, hive_metadata, hive_field_metadata):
"""
convert from json to csv
:param input: input json file
:param hive_metadata: output data file for hive table metadata
:param hive_field_metadata: output data file for hive field metadata
:return:
"""
pp = pprint.PrettyPrinter(indent=1)
f_json = open(input)
all_data = json.load(f_json)
f_json.close()
schema_file_writer = FileWriter(hive_metadata)
field_file_writer = FileWriter(hive_field_metadata)
lineageInfo = LineageInfo()
# one db info : 'type', 'database', 'tables'
# one table info : required : 'name' , 'type', 'serializationFormat' ,'createTime', 'DB_ID', 'TBL_ID', 'SD_ID'
# optional : 'schemaLiteral', 'schemaUrl', 'fieldDelimiter', 'fieldList'
for one_db_info in all_data:
i = 0
for table in one_db_info['tables']:
i += 1
schema_json = {}
prop_json = {} # set the prop json
for prop_name in TableInfo.optional_prop:
if prop_name in table and table[prop_name] is not None:
prop_json[prop_name] = table[prop_name]
if TableInfo.view_expended_text in prop_json:
text = prop_json[TableInfo.view_expended_text].replace('`', '')
array = HiveViewDependency.getViewDependency(text)
l = []
for a in array:
l.append(a)
prop_json['view_depends_on'] = l
# process either schema
flds = {}
field_detail_list = []
if TableInfo.schema_literal in table and table[TableInfo.schema_literal] is not None:
sort_id = 0
try:
schema_data = json.loads(table[TableInfo.schema_literal])
except ValueError:
print "Schema json error for table : "
print table
schema_json = schema_data
# process each field
for field in schema_data['fields']:
field_name = field['name']
type = field['type'] # could be a list
default_value = field['default'] if 'default' in field else None
doc = field['doc'] if 'doc' in field else None
attributes_json = json.loads(field['attributes_json']) if 'attributes_json' in field else None
pk = delta = is_nullable = is_indexed = is_partitioned = inside_type = format = data_size = None
if attributes_json:
pk = attributes_json['pk'] if 'pk' in attributes_json else None
delta = attributes_json['delta'] if 'delta' in attributes_json else None
is_nullable = attributes_json['nullable'] if 'nullable' in attributes_json else None
inside_type = attributes_json['type'] if 'type' in attributes_json else None
format = attributes_json['format'] if 'format' in attributes_json else None
flds[field_name] = {'type': type}
# String urn, Integer sortId, Integer parentSortId, String parentPath, String fieldName,
#String dataType, String isNullable, String defaultValue, Integer dataSize, String namespace, String description
sort_id += 1
field_detail_list.append(
["hive:///%s/%s" % (one_db_info['database'], table['name']), str(sort_id), '0', None, field_name, '',
type, data_size, None, None, is_nullable, is_indexed, is_partitioned, default_value, None,
json.dumps(attributes_json)])
elif TableInfo.field_list in table:
schema_json = {'type': 'record', 'name': table['name'],
'fields': table[TableInfo.field_list]} # construct a schema for data came from COLUMN_V2
for field in table[TableInfo.field_list]:
field_name = field['ColumnName']
type = field['TypeName']
# ColumnName, IntegerIndex, TypeName, Comment
flds[field_name] = {'type': type}
pk = delta = is_nullable = is_indexed = is_partitioned = inside_type = format = data_size = default_value = None # TODO ingest
field_detail_list.append(
["hive:///%s/%s" % (one_db_info['database'], table['name']), field['IntegerIndex'], '0', None, field_name,
'', field['TypeName'], None, None, None, is_nullable, is_indexed, is_partitioned, default_value, None,
None])
dataset_scehma_record = DatasetSchemaRecord(table['name'], json.dumps(schema_json), json.dumps(prop_json),
json.dumps(flds),
"hive:///%s/%s" % (one_db_info['database'], table['name']), 'Hive',
'', (table[TableInfo.create_time] if table.has_key(
TableInfo.create_time) else None), (table["lastAlterTime"]) if table.has_key("lastAlterTime") else None)
schema_file_writer.append(dataset_scehma_record)
for fields in field_detail_list:
field_record = DatasetFieldRecord(fields)
field_file_writer.append(field_record)
schema_file_writer.flush()
field_file_writer.flush()
print "%20s contains %6d tables" % (one_db_info['database'], i)
schema_file_writer.close()
field_file_writer.close()
def convert_timestamp(self, time_string):
return int(time.mktime(time.strptime(time_string, "%Y-%m-%d %H:%M:%S")))
if __name__ == "__main__":
args = sys.argv[1]
t = HiveTransform()
t.transform(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], args[Constant.HIVE_SCHEMA_CSV_FILE_KEY], args[Constant.HIVE_FIELD_METADATA_KEY])

View File

@ -0,0 +1,48 @@
package metadata.etl.dataset.hive;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
/**
* Created by zsun on 11/13/15.
*/
public class HiveTest {
HiveMetadataEtl hm;
@BeforeTest
public void setUp()
throws Exception {
hm = new HiveMetadataEtl(0, 0L);
}
@Test
public void extractTest()
throws Exception {
hm.extract();
// check the json file
}
@Test
public void transformTest()
throws Exception {
hm.transform();
// check the csv file
}
@Test
public void loadTest()
throws Exception {
hm.load();
// check in database
}
@Test
public void runTest()
throws Exception {
extractTest();
transformTest();
loadTest();
}
}

View File

@ -0,0 +1,22 @@
package metadata.etl.dataset.hive;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.testng.Assert;
import org.testng.annotations.Test;
/**
* Created by zsun on 12/9/15.
*/
public class HiveViewDependencyParserTest {
@Test
public void parseTest()
throws CommandNeedRetryException, SemanticException, ParseException {
String hiveQl = "select t1.c2 from (select t2.column2 c2, t3.column3 from db1.table2 t2 join db2.table3 t3 on t2.x = t3.y) t1";
HiveViewDependency hiveViewDependency = new HiveViewDependency();
String[] result = hiveViewDependency.getViewDependency(hiveQl);
String[] expctedResult = new String[]{"db1.table2", "db2.table3"};
Assert.assertEquals(expctedResult, result);
}
}