Add optional config of ETL job white list

This commit is contained in:
SunZhaonan 2016-05-12 16:27:45 -07:00
parent a4734ab406
commit 9d6a1b2649
9 changed files with 33 additions and 84 deletions

View File

@ -14,6 +14,7 @@
package actors;
import akka.actor.UntypedActor;
import java.util.Set;
import metadata.etl.models.EtlJobStatus;
import shared.Global;
import java.util.Date;
@ -45,8 +46,12 @@ public class SchedulerActor extends UntypedActor {
if (message.equals("checking")) {
List<Map<String, Object>> dueJobs = EtlJobDao.getDueJobs();
Logger.info("running " + dueJobs.size() + " jobs");
Set<Integer> whiteList = Global.getWhiteList();
for (Map<String, Object> dueJob : dueJobs) {
Integer whEtlJobId = ((Long) dueJob.get("wh_etl_job_id")).intValue();
if (whiteList != null && !whiteList.contains(whEtlJobId)) {
continue; // if we config the white list and it's not in white list, skip this job
}
EtlJobName etlJobName = EtlJobName.valueOf((String) dueJob.get("wh_etl_job_name"));
EtlType etlType = EtlType.valueOf((String) dueJob.get("wh_etl_type"));
Integer refId = (Integer) dueJob.get("ref_id");

View File

@ -12,10 +12,12 @@ package shared; /**
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import play.Application;
import play.GlobalSettings;
import play.Logger;
import play.Play;
import utils.SchedulerUtil;
@ -24,15 +26,28 @@ import utils.SchedulerUtil;
*/
public class Global extends GlobalSettings {
// the jobs id that allowed to run on this instance
private static Set<Integer> whiteList;
private static Set<Integer> currentRunningJob;
@Override
public void onStart(Application arg0) {
Logger.info("on start---===");
List<Integer> whiteListList = Play.application().configuration().getIntList("scheduler.jobid.whitelist", null);
if (whiteListList != null) {
whiteList = new HashSet<>(whiteListList);
} else {
whiteList = null;
}
SchedulerUtil.start();
currentRunningJob = new HashSet<>();
}
public static Set<Integer> getWhiteList() {
return whiteList;
}
public static Set<Integer> getCurrentRunningJob() {
return currentRunningJob;
}

View File

@ -62,5 +62,8 @@ logger.play=INFO
# Logger provided to your application:
logger.application=DEBUG
# if does not have this variable, every job will run
# if have this varialbe, only the id in this list will be scheduled
# scheduler.jobid.whitelist=[1,2,3,4,5,6,7,8,9]
scheduler.check.interval=10
application.global=shared.Global

View File

@ -27,7 +27,7 @@ public class AzExecMessage {
public Properties prop;
public AzServiceCommunicator asc;
public HadoopNameNodeExtractor hnne;
public HadoopJobHistoryNodeExtractor hnne;
public AzDbCommunicator adc;
public DatabaseWriter databaseWriter;
public Connection connection;

View File

@ -45,7 +45,7 @@ import wherehows.common.writers.DatabaseWriter;
public class AzLineageExtractorMaster {
Properties prop;
private static final Logger logger = LoggerFactory.getLogger(AzLineageExtractorActor.class);
private static final Logger logger = LoggerFactory.getLogger(AzLineageExtractorMaster.class);
public AzLineageExtractorMaster(Properties prop)
throws Exception {
this.prop = prop;
@ -88,7 +88,7 @@ public class AzLineageExtractorMaster {
// initialize
//AzkabanServiceCommunicator asc = new AzkabanServiceCommunicator(prop);
HadoopNameNodeExtractor hnne = new HadoopNameNodeExtractor(prop);
HadoopJobHistoryNodeExtractor hnne = new HadoopJobHistoryNodeExtractor(prop);
AzDbCommunicator adc = new AzDbCommunicator(prop);
String wherehowsUrl = prop.getProperty(Constant.WH_DB_URL_KEY);

View File

@ -121,59 +121,6 @@ public class AzLineageMetadataEtl extends EtlJob {
conn.createStatement().execute(insertIntoFinalTable);
logger.info("Azkaban lineage metadata ETL completed");
if (prop.getProperty(Constant.APP_ID_KEY).equals("32") || prop.getProperty(Constant.APP_ID_KEY).equals("31") ) {
logger.info("TEMPORARY load war & nertz's data into cmdb database");
loadIntoOthers();
}
}
/**
* temporary solutions, will deprecate later : also insert into cmdb job_attempt_data_lineage table
*/
@Deprecated
public void loadIntoOthers()
throws SQLException {
String cmdbHost = this.prop.getProperty("cmdb.db.host");
String cmdbUserName = this.prop.getProperty("cmdb.db.username");
String cmdbPassWord = this.prop.getProperty("cmdb.db.password");
Connection cmdbConn =
DriverManager.getConnection(cmdbHost + "?" + "user=" + cmdbUserName + "&password=" + cmdbPassWord);
String queryCmd =
"SELECT app_id, flow_exec_id, job_exec_id, job_exec_uuid, job_name, job_start_unixtime, job_finished_unixtime,\n"
+ "db_id, abstracted_object_name, full_object_name, partition_start, partition_end, partition_type,\n"
+ "layout_id, storage_type, source_target_type, srl_no, source_srl_no, operation,\n"
+ "record_count, insert_count, delete_count, update_count, created_date, wh_etl_exec_id \n"
+ "FROM stg_job_execution_data_lineage";
ResultSet resultSet = conn.createStatement().executeQuery(queryCmd);
Statement cmdbStatement = cmdbConn.createStatement();
while (resultSet.next()) {
String insertCmd =
"INSERT IGNORE INTO job_attempt_data_lineage (" + "application_id," + " job_id, " + "srl_no," + "database_id,"
+ "source_target_type," + "object_type," + "abstracted_object_name," + "full_object_name,"
+ "partition_start," + "partition_end," + "partition_type," + "storage_type," + "operation," + "record_count,"
+ "insert_count," + "delete_count," + "update_count," + "created_date," + "parent_srl_no," + "flow_exec_id,"
+ "job_start_unixtime," + "job_finished_unixtime" + ") VALUES (" + resultSet.getString("app_id") + ","
+ resultSet.getString("job_exec_id") + "," + resultSet.getString("srl_no") + "," + resultSet
.getString("db_id") + ",'" + resultSet.getString("source_target_type") + "','" + "hdfs" + "','"
+ resultSet.getString("abstracted_object_name") + "','" + resultSet.getString("full_object_name") + "'," + (
resultSet.getString("partition_start") != null ? "'" + resultSet.getString("partition_start") + "'" : "null")
+ "," + (resultSet.getString("partition_end") != null ? "'" + resultSet.getString("partition_end") + "'"
: "null") + "," + (resultSet.getString("partition_type") != null ? "'" + resultSet.getString("partition_type")
+ "'" : "null") + ",'" + resultSet.getString("storage_type") + "','" + resultSet.getString("operation") + "',"
+ resultSet.getString("record_count") + "," + resultSet.getString("insert_count") + "," + resultSet
.getString("delete_count") + "," + resultSet.getString("update_count") + "," + "CURRENT_TIME() ,"
// TODO convert it to UTC?
+ resultSet.getString("source_srl_no") + "," + resultSet.getString("flow_exec_id") + "," + resultSet
.getString("job_start_unixtime") + "," + resultSet.getString("job_finished_unixtime") + ")";
logger.debug(insertCmd);
cmdbStatement.execute(insertCmd);
}
cmdbConn.close();
}
@Override

View File

@ -41,11 +41,11 @@ import wherehows.common.Constant;
/**
* Created by zsun on 9/3/15.
*/
public class HadoopNameNodeExtractor {
public class HadoopJobHistoryNodeExtractor {
private String serverURL = "";
private CloseableHttpClient httpClient;
private static final Logger logger = LoggerFactory.getLogger(HadoopNameNodeExtractor.class);
private static final Logger logger = LoggerFactory.getLogger(HadoopJobHistoryNodeExtractor.class);
/**
* Use HTTPClient to connect to Hadoop job history server.
@ -53,7 +53,7 @@ public class HadoopNameNodeExtractor {
* @param prop
* @throws Exception
*/
public HadoopNameNodeExtractor(Properties prop)
public HadoopJobHistoryNodeExtractor(Properties prop)
throws Exception {
this.serverURL = prop.getProperty(Constant.AZ_HADOOP_JOBHISTORY_KEY);

View File

@ -67,7 +67,7 @@ public class AzLineageExtractorTest {
statement.execute("TRUNCATE TABLE stg_job_execution_data_lineage");
AzExecMessage message = new AzExecMessage(aje, prop);
message.databaseWriter = new DatabaseWriter(connUrl, "stg_job_execution_data_lineage");
message.hnne = new HadoopNameNodeExtractor(prop);
message.hnne = new HadoopJobHistoryNodeExtractor(prop);
message.adc = new AzDbCommunicator(prop);
message.connection = conn;
AzLineageExtractor.extract(message);

View File

@ -14,41 +14,20 @@
package metadata.etl.lineage;
import junit.framework.Assert;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.params.AuthPolicy;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.codehaus.jettison.json.JSONException;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Created by zsun on 9/3/15.
*/
public class HadoopNameNodeExtractorTest {
HadoopNameNodeExtractor he;
public class HadoopJobHistoryNodeExtractorTest {
HadoopJobHistoryNodeExtractor he;
@BeforeTest
public void setUp() {
try {
he = new HadoopNameNodeExtractor(new LineageTest().properties);
he = new HadoopJobHistoryNodeExtractor(new LineageTest().properties);
} catch (Exception e) {
e.printStackTrace();
}