Use ProcessBuilder and redirected log file for HDFS Extract (#198)

* Use ProcessBuilder and redirected log file for HDFS Extract

* relax urn validation rule
This commit is contained in:
Eric Sun 2016-08-08 14:02:34 -07:00 committed by GitHub
parent 39cec22e25
commit cd4853d0a5
6 changed files with 79 additions and 57 deletions

View File

@ -24,25 +24,41 @@ import play.Logger;
*/
public class Urn {
public String urnString;
public String storageType;
public String datasetType;
public String schemaName;
public String abstractObjectName;
static final String[] stoList = new String[] {"teradata", "hdfs"};
static final Set<String> storageTypes = new HashSet<String>(Arrays.asList(stoList));
static final String[] stoList = new String[] {"teradata", "hdfs", "hive", "dalids", "oracle", "mysql", "pinot"};
static final Set<String> datasetTypes = new HashSet<String>(Arrays.asList(stoList));
/**
* Urn can contain 3 parts
* (1) (2) (3)
* dataset_type://cluster:port/parent/name
* the 2nd part is only used to identify deployed dataset instance
* for dataset definition, we only use part (1) + (3)
*/
public Urn(String urnString) {
this.urnString = urnString;
String[] splitResult = urnString.split(":///");
storageType = splitResult[0].toLowerCase();
datasetType = splitResult[0].toLowerCase();
Logger.debug(urnString);
switch (storageType) {
/* example: hdfs://data/tracking/PageViewEvent -> 'hdfs', '', 'data/tracking/PageViewEvent' */
switch (datasetType) {
/* example: hdfs:///data/tracking/PageViewEvent -> 'hdfs', '', 'data/tracking/PageViewEvent' */
case "hdfs": abstractObjectName = "/" + splitResult[1];
schemaName = "";
break;
/* example: teradata://dwh/dwh_dim/domain_name -> 'teradata', 'dwh/dwh_dim', 'domain_name' */
case "teradata": String[] split2 = splitResult[1].split("/");
/* example: teradata:///dwh_dim/dim_table_name -> 'teradata', 'dwh_dim', 'dim_table_name'
* hive:///db_name/table_name -> 'hive', 'db_name', 'table_name'
* */
case "teradata":
case "oracle":
case "mysql":
case "espresso":
case "pinot":
case "hive":
case "dalids":
String[] split2 = splitResult[1].split("/");
abstractObjectName = split2[split2.length-1];
StringBuffer sb = new StringBuffer();
if (split2.length > 1) {
@ -58,23 +74,31 @@ public class Urn {
}
}
public Urn(String storageType, String schemaName, String abstractObjectName) {
this.storageType = storageType.toLowerCase();
public Urn(String datasetType, String schemaName, String abstractObjectName) {
this.datasetType = datasetType.toLowerCase();
if (schemaName != null)
this.schemaName = schemaName.toLowerCase();
this.abstractObjectName = abstractObjectName;
switch (this.storageType) {
case "teradata" : urnString = "teradata:///" + schemaName + "/" + abstractObjectName;
switch (this.datasetType) {
case "teradata":
case "oracle":
case "mysql":
case "espresso":
case "pinot":
case "hive":
case "dalids":
urnString = this.datasetType + ":///" + schemaName + "/" + abstractObjectName;
break;
default: String trimName = abstractObjectName.startsWith("/") ? abstractObjectName.substring(1) : abstractObjectName;
urnString = this.storageType + ":///" + trimName;
urnString = this.datasetType + ":///" + trimName;
}
}
public static boolean validateUrn(String urnString) {
String[] splitResult = urnString.split(":///");
if (storageTypes.contains(splitResult[0]) && splitResult.length > 1)
if ((datasetTypes.contains(splitResult[0]) || splitResult[0].matches("\\w+")) &&
splitResult.length > 1)
return true;
return false;
}

View File

@ -17,10 +17,11 @@ libraryDependencies ++= Seq(
"org.mockito" % "mockito-core" % "1.9.5",
"org.quartz-scheduler" % "quartz" % "2.2.1",
"org.quartz-scheduler" % "quartz-jobs" % "2.2.1",
"org.slf4j" % "slf4j-api" % "1.6.6",
"org.slf4j" % "slf4j-api" % "1.7.21",
"org.jasypt" % "jasypt" % "1.9.2",
"org.apache.kafka" % "kafka_2.10" % "0.10.0.0",
"org.apache.kafka" % "kafka-clients" % "0.10.0.0"
)
).map(_.exclude("log4j", "log4j"))
.map(_.exclude("org.slf4j", "slf4j-log4j12"))
play.Project.playJavaSettings

View File

@ -15,6 +15,7 @@ configurations {
dependencySubstitution {
substitute module('org.slf4j:slf4j-log4j12') with module('ch.qos.logback:logback-classic:1.1.7')
//prefer 'log4j-over-slf4j' over 'log4j'
substitute module('log4j:log4j') with module('org.slf4j:log4j-over-slf4j:1.7.21')
}
}
}

View File

@ -19,8 +19,8 @@ import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
@ -32,8 +32,11 @@ import java.io.StringWriter;
import java.net.URL;
import java.util.Properties;
import java.lang.reflect.*;
import metadata.etl.EtlJob;
import java.lang.ProcessBuilder;
import org.apache.commons.io.FileUtils;
import metadata.etl.EtlJob;
import wherehows.common.Constant;
@ -84,8 +87,8 @@ public class HdfsMetadataEtl extends EtlJob {
File dest = new File(remoteJarFile);
try {
FileUtils.copyURLToFile(localJarUrl, dest);
} catch(Exception e) {
logger.error(e.toString());
} catch(Exception e) {
logger.error(e.toString());
}
String outputSchemaFile = prop.getProperty(Constant.HDFS_SCHEMA_LOCAL_PATH_KEY);
@ -95,20 +98,23 @@ public class HdfsMetadataEtl extends EtlJob {
String numOfThread = prop.getProperty(Constant.HDFS_NUM_OF_THREAD_KEY, String.valueOf(1));
String hdfsUser = prop.getProperty(Constant.HDFS_REMOTE_USER_KEY);
String hdfsKeyTab = prop.getProperty(Constant.HDFS_REMOTE_KEYTAB_LOCATION_KEY);
String hdfsExtractLogFile = outputSchemaFile + ".log";
String execCmd =
"hadoop jar " + remoteJarFile
+ " -D " + Constant.HDFS_SCHEMA_REMOTE_PATH_KEY + "=" + outputSchemaFile
+ " -D " + Constant.HDFS_SAMPLE_REMOTE_PATH_KEY + "=" + outputSampleDataFile
+ " -D " + Constant.HDFS_CLUSTER_KEY + "=" + cluster
+ " -D " + Constant.HDFS_WHITE_LIST_KEY + "=" + whiteList
+ " -D " + Constant.HDFS_NUM_OF_THREAD_KEY + "=" + numOfThread
+ " -D " + Constant.HDFS_REMOTE_USER_KEY + "=" + hdfsUser
+ " -D log_file_name=hdfs_schema_fetch"
+ " -D " + Constant.HDFS_REMOTE_KEYTAB_LOCATION_KEY + "=" + hdfsKeyTab;
//logger.info("executue remote command : " + execCmd);
String[] hadoopCmd = {"hadoop", "jar", remoteJarFile,
"-D" + Constant.HDFS_SCHEMA_REMOTE_PATH_KEY + "=" + outputSchemaFile,
"-D" + Constant.HDFS_SAMPLE_REMOTE_PATH_KEY + "=" + outputSampleDataFile,
"-D" + Constant.HDFS_CLUSTER_KEY + "=" + cluster,
"-D" + Constant.HDFS_WHITE_LIST_KEY + "=" + whiteList,
"-D" + Constant.HDFS_NUM_OF_THREAD_KEY + "=" + numOfThread,
"-D" + Constant.HDFS_REMOTE_USER_KEY + "=" + hdfsUser,
"-D" + Constant.HDFS_REMOTE_KEYTAB_LOCATION_KEY + "=" + hdfsKeyTab,
"-Dlog.file.name=hdfs_schema_fetch" };
Process process = Runtime.getRuntime().exec(execCmd);
ProcessBuilder pb = new ProcessBuilder(hadoopCmd);
File logFile = new File(hdfsExtractLogFile);
pb.redirectErrorStream(true);
pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile));
Process process = pb.start();
int pid = -1;
if(process.getClass().getName().equals("java.lang.UNIXProcess")) {
/* get the PID on unix/linux systems */
@ -119,22 +125,7 @@ public class HdfsMetadataEtl extends EtlJob {
} catch (Throwable e) {
}
}
logger.info("executue remote command [PID=" + pid + "]: " + execCmd);
BufferedInputStream stdout = new BufferedInputStream(process.getInputStream());
byte[] bytes = new byte[4096];
while (stdout.read(bytes) != -1) {}
String line = null;
/* @need to redo this part using ProcessBuilder + redirection
InputStream stdout = process.getInputStream();
InputStreamReader isr = new InputStreamReader(stdout);
BufferedReader br = new BufferedReader(isr);
while ( (line = br.readLine()) != null) {
logger.info(line);
}*/
logger.info("executue command [PID=" + pid + "]: " + hadoopCmd);
// wait until this process finished.
int execResult = process.waitFor();
@ -142,7 +133,8 @@ public class HdfsMetadataEtl extends EtlJob {
// if the process failed, log the error and throw exception
if (execResult > 0) {
BufferedReader br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
String errString = "Error Details:\n";
String errString = "HDFS Metadata Extract Error:\n";
String line = "";
while((line = br.readLine()) != null)
errString = errString.concat(line).concat("\n");
logger.error("*** Process failed, status: " + execResult);
@ -230,7 +222,7 @@ public class HdfsMetadataEtl extends EtlJob {
}
logger.info("ExecChannel exit-status: " + execChannel.getExitStatus());
execChannel.disconnect();
// scp back the result

View File

@ -61,7 +61,7 @@ class HdfsLoad:
update stg_dict_dataset
set name = substring_index(urn, '/', -2)
where db_id = {db_id}
and name in ('1.0', '2.0', '3.0', '4.0', '0.1', '0.2', '0.3', '0.4', 'dedup', '1-day', '7-day');
and name regexp '[0-9]+\\.[0-9]+|dedup|dedupe|[0-9]+-day';
-- update parent name, this depends on the data from source system
update stg_dict_dataset
@ -109,7 +109,7 @@ class HdfsLoad:
wh_etl_exec_id={wh_etl_exec_id}, abstract_dataset_urn=s.urn, schema_text=s.schema;
-- insert into final table
INSERT INTO dict_dataset
INSERT IGNORE INTO dict_dataset
( `name`,
`schema`,
schema_type,
@ -145,11 +145,11 @@ class HdfsLoad:
on duplicate key update
`name`=s.name, `schema`=s.schema, schema_type=s.schema_type, fields=s.fields,
properties=s.properties, source=s.source, location_prefix=s.location_prefix, parent_name=s.parent_name,
storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id,
dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned,
storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id,
dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned,
partition_layout_pattern_id=s.partition_layout_pattern_id, sample_partition_full_path=s.sample_partition_full_path,
source_created_time=s.source_created_time, source_modified_time=s.source_modified_time,
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id
;
analyze table dict_dataset;
@ -159,7 +159,7 @@ class HdfsLoad:
and sdi.db_id = {db_id};
-- insert into final instance table
INSERT INTO dict_dataset_instance
INSERT IGNORE INTO dict_dataset_instance
( dataset_id,
db_id,
deployment_tier,
@ -195,11 +195,13 @@ class HdfsLoad:
instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id
;
'''.format(source_file=self.input_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id)
for state in load_cmd.split(";"):
self.logger.debug(state)
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_dataset".format(db_id=self.db_id))
def load_field(self):
cursor = self.conn_mysql.cursor()
@ -359,6 +361,7 @@ class HdfsLoad:
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
self.logger.info("finish loading hdfs metadata db_id={db_id} to dict_field_detail".format(db_id=self.db_id))
def load_sample(self):
cursor = self.conn_mysql.cursor()
@ -402,6 +405,7 @@ class HdfsLoad:
cursor.execute(state)
self.conn_mysql.commit()
cursor.close()
self.logger.info("finish loading hdfs sample data db_id={db_id} to dict_dataset_sample".format(db_id=self.db_id))
if __name__ == "__main__":

View File

@ -204,7 +204,7 @@ class HiveLoad:
);
insert into dict_field_detail (
insert ignore into dict_field_detail (
dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path,
field_name, namespace, data_type, data_size, is_nullable, default_value,
modified