diff --git a/metadata-etl/src/main/resources/jython/HiveExtract.py b/metadata-etl/src/main/resources/jython/HiveExtract.py index 0fb331a1ae..cced4a4163 100644 --- a/metadata-etl/src/main/resources/jython/HiveExtract.py +++ b/metadata-etl/src/main/resources/jython/HiveExtract.py @@ -13,6 +13,8 @@ # from org.slf4j import LoggerFactory +from wherehows.common.writers import FileWriter +from wherehows.common.schemas import DatasetInstanceRecord from com.ziclix.python.sql import zxJDBC import sys, os, re, json import datetime @@ -46,7 +48,6 @@ class TableInfo: table_type, location, view_expended_text, input_format, output_format, is_compressed, is_storedassubdirectories, etl_source] - class HiveExtract: """ Extract hive metadata from hive metastore. store it in a json file @@ -54,12 +55,14 @@ class HiveExtract: conn_hms = None db_dict = {} # name : index table_dict = {} # fullname : index + dataset_dict = {} # name : index + instance_dict = {} # name : index serde_param_columns = [] def __init__(self): self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) - def get_table_info_from_v2(self, database_name): + def get_table_info_from_v2(self, database_name, is_dali=False): """ get table, column info from table columns_v2 :param database_name: @@ -68,27 +71,66 @@ class HiveExtract: 13IS_COMPRESSED, 14 IS_STOREDASSUBDIRECTORIES, 15 INTEGER_IDX, 16 COLUMN_NAME, 17 TYPE_NAME, 18 COMMENT) """ curs = self.conn_hms.cursor() - tbl_info_sql = """select d.NAME DB_NAME, t.TBL_NAME TBL_NAME, - case when s.INPUT_FORMAT like '%.TextInput%' then 'Text' - when s.INPUT_FORMAT like '%.Avro%' then 'Avro' - when s.INPUT_FORMAT like '%.RCFile%' then 'RC' - when s.INPUT_FORMAT like '%.Orc%' then 'ORC' - when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence' - when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet' - else s.INPUT_FORMAT - end SerializationFormat, - t.CREATE_TIME TableCreateTime, - t.DB_ID, t.TBL_ID, s.SD_ID, - substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location, - t.TBL_TYPE, t.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES, - c.INTEGER_IDX, c.COLUMN_NAME, c.TYPE_NAME, c.COMMENT - from TBLS t join DBS d on t.DB_ID=d.DB_ID - join SDS s on t.SD_ID = s.SD_ID - join COLUMNS_V2 c on s.CD_ID = c.CD_ID - where - d.NAME in ('{db_name}') - order by 1,2 - """.format(db_name=database_name) + if is_dali: + tbl_info_sql = """select d.NAME DB_NAME, t.TBL_NAME TBL_NAME, + case when s.INPUT_FORMAT like '%.TextInput%' then 'Text' + when s.INPUT_FORMAT like '%.Avro%' then 'Avro' + when s.INPUT_FORMAT like '%.RCFile%' then 'RC' + when s.INPUT_FORMAT like '%.Orc%' then 'ORC' + when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence' + when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet' + else s.INPUT_FORMAT + end SerializationFormat, + t.CREATE_TIME TableCreateTime, + t.DB_ID, t.TBL_ID, s.SD_ID, + substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location, + t.TBL_TYPE, t.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES, + c.INTEGER_IDX, c.COLUMN_NAME, c.TYPE_NAME, c.COMMENT, + case when t.TBL_NAME regexp '_[0-9]+_[0-9]+_[0-9]+$' + then concat(substring(t.TBL_NAME, 1, length(t.TBL_NAME) - length(substring_index(t.TBL_NAME, '_', -3)) - 1),'_{version}') + else t.TBL_NAME + end dataset_name, + case when t.TBL_NAME regexp '_[0-9]+_[0-9]+_[0-9]+$' + then replace(substring_index(t.TBL_NAME, '_', -3), '_', '.') + end version, 'Dalids' TYPE, 'View' storage_type, concat(d.NAME, '.', t.TBL_NAME) native_name, + case when t.TBL_NAME regexp '_[0-9]+_[0-9]+_[0-9]+$' + then substring(t.TBL_NAME, 1, length(t.TBL_NAME) - length(substring_index(t.TBL_NAME, '_', -3)) - 1) + else t.TBL_NAME + end logical_name, unix_timestamp(now()) created_time, concat('dalids:///', d.NAME, '/', t.TBL_NAME) dataset_urn + from TBLS t join DBS d on t.DB_ID=d.DB_ID + join SDS s on t.SD_ID = s.SD_ID + join COLUMNS_V2 c on s.CD_ID = c.CD_ID + where + d.NAME in ('{db_name}') and (d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and d.NAME not like 'dalitest%' and t.TBL_TYPE = 'VIRTUAL_VIEW' + order by DB_NAME, dataset_name, version DESC + """.format(version='{version}', db_name=database_name) + else: + tbl_info_sql = """select d.NAME DB_NAME, t.TBL_NAME TBL_NAME, + case when s.INPUT_FORMAT like '%.TextInput%' then 'Text' + when s.INPUT_FORMAT like '%.Avro%' then 'Avro' + when s.INPUT_FORMAT like '%.RCFile%' then 'RC' + when s.INPUT_FORMAT like '%.Orc%' then 'ORC' + when s.INPUT_FORMAT like '%.Sequence%' then 'Sequence' + when s.INPUT_FORMAT like '%.Parquet%' then 'Parquet' + else s.INPUT_FORMAT + end SerializationFormat, + t.CREATE_TIME TableCreateTime, + t.DB_ID, t.TBL_ID, s.SD_ID, + substr(s.LOCATION, length(substring_index(s.LOCATION, '/', 3))+1) Location, + t.TBL_TYPE, t.VIEW_EXPANDED_TEXT, s.INPUT_FORMAT, s.OUTPUT_FORMAT, s.IS_COMPRESSED, s.IS_STOREDASSUBDIRECTORIES, + c.INTEGER_IDX, c.COLUMN_NAME, c.TYPE_NAME, c.COMMENT, t.TBL_NAME dataset_name, 0 version, 'Hive' TYPE, + case when LOCATE('view', LOWER(t.TBL_TYPE)) > 0 then 'View' + when LOCATE('index', LOWER(t.TBL_TYPE)) > 0 then 'Index' + else 'Table' + end storage_type, concat(d.NAME, '.', t.TBL_NAME) native_name, t.TBL_NAME logical_name, + unix_timestamp(now()) created_time, concat('hive:///', d.NAME, '/', t.TBL_NAME) dataset_urn + from TBLS t join DBS d on t.DB_ID=d.DB_ID + join SDS s on t.SD_ID = s.SD_ID + join COLUMNS_V2 c on s.CD_ID = c.CD_ID + where + d.NAME in ('{db_name}') and not ((d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and t.TBL_TYPE = 'VIRTUAL_VIEW') + order by 1,2 + """.format(db_name=database_name) curs.execute(tbl_info_sql) rows = curs.fetchall() curs.close() @@ -148,6 +190,8 @@ class HiveExtract: """ db_idx = len(schema) - 1 table_idx = -1 + dataset_idx = -1 + dataset_urn_idx = -1 field_list = [] for row_index, row_value in enumerate(rows): @@ -157,16 +201,44 @@ class HiveExtract: # sort the field_list by IntegerIndex field_list = sorted(field_list, key=lambda k: k['IntegerIndex']) # process the record of table - table_record = {TableInfo.table_name: row_value[1], TableInfo.type: 'Table', TableInfo.serialization_format: row_value[2], + + if row_value[20].lower() == 'dalids': + urn = 'dalids:///' + row_value[0] + '/' + row_value[18] + instance_record = DatasetInstanceRecord(row_value[25], + long(self.db_id), + 'grid', + 'eat1', + 'eat1-nertz', + '*', + 0, + row_value[22], + row_value[23], + str(row_value[19]), + long(row_value[3]), + long(row_value[24]), + self.wh_exec_id, + 'dalids:///' + row_value[0] + '/' + row_value[18]) + self.instance_writer.append(instance_record) + dataset_urn_idx += 1 + self.instance_dict[row_value[25]] = dataset_urn_idx + else: + urn = 'hive:///' + row_value[0] + '/' + row_value[18] + + if urn in self.dataset_dict: + continue + + table_record = {TableInfo.table_name: row_value[18], TableInfo.type: row_value[21], TableInfo.serialization_format: row_value[2], TableInfo.create_time: row_value[3], TableInfo.db_id: row_value[4], TableInfo.table_id: row_value[5], TableInfo.serde_id: row_value[6], TableInfo.location: row_value[7], TableInfo.table_type: row_value[8], TableInfo.view_expended_text: row_value[9], TableInfo.input_format: row_value[10], TableInfo.output_format: row_value[11], TableInfo.is_compressed: row_value[12], TableInfo.is_storedassubdirectories: row_value[13], TableInfo.etl_source: 'COLUMN_V2', TableInfo.field_list: field_list[:]} + dataset_idx += 1 + self.dataset_dict[urn] = dataset_idx field_list = [] # empty it if row_value[0] not in self.db_dict: - schema.append({'database': row_value[0], 'type': 'Hive', 'tables': []}) + schema.append({'database': row_value[0], 'type': row_value[20], 'tables': []}) db_idx += 1 self.db_dict[row_value[0]] = db_idx @@ -178,7 +250,7 @@ class HiveExtract: table_idx += 1 self.table_dict[full_name] = table_idx - + self.instance_writer.flush() self.logger.info("%s %6d tables processed for database %12s from COLUMN_V2" % ( datetime.datetime.now(), table_idx + 1, row_value[0])) @@ -256,11 +328,19 @@ class HiveExtract: # tables from Column V2 rows = [] begin = datetime.datetime.now().strftime("%H:%M:%S") - rows.extend(self.get_table_info_from_v2(database_name)) + rows.extend(self.get_table_info_from_v2(database_name, False)) if len(rows) > 0: self.format_table_metadata_v2(rows, schema) end = datetime.datetime.now().strftime("%H:%M:%S") - self.logger.info("Get table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end))) + self.logger.info("Get Hive table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end))) + + rows = [] + begin = datetime.datetime.now().strftime("%H:%M:%S") + rows.extend(self.get_table_info_from_v2(database_name, True)) + if len(rows) > 0: + self.format_table_metadata_v2(rows, schema) + end = datetime.datetime.now().strftime("%H:%M:%S") + self.logger.info("Get Dalids table info from COLUMN_V2 %12s [%s -> %s]\n" % (database_name, str(begin), str(end))) schema_json_file.write(json.dumps(schema, indent=None) + '\n') @@ -298,6 +378,10 @@ if __name__ == "__main__": database_white_list = '' e = HiveExtract() + e.dataset_instance_file = args[Constant.HIVE_INSTANCE_CSV_FILE_KEY] + e.instance_writer = FileWriter(e.dataset_instance_file) + e.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY]) + e.db_id = args[Constant.DB_ID_KEY] e.conn_hms = zxJDBC.connect(jdbc_url, username, password, jdbc_driver) try: @@ -305,3 +389,4 @@ if __name__ == "__main__": e.run(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], None) finally: e.conn_hms.close() + e.instance_writer.close() diff --git a/metadata-etl/src/main/resources/jython/HiveLoad.py b/metadata-etl/src/main/resources/jython/HiveLoad.py index d62b49164a..88feba01fb 100644 --- a/metadata-etl/src/main/resources/jython/HiveLoad.py +++ b/metadata-etl/src/main/resources/jython/HiveLoad.py @@ -30,9 +30,8 @@ class HiveLoad: LOAD DATA LOCAL INFILE '{source_file}' INTO TABLE stg_dict_dataset FIELDS TERMINATED BY '\Z' ESCAPED BY '\0' - (`name`, `schema`, properties, fields, urn, source, @sample_partition_full_path, source_created_time, @source_modified_time) + (`name`, `schema`, properties, fields, urn, source, dataset_type, storage_type, @sample_partition_full_path, source_created_time, @source_modified_time) SET db_id = {db_id}, - storage_type = 'Table', dataset_type = 'hive', source_modified_time=nullif(@source_modified_time,''), sample_partition_full_path=nullif(@sample_partition_full_path,''), wh_etl_exec_id = {wh_etl_exec_id}; @@ -270,11 +269,72 @@ class HiveLoad: # didn't load into final table for now for state in load_cmd.split(";"): - self.logger.debug(state) + self.logger.info(state) cursor.execute(state) self.conn_mysql.commit() cursor.close() + def load_dataset_instance(self): + """ + Load dataset instance + :return: + """ + cursor = self.conn_mysql.cursor() + load_cmd = """ + DELETE FROM stg_dict_dataset_instance WHERE db_id = {db_id}; + + LOAD DATA LOCAL INFILE '{source_file}' + INTO TABLE stg_dict_dataset_instance + FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0' + (dataset_urn, db_id, deployment_tier, data_center, server_cluster, slice, + status_id, native_name, logical_name, version, instance_created_time, + created_time, wh_etl_exec_id, abstracted_dataset_urn); + + -- update dataset_id + update stg_dict_dataset_instance sdi, dict_dataset d + set sdi.dataset_id = d.id where sdi.abstracted_dataset_urn = d.urn + and sdi.db_id = {db_id}; + + """.format(source_file=self.input_instance_file, db_id=self.db_id) + + # didn't load into final table for now + + for state in load_cmd.split(";"): + self.logger.info(state) + cursor.execute(state) + self.conn_mysql.commit() + cursor.close() + + def load_dataset_dependencies(self): + """ + Load dataset instance + :return: + """ + cursor = self.conn_mysql.cursor() + load_cmd = """ + LOAD DATA LOCAL INFILE '{source_file}' + INTO TABLE stg_cfg_object_name_map + FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0' + (object_type, object_sub_type, object_name, object_urn, map_phrase, map_phrase_reversed, + mapped_object_type, mapped_object_sub_type, mapped_object_name, mapped_object_urn, description); + + -- update source dataset_id + update stg_cfg_object_name_map s, dict_dataset d + set s.object_dataset_id = d.id where s.object_urn = d.urn; + + -- update mapped dataset_id + update stg_cfg_object_name_map s, dict_dataset d + set s.mapped_object_dataset_id = d.id where s.mapped_object_urn = d.urn; + """.format(source_file=self.input_dependency_file) + + # didn't load into final table for now + + for state in load_cmd.split(";"): + self.logger.info(state) + cursor.execute(state) + self.conn_mysql.commit() + cursor.close() + if __name__ == "__main__": args = sys.argv[1] @@ -289,6 +349,8 @@ if __name__ == "__main__": l.input_schema_file = args[Constant.HIVE_SCHEMA_CSV_FILE_KEY] l.input_field_file = args[Constant.HIVE_FIELD_METADATA_KEY] + l.input_instance_file = args[Constant.HIVE_INSTANCE_CSV_FILE_KEY] + l.input_dependency_file = args[Constant.HIVE_DEPENDENCY_CSV_FILE_KEY] l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) @@ -300,5 +362,7 @@ if __name__ == "__main__": try: l.load_metadata() l.load_field() + l.load_dataset_instance() + l.load_dataset_dependencies() finally: l.conn_mysql.close() diff --git a/metadata-etl/src/main/resources/jython/HiveTransform.py b/metadata-etl/src/main/resources/jython/HiveTransform.py index 88912b8784..d83ceeda9a 100644 --- a/metadata-etl/src/main/resources/jython/HiveTransform.py +++ b/metadata-etl/src/main/resources/jython/HiveTransform.py @@ -16,9 +16,10 @@ import json import datetime import sys, os import time +from com.ziclix.python.sql import zxJDBC from org.slf4j import LoggerFactory from wherehows.common.writers import FileWriter -from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord +from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord, HiveDependencyInstanceRecord from wherehows.common import Constant from HiveExtract import TableInfo from org.apache.hadoop.hive.ql.tools import LineageInfo @@ -30,6 +31,14 @@ from AvroColumnParser import AvroColumnParser class HiveTransform: def __init__(self): self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) + username = args[Constant.HIVE_METASTORE_USERNAME] + password = args[Constant.HIVE_METASTORE_PASSWORD] + jdbc_driver = args[Constant.HIVE_METASTORE_JDBC_DRIVER] + jdbc_url = args[Constant.HIVE_METASTORE_JDBC_URL] + self.conn_hms = zxJDBC.connect(jdbc_url, username, password, jdbc_driver) + self.curs = self.conn_hms.cursor() + dependency_instance_file = args[Constant.HIVE_DEPENDENCY_CSV_FILE_KEY] + self.instance_writer = FileWriter(dependency_instance_file) def transform(self, input, hive_metadata, hive_field_metadata): """ @@ -47,6 +56,31 @@ class HiveTransform: field_file_writer = FileWriter(hive_field_metadata) lineageInfo = LineageInfo() + depends_sql = """ + SELECT d.NAME DB_NAME, case when t.TBL_NAME regexp '_[0-9]+_[0-9]+_[0-9]+$' + then concat(substring(t.TBL_NAME, 1, length(t.TBL_NAME) - length(substring_index(t.TBL_NAME, '_', -3)) - 1),'_{version}') + else t.TBL_NAME + end dataset_name, + concat('/', d.NAME, '/', t.TBL_NAME) object_name, + case when (d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and d.NAME not like 'dalitest%' and t.TBL_TYPE = 'VIRTUAL_VIEW' + then 'Dali' + else 'Hive' + end object_type, + case when (d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and d.NAME not like 'dalitest%' and t.TBL_TYPE = 'VIRTUAL_VIEW' + then 'View' + else + case when LOCATE('view', LOWER(t.TBL_TYPE)) > 0 then 'View' + when LOCATE('index', LOWER(t.TBL_TYPE)) > 0 then 'Index' + else 'Table' + end + end object_sub_type, + case when (d.NAME like '%\_mp' or d.NAME like '%\_mp\_versioned') and t.TBL_TYPE = 'VIRTUAL_VIEW' + then 'dalids' + else 'hive' + end prefix + FROM TBLS t JOIN DBS d on t.DB_ID = d.DB_ID + WHERE d.NAME = '{db_name}' and t.TBL_NAME = '{table_name}' + """ # one db info : 'type', 'database', 'tables' # one table info : required : 'name' , 'type', 'serializationFormat' ,'createTime', 'DB_ID', 'TBL_ID', 'SD_ID' @@ -68,7 +102,32 @@ class HiveTransform: l = [] for a in array: l.append(a) + names = str(a).split('.') + if names and len(names) >= 2: + db_name = names[0] + table_name = names[1] + if db_name and table_name: + rows = [] + self.curs.execute(depends_sql.format(db_name=db_name, table_name=table_name, version='{version}')) + rows = self.curs.fetchall() + if rows and len(rows) > 0: + for row_index, row_value in enumerate(rows): + dependent_record = HiveDependencyInstanceRecord( + one_db_info['type'], + table['type'], + "/%s/%s" % (one_db_info['database'], table['name']), + 'dalids:///' + one_db_info['database'] + '/' + table['name'] + if one_db_info['type'].lower() == 'dalids' + else 'hive:///' + one_db_info['database'] + '/' + table['name'], + 'depends on', + 'is used by', + row_value[3], + row_value[4], + row_value[2], + row_value[5] + ':///' + row_value[0] + '/' + row_value[1], '') + self.instance_writer.append(dependent_record) prop_json['view_depends_on'] = l + self.instance_writer.flush() # process either schema flds = {} @@ -85,16 +144,26 @@ class HiveTransform: field_detail_list += result except ValueError: self.logger.error("Schema json error for table : \n" + str(table)) + elif TableInfo.field_list in table: # Convert to avro uri = "hive:///%s/%s" % (one_db_info['database'], table['name']) hcp = HiveColumnParser(table, urn = uri) + if one_db_info['type'].lower() == 'dali': + uri = "dalids:///%s/%s" % (one_db_info['database'], table['name']) + else: + uri = "hive:///%s/%s" % (one_db_info['database'], table['name']) schema_json = {'fields' : hcp.column_type_dict['fields'], 'type' : 'record', 'name' : table['name'], 'uri' : uri} field_detail_list += hcp.column_type_list + if one_db_info['type'].lower() == 'dali': + dataset_urn = "dalids:///%s/%s" % (one_db_info['database'], table['name']) + else: + dataset_urn = "hive:///%s/%s" % (one_db_info['database'], table['name']) dataset_scehma_record = DatasetSchemaRecord(table['name'], json.dumps(schema_json), json.dumps(prop_json), json.dumps(flds), - "hive:///%s/%s" % (one_db_info['database'], table['name']), 'Hive', + dataset_urn, + 'Hive', one_db_info['type'], table['type'], '', (table[TableInfo.create_time] if table.has_key( TableInfo.create_time) else None), (table["lastAlterTime"]) if table.has_key("lastAlterTime") else None) schema_file_writer.append(dataset_scehma_record) @@ -117,6 +186,14 @@ class HiveTransform: if __name__ == "__main__": args = sys.argv[1] t = HiveTransform() + try: + t.transform(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], + args[Constant.HIVE_SCHEMA_CSV_FILE_KEY], + args[Constant.HIVE_FIELD_METADATA_KEY]) + finally: + t.curs.close() + t.conn_hms.close() + t.instance_writer.close() + - t.transform(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], args[Constant.HIVE_SCHEMA_CSV_FILE_KEY], args[Constant.HIVE_FIELD_METADATA_KEY]) diff --git a/web/app/controllers/api/v1/Dataset.java b/web/app/controllers/api/v1/Dataset.java index 785880a4db..b8f5241da8 100644 --- a/web/app/controllers/api/v1/Dataset.java +++ b/web/app/controllers/api/v1/Dataset.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import models.DatasetColumn; +import models.DatasetDependency; import models.ImpactDataset; import play.api.libs.json.JsValue; import play.libs.Json; @@ -27,6 +28,7 @@ import play.Logger; import org.apache.commons.lang3.StringUtils; import dao.DatasetsDAO; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -747,4 +749,14 @@ public class Dataset extends Controller result.put("similar", Json.toJson(DatasetsDAO.similarColumns(datasetId, columnId))); return ok(result); } + + public static Result getDependViews(Long datasetId) + { + ObjectNode result = Json.newObject(); + List depends = new ArrayList(); + DatasetsDAO.getDatasetDependencies(datasetId, 1, 0, depends); + result.put("status", "ok"); + result.put("depends", Json.toJson(depends)); + return ok(result); + } } diff --git a/web/app/dao/DatasetsDAO.java b/web/app/dao/DatasetsDAO.java index 8328030dc7..93a27c5678 100644 --- a/web/app/dao/DatasetsDAO.java +++ b/web/app/dao/DatasetsDAO.java @@ -314,6 +314,11 @@ public class DatasetsDAO extends AbstractMySQLOpenSourceDAO private final static String GET_DATASET_OWNER_TYPES = "SELECT DISTINCT owner_type " + "FROM dataset_owner WHERE owner_type is not null"; + private final static String GET_DATASET_DEPENDS_VIEW = "SELECT object_type, object_sub_type, " + + "object_name, object_urn, map_phrase, map_phrase_reversed, mapped_object_dataset_id, " + + "mapped_object_type, mapped_object_sub_type, mapped_object_name, mapped_object_urn " + + "FROM stg_cfg_object_name_map WHERE object_dataset_id = ?"; + public static List getDatasetOwnerTypes() { return getJdbcTemplate().queryForList(GET_DATASET_OWNER_TYPES, String.class); @@ -1713,4 +1718,51 @@ public class DatasetsDAO extends AbstractMySQLOpenSourceDAO return result; } + + public static void getDatasetDependencies( + Long datasetId, + int level, + int parent, + List depends) + { + if (depends == null) + { + depends = new ArrayList(); + } + + List> rows = null; + rows = getJdbcTemplate().queryForList( + GET_DATASET_DEPENDS_VIEW, + datasetId); + + if (rows != null) + { + for (Map row : rows) { + DatasetDependency dd = new DatasetDependency(); + dd.datasetId = (Long) row.get("mapped_object_dataset_id"); + dd.objectName = (String) row.get("mapped_object_name"); + dd.objectType = (String) row.get("mapped_object_type"); + dd.objectSubType = (String) row.get("mapped_object_sub_type"); + dd.datasetUrn = (String) row.get("mapped_object_urn"); + if (dd.datasetId != null && dd.datasetId > 0) + { + dd.isValidDataset = true; + dd.datasetLink = "#/datasets/" + Long.toString(dd.datasetId); + } + else + { + dd.isValidDataset = false; + } + dd.level = level; + dd.sortId = depends.size() + 1; + dd.treeGridClass = "treegrid-" + Integer.toString(dd.sortId); + if (parent != 0) + { + dd.treeGridClass += " treegrid-parent-" + Integer.toString(parent); + } + depends.add(dd); + getDatasetDependencies(dd.datasetId, level + 1, dd.sortId, depends); + } + } + } } diff --git a/web/app/models/DatasetDependency.java b/web/app/models/DatasetDependency.java new file mode 100644 index 0000000000..30770b74fc --- /dev/null +++ b/web/app/models/DatasetDependency.java @@ -0,0 +1,28 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package models; + +public class DatasetDependency { + + public Long datasetId; + public String datasetUrn; + public String datasetLink; + public String objectType; + public String objectSubType; + public String objectName; + public String treeGridClass; + public int sortId; + public int level; + public boolean isValidDataset; +} diff --git a/web/app/views/index.scala.html b/web/app/views/index.scala.html index 852677d4d5..aa91a3567b 100644 --- a/web/app/views/index.scala.html +++ b/web/app/views/index.scala.html @@ -462,6 +462,41 @@ {{/if}} + + diff --git a/web/conf/routes b/web/conf/routes index 7d1d025867..3e2f9a8927 100644 --- a/web/conf/routes +++ b/web/conf/routes @@ -53,6 +53,8 @@ POST /api/v1/datasets/:id/owners controllers.api.v1.Dataset.updateDat GET /api/v1/datasets/:id/columns controllers.api.v1.Dataset.getDatasetColumnsByID(id:Int) +GET /api/v1/datasets/:id/depends controllers.api.v1.Dataset.getDependViews(id:Long) + GET /api/v1/datasets/:id/properties controllers.api.v1.Dataset.getDatasetPropertiesByID(id:Int) GET /api/v1/datasets/:id/sample controllers.api.v1.Dataset.getDatasetSampleDataByID(id:Int) diff --git a/web/public/javascripts/routers/datasets.js b/web/public/javascripts/routers/datasets.js index 31397225bc..bc43730d40 100644 --- a/web/public/javascripts/routers/datasets.js +++ b/web/public/javascripts/routers/datasets.js @@ -118,7 +118,7 @@ function convertPropertiesToArray(properties) { for (var key in properties) { - if (key.toLowerCase() != 'elements') + if ((key.toLowerCase() != 'elements') && (key.toLowerCase() != 'view_depends_on')) { if (typeof properties[key] !== 'object') { @@ -399,6 +399,23 @@ App.DatasetRoute = Ember.Route.extend({ } }); + var datasetDependsUrl = 'api/v1/datasets/' + id + "/depends"; + $.get(datasetDependsUrl, function(data) { + if (data && data.status == "ok") + { + if (data.depends && (data.depends.length > 0)) + { + controller.set("hasDepends", true); + controller.set("depends", data.depends); + setTimeout(initializeColumnTreeGrid, 500); + } + else + { + controller.set("hasDepends", false); + } + } + }); + var ownershipUrl = 'api/v1/datasets/' + id + "/owners"; $.get(ownershipUrl, function(data) { if (data && data.status == "ok") diff --git a/web/public/javascripts/views/views.js b/web/public/javascripts/views/views.js index 1e3601b9d7..1740e75c33 100644 --- a/web/public/javascripts/views/views.js +++ b/web/public/javascripts/views/views.js @@ -18,6 +18,10 @@ App.OwnerView = Ember.View.extend({ templateName: 'owner' }); +App.DependView = Ember.View.extend({ + templateName: 'depend' +}); + App.DetailView = Ember.View.extend({ templateName: 'detail', didInsertElement: function() { diff --git a/wherehows-common/src/main/java/wherehows/common/Constant.java b/wherehows-common/src/main/java/wherehows/common/Constant.java index 5ca00fe435..3ba8f16b5b 100644 --- a/wherehows-common/src/main/java/wherehows/common/Constant.java +++ b/wherehows-common/src/main/java/wherehows/common/Constant.java @@ -168,6 +168,8 @@ public class Constant { public static final String HIVE_DATABASE_WHITELIST_KEY = "hive.database_white_list"; public static final String HIVE_SCHEMA_JSON_FILE_KEY = "hive.schema_json_file"; + public static final String HIVE_DEPENDENCY_CSV_FILE_KEY = "hive.dependency_csv_file"; + public static final String HIVE_INSTANCE_CSV_FILE_KEY = "hive.instance_csv_file"; // public static final String HIVE_SAMPLE_CSV_FILE_KEY = "hive.sample_csv"; public static final String HIVE_SCHEMA_CSV_FILE_KEY = "hive.schema_csv_file"; public static final String HIVE_FIELD_METADATA_KEY = "hive.field_metadata"; diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetInstanceRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetInstanceRecord.java new file mode 100644 index 0000000000..d8db31cc14 --- /dev/null +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetInstanceRecord.java @@ -0,0 +1,81 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package wherehows.common.schemas; + +import java.util.ArrayList; +import java.util.List; + + +/** + * Used to generate one record in the dataset schema data file. Load into staging table. + * Created by zsun on 8/25/15. + */ +public class DatasetInstanceRecord extends AbstractRecord { + String datasetUrn; + Integer dbId; + String deploymentTier; + String dataCenter; + String serverCluster; + String slice; + Integer statusId; + String nativeName; + String logicalName; + String version; + Long instanceCreatedUnixtime; + Long createdUnixtime; + Long whExecId; + String abstractedDatasetUrn; + + public DatasetInstanceRecord(String datasetUrn, Integer dbId, String deploymentTier, String dataCenter, + String serverCluster, String slice, Integer statusId, String nativeName, + String logicalName, String version, Long instanceCreatedUnixtime, + Long createdUnixtime, Long whExecId, String abstractedDatasetUrn) { + this.datasetUrn = datasetUrn; + this.dbId = dbId; + this.deploymentTier = deploymentTier; + this.dataCenter = dataCenter; + this.serverCluster = serverCluster; + this.slice = slice; + this.statusId = statusId; + this.nativeName = nativeName; + this.logicalName = logicalName; + this.version = version; + this.instanceCreatedUnixtime = instanceCreatedUnixtime; + this.createdUnixtime = createdUnixtime; + this.whExecId = whExecId; + this.abstractedDatasetUrn = abstractedDatasetUrn; + } + + @Override + public List fillAllFields() { + List allFields = new ArrayList<>(); + allFields.add(datasetUrn); + allFields.add(dbId); + allFields.add(deploymentTier); + allFields.add(dataCenter); + allFields.add(serverCluster); + allFields.add(slice); + allFields.add(statusId); + allFields.add(nativeName); + allFields.add(logicalName); + allFields.add(version); + allFields.add(instanceCreatedUnixtime); + allFields.add(createdUnixtime); + allFields.add(whExecId); + allFields.add(abstractedDatasetUrn); + return allFields; + } + + +} diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSchemaRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSchemaRecord.java index c2439ebbfe..72e4ced24f 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSchemaRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSchemaRecord.java @@ -28,11 +28,14 @@ public class DatasetSchemaRecord extends AbstractRecord { String fields; String urn; String source; + String datasetType; + String storageType; String samplePartitionFullPath; Integer sourceCreated; Integer sourceModified; - public DatasetSchemaRecord(String name, String schema, String properties, String fields, String urn, String source, String samplePartitionFullPath, + public DatasetSchemaRecord(String name, String schema, String properties, String fields, String urn, String source, + String datasetType, String storageType, String samplePartitionFullPath, Integer sourceCreated, Integer sourceModified) { this.name = name; this.schema = schema; @@ -40,6 +43,8 @@ public class DatasetSchemaRecord extends AbstractRecord { this.fields = fields; this.urn = urn; this.source = source; + this.datasetType = datasetType; + this.storageType = storageType; this.samplePartitionFullPath = samplePartitionFullPath; this.sourceCreated = sourceCreated; this.sourceModified = sourceModified; @@ -54,6 +59,8 @@ public class DatasetSchemaRecord extends AbstractRecord { allFields.add(fields); allFields.add(urn); allFields.add(source); + allFields.add(datasetType); + allFields.add(storageType); allFields.add(samplePartitionFullPath); allFields.add(sourceCreated); allFields.add(sourceModified); diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/HiveDependencyInstanceRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/HiveDependencyInstanceRecord.java new file mode 100644 index 0000000000..c062b070db --- /dev/null +++ b/wherehows-common/src/main/java/wherehows/common/schemas/HiveDependencyInstanceRecord.java @@ -0,0 +1,72 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package wherehows.common.schemas; + +import java.util.ArrayList; +import java.util.List; + + +/** + * Used to generate one record in the dataset schema data file. Load into staging table. + * Created by zsun on 8/25/15. + */ +public class HiveDependencyInstanceRecord extends AbstractRecord { + String objectType; + String objectSubType; + String objectName; + String mapPhrase; + String mapPhraseReversed; + String mappedObjectType; + String mappedObjectSubType; + String mappedObjectName; + String description; + String objectUrn; + String mappedObjectUrn; + + public HiveDependencyInstanceRecord(String objectType, String objectSubType, String objectName, String objectUrn, + String mapPhrase, String mapPhraseReversed, String mappedObjectType, + String mappedObjectSubType, String mappedObjectName, + String mappedObjectUrn, String description) { + this.objectType = objectType; + this.objectSubType = objectSubType; + this.objectName = objectName; + this.objectUrn = objectUrn; + this.mapPhrase = mapPhrase; + this.mapPhraseReversed = mapPhraseReversed; + this.mappedObjectType = mappedObjectType; + this.mappedObjectSubType = mappedObjectSubType; + this.mappedObjectName = mappedObjectName; + this.mappedObjectUrn = mappedObjectUrn; + this.description = description; + } + + @Override + public List fillAllFields() { + List allFields = new ArrayList<>(); + allFields.add(objectType); + allFields.add(objectSubType); + allFields.add(objectName); + allFields.add(objectUrn); + allFields.add(mapPhrase); + allFields.add(mapPhraseReversed); + allFields.add(mappedObjectType); + allFields.add(mappedObjectSubType); + allFields.add(mappedObjectName); + allFields.add(mappedObjectUrn); + allFields.add(description); + return allFields; + } + + +}