Add BaseJob in wherehows-common, make ETLjob extends from it (#681)

This commit is contained in:
Yi (Alan) Wang 2017-08-16 21:24:38 -07:00 committed by GitHub
parent cc1699761e
commit 28b83b8e7b
13 changed files with 108 additions and 104 deletions

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import wherehows.common.Constant;
import wherehows.common.jobs.Launcher;
import static org.apache.commons.lang3.StringUtils.*;
@ -74,7 +75,7 @@ class ConfigUtil {
.add("-DCONTEXT=" + etlJobName)
.add("-Dlogback.configurationFile=etl_logback.xml")
.add("-DLOG_DIR=" + outDir)
.add("metadata.etl.Launcher")
.add(Launcher.class.getCanonicalName())
.build());
pb.redirectOutput(ProcessBuilder.Redirect.to(new File(outDir + "/" + etlJobName + ".stdout")));
pb.redirectError(ProcessBuilder.Redirect.to(new File(outDir + "/" + etlJobName + ".stderr")));

View File

@ -18,18 +18,16 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import metadata.etl.Launcher;
import metadata.etl.models.EtlJobStatus;
import models.daos.EtlJobDao;
import msgs.EtlJobMessage;
import play.Logger;
import play.Play;
import shared.Global;
import wherehows.common.Constant;
import wherehows.common.jobs.JobStatus;
import wherehows.common.jobs.Launcher;
/**
@ -123,7 +121,7 @@ public class EtlJobActor extends UntypedActor {
throw new Exception("Process + " + getPid(process) + " failed");
}
EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.SUCCEEDED, "Job succeed!");
EtlJobDao.endRun(msg.getWhEtlExecId(), JobStatus.SUCCEEDED, "Job succeed!");
Logger.info("ETL job {} finished", msg.toDebugString());
if (props.getProperty(Constant.REBUILD_TREE_DATASET) != null) {
@ -139,11 +137,10 @@ public class EtlJobActor extends UntypedActor {
if (process.isAlive()) {
process.destroy();
}
EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.ERROR, e.getMessage());
EtlJobDao.endRun(msg.getWhEtlExecId(), JobStatus.ERROR, e.getMessage());
} finally {
Global.removeRunningJob(msg.getEtlJobName());
if (!Logger.isDebugEnabled()) // if debug enable, won't delete the config files.
{
if (!Logger.isDebugEnabled()) { // if debug enable, won't delete the config files.
ConfigUtil.deletePropertiesFile(msg.getWhEtlExecId(), configDir);
}
}

View File

@ -18,7 +18,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import metadata.etl.models.EtlJobStatus;
import wherehows.common.jobs.JobStatus;
import models.daos.EtlJobDao;
import msgs.EtlJobMessage;
import play.Logger;
@ -96,7 +96,7 @@ public class SchedulerActor extends UntypedActor {
if (Global.getCurrentRunningJob().contains(etlJobName)) {
Logger.error("The previous job is still running! Abort this job : " + etlMsg.toDebugString());
EtlJobDao.endRun(etlMsg.getWhEtlExecId(), EtlJobStatus.ERROR, "Previous is still running, Aborted!");
EtlJobDao.endRun(etlMsg.getWhEtlExecId(), JobStatus.ERROR, "Previous is still running, Aborted!");
} else {
Global.getCurrentRunningJob().add(etlJobName);
Logger.info("Send message : " + etlMsg.toDebugString());

View File

@ -17,7 +17,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import metadata.etl.models.EtlJobStatus;
import wherehows.common.jobs.JobStatus;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.support.KeyHolder;
import play.libs.Time;
@ -75,7 +75,7 @@ public class EtlJobDao {
public static long insertNewRun(String etlJobName) {
Map<String, Object> params = new HashMap<>();
params.put("whEtlJobName", etlJobName);
params.put("status", EtlJobStatus.REQUESTED.toString());
params.put("status", JobStatus.REQUESTED.toString());
params.put("requestTime", System.currentTimeMillis() / 1000);
KeyHolder keyHolder = JdbcUtil.insertRow(INSERT_NEW_RUN, params);
return (Long) keyHolder.getKey();
@ -84,13 +84,13 @@ public class EtlJobDao {
public static void startRun(long whEtlExecId, String message) {
Map<String, Object> params = new HashMap<>();
params.put("whEtlExecId", whEtlExecId);
params.put("status", EtlJobStatus.STARTED.toString());
params.put("status", JobStatus.STARTED.toString());
params.put("startTime", System.currentTimeMillis() / 1000);
params.put("message", message);
JdbcUtil.wherehowsNamedJdbcTemplate.update(START_RUN, params);
}
public static void endRun(long whEtlExecId, EtlJobStatus status, String message) {
public static void endRun(long whEtlExecId, JobStatus status, String message) {
Map<String, Object> params = new HashMap<>();
params.put("whEtlExecId", whEtlExecId);
params.put("status", status.toString());

View File

@ -36,7 +36,7 @@ public class ConfigUtilTest {
// then:
assertThat(pb.command()).contains("java", "-cp", System.getProperty("java.class.path"),
"-Dconfig=/var/tmp/wherehows/exec/0.properties", "-DCONTEXT=hdfs_metadata_etl",
"-Dlogback.configurationFile=etl_logback.xml", "metadata.etl.Launcher");
"-Dlogback.configurationFile=etl_logback.xml", "wherehows.common.jobs.Launcher");
}
@Test
@ -54,7 +54,7 @@ public class ConfigUtilTest {
assertThat(pb.command()).contains("java", "-a", "-b", "-cp", System.getProperty("java.class.path"),
"-Dconfig=" + applicationDirectory + "/exec/1.properties", "-DCONTEXT=ldap_user_etl",
"-DLOG_DIR=" + applicationDirectory, "-Dlogback.configurationFile=etl_logback.xml",
"metadata.etl.Launcher");
"wherehows.common.jobs.Launcher");
assertThat(pb.redirectError().file().getPath().equals("./temp/LDAP_USER_ETL.stderr"));
assertThat(pb.redirectOutput().file().getPath().equals("./temp/LDAP_USER_ETL.stdout"));
}

View File

@ -0,0 +1,34 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.jobs;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import wherehows.common.Constant;
public abstract class BaseJob {
protected final Logger logger = LoggerFactory.getLogger(getClass());
public final Properties prop;
public BaseJob(long whExecId, Properties properties) {
this.prop = properties;
this.prop.setProperty(Constant.WH_EXEC_ID_KEY, String.valueOf(whExecId));
}
public abstract void run() throws Exception;
}

View File

@ -11,21 +11,20 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.models;
package wherehows.common.jobs;
import java.lang.reflect.Constructor;
import java.util.Properties;
import metadata.etl.EtlJob;
/**
* Created by zechen on 10/21/15.
*/
public class EtlJobFactory {
public static EtlJob getEtlJob(String etlClassName, int refId, long whExecId, Properties properties)
throws Exception {
Class etlClass = Class.forName(etlClassName);
Constructor<?> ctor = etlClass.getConstructor(int.class, long.class, Properties.class);
return (EtlJob) ctor.newInstance(refId, whExecId, properties);
public class JobFactory {
private JobFactory() {
}
public static BaseJob getJob(String jobClassName, int refId, long whExecId, Properties properties) throws Exception {
Class jobClass = Class.forName(jobClassName);
Constructor<?> ctor = jobClass.getConstructor(int.class, long.class, Properties.class);
return (BaseJob) ctor.newInstance(refId, whExecId, properties);
}
}

View File

@ -11,15 +11,8 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.models;
package wherehows.common.jobs;
/**
* Created by zechen on 9/24/15.
*/
public enum EtlJobStatus {
REQUESTED,
STARTED,
PROCESSING,
ERROR,
SUCCEEDED
public enum JobStatus {
REQUESTED, STARTED, PROCESSING, ERROR, SUCCEEDED
}

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl;
package wherehows.common.jobs;
import java.io.FileInputStream;
import java.io.IOException;
@ -19,7 +19,6 @@ import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Properties;
import metadata.etl.models.EtlJobFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import wherehows.common.Constant;
@ -42,48 +41,47 @@ public class Launcher {
/**
* Read config file location from command line. Read all configuration from command line, execute the job.
* Example command line : java -Dconfig=/path/to/config/file -cp "lib/*" metadata.etl.Launcher
* Example command line : java -Dconfig=/path/to/config/file -cp "lib/*" wherehows.common.jobs.Launcher
* @param args contain the config file location parameter 'confg'
* @throws Exception
*/
public static void main(String[] args)
throws Exception {
public static void main(String[] args) throws Exception {
String property_file = System.getProperty(CONFIG_FILE_LOCATION_KEY, null);
String etlClassName = null;
String propertyFile = System.getProperty(CONFIG_FILE_LOCATION_KEY, null);
String jobClassName = null;
int refId = 0;
long whEtlExecId = 0;
Properties props = new Properties();
try (InputStream propFile = new FileInputStream(property_file)) {
try (InputStream propFile = new FileInputStream(propertyFile)) {
props.load(propFile);
etlClassName = props.getProperty(Constant.JOB_CLASS_KEY);
jobClassName = props.getProperty(Constant.JOB_CLASS_KEY);
refId = Integer.valueOf(props.getProperty(Constant.JOB_REF_ID, "0"));
whEtlExecId = Integer.valueOf(props.getProperty(WH_ETL_EXEC_ID_KEY));
System.setProperty(LOGGER_CONTEXT_NAME_KEY, etlClassName);
System.setProperty(LOGGER_CONTEXT_NAME_KEY, jobClassName);
} catch (IOException e) {
//logger.error("property file '{}' not found" , property_file);
//logger.error("property file '{}' not found" , property_file);
e.printStackTrace();
System.exit(1);
}
if (etlClassName == null) {
if (jobClassName == null) {
logger.error("Must specify {} in properties file", Constant.JOB_CLASS_KEY);
System.exit(1);
}
// create the etl job
EtlJob etlJob = null;
BaseJob job = null;
try {
etlJob = EtlJobFactory.getEtlJob(etlClassName, refId, whEtlExecId, props);
job = JobFactory.getJob(jobClassName, refId, whEtlExecId, props);
} catch (Exception e) {
logger.error("Failed to create ETL job {}: {}", etlClassName, e.getMessage());
logger.error("Failed to create ETL job {}: {}", jobClassName, e.getMessage());
System.exit(1);
}
try {
etlJob.run();
job.run();
} catch (Exception e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
@ -98,5 +96,4 @@ public class Launcher {
logger.info("whEtlExecId=" + whEtlExecId + " finished.");
System.exit(0);
}
}

View File

@ -1,5 +1,4 @@
apply plugin: 'application'
mainClassName = 'metadata.etl.Launcher'
apply plugin: 'java'
configurations {
//Libraries needed at compilation time but not to be

View File

@ -13,20 +13,15 @@
*/
package metadata.etl;
import java.io.FileInputStream;
import java.io.File;
import java.net.URL;
import java.util.Properties;
import org.python.core.PyDictionary;
import org.python.core.PyString;
import org.python.core.PySystemState;
import org.python.util.PythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import wherehows.common.Constant;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Properties;
import wherehows.common.jobs.BaseJob;
/**
@ -35,15 +30,11 @@ import java.util.Properties;
* Each ETL process that implement this interface will have their own extract, transform, load function.
* Created by zsun on 7/29/15.
*/
public abstract class EtlJob {
public abstract class EtlJob extends BaseJob {
public final ClassLoader classLoader = getClass().getClassLoader();
public PythonInterpreter interpreter;
public Properties prop;
public ClassLoader classLoader = getClass().getClassLoader();
protected final Logger logger = LoggerFactory.getLogger(getClass());
// default location of local test configuration file
private final static String DEFAULT_CONFIG_FILE_LOCATION = System.getProperty("user.home") + "/.wherehows/local_test.properties";
/**
* Used by backend service
@ -53,7 +44,16 @@ public abstract class EtlJob {
* @param properties
*/
public EtlJob(Integer appId, Integer dbId, Long whExecId, Properties properties) {
PySystemState sys = configFromProperties(appId, dbId, whExecId, properties);
super(whExecId, properties);
if (appId != null) {
prop.setProperty(Constant.APP_ID_KEY, String.valueOf(appId));
}
if (dbId != null) {
prop.setProperty(Constant.DB_ID_KEY, String.valueOf(dbId));
}
PySystemState sys = configFromProperties();
addJythonToPath(sys);
interpreter = new PythonInterpreter(null, sys);
}
@ -71,52 +71,36 @@ public abstract class EtlJob {
}
/**
* Copy all properties into jython envirenment
* @param appId
* @param whExecId
* @param properties
* Copy all properties into jython environment
* @return PySystemState A PySystemState that contain all the arguments.
*/
private PySystemState configFromProperties(Integer appId, Integer dbId, Long whExecId, Properties properties) {
this.prop = properties;
if (appId != null)
prop.setProperty(Constant.APP_ID_KEY, String.valueOf(appId));
if (dbId != null)
prop.setProperty(Constant.DB_ID_KEY, String.valueOf(dbId));
prop.setProperty(Constant.WH_EXEC_ID_KEY, String.valueOf(whExecId));
PyDictionary config = new PyDictionary();
private PySystemState configFromProperties() {
final PyDictionary config = new PyDictionary();
for (String key : prop.stringPropertyNames()) {
String value = prop.getProperty(key);
config.put(new PyString(key), new PyString(value));
config.put(new PyString(key), new PyString(prop.getProperty(key)));
}
PySystemState sys = new PySystemState();
sys.argv.append(config);
return sys;
}
public abstract void extract()
throws Exception;
public abstract void extract() throws Exception;
public abstract void transform()
throws Exception;
public abstract void transform() throws Exception;
public abstract void load()
throws Exception;
public abstract void load() throws Exception;
public void setup()
throws Exception {
public void setup() throws Exception {
// redirect error to out
System.setErr(System.out);
}
public void close()
throws Exception {
public void close() throws Exception {
interpreter.cleanup();
interpreter.close();
}
public void run()
throws Exception {
public void run() throws Exception {
setup();
logger.info("PySystem path: " + interpreter.getSystemState().path.toString());
extract();

View File

@ -15,14 +15,13 @@ package metadata.etl;
import java.io.InputStream;
import java.util.Properties;
import metadata.etl.EtlJob;
import wherehows.common.Constant;
public class JythonEtlJob extends EtlJob {
public JythonEtlJob(int dbId, long whExecId, Properties prop) {
super(dbId, dbId, whExecId, prop);
public JythonEtlJob(int refId, long whExecId, Properties prop) {
super(refId, refId, whExecId, prop);
}
@Override

View File

@ -16,6 +16,7 @@ package metadata.etl.models;
import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.Test;
import wherehows.common.jobs.JobFactory;
public class EtlJobFactoryTest {
@ -23,7 +24,7 @@ public class EtlJobFactoryTest {
@Test
public void testGetEtlJob() throws Exception {
Properties properties = new Properties();
DummyEtlJob job = (DummyEtlJob) EtlJobFactory.getEtlJob(DummyEtlJob.class.getCanonicalName(), 1, 2L, properties);
DummyEtlJob job = (DummyEtlJob) JobFactory.getJob(DummyEtlJob.class.getCanonicalName(), 1, 2L, properties);
Assert.assertEquals(job.refId, 1);
Assert.assertEquals(job.whExecId, 2L);