2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								/**
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 * Copyright 2015 LinkedIn Corp. All rights reserved.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 *
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 * Licensed under the Apache License, Version 2.0 (the "License");
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 * you may not use this file except in compliance with the License.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 * You may obtain a copy of the License at
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 *
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 * http://www.apache.org/licenses/LICENSE-2.0
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 *
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 * Unless required by applicable law or agreed to in writing, software
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 * distributed under the License is distributed on an "AS IS" BASIS,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 */
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								package actors;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import akka.actor.UntypedActor;
							 | 
						
					
						
							
								
									
										
										
										
											2016-09-12 16:09:08 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								import java.net.UnknownHostException;
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								import metadata.etl.models.EtlJobStatus;
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import models.daos.EtlJobDao;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import models.daos.EtlJobPropertyDao;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import msgs.EtlJobMessage;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import play.Logger;
							 | 
						
					
						
							
								
									
										
										
										
											2016-03-06 18:04:38 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								import shared.Global;
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-30 08:59:05 +02:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								import java.io.BufferedReader;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import java.io.InputStream;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import java.io.InputStreamReader;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import java.lang.reflect.Field;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								import java.util.Properties;
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								/**
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 * Created by zechen on 9/4/15.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								 */
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								public class EtlJobActor extends UntypedActor {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								  private Process process;
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-30 08:59:05 +02:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								  @Override
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								  public void onReceive(Object message)
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-30 08:59:05 +02:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								          throws Exception {
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-03 15:17:38 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    Properties props = null;
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    if (message instanceof EtlJobMessage) {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      EtlJobMessage msg = (EtlJobMessage) message;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      try {
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-03 15:17:38 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        props = EtlJobPropertyDao.getJobProperties(msg.getEtlJobName(), msg.getRefId());
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        Properties whProps = EtlJobPropertyDao.getWherehowsProperties();
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        props.putAll(whProps);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        EtlJobDao.startRun(msg.getWhEtlExecId(), "Job started!");
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        // start a new process here
							 | 
						
					
						
							
								
									
										
										
										
											2016-09-23 16:54:52 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        String cmd = ConfigUtil.generateCommand(msg.getEtlJobName(), msg.getWhEtlExecId(), msg.getCmdParam(), props);
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-03 15:17:38 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        Logger.debug("run command : " + cmd);
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-30 08:59:05 +02:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        ConfigUtil.generateProperties(msg.getEtlJobName(), msg.getRefId(), msg.getWhEtlExecId(), props);
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        process = Runtime.getRuntime().exec(cmd);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2016-09-12 16:09:08 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        // update process id and hostname for started job
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        EtlJobDao.updateJobProcessInfo(msg.getWhEtlExecId(), getPid(process), getHostname());
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        InputStream stdout = process.getInputStream();
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        InputStreamReader isr = new InputStreamReader(stdout);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        BufferedReader br = new BufferedReader(isr);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        String line = null;
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-30 08:59:05 +02:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        while ((line = br.readLine()) != null) {
							 | 
						
					
						
							
								
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								          Logger.info(line);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        }
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        // wait until this process finished.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        int execResult = process.waitFor();
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        // if the process failed, log the error and throw exception
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        if (execResult > 0) {
							 | 
						
					
						
							
								
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								          br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								          String errString = "Error Details:\n";
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-30 08:59:05 +02:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								          while ((line = br.readLine()) != null)
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            errString = errString.concat(line).concat("\n");
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          Logger.error("*** Process + " + getPid(process) + " failed, status: " + execResult);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          Logger.error(errString);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          throw new Exception("Process + " + getPid(process) + " failed");
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        }
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.SUCCEEDED, "Job succeed!");
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        Logger.info("ETL job {} finished", msg.toDebugString());
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        if (msg.getEtlJobName().affectDataset()) {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          ActorRegistry.treeBuilderActor.tell("dataset", getSelf());
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        }
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        if (msg.getEtlJobName().affectFlow()) {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          ActorRegistry.treeBuilderActor.tell("flow", getSelf());
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        }
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-03 19:22:18 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								      } catch (Throwable e) { // catch all throwable at the highest level.
							 | 
						
					
						
							
								
									
										
										
										
											2016-03-09 19:15:42 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        e.printStackTrace();
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-03 19:22:18 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        Logger.error("ETL job {} got a problem", msg.toDebugString());
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        if (process.isAlive()) {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          process.destroy();
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        }
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        EtlJobDao.endRun(msg.getWhEtlExecId(), EtlJobStatus.ERROR, e.getMessage());
							 | 
						
					
						
							
								
									
										
										
										
											2016-03-06 18:04:38 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								      } finally {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        Global.removeRunningJob(((EtlJobMessage) message).getWhEtlJobId());
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-03 15:17:38 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        if (!Logger.isDebugEnabled()) // if debug enable, won't delete the config files.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          ConfigUtil.deletePropertiesFile(props, msg.getWhEtlExecId());
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      }
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    }
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								  }
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								  /**
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								   * Reflection to get the pid
							 | 
						
					
						
							
								
									
										
										
										
											2016-05-30 08:59:05 +02:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								   *
							 | 
						
					
						
							
								
									
										
										
										
											2016-02-29 16:33:17 -08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								   * @param process {@code Process}
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								   * @return pid, -1 if not found
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								   */
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								  private static int getPid(Process process) {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    try {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      Class<?> cProcessImpl = process.getClass();
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      Field fPid = cProcessImpl.getDeclaredField("pid");
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      if (!fPid.isAccessible()) {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        fPid.setAccessible(true);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      }
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      return fPid.getInt(process);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    } catch (Exception e) {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      return -1;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    }
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								  }
							 | 
						
					
						
							
								
									
										
										
										
											2016-09-12 16:09:08 -07:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								  /**
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								   * Return the hostname of the machine
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								   * @return hostname, null if unknown
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								   */
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								  private static String getHostname() {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    try {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      return java.net.InetAddress.getLocalHost().getHostName();
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    } catch (UnknownHostException ex) {
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      return null;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    }
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								  }
							 | 
						
					
						
							
								
									
										
										
										
											2015-11-19 14:39:21 -08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								}
							 |