diff --git a/backend-service/app/models/utils/Urn.java b/backend-service/app/models/utils/Urn.java index 2831b0db22..658cba47dd 100644 --- a/backend-service/app/models/utils/Urn.java +++ b/backend-service/app/models/utils/Urn.java @@ -24,25 +24,41 @@ import play.Logger; */ public class Urn { public String urnString; - public String storageType; + public String datasetType; public String schemaName; public String abstractObjectName; - static final String[] stoList = new String[] {"teradata", "hdfs"}; - static final Set storageTypes = new HashSet(Arrays.asList(stoList)); + static final String[] stoList = new String[] {"teradata", "hdfs", "hive", "dalids", "oracle", "mysql", "pinot"}; + static final Set datasetTypes = new HashSet(Arrays.asList(stoList)); + /** + * Urn can contain 3 parts + * (1) (2) (3) + * dataset_type://cluster:port/parent/name + * the 2nd part is only used to identify deployed dataset instance + * for dataset definition, we only use part (1) + (3) + */ public Urn(String urnString) { this.urnString = urnString; String[] splitResult = urnString.split(":///"); - storageType = splitResult[0].toLowerCase(); + datasetType = splitResult[0].toLowerCase(); Logger.debug(urnString); - switch (storageType) { - /* example: hdfs://data/tracking/PageViewEvent -> 'hdfs', '', 'data/tracking/PageViewEvent' */ + switch (datasetType) { + /* example: hdfs:///data/tracking/PageViewEvent -> 'hdfs', '', 'data/tracking/PageViewEvent' */ case "hdfs": abstractObjectName = "/" + splitResult[1]; schemaName = ""; break; - /* example: teradata://dwh/dwh_dim/domain_name -> 'teradata', 'dwh/dwh_dim', 'domain_name' */ - case "teradata": String[] split2 = splitResult[1].split("/"); + /* example: teradata:///dwh_dim/dim_table_name -> 'teradata', 'dwh_dim', 'dim_table_name' + * hive:///db_name/table_name -> 'hive', 'db_name', 'table_name' + * */ + case "teradata": + case "oracle": + case "mysql": + case "espresso": + case "pinot": + case "hive": + case "dalids": + String[] split2 = splitResult[1].split("/"); abstractObjectName = split2[split2.length-1]; StringBuffer sb = new StringBuffer(); if (split2.length > 1) { @@ -58,23 +74,31 @@ public class Urn { } } - public Urn(String storageType, String schemaName, String abstractObjectName) { - this.storageType = storageType.toLowerCase(); + public Urn(String datasetType, String schemaName, String abstractObjectName) { + this.datasetType = datasetType.toLowerCase(); if (schemaName != null) this.schemaName = schemaName.toLowerCase(); this.abstractObjectName = abstractObjectName; - switch (this.storageType) { - case "teradata" : urnString = "teradata:///" + schemaName + "/" + abstractObjectName; + switch (this.datasetType) { + case "teradata": + case "oracle": + case "mysql": + case "espresso": + case "pinot": + case "hive": + case "dalids": + urnString = this.datasetType + ":///" + schemaName + "/" + abstractObjectName; break; default: String trimName = abstractObjectName.startsWith("/") ? abstractObjectName.substring(1) : abstractObjectName; - urnString = this.storageType + ":///" + trimName; + urnString = this.datasetType + ":///" + trimName; } } public static boolean validateUrn(String urnString) { String[] splitResult = urnString.split(":///"); - if (storageTypes.contains(splitResult[0]) && splitResult.length > 1) + if ((datasetTypes.contains(splitResult[0]) || splitResult[0].matches("\\w+")) && + splitResult.length > 1) return true; return false; } diff --git a/backend-service/build.sbt b/backend-service/build.sbt index 6f2d44b8c5..9763184a71 100644 --- a/backend-service/build.sbt +++ b/backend-service/build.sbt @@ -17,10 +17,11 @@ libraryDependencies ++= Seq( "org.mockito" % "mockito-core" % "1.9.5", "org.quartz-scheduler" % "quartz" % "2.2.1", "org.quartz-scheduler" % "quartz-jobs" % "2.2.1", - "org.slf4j" % "slf4j-api" % "1.6.6", + "org.slf4j" % "slf4j-api" % "1.7.21", "org.jasypt" % "jasypt" % "1.9.2", "org.apache.kafka" % "kafka_2.10" % "0.10.0.0", "org.apache.kafka" % "kafka-clients" % "0.10.0.0" -) +).map(_.exclude("log4j", "log4j")) + .map(_.exclude("org.slf4j", "slf4j-log4j12")) play.Project.playJavaSettings diff --git a/metadata-etl/build.gradle b/metadata-etl/build.gradle index c3365b94b8..db07b8b1b7 100644 --- a/metadata-etl/build.gradle +++ b/metadata-etl/build.gradle @@ -15,6 +15,7 @@ configurations { dependencySubstitution { substitute module('org.slf4j:slf4j-log4j12') with module('ch.qos.logback:logback-classic:1.1.7') //prefer 'log4j-over-slf4j' over 'log4j' + substitute module('log4j:log4j') with module('org.slf4j:log4j-over-slf4j:1.7.21') } } } diff --git a/metadata-etl/src/main/java/metadata/etl/dataset/hdfs/HdfsMetadataEtl.java b/metadata-etl/src/main/java/metadata/etl/dataset/hdfs/HdfsMetadataEtl.java index d4038b0d36..86b3647793 100644 --- a/metadata-etl/src/main/java/metadata/etl/dataset/hdfs/HdfsMetadataEtl.java +++ b/metadata-etl/src/main/java/metadata/etl/dataset/hdfs/HdfsMetadataEtl.java @@ -19,8 +19,8 @@ import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; - import com.jcraft.jsch.SftpException; + import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -32,8 +32,11 @@ import java.io.StringWriter; import java.net.URL; import java.util.Properties; import java.lang.reflect.*; -import metadata.etl.EtlJob; +import java.lang.ProcessBuilder; + import org.apache.commons.io.FileUtils; + +import metadata.etl.EtlJob; import wherehows.common.Constant; @@ -84,8 +87,8 @@ public class HdfsMetadataEtl extends EtlJob { File dest = new File(remoteJarFile); try { FileUtils.copyURLToFile(localJarUrl, dest); - } catch(Exception e) { - logger.error(e.toString()); + } catch(Exception e) { + logger.error(e.toString()); } String outputSchemaFile = prop.getProperty(Constant.HDFS_SCHEMA_LOCAL_PATH_KEY); @@ -95,20 +98,23 @@ public class HdfsMetadataEtl extends EtlJob { String numOfThread = prop.getProperty(Constant.HDFS_NUM_OF_THREAD_KEY, String.valueOf(1)); String hdfsUser = prop.getProperty(Constant.HDFS_REMOTE_USER_KEY); String hdfsKeyTab = prop.getProperty(Constant.HDFS_REMOTE_KEYTAB_LOCATION_KEY); + String hdfsExtractLogFile = outputSchemaFile + ".log"; - String execCmd = - "hadoop jar " + remoteJarFile - + " -D " + Constant.HDFS_SCHEMA_REMOTE_PATH_KEY + "=" + outputSchemaFile - + " -D " + Constant.HDFS_SAMPLE_REMOTE_PATH_KEY + "=" + outputSampleDataFile - + " -D " + Constant.HDFS_CLUSTER_KEY + "=" + cluster - + " -D " + Constant.HDFS_WHITE_LIST_KEY + "=" + whiteList - + " -D " + Constant.HDFS_NUM_OF_THREAD_KEY + "=" + numOfThread - + " -D " + Constant.HDFS_REMOTE_USER_KEY + "=" + hdfsUser - + " -D log_file_name=hdfs_schema_fetch" - + " -D " + Constant.HDFS_REMOTE_KEYTAB_LOCATION_KEY + "=" + hdfsKeyTab; - //logger.info("executue remote command : " + execCmd); + String[] hadoopCmd = {"hadoop", "jar", remoteJarFile, + "-D" + Constant.HDFS_SCHEMA_REMOTE_PATH_KEY + "=" + outputSchemaFile, + "-D" + Constant.HDFS_SAMPLE_REMOTE_PATH_KEY + "=" + outputSampleDataFile, + "-D" + Constant.HDFS_CLUSTER_KEY + "=" + cluster, + "-D" + Constant.HDFS_WHITE_LIST_KEY + "=" + whiteList, + "-D" + Constant.HDFS_NUM_OF_THREAD_KEY + "=" + numOfThread, + "-D" + Constant.HDFS_REMOTE_USER_KEY + "=" + hdfsUser, + "-D" + Constant.HDFS_REMOTE_KEYTAB_LOCATION_KEY + "=" + hdfsKeyTab, + "-Dlog.file.name=hdfs_schema_fetch" }; - Process process = Runtime.getRuntime().exec(execCmd); + ProcessBuilder pb = new ProcessBuilder(hadoopCmd); + File logFile = new File(hdfsExtractLogFile); + pb.redirectErrorStream(true); + pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile)); + Process process = pb.start(); int pid = -1; if(process.getClass().getName().equals("java.lang.UNIXProcess")) { /* get the PID on unix/linux systems */ @@ -119,22 +125,7 @@ public class HdfsMetadataEtl extends EtlJob { } catch (Throwable e) { } } - logger.info("executue remote command [PID=" + pid + "]: " + execCmd); - - BufferedInputStream stdout = new BufferedInputStream(process.getInputStream()); - byte[] bytes = new byte[4096]; - while (stdout.read(bytes) != -1) {} - - String line = null; - /* @need to redo this part using ProcessBuilder + redirection - InputStream stdout = process.getInputStream(); - InputStreamReader isr = new InputStreamReader(stdout); - BufferedReader br = new BufferedReader(isr); - - - while ( (line = br.readLine()) != null) { - logger.info(line); - }*/ + logger.info("executue command [PID=" + pid + "]: " + hadoopCmd); // wait until this process finished. int execResult = process.waitFor(); @@ -142,7 +133,8 @@ public class HdfsMetadataEtl extends EtlJob { // if the process failed, log the error and throw exception if (execResult > 0) { BufferedReader br = new BufferedReader(new InputStreamReader(process.getErrorStream())); - String errString = "Error Details:\n"; + String errString = "HDFS Metadata Extract Error:\n"; + String line = ""; while((line = br.readLine()) != null) errString = errString.concat(line).concat("\n"); logger.error("*** Process failed, status: " + execResult); @@ -230,7 +222,7 @@ public class HdfsMetadataEtl extends EtlJob { } logger.info("ExecChannel exit-status: " + execChannel.getExitStatus()); - + execChannel.disconnect(); // scp back the result diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index 753dea8d01..4c627a17aa 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -61,7 +61,7 @@ class HdfsLoad: update stg_dict_dataset set name = substring_index(urn, '/', -2) where db_id = {db_id} - and name in ('1.0', '2.0', '3.0', '4.0', '0.1', '0.2', '0.3', '0.4', 'dedup', '1-day', '7-day'); + and name regexp '[0-9]+\\.[0-9]+|dedup|dedupe|[0-9]+-day'; -- update parent name, this depends on the data from source system update stg_dict_dataset @@ -109,7 +109,7 @@ class HdfsLoad: wh_etl_exec_id={wh_etl_exec_id}, abstract_dataset_urn=s.urn, schema_text=s.schema; -- insert into final table - INSERT INTO dict_dataset + INSERT IGNORE INTO dict_dataset ( `name`, `schema`, schema_type, @@ -145,11 +145,11 @@ class HdfsLoad: on duplicate key update `name`=s.name, `schema`=s.schema, schema_type=s.schema_type, fields=s.fields, properties=s.properties, source=s.source, location_prefix=s.location_prefix, parent_name=s.parent_name, - storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id, - dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, + storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id, + dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, partition_layout_pattern_id=s.partition_layout_pattern_id, sample_partition_full_path=s.sample_partition_full_path, source_created_time=s.source_created_time, source_modified_time=s.source_modified_time, - modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id + modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id ; analyze table dict_dataset; @@ -159,7 +159,7 @@ class HdfsLoad: and sdi.db_id = {db_id}; -- insert into final instance table - INSERT INTO dict_dataset_instance + INSERT IGNORE INTO dict_dataset_instance ( dataset_id, db_id, deployment_tier, @@ -195,11 +195,13 @@ class HdfsLoad: instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id ; '''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) + for state in load_cmd.split(";"): self.logger.debug(state) cursor.execute(state) self.conn_mysql.commit() cursor.close() + self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_dataset".format(db_id=self.db_id)) def load_field(self): cursor = self.conn_mysql.cursor() @@ -359,6 +361,7 @@ class HdfsLoad: cursor.execute(state) self.conn_mysql.commit() cursor.close() + self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_field_detail".format(db_id=self.db_id)) def load_sample(self): cursor = self.conn_mysql.cursor() @@ -402,6 +405,7 @@ class HdfsLoad: cursor.execute(state) self.conn_mysql.commit() cursor.close() + self.logger.info("finish loading hdfs sample data db_id={db_id} to dict_dataset_sample".format(db_id=self.db_id)) if __name__ == "__main__": diff --git a/metadata-etl/src/main/resources/jython/HiveLoad.py b/metadata-etl/src/main/resources/jython/HiveLoad.py index ea4191022d..5254ee69f2 100644 --- a/metadata-etl/src/main/resources/jython/HiveLoad.py +++ b/metadata-etl/src/main/resources/jython/HiveLoad.py @@ -204,7 +204,7 @@ class HiveLoad: ); - insert into dict_field_detail ( + insert ignore into dict_field_detail ( dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, field_name, namespace, data_type, data_size, is_nullable, default_value, modified