mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-26 17:37:33 +00:00
Add local mode for hdfs extract
This commit is contained in:
parent
c1e8baf1c5
commit
bec1c5cee0
@ -17,14 +17,21 @@ import com.jcraft.jsch.Channel;
|
||||
import com.jcraft.jsch.ChannelExec;
|
||||
import com.jcraft.jsch.ChannelSftp;
|
||||
import com.jcraft.jsch.JSch;
|
||||
import com.jcraft.jsch.JSchException;
|
||||
import com.jcraft.jsch.Session;
|
||||
|
||||
import com.jcraft.jsch.SftpException;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URL;
|
||||
import java.util.Properties;
|
||||
import metadata.etl.EtlJob;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import wherehows.common.Constant;
|
||||
|
||||
|
||||
@ -57,6 +64,73 @@ public class HdfsMetadataEtl extends EtlJob {
|
||||
public void extract()
|
||||
throws Exception {
|
||||
logger.info("Begin hdfs metadata extract!");
|
||||
boolean isRemote = Boolean.valueOf(prop.getProperty(Constant.HDFS_REMOTE, "false"));
|
||||
if (isRemote) {
|
||||
extractRemote();
|
||||
} else {
|
||||
extractLocal();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void extractLocal()
|
||||
throws Exception {
|
||||
|
||||
URL localJarUrl = classLoader.getResource("jar/schemaFetch.jar");
|
||||
String homeDir = System.getProperty("user.home");
|
||||
String remoteJarFile = homeDir + "/.wherehows/schemaFetch.jar";
|
||||
File dest = new File(remoteJarFile);
|
||||
FileUtils.copyURLToFile(localJarUrl, dest);
|
||||
|
||||
String outputSchemaFile = prop.getProperty(Constant.HDFS_SCHEMA_LOCAL_PATH_KEY);
|
||||
String outputSampleDataFile = prop.getProperty(Constant.HDFS_SAMPLE_LOCAL_PATH_KEY);
|
||||
String cluster = prop.getProperty(Constant.HDFS_CLUSTER_KEY);
|
||||
String whiteList = prop.getProperty(Constant.HDFS_WHITE_LIST_KEY);
|
||||
String numOfThread = prop.getProperty(Constant.HDFS_NUM_OF_THREAD_KEY, String.valueOf(1));
|
||||
String hdfsUser = prop.getProperty(Constant.HDFS_REMOTE_USER_KEY);
|
||||
String hdfsKeyTab = prop.getProperty(Constant.HDFS_REMOTE_KEYTAB_LOCATION_KEY);
|
||||
|
||||
String execCmd =
|
||||
"hadoop jar " + remoteJarFile
|
||||
+ " -D " + Constant.HDFS_SCHEMA_REMOTE_PATH_KEY + "=" + outputSchemaFile
|
||||
+ " -D " + Constant.HDFS_SAMPLE_REMOTE_PATH_KEY + "=" + outputSampleDataFile
|
||||
+ " -D " + Constant.HDFS_CLUSTER_KEY + "=" + cluster
|
||||
+ " -D " + Constant.HDFS_WHITE_LIST_KEY + "=" + whiteList
|
||||
+ " -D " + Constant.HDFS_NUM_OF_THREAD_KEY + "=" + numOfThread
|
||||
+ " -D " + Constant.HDFS_REMOTE_USER_KEY + "=" + hdfsUser
|
||||
+ " -D " + Constant.HDFS_REMOTE_KEYTAB_LOCATION_KEY + "=" + hdfsKeyTab;
|
||||
logger.info("executue remote command : " + execCmd);
|
||||
|
||||
Process process = Runtime.getRuntime().exec(execCmd);
|
||||
|
||||
InputStream stdout = process.getInputStream();
|
||||
InputStreamReader isr = new InputStreamReader(stdout);
|
||||
BufferedReader br = new BufferedReader(isr);
|
||||
String line = null;
|
||||
while ( (line = br.readLine()) != null) {
|
||||
logger.info(line);
|
||||
}
|
||||
|
||||
// wait until this process finished.
|
||||
int execResult = process.waitFor();
|
||||
|
||||
// if the process failed, log the error and throw exception
|
||||
if (execResult > 0) {
|
||||
br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
|
||||
String errString = "Error Details:\n";
|
||||
while((line = br.readLine()) != null)
|
||||
errString = errString.concat(line).concat("\n");
|
||||
logger.error("*** Process failed, status: " + execResult);
|
||||
logger.error(errString);
|
||||
throw new Exception("Process + failed");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
private void extractRemote()
|
||||
throws JSchException, SftpException, IOException {
|
||||
logger.info("Remote mode!");
|
||||
JSch jsch = new JSch();
|
||||
//jsch.setLogger(logger);
|
||||
final Log4JOutputStream log4JOutputStream = new Log4JOutputStream();
|
||||
|
||||
@ -92,6 +92,8 @@ public class Constant {
|
||||
public static final String TD_LOAD_SAMPLE = "teradata.load_sample";
|
||||
|
||||
// Hdfs
|
||||
/** The property_name field in wh_etl_job_property table. Whether using remote mode or not */
|
||||
public static final String HDFS_REMOTE = "hdfs.remote.mode";
|
||||
/** The property_name field in wh_etl_job_property table. The hfds remote user that run the hadoop job on gateway */
|
||||
public static final String HDFS_REMOTE_USER_KEY = "hdfs.remote.user";
|
||||
/** The property_name field in wh_etl_job_property table. The gateway machine name*/
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user