mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-13 18:04:55 +00:00
Merge pull request #138 from jerrybai2009/master
Dali Metadata integration - combine dali versions into one node
This commit is contained in:
commit
96e0522e4a
@ -13,6 +13,8 @@
|
||||
#
|
||||
|
||||
from org.slf4j import LoggerFactory
|
||||
from wherehows.common.writers import FileWriter
|
||||
from wherehows.common.schemas import DatasetInstanceRecord
|
||||
from com.ziclix.python.sql import zxJDBC
|
||||
import sys, os, re, json
|
||||
import datetime
|
||||
@ -46,7 +48,6 @@ class TableInfo:
|
||||
table_type, location, view_expended_text, input_format, output_format, is_compressed,
|
||||
is_storedassubdirectories, etl_source]
|
||||
|
||||
|
||||
class HiveExtract:
|
||||
"""
|
||||
Extract hive metadata from hive metastore. store it in a json file
|
||||
@ -54,12 +55,14 @@ class HiveExtract:
|
||||
conn_hms = None
|
||||
db_dict = {} # name : index
|
||||
table_dict = {} # fullname : index
|
||||
dataset_dict = {} # name : index
|
||||
instance_dict = {} # name : index
|
||||
serde_param_columns = []
|
||||
|
||||
def __init__(self):
|
||||
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
|
||||
|
||||
def get_table_info_from_v2(self, database_name):
|
||||
def get_table_info_from_v2(self, database_name, is_dali=False):
|
||||
"""
|
||||
get table, column info from table columns_v2
|
||||
:param database_name:
|
||||
@ -68,27 +71,66 @@ class HiveExtract:
|
||||
13IS_COMPRESSED, 14 IS_STOREDASSUBDIRECTORIES, 15 INTEGER_IDX, 16 COLUMN_NAME, 17 TYPE_NAME, 18 COMMENT)
|
||||
"""
|
||||
curs = self.conn_hms.cursor()
|
||||
tbl_info_sql = """select d.NAME DB_NAME, t.TBL_NAME TBL_NAME,
|
||||
case when s.INPUT_FORMAT like '%.TextInput%' then 'Text'
|
||||
when s.INPUT_FORMAT like '%.Avro%' then 'Avro'
|
||||
when s.INPUT_FORMAT like '%.RCFile%' then 'RC'
|
||||
when s.INPUT_FORMAT like '%.Orc%' then 'ORC'
|
||||
when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence'
|
||||
when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet'
|
||||
else s.INPUT_FORMAT
|
||||
end SerializationFormat,
|
||||
t.CREATE_TIME TableCreateTime,
|
||||
t.DB_ID, t.TBL_ID, s.SD_ID,
|
||||
substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location,
|
||||
t.TBL_TYPE, t.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES,
|
||||
c.INTEGER_IDX, c.COLUMN_NAME, c.TYPE_NAME, c.COMMENT
|
||||
from TBLS t join DBS d on t.DB_ID=d.DB_ID
|
||||
join SDS s on t.SD_ID = s.SD_ID
|
||||
join COLUMNS_V2 c on s.CD_ID = c.CD_ID
|
||||
where
|
||||
d.NAME in ('{db_name}')
|
||||
order by 1,2
|
||||
""".format(db_name=database_name)
|
||||
if is_dali:
|
||||
tbl_info_sql = """select d.NAME DB_NAME, t.TBL_NAME TBL_NAME,
|
||||
case when s.INPUT_FORMAT like '%.TextInput%' then 'Text'
|
||||
when s.INPUT_FORMAT like '%.Avro%' then 'Avro'
|
||||
when s.INPUT_FORMAT like '%.RCFile%' then 'RC'
|
||||
when s.INPUT_FORMAT like '%.Orc%' then 'ORC'
|
||||
when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence'
|
||||
when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet'
|
||||
else s.INPUT_FORMAT
|
||||
end SerializationFormat,
|
||||
t.CREATE_TIME TableCreateTime,
|
||||
t.DB_ID, t.TBL_ID, s.SD_ID,
|
||||
substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location,
|
||||
t.TBL_TYPE, t.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES,
|
||||
c.INTEGER_IDX, c.COLUMN_NAME, c.TYPE_NAME, c.COMMENT,
|
||||
case when t.TBL_NAME regexp '_[0-9]+_[0-9]+_[0-9]+$'
|
||||
then concat(substring(t.TBL_NAME, 1, length(t.TBL_NAME) - length(substring_index(t.TBL_NAME, '_', -3)) - 1),'_{version}')
|
||||
else t.TBL_NAME
|
||||
end dataset_name,
|
||||
case when t.TBL_NAME regexp '_[0-9]+_[0-9]+_[0-9]+$'
|
||||
then replace(substring_index(t.TBL_NAME, '_', -3), '_', '.')
|
||||
end version, 'Dalids' TYPE, 'View' storage_type, concat(d.NAME, '.', t.TBL_NAME) native_name,
|
||||
case when t.TBL_NAME regexp '_[0-9]+_[0-9]+_[0-9]+$'
|
||||
then substring(t.TBL_NAME, 1, length(t.TBL_NAME) - length(substring_index(t.TBL_NAME, '_', -3)) - 1)
|
||||
else t.TBL_NAME
|
||||
end logical_name, unix_timestamp(now()) created_time, concat('dalids:///', d.NAME, '/', t.TBL_NAME) dataset_urn
|
||||
from TBLS t join DBS d on t.DB_ID=d.DB_ID
|
||||
join SDS s on t.SD_ID = s.SD_ID
|
||||
join COLUMNS_V2 c on s.CD_ID = c.CD_ID
|
||||
where
|
||||
d.NAME in ('{db_name}') and (d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and d.NAME not like 'dalitest%' and t.TBL_TYPE = 'VIRTUAL_VIEW'
|
||||
order by DB_NAME, dataset_name, version DESC
|
||||
""".format(version='{version}', db_name=database_name)
|
||||
else:
|
||||
tbl_info_sql = """select d.NAME DB_NAME, t.TBL_NAME TBL_NAME,
|
||||
case when s.INPUT_FORMAT like '%.TextInput%' then 'Text'
|
||||
when s.INPUT_FORMAT like '%.Avro%' then 'Avro'
|
||||
when s.INPUT_FORMAT like '%.RCFile%' then 'RC'
|
||||
when s.INPUT_FORMAT like '%.Orc%' then 'ORC'
|
||||
when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence'
|
||||
when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet'
|
||||
else s.INPUT_FORMAT
|
||||
end SerializationFormat,
|
||||
t.CREATE_TIME TableCreateTime,
|
||||
t.DB_ID, t.TBL_ID, s.SD_ID,
|
||||
substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location,
|
||||
t.TBL_TYPE, t.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES,
|
||||
c.INTEGER_IDX, c.COLUMN_NAME, c.TYPE_NAME, c.COMMENT, t.TBL_NAME dataset_name, 0 version, 'Hive' TYPE,
|
||||
case when LOCATE('view', LOWER(t.TBL_TYPE)) > 0 then 'View'
|
||||
when LOCATE('index', LOWER(t.TBL_TYPE)) > 0 then 'Index'
|
||||
else 'Table'
|
||||
end storage_type, concat(d.NAME, '.', t.TBL_NAME) native_name, t.TBL_NAME logical_name,
|
||||
unix_timestamp(now()) created_time, concat('hive:///', d.NAME, '/', t.TBL_NAME) dataset_urn
|
||||
from TBLS t join DBS d on t.DB_ID=d.DB_ID
|
||||
join SDS s on t.SD_ID = s.SD_ID
|
||||
join COLUMNS_V2 c on s.CD_ID = c.CD_ID
|
||||
where
|
||||
d.NAME in ('{db_name}') and not ((d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and t.TBL_TYPE = 'VIRTUAL_VIEW')
|
||||
order by 1,2
|
||||
""".format(db_name=database_name)
|
||||
curs.execute(tbl_info_sql)
|
||||
rows = curs.fetchall()
|
||||
curs.close()
|
||||
@ -148,6 +190,8 @@ class HiveExtract:
|
||||
"""
|
||||
db_idx = len(schema) - 1
|
||||
table_idx = -1
|
||||
dataset_idx = -1
|
||||
dataset_urn_idx = -1
|
||||
|
||||
field_list = []
|
||||
for row_index, row_value in enumerate(rows):
|
||||
@ -157,16 +201,44 @@ class HiveExtract:
|
||||
# sort the field_list by IntegerIndex
|
||||
field_list = sorted(field_list, key=lambda k: k['IntegerIndex'])
|
||||
# process the record of table
|
||||
table_record = {TableInfo.table_name: row_value[1], TableInfo.type: 'Table', TableInfo.serialization_format: row_value[2],
|
||||
|
||||
if row_value[20].lower() == 'dalids':
|
||||
urn = 'dalids:///' + row_value[0] + '/' + row_value[18]
|
||||
instance_record = DatasetInstanceRecord(row_value[25],
|
||||
long(self.db_id),
|
||||
'grid',
|
||||
'eat1',
|
||||
'eat1-nertz',
|
||||
'*',
|
||||
0,
|
||||
row_value[22],
|
||||
row_value[23],
|
||||
str(row_value[19]),
|
||||
long(row_value[3]),
|
||||
long(row_value[24]),
|
||||
self.wh_exec_id,
|
||||
'dalids:///' + row_value[0] + '/' + row_value[18])
|
||||
self.instance_writer.append(instance_record)
|
||||
dataset_urn_idx += 1
|
||||
self.instance_dict[row_value[25]] = dataset_urn_idx
|
||||
else:
|
||||
urn = 'hive:///' + row_value[0] + '/' + row_value[18]
|
||||
|
||||
if urn in self.dataset_dict:
|
||||
continue
|
||||
|
||||
table_record = {TableInfo.table_name: row_value[18], TableInfo.type: row_value[21], TableInfo.serialization_format: row_value[2],
|
||||
TableInfo.create_time: row_value[3], TableInfo.db_id: row_value[4], TableInfo.table_id: row_value[5],
|
||||
TableInfo.serde_id: row_value[6], TableInfo.location: row_value[7], TableInfo.table_type: row_value[8],
|
||||
TableInfo.view_expended_text: row_value[9], TableInfo.input_format: row_value[10], TableInfo.output_format: row_value[11],
|
||||
TableInfo.is_compressed: row_value[12], TableInfo.is_storedassubdirectories: row_value[13],
|
||||
TableInfo.etl_source: 'COLUMN_V2', TableInfo.field_list: field_list[:]}
|
||||
dataset_idx += 1
|
||||
self.dataset_dict[urn] = dataset_idx
|
||||
field_list = [] # empty it
|
||||
|
||||
if row_value[0] not in self.db_dict:
|
||||
schema.append({'database': row_value[0], 'type': 'Hive', 'tables': []})
|
||||
schema.append({'database': row_value[0], 'type': row_value[20], 'tables': []})
|
||||
db_idx += 1
|
||||
self.db_dict[row_value[0]] = db_idx
|
||||
|
||||
@ -178,7 +250,7 @@ class HiveExtract:
|
||||
table_idx += 1
|
||||
self.table_dict[full_name] = table_idx
|
||||
|
||||
|
||||
self.instance_writer.flush()
|
||||
self.logger.info("%s %6d tables processed for database %12s from COLUMN_V2" % (
|
||||
datetime.datetime.now(), table_idx + 1, row_value[0]))
|
||||
|
||||
@ -256,11 +328,19 @@ class HiveExtract:
|
||||
# tables from Column V2
|
||||
rows = []
|
||||
begin = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
rows.extend(self.get_table_info_from_v2(database_name))
|
||||
rows.extend(self.get_table_info_from_v2(database_name, False))
|
||||
if len(rows) > 0:
|
||||
self.format_table_metadata_v2(rows, schema)
|
||||
end = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
self.logger.info("Get table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end)))
|
||||
self.logger.info("Get Hive table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end)))
|
||||
|
||||
rows = []
|
||||
begin = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
rows.extend(self.get_table_info_from_v2(database_name, True))
|
||||
if len(rows) > 0:
|
||||
self.format_table_metadata_v2(rows, schema)
|
||||
end = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
self.logger.info("Get Dalids table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end)))
|
||||
|
||||
schema_json_file.write(json.dumps(schema, indent=None) + '\n')
|
||||
|
||||
@ -298,6 +378,10 @@ if __name__ == "__main__":
|
||||
database_white_list = ''
|
||||
|
||||
e = HiveExtract()
|
||||
e.dataset_instance_file = args[Constant.HIVE_INSTANCE_CSV_FILE_KEY]
|
||||
e.instance_writer = FileWriter(e.dataset_instance_file)
|
||||
e.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY])
|
||||
e.db_id = args[Constant.DB_ID_KEY]
|
||||
e.conn_hms = zxJDBC.connect(jdbc_url, username, password, jdbc_driver)
|
||||
|
||||
try:
|
||||
@ -305,3 +389,4 @@ if __name__ == "__main__":
|
||||
e.run(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], None)
|
||||
finally:
|
||||
e.conn_hms.close()
|
||||
e.instance_writer.close()
|
||||
|
@ -30,9 +30,8 @@ class HiveLoad:
|
||||
LOAD DATA LOCAL INFILE '{source_file}'
|
||||
INTO TABLE stg_dict_dataset
|
||||
FIELDS TERMINATED BY '\Z' ESCAPED BY '\0'
|
||||
(`name`, `schema`, properties, fields, urn, source, @sample_partition_full_path, source_created_time, @source_modified_time)
|
||||
(`name`, `schema`, properties, fields, urn, source, dataset_type, storage_type, @sample_partition_full_path, source_created_time, @source_modified_time)
|
||||
SET db_id = {db_id},
|
||||
storage_type = 'Table', dataset_type = 'hive',
|
||||
source_modified_time=nullif(@source_modified_time,''),
|
||||
sample_partition_full_path=nullif(@sample_partition_full_path,''),
|
||||
wh_etl_exec_id = {wh_etl_exec_id};
|
||||
@ -270,11 +269,72 @@ class HiveLoad:
|
||||
# didn't load into final table for now
|
||||
|
||||
for state in load_cmd.split(";"):
|
||||
self.logger.debug(state)
|
||||
self.logger.info(state)
|
||||
cursor.execute(state)
|
||||
self.conn_mysql.commit()
|
||||
cursor.close()
|
||||
|
||||
def load_dataset_instance(self):
|
||||
"""
|
||||
Load dataset instance
|
||||
:return:
|
||||
"""
|
||||
cursor = self.conn_mysql.cursor()
|
||||
load_cmd = """
|
||||
DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id};
|
||||
|
||||
LOAD DATA LOCAL INFILE '{source_file}'
|
||||
INTO TABLE stg_dict_dataset_instance
|
||||
FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0'
|
||||
(dataset_urn, db_id, deployment_tier, data_center, server_cluster, slice,
|
||||
status_id, native_name, logical_name, version, instance_created_time,
|
||||
created_time, wh_etl_exec_id, abstracted_dataset_urn);
|
||||
|
||||
-- update dataset_id
|
||||
update stg_dict_dataset_instance sdi, dict_dataset d
|
||||
set sdi.dataset_id = d.id where sdi.abstracted_dataset_urn = d.urn
|
||||
and sdi.db_id = {db_id};
|
||||
|
||||
""".format(source_file=self.input_instance_file, db_id=self.db_id)
|
||||
|
||||
# didn't load into final table for now
|
||||
|
||||
for state in load_cmd.split(";"):
|
||||
self.logger.info(state)
|
||||
cursor.execute(state)
|
||||
self.conn_mysql.commit()
|
||||
cursor.close()
|
||||
|
||||
def load_dataset_dependencies(self):
|
||||
"""
|
||||
Load dataset instance
|
||||
:return:
|
||||
"""
|
||||
cursor = self.conn_mysql.cursor()
|
||||
load_cmd = """
|
||||
LOAD DATA LOCAL INFILE '{source_file}'
|
||||
INTO TABLE stg_cfg_object_name_map
|
||||
FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0'
|
||||
(object_type, object_sub_type, object_name, object_urn, map_phrase, map_phrase_reversed,
|
||||
mapped_object_type, mapped_object_sub_type, mapped_object_name, mapped_object_urn, description);
|
||||
|
||||
-- update source dataset_id
|
||||
update stg_cfg_object_name_map s, dict_dataset d
|
||||
set s.object_dataset_id = d.id where s.object_urn = d.urn;
|
||||
|
||||
-- update mapped dataset_id
|
||||
update stg_cfg_object_name_map s, dict_dataset d
|
||||
set s.mapped_object_dataset_id = d.id where s.mapped_object_urn = d.urn;
|
||||
""".format(source_file=self.input_dependency_file)
|
||||
|
||||
# didn't load into final table for now
|
||||
|
||||
for state in load_cmd.split(";"):
|
||||
self.logger.info(state)
|
||||
cursor.execute(state)
|
||||
self.conn_mysql.commit()
|
||||
cursor.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = sys.argv[1]
|
||||
@ -289,6 +349,8 @@ if __name__ == "__main__":
|
||||
|
||||
l.input_schema_file = args[Constant.HIVE_SCHEMA_CSV_FILE_KEY]
|
||||
l.input_field_file = args[Constant.HIVE_FIELD_METADATA_KEY]
|
||||
l.input_instance_file = args[Constant.HIVE_INSTANCE_CSV_FILE_KEY]
|
||||
l.input_dependency_file = args[Constant.HIVE_DEPENDENCY_CSV_FILE_KEY]
|
||||
l.db_id = args[Constant.DB_ID_KEY]
|
||||
l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
|
||||
l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
|
||||
@ -300,5 +362,7 @@ if __name__ == "__main__":
|
||||
try:
|
||||
l.load_metadata()
|
||||
l.load_field()
|
||||
l.load_dataset_instance()
|
||||
l.load_dataset_dependencies()
|
||||
finally:
|
||||
l.conn_mysql.close()
|
||||
|
@ -16,9 +16,10 @@ import json
|
||||
import datetime
|
||||
import sys, os
|
||||
import time
|
||||
from com.ziclix.python.sql import zxJDBC
|
||||
from org.slf4j import LoggerFactory
|
||||
from wherehows.common.writers import FileWriter
|
||||
from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord
|
||||
from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord, HiveDependencyInstanceRecord
|
||||
from wherehows.common import Constant
|
||||
from HiveExtract import TableInfo
|
||||
from org.apache.hadoop.hive.ql.tools import LineageInfo
|
||||
@ -30,6 +31,14 @@ from AvroColumnParser import AvroColumnParser
|
||||
class HiveTransform:
|
||||
def __init__(self):
|
||||
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
|
||||
username = args[Constant.HIVE_METASTORE_USERNAME]
|
||||
password = args[Constant.HIVE_METASTORE_PASSWORD]
|
||||
jdbc_driver = args[Constant.HIVE_METASTORE_JDBC_DRIVER]
|
||||
jdbc_url = args[Constant.HIVE_METASTORE_JDBC_URL]
|
||||
self.conn_hms = zxJDBC.connect(jdbc_url, username, password, jdbc_driver)
|
||||
self.curs = self.conn_hms.cursor()
|
||||
dependency_instance_file = args[Constant.HIVE_DEPENDENCY_CSV_FILE_KEY]
|
||||
self.instance_writer = FileWriter(dependency_instance_file)
|
||||
|
||||
def transform(self, input, hive_metadata, hive_field_metadata):
|
||||
"""
|
||||
@ -47,6 +56,31 @@ class HiveTransform:
|
||||
field_file_writer = FileWriter(hive_field_metadata)
|
||||
|
||||
lineageInfo = LineageInfo()
|
||||
depends_sql = """
|
||||
SELECT d.NAME DB_NAME, case when t.TBL_NAME regexp '_[0-9]+_[0-9]+_[0-9]+$'
|
||||
then concat(substring(t.TBL_NAME, 1, length(t.TBL_NAME) - length(substring_index(t.TBL_NAME, '_', -3)) - 1),'_{version}')
|
||||
else t.TBL_NAME
|
||||
end dataset_name,
|
||||
concat('/', d.NAME, '/', t.TBL_NAME) object_name,
|
||||
case when (d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and d.NAME not like 'dalitest%' and t.TBL_TYPE = 'VIRTUAL_VIEW'
|
||||
then 'Dali'
|
||||
else 'Hive'
|
||||
end object_type,
|
||||
case when (d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and d.NAME not like 'dalitest%' and t.TBL_TYPE = 'VIRTUAL_VIEW'
|
||||
then 'View'
|
||||
else
|
||||
case when LOCATE('view', LOWER(t.TBL_TYPE)) > 0 then 'View'
|
||||
when LOCATE('index', LOWER(t.TBL_TYPE)) > 0 then 'Index'
|
||||
else 'Table'
|
||||
end
|
||||
end object_sub_type,
|
||||
case when (d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and t.TBL_TYPE = 'VIRTUAL_VIEW'
|
||||
then 'dalids'
|
||||
else 'hive'
|
||||
end prefix
|
||||
FROM TBLS t JOIN DBS d on t.DB_ID = d.DB_ID
|
||||
WHERE d.NAME = '{db_name}' and t.TBL_NAME = '{table_name}'
|
||||
"""
|
||||
|
||||
# one db info : 'type', 'database', 'tables'
|
||||
# one table info : required : 'name' , 'type', 'serializationFormat' ,'createTime', 'DB_ID', 'TBL_ID', 'SD_ID'
|
||||
@ -68,7 +102,32 @@ class HiveTransform:
|
||||
l = []
|
||||
for a in array:
|
||||
l.append(a)
|
||||
names = str(a).split('.')
|
||||
if names and len(names) >= 2:
|
||||
db_name = names[0]
|
||||
table_name = names[1]
|
||||
if db_name and table_name:
|
||||
rows = []
|
||||
self.curs.execute(depends_sql.format(db_name=db_name, table_name=table_name, version='{version}'))
|
||||
rows = self.curs.fetchall()
|
||||
if rows and len(rows) > 0:
|
||||
for row_index, row_value in enumerate(rows):
|
||||
dependent_record = HiveDependencyInstanceRecord(
|
||||
one_db_info['type'],
|
||||
table['type'],
|
||||
"/%s/%s" % (one_db_info['database'], table['name']),
|
||||
'dalids:///' + one_db_info['database'] + '/' + table['name']
|
||||
if one_db_info['type'].lower() == 'dalids'
|
||||
else 'hive:///' + one_db_info['database'] + '/' + table['name'],
|
||||
'depends on',
|
||||
'is used by',
|
||||
row_value[3],
|
||||
row_value[4],
|
||||
row_value[2],
|
||||
row_value[5] + ':///' + row_value[0] + '/' + row_value[1], '')
|
||||
self.instance_writer.append(dependent_record)
|
||||
prop_json['view_depends_on'] = l
|
||||
self.instance_writer.flush()
|
||||
|
||||
# process either schema
|
||||
flds = {}
|
||||
@ -85,16 +144,26 @@ class HiveTransform:
|
||||
field_detail_list += result
|
||||
except ValueError:
|
||||
self.logger.error("Schema json error for table : \n" + str(table))
|
||||
|
||||
elif TableInfo.field_list in table:
|
||||
# Convert to avro
|
||||
uri = "hive:///%s/%s" % (one_db_info['database'], table['name'])
|
||||
hcp = HiveColumnParser(table, urn = uri)
|
||||
if one_db_info['type'].lower() == 'dali':
|
||||
uri = "dalids:///%s/%s" % (one_db_info['database'], table['name'])
|
||||
else:
|
||||
uri = "hive:///%s/%s" % (one_db_info['database'], table['name'])
|
||||
schema_json = {'fields' : hcp.column_type_dict['fields'], 'type' : 'record', 'name' : table['name'], 'uri' : uri}
|
||||
field_detail_list += hcp.column_type_list
|
||||
|
||||
if one_db_info['type'].lower() == 'dali':
|
||||
dataset_urn = "dalids:///%s/%s" % (one_db_info['database'], table['name'])
|
||||
else:
|
||||
dataset_urn = "hive:///%s/%s" % (one_db_info['database'], table['name'])
|
||||
dataset_scehma_record = DatasetSchemaRecord(table['name'], json.dumps(schema_json), json.dumps(prop_json),
|
||||
json.dumps(flds),
|
||||
"hive:///%s/%s" % (one_db_info['database'], table['name']), 'Hive',
|
||||
dataset_urn,
|
||||
'Hive', one_db_info['type'], table['type'],
|
||||
'', (table[TableInfo.create_time] if table.has_key(
|
||||
TableInfo.create_time) else None), (table["lastAlterTime"]) if table.has_key("lastAlterTime") else None)
|
||||
schema_file_writer.append(dataset_scehma_record)
|
||||
@ -117,6 +186,14 @@ class HiveTransform:
|
||||
if __name__ == "__main__":
|
||||
args = sys.argv[1]
|
||||
t = HiveTransform()
|
||||
try:
|
||||
t.transform(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY],
|
||||
args[Constant.HIVE_SCHEMA_CSV_FILE_KEY],
|
||||
args[Constant.HIVE_FIELD_METADATA_KEY])
|
||||
finally:
|
||||
t.curs.close()
|
||||
t.conn_hms.close()
|
||||
t.instance_writer.close()
|
||||
|
||||
|
||||
t.transform(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], args[Constant.HIVE_SCHEMA_CSV_FILE_KEY], args[Constant.HIVE_FIELD_METADATA_KEY])
|
||||
|
||||
|
@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import models.DatasetColumn;
|
||||
import models.DatasetDependency;
|
||||
import models.ImpactDataset;
|
||||
import play.api.libs.json.JsValue;
|
||||
import play.libs.Json;
|
||||
@ -27,6 +28,7 @@ import play.Logger;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import dao.DatasetsDAO;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -747,4 +749,14 @@ public class Dataset extends Controller
|
||||
result.put("similar", Json.toJson(DatasetsDAO.similarColumns(datasetId, columnId)));
|
||||
return ok(result);
|
||||
}
|
||||
|
||||
public static Result getDependViews(Long datasetId)
|
||||
{
|
||||
ObjectNode result = Json.newObject();
|
||||
List<DatasetDependency> depends = new ArrayList<DatasetDependency>();
|
||||
DatasetsDAO.getDatasetDependencies(datasetId, 1, 0, depends);
|
||||
result.put("status", "ok");
|
||||
result.put("depends", Json.toJson(depends));
|
||||
return ok(result);
|
||||
}
|
||||
}
|
||||
|
@ -314,6 +314,11 @@ public class DatasetsDAO extends AbstractMySQLOpenSourceDAO
|
||||
private final static String GET_DATASET_OWNER_TYPES = "SELECT DISTINCT owner_type " +
|
||||
"FROM dataset_owner WHERE owner_type is not null";
|
||||
|
||||
private final static String GET_DATASET_DEPENDS_VIEW = "SELECT object_type, object_sub_type, " +
|
||||
"object_name, object_urn, map_phrase, map_phrase_reversed, mapped_object_dataset_id, " +
|
||||
"mapped_object_type, mapped_object_sub_type, mapped_object_name, mapped_object_urn " +
|
||||
"FROM stg_cfg_object_name_map WHERE object_dataset_id = ?";
|
||||
|
||||
public static List<String> getDatasetOwnerTypes()
|
||||
{
|
||||
return getJdbcTemplate().queryForList(GET_DATASET_OWNER_TYPES, String.class);
|
||||
@ -1713,4 +1718,51 @@ public class DatasetsDAO extends AbstractMySQLOpenSourceDAO
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public static void getDatasetDependencies(
|
||||
Long datasetId,
|
||||
int level,
|
||||
int parent,
|
||||
List<DatasetDependency> depends)
|
||||
{
|
||||
if (depends == null)
|
||||
{
|
||||
depends = new ArrayList<DatasetDependency>();
|
||||
}
|
||||
|
||||
List<Map<String, Object>> rows = null;
|
||||
rows = getJdbcTemplate().queryForList(
|
||||
GET_DATASET_DEPENDS_VIEW,
|
||||
datasetId);
|
||||
|
||||
if (rows != null)
|
||||
{
|
||||
for (Map row : rows) {
|
||||
DatasetDependency dd = new DatasetDependency();
|
||||
dd.datasetId = (Long) row.get("mapped_object_dataset_id");
|
||||
dd.objectName = (String) row.get("mapped_object_name");
|
||||
dd.objectType = (String) row.get("mapped_object_type");
|
||||
dd.objectSubType = (String) row.get("mapped_object_sub_type");
|
||||
dd.datasetUrn = (String) row.get("mapped_object_urn");
|
||||
if (dd.datasetId != null && dd.datasetId > 0)
|
||||
{
|
||||
dd.isValidDataset = true;
|
||||
dd.datasetLink = "#/datasets/" + Long.toString(dd.datasetId);
|
||||
}
|
||||
else
|
||||
{
|
||||
dd.isValidDataset = false;
|
||||
}
|
||||
dd.level = level;
|
||||
dd.sortId = depends.size() + 1;
|
||||
dd.treeGridClass = "treegrid-" + Integer.toString(dd.sortId);
|
||||
if (parent != 0)
|
||||
{
|
||||
dd.treeGridClass += " treegrid-parent-" + Integer.toString(parent);
|
||||
}
|
||||
depends.add(dd);
|
||||
getDatasetDependencies(dd.datasetId, level + 1, dd.sortId, depends);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
28
web/app/models/DatasetDependency.java
Normal file
28
web/app/models/DatasetDependency.java
Normal file
@ -0,0 +1,28 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models;
|
||||
|
||||
public class DatasetDependency {
|
||||
|
||||
public Long datasetId;
|
||||
public String datasetUrn;
|
||||
public String datasetLink;
|
||||
public String objectType;
|
||||
public String objectSubType;
|
||||
public String objectName;
|
||||
public String treeGridClass;
|
||||
public int sortId;
|
||||
public int level;
|
||||
public boolean isValidDataset;
|
||||
}
|
@ -462,6 +462,41 @@
|
||||
{{/if}}
|
||||
</script>
|
||||
|
||||
<script type="text/x-handlebars" data-template-name="depend">
|
||||
{{#if hasDepends}}
|
||||
<table id="json-table" class="columntreegrid tree table table-bordered dataset-detail-table">
|
||||
<thead>
|
||||
<tr class="results-header">
|
||||
<th class="span2">Dataset</th>
|
||||
<th class="span1">Object Type</th>
|
||||
<th class="span1">Object Sub Type</th>
|
||||
<th class="span2">Level</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{{#each depend in depends}}
|
||||
<tr {{bind-attr class="depend.treeGridClass"}}>
|
||||
<td>
|
||||
{{#if depend.isValidDataset}}
|
||||
<a {{bind-attr href=depend.datasetLink}}>
|
||||
{{depend.objectName}}
|
||||
</a>
|
||||
{{else}}
|
||||
{{depend.objectName}}
|
||||
{{/if}}
|
||||
</td>
|
||||
<td>{{depend.objectType}}</td>
|
||||
<td>{{depend.objectSubType}}</td>
|
||||
<td>{{depend.level}}</td>
|
||||
</tr>
|
||||
{{/each}}
|
||||
</tbody>
|
||||
</table>
|
||||
{{else}}
|
||||
<p>Depends view is not available</p>
|
||||
{{/if}}
|
||||
</script>
|
||||
|
||||
<script type="text/x-handlebars" data-template-name="detail">
|
||||
<div id="metric" class="container-fluid">
|
||||
<div class="row-fluid">
|
||||
@ -803,11 +838,18 @@
|
||||
{{/unless}}
|
||||
<li id="impacts">
|
||||
<a data-toggle="tab"
|
||||
title="Down Stream Impact Analysis"
|
||||
title="Downstream"
|
||||
href="#impacttab">
|
||||
Down Stream Impact
|
||||
</a>
|
||||
</li>
|
||||
<li id="depends">
|
||||
<a data-toggle="tab"
|
||||
title="Depend On"
|
||||
href="#dependtab">
|
||||
Depend On
|
||||
</a>
|
||||
</li>
|
||||
</ul>
|
||||
<div class="tab-content">
|
||||
{{#unless isPinot}}
|
||||
@ -837,6 +879,9 @@
|
||||
<div id="impacttab" class="tab-pane">
|
||||
{{view "impact"}}
|
||||
</div>
|
||||
<div id="dependtab" class="tab-pane">
|
||||
{{view "depend"}}
|
||||
</div>
|
||||
</div>
|
||||
{{else}}
|
||||
<div class="panel-group" id="accordion" role="tablist" aria-multiselectable="true">
|
||||
@ -931,6 +976,22 @@
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="panel panel-default">
|
||||
<div class="panel-heading" role="tab" id="dependsHeading">
|
||||
<h4 class="panel-title">
|
||||
<a class="collapsed" data-toggle="collapse" data-parent="#accordion"
|
||||
href="#dependsview" aria-expanded="false" aria-controls="sampleData">
|
||||
Depends On
|
||||
</a>
|
||||
</h4>
|
||||
</div>
|
||||
<div id="dependsview" class="panel-collapse collapse" role="tabpanel" aria-labelledby="impactsHeading">
|
||||
<div class="panel-body">
|
||||
{{view "depend"}}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{{/if}}
|
||||
</script>
|
||||
|
@ -53,6 +53,8 @@ POST /api/v1/datasets/:id/owners controllers.api.v1.Dataset.updateDat
|
||||
|
||||
GET /api/v1/datasets/:id/columns controllers.api.v1.Dataset.getDatasetColumnsByID(id:Int)
|
||||
|
||||
GET /api/v1/datasets/:id/depends controllers.api.v1.Dataset.getDependViews(id:Long)
|
||||
|
||||
GET /api/v1/datasets/:id/properties controllers.api.v1.Dataset.getDatasetPropertiesByID(id:Int)
|
||||
|
||||
GET /api/v1/datasets/:id/sample controllers.api.v1.Dataset.getDatasetSampleDataByID(id:Int)
|
||||
|
@ -118,7 +118,7 @@ function convertPropertiesToArray(properties)
|
||||
{
|
||||
for (var key in properties)
|
||||
{
|
||||
if (key.toLowerCase() != 'elements')
|
||||
if ((key.toLowerCase() != 'elements') && (key.toLowerCase() != 'view_depends_on'))
|
||||
{
|
||||
if (typeof properties[key] !== 'object')
|
||||
{
|
||||
@ -399,6 +399,23 @@ App.DatasetRoute = Ember.Route.extend({
|
||||
}
|
||||
});
|
||||
|
||||
var datasetDependsUrl = 'api/v1/datasets/' + id + "/depends";
|
||||
$.get(datasetDependsUrl, function(data) {
|
||||
if (data && data.status == "ok")
|
||||
{
|
||||
if (data.depends && (data.depends.length > 0))
|
||||
{
|
||||
controller.set("hasDepends", true);
|
||||
controller.set("depends", data.depends);
|
||||
setTimeout(initializeColumnTreeGrid, 500);
|
||||
}
|
||||
else
|
||||
{
|
||||
controller.set("hasDepends", false);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
var ownershipUrl = 'api/v1/datasets/' + id + "/owners";
|
||||
$.get(ownershipUrl, function(data) {
|
||||
if (data && data.status == "ok")
|
||||
|
@ -18,6 +18,10 @@ App.OwnerView = Ember.View.extend({
|
||||
templateName: 'owner'
|
||||
});
|
||||
|
||||
App.DependView = Ember.View.extend({
|
||||
templateName: 'depend'
|
||||
});
|
||||
|
||||
App.DetailView = Ember.View.extend({
|
||||
templateName: 'detail',
|
||||
didInsertElement: function() {
|
||||
|
@ -168,6 +168,8 @@ public class Constant {
|
||||
|
||||
public static final String HIVE_DATABASE_WHITELIST_KEY = "hive.database_white_list";
|
||||
public static final String HIVE_SCHEMA_JSON_FILE_KEY = "hive.schema_json_file";
|
||||
public static final String HIVE_DEPENDENCY_CSV_FILE_KEY = "hive.dependency_csv_file";
|
||||
public static final String HIVE_INSTANCE_CSV_FILE_KEY = "hive.instance_csv_file";
|
||||
// public static final String HIVE_SAMPLE_CSV_FILE_KEY = "hive.sample_csv";
|
||||
public static final String HIVE_SCHEMA_CSV_FILE_KEY = "hive.schema_csv_file";
|
||||
public static final String HIVE_FIELD_METADATA_KEY = "hive.field_metadata";
|
||||
|
@ -0,0 +1,81 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package wherehows.common.schemas;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* Used to generate one record in the dataset schema data file. Load into staging table.
|
||||
* Created by zsun on 8/25/15.
|
||||
*/
|
||||
public class DatasetInstanceRecord extends AbstractRecord {
|
||||
String datasetUrn;
|
||||
Integer dbId;
|
||||
String deploymentTier;
|
||||
String dataCenter;
|
||||
String serverCluster;
|
||||
String slice;
|
||||
Integer statusId;
|
||||
String nativeName;
|
||||
String logicalName;
|
||||
String version;
|
||||
Long instanceCreatedUnixtime;
|
||||
Long createdUnixtime;
|
||||
Long whExecId;
|
||||
String abstractedDatasetUrn;
|
||||
|
||||
public DatasetInstanceRecord(String datasetUrn, Integer dbId, String deploymentTier, String dataCenter,
|
||||
String serverCluster, String slice, Integer statusId, String nativeName,
|
||||
String logicalName, String version, Long instanceCreatedUnixtime,
|
||||
Long createdUnixtime, Long whExecId, String abstractedDatasetUrn) {
|
||||
this.datasetUrn = datasetUrn;
|
||||
this.dbId = dbId;
|
||||
this.deploymentTier = deploymentTier;
|
||||
this.dataCenter = dataCenter;
|
||||
this.serverCluster = serverCluster;
|
||||
this.slice = slice;
|
||||
this.statusId = statusId;
|
||||
this.nativeName = nativeName;
|
||||
this.logicalName = logicalName;
|
||||
this.version = version;
|
||||
this.instanceCreatedUnixtime = instanceCreatedUnixtime;
|
||||
this.createdUnixtime = createdUnixtime;
|
||||
this.whExecId = whExecId;
|
||||
this.abstractedDatasetUrn = abstractedDatasetUrn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Object> fillAllFields() {
|
||||
List<Object> allFields = new ArrayList<>();
|
||||
allFields.add(datasetUrn);
|
||||
allFields.add(dbId);
|
||||
allFields.add(deploymentTier);
|
||||
allFields.add(dataCenter);
|
||||
allFields.add(serverCluster);
|
||||
allFields.add(slice);
|
||||
allFields.add(statusId);
|
||||
allFields.add(nativeName);
|
||||
allFields.add(logicalName);
|
||||
allFields.add(version);
|
||||
allFields.add(instanceCreatedUnixtime);
|
||||
allFields.add(createdUnixtime);
|
||||
allFields.add(whExecId);
|
||||
allFields.add(abstractedDatasetUrn);
|
||||
return allFields;
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -28,11 +28,14 @@ public class DatasetSchemaRecord extends AbstractRecord {
|
||||
String fields;
|
||||
String urn;
|
||||
String source;
|
||||
String datasetType;
|
||||
String storageType;
|
||||
String samplePartitionFullPath;
|
||||
Integer sourceCreated;
|
||||
Integer sourceModified;
|
||||
|
||||
public DatasetSchemaRecord(String name, String schema, String properties, String fields, String urn, String source, String samplePartitionFullPath,
|
||||
public DatasetSchemaRecord(String name, String schema, String properties, String fields, String urn, String source,
|
||||
String datasetType, String storageType, String samplePartitionFullPath,
|
||||
Integer sourceCreated, Integer sourceModified) {
|
||||
this.name = name;
|
||||
this.schema = schema;
|
||||
@ -40,6 +43,8 @@ public class DatasetSchemaRecord extends AbstractRecord {
|
||||
this.fields = fields;
|
||||
this.urn = urn;
|
||||
this.source = source;
|
||||
this.datasetType = datasetType;
|
||||
this.storageType = storageType;
|
||||
this.samplePartitionFullPath = samplePartitionFullPath;
|
||||
this.sourceCreated = sourceCreated;
|
||||
this.sourceModified = sourceModified;
|
||||
@ -54,6 +59,8 @@ public class DatasetSchemaRecord extends AbstractRecord {
|
||||
allFields.add(fields);
|
||||
allFields.add(urn);
|
||||
allFields.add(source);
|
||||
allFields.add(datasetType);
|
||||
allFields.add(storageType);
|
||||
allFields.add(samplePartitionFullPath);
|
||||
allFields.add(sourceCreated);
|
||||
allFields.add(sourceModified);
|
||||
|
@ -0,0 +1,72 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package wherehows.common.schemas;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* Used to generate one record in the dataset schema data file. Load into staging table.
|
||||
* Created by zsun on 8/25/15.
|
||||
*/
|
||||
public class HiveDependencyInstanceRecord extends AbstractRecord {
|
||||
String objectType;
|
||||
String objectSubType;
|
||||
String objectName;
|
||||
String mapPhrase;
|
||||
String mapPhraseReversed;
|
||||
String mappedObjectType;
|
||||
String mappedObjectSubType;
|
||||
String mappedObjectName;
|
||||
String description;
|
||||
String objectUrn;
|
||||
String mappedObjectUrn;
|
||||
|
||||
public HiveDependencyInstanceRecord(String objectType, String objectSubType, String objectName, String objectUrn,
|
||||
String mapPhrase, String mapPhraseReversed, String mappedObjectType,
|
||||
String mappedObjectSubType, String mappedObjectName,
|
||||
String mappedObjectUrn, String description) {
|
||||
this.objectType = objectType;
|
||||
this.objectSubType = objectSubType;
|
||||
this.objectName = objectName;
|
||||
this.objectUrn = objectUrn;
|
||||
this.mapPhrase = mapPhrase;
|
||||
this.mapPhraseReversed = mapPhraseReversed;
|
||||
this.mappedObjectType = mappedObjectType;
|
||||
this.mappedObjectSubType = mappedObjectSubType;
|
||||
this.mappedObjectName = mappedObjectName;
|
||||
this.mappedObjectUrn = mappedObjectUrn;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Object> fillAllFields() {
|
||||
List<Object> allFields = new ArrayList<>();
|
||||
allFields.add(objectType);
|
||||
allFields.add(objectSubType);
|
||||
allFields.add(objectName);
|
||||
allFields.add(objectUrn);
|
||||
allFields.add(mapPhrase);
|
||||
allFields.add(mapPhraseReversed);
|
||||
allFields.add(mappedObjectType);
|
||||
allFields.add(mappedObjectSubType);
|
||||
allFields.add(mappedObjectName);
|
||||
allFields.add(mappedObjectUrn);
|
||||
allFields.add(description);
|
||||
return allFields;
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user