| 
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 |  |  | /**
 | 
					
						
							|  |  |  |  * Copyright 2015 LinkedIn Corp. All rights reserved.
 | 
					
						
							|  |  |  |  *
 | 
					
						
							|  |  |  |  * Licensed under the Apache License, Version 2.0 (the "License");
 | 
					
						
							|  |  |  |  * you may not use this file except in compliance with the License.
 | 
					
						
							|  |  |  |  * You may obtain a copy of the License at
 | 
					
						
							|  |  |  |  *
 | 
					
						
							|  |  |  |  * http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  |  *
 | 
					
						
							|  |  |  |  * Unless required by applicable law or agreed to in writing, software
 | 
					
						
							|  |  |  |  * distributed under the License is distributed on an "AS IS" BASIS,
 | 
					
						
							|  |  |  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					
						
							|  |  |  |  */
 | 
					
						
							|  |  |  | package actors;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import akka.actor.UntypedActor;
 | 
					
						
							|  |  |  | import java.util.Date;
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  | import java.util.HashMap;
 | 
					
						
							| 
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 |  |  | import java.util.Map;
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  | import java.util.Properties;
 | 
					
						
							| 
									
										
										
										
											2017-08-16 21:24:38 -07:00
										 |  |  | import wherehows.common.jobs.JobStatus;
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  | import models.daos.EtlJobDao;
 | 
					
						
							| 
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 |  |  | import msgs.EtlJobMessage;
 | 
					
						
							|  |  |  | import play.Logger;
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  | import play.Play;
 | 
					
						
							|  |  |  | import shared.Global;
 | 
					
						
							| 
									
										
										
										
											2017-09-06 11:45:52 -07:00
										 |  |  | import wherehows.common.utils.JobsUtil;
 | 
					
						
							| 
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-03 14:52:31 -07:00
										 |  |  | import static wherehows.common.Constant.*;
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  | /**
 | 
					
						
							|  |  |  |  * Created by zechen on 9/3/15.
 | 
					
						
							|  |  |  |  */
 | 
					
						
							|  |  |  | public class SchedulerActor extends UntypedActor {
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-03 14:52:31 -07:00
										 |  |  |   public static final String ETL_JOBS_DIR = Play.application().configuration().getString(WH_ETL_JOBS_DIR);
 | 
					
						
							| 
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |   /**
 | 
					
						
							|  |  |  |    * Search for etl jobs that are ready to run and update the time for next run
 | 
					
						
							|  |  |  |    * @param message
 | 
					
						
							|  |  |  |    * @throws Exception
 | 
					
						
							|  |  |  |    */
 | 
					
						
							|  |  |  |   @Override
 | 
					
						
							| 
									
										
										
										
											2017-05-10 17:43:56 -07:00
										 |  |  |   public void onReceive(Object message) throws Exception {
 | 
					
						
							| 
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 |  |  |     if (message.equals("checking")) {
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  |       runDueJobs();
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  |   }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   private Map<String, Long> getScheduledJobs() throws Exception {
 | 
					
						
							|  |  |  |     Map<String, Long> map = new HashMap<>();
 | 
					
						
							|  |  |  |     for (Map<String, Object> job : EtlJobDao.getAllScheduledJobs()) {
 | 
					
						
							|  |  |  |       String jobName = (String) job.get("wh_etl_job_name");
 | 
					
						
							| 
									
										
										
										
											2017-07-26 21:13:56 -07:00
										 |  |  |       Boolean enabled = (Boolean) job.get("enabled");
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  |       Long nextRun = (Long) job.get("next_run");
 | 
					
						
							| 
									
										
										
										
											2017-07-26 21:13:56 -07:00
										 |  |  |       // filter for only enabled jobs
 | 
					
						
							|  |  |  |       if (enabled != null && enabled) {
 | 
					
						
							|  |  |  |         map.put(jobName, nextRun);
 | 
					
						
							|  |  |  |       }
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  |     }
 | 
					
						
							|  |  |  |     return map;
 | 
					
						
							|  |  |  |   }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   private void runDueJobs() throws Exception {
 | 
					
						
							|  |  |  |     Map<String, Properties> enabledJobs = JobsUtil.getScheduledJobs(ETL_JOBS_DIR);
 | 
					
						
							|  |  |  |     Logger.info("Enabled jobs: {}", enabledJobs.keySet());
 | 
					
						
							| 
									
										
										
										
											2017-05-10 17:43:56 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  |     Map<String, Long> scheduledJobs = getScheduledJobs();
 | 
					
						
							|  |  |  |     Logger.info("Scheduled jobs: {}", scheduledJobs);
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     long now = System.currentTimeMillis() / 1000;
 | 
					
						
							|  |  |  |     for (Map.Entry<String, Properties> entry : enabledJobs.entrySet()) {
 | 
					
						
							|  |  |  |       String etlJobName = entry.getKey();
 | 
					
						
							|  |  |  |       Properties properties = entry.getValue();
 | 
					
						
							|  |  |  |       EtlJobMessage etlMsg = new EtlJobMessage(etlJobName, properties);
 | 
					
						
							| 
									
										
										
										
											2016-03-06 18:04:38 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  |       // Schedule next run if a cron expr is defined.
 | 
					
						
							|  |  |  |       String cronExpr = etlMsg.getCronExpr();
 | 
					
						
							|  |  |  |       if (cronExpr != null) {
 | 
					
						
							| 
									
										
										
										
											2017-07-26 21:13:56 -07:00
										 |  |  |         EtlJobDao.updateNextRun(etlJobName, cronExpr, new Date());
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  |       }
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-06-28 11:41:35 -07:00
										 |  |  |       if (scheduledJobs.getOrDefault(etlJobName, Long.MAX_VALUE) > now) {
 | 
					
						
							|  |  |  |         continue;
 | 
					
						
							|  |  |  |       }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       Logger.info("Running job: {}", etlJobName);
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  |       Long whExecId = EtlJobDao.insertNewRun(etlJobName);
 | 
					
						
							|  |  |  |       etlMsg.setWhEtlExecId(whExecId);
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       StringBuilder s = new StringBuilder("Current running jobs : ");
 | 
					
						
							|  |  |  |       for (String j : Global.getCurrentRunningJob()) {
 | 
					
						
							|  |  |  |         s.append(j).append("\t");
 | 
					
						
							|  |  |  |       }
 | 
					
						
							|  |  |  |       Logger.info(s.toString());
 | 
					
						
							| 
									
										
										
										
											2016-03-06 18:04:38 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  |       if (Global.getCurrentRunningJob().contains(etlJobName)) {
 | 
					
						
							|  |  |  |         Logger.error("The previous job is still running! Abort this job : " + etlMsg.toDebugString());
 | 
					
						
							| 
									
										
										
										
											2017-08-16 21:24:38 -07:00
										 |  |  |         EtlJobDao.endRun(etlMsg.getWhEtlExecId(), JobStatus.ERROR, "Previous is still running, Aborted!");
 | 
					
						
							| 
									
										
										
										
											2017-06-05 15:40:05 -07:00
										 |  |  |       } else {
 | 
					
						
							|  |  |  |         Global.getCurrentRunningJob().add(etlJobName);
 | 
					
						
							|  |  |  |         Logger.info("Send message : " + etlMsg.toDebugString());
 | 
					
						
							|  |  |  |         ActorRegistry.etlJobActor.tell(etlMsg, getSelf());
 | 
					
						
							| 
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 |  |  |       }
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  |   }
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 |