Merge pull request #112 from jerrybai2009/master

update the tract pathinfo method since db content changed
This commit is contained in:
jerrybai2009 2016-04-21 13:53:30 -07:00
commit c779f33ad8
3 changed files with 689 additions and 347 deletions

View File

@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import play.Logger; import play.Logger;
import play.Play; import play.Play;
@ -39,26 +40,68 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO
"flow WHERE app_id = ? and flow_id = ?"; "flow WHERE app_id = ? and flow_id = ?";
private final static String GET_JOB = "SELECT ca.app_id, ca.app_code as cluster, " + private final static String GET_JOB = "SELECT ca.app_id, ca.app_code as cluster, " +
"je.flow_id, je.job_id, jedl.job_name, " + "jedl.job_name, fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " +
"fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " +
"jedl.operation, jedl.source_srl_no, jedl.srl_no, " + "jedl.operation, jedl.source_srl_no, jedl.srl_no, " +
"max(jedl.job_exec_id) as job_exec_id, FROM_UNIXTIME(jedl.job_start_unixtime) as start_time, " + "max(jedl.job_exec_id) as job_exec_id FROM job_execution_data_lineage jedl " +
"FROM_UNIXTIME(jedl.job_finished_unixtime) as end_time FROM job_execution_data_lineage jedl " +
"JOIN cfg_application ca on ca.app_id = jedl.app_id " + "JOIN cfg_application ca on ca.app_id = jedl.app_id " +
"JOIN job_execution je on jedl.app_id = je.app_id " + "LEFT JOIN job_execution je on jedl.app_id = je.app_id " +
"and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " + "and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " +
"JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + "LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " +
"WHERE abstracted_object_name = ? and " + "WHERE abstracted_object_name in ( :names ) and " +
"jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + "jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " +
"COALESCE(jedl.source_srl_no, jedl.srl_no) = jedl.srl_no and " + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " +
"FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL ? DAY " + "GROUP BY ca.app_id, cluster, jedl.job_name, jedl.flow_path, jedl.source_target_type, " +
"GROUP BY ca.app_id, je.job_id, je.flow_id"; "jedl.storage_type, jedl.operation " +
"ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime";
private final static String GET_UP_LEVEL_JOB = "SELECT ca.app_id, ca.app_code as cluster, " +
"jedl.job_name, fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " +
"jedl.operation, jedl.source_srl_no, jedl.srl_no, " +
"max(jedl.job_exec_id) as job_exec_id FROM job_execution_data_lineage jedl " +
"JOIN cfg_application ca on ca.app_id = jedl.app_id " +
"LEFT JOIN job_execution je on jedl.app_id = je.app_id " +
"and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " +
"LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " +
"WHERE abstracted_object_name in ( :names ) and jedl.source_target_type = 'target' and " +
"jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " +
"FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " +
"GROUP BY ca.app_id, cluster, jedl.job_name, jedl.flow_path, jedl.source_target_type, " +
"jedl.storage_type, jedl.operation " +
"ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime";
private final static String GET_JOB_WITH_SOURCE = "SELECT ca.app_id, ca.app_code as cluster, " +
"jedl.job_name, fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " +
"jedl.operation, jedl.source_srl_no, jedl.srl_no, " +
"max(jedl.job_exec_id) as job_exec_id FROM job_execution_data_lineage jedl " +
"JOIN cfg_application ca on ca.app_id = jedl.app_id " +
"LEFT JOIN job_execution je on jedl.app_id = je.app_id " +
"and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " +
"LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " +
"WHERE abstracted_object_name in ( :names ) and jedl.source_target_type != (:type) and " +
"jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " +
"FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " +
"GROUP BY ca.app_id, cluster, jedl.job_name, jedl.flow_path, jedl.source_target_type, " +
"jedl.storage_type, jedl.operation " +
"ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime";
private final static String GET_DATA = "SELECT storage_type, operation, " + private final static String GET_DATA = "SELECT storage_type, operation, " +
"abstracted_object_name, source_target_type " + "abstracted_object_name, source_target_type, job_start_unixtime, job_finished_unixtime, " +
"FROM_UNIXTIME(job_start_unixtime) as start_time, " +
"FROM_UNIXTIME(job_finished_unixtime) as end_time " +
"FROM job_execution_data_lineage WHERE app_id = ? and job_exec_id = ? and " + "FROM job_execution_data_lineage WHERE app_id = ? and job_exec_id = ? and " +
"flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + "flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " +
"COALESCE(source_srl_no, srl_no) = srl_no"; "COALESCE(source_srl_no, srl_no) = srl_no ORDER BY source_target_type DESC";
private final static String GET_DATA_FILTER_OUT_LASSEN = "SELECT j1.storage_type, j1.operation, " +
"j1.abstracted_object_name, j1.source_target_type, j1.job_start_unixtime, j1.job_finished_unixtime, " +
"FROM_UNIXTIME(j1.job_start_unixtime) as start_time, " +
"FROM_UNIXTIME(j1.job_finished_unixtime) as end_time " +
"FROM job_execution_data_lineage j1 " +
"JOIN job_execution_data_lineage j2 on j1.app_id = j2.app_id and j1.job_exec_id = j2.job_exec_id " +
"and j2.abstracted_object_name in (:names) and j2.source_target_type = 'source' " +
"WHERE j1.app_id = (:appid) and j1.job_exec_id = (:execid) and " +
"j1.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " +
"COALESCE(j1.source_srl_no, j1.srl_no) = j2.srl_no ORDER BY j1.source_target_type DESC";
private final static String GET_APP_ID = "SELECT app_id FROM cfg_application WHERE LOWER(app_code) = ?"; private final static String GET_APP_ID = "SELECT app_id FROM cfg_application WHERE LOWER(app_code) = ?";
@ -100,19 +143,214 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO
"and source_target_type = 'target' and " + "and source_target_type = 'target' and " +
"FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL 60 DAY"; "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL 60 DAY";
private final static String GET_MAPPED_OBJECT_NAME = "SELECT mapped_object_name " +
"FROM cfg_object_name_map WHERE object_name = ?";
private final static String GET_OBJECT_NAME_BY_MAPPED_NAME = "SELECT object_name " +
"FROM cfg_object_name_map WHERE mapped_object_name = ?";
public static JsonNode getObjectAdjacnet(String urn, int upLevel, int downLevel, int lookBackTime) public static JsonNode getObjectAdjacnet(String urn, int upLevel, int downLevel, int lookBackTime)
{ {
ObjectNode resultNode = Json.newObject(); int level = 0;
LineagePathInfo pathInfo = utils.Lineage.convertFromURN(urn); LineagePathInfo pathInfo = utils.Lineage.convertFromURN(urn);
List<LineageNode> nodes = new ArrayList<LineageNode>(); List<LineageNode> nodes = new ArrayList<LineageNode>();
List<LineageEdge> edges = new ArrayList<LineageEdge>(); List<LineageEdge> edges = new ArrayList<LineageEdge>();
Map<Long, Integer> addedJobNodes = new HashMap<Long, Integer>(); Map<Long, List<LineageNode>> addedSourceNodes= new HashMap<Long, List<LineageNode>>();
Map<String, Integer> addedDataNodes = new HashMap<String, Integer>(); Map<Long, List<LineageNode>> addedTargetNodes= new HashMap<Long, List<LineageNode>>();
int index = 0; Map<Long, LineageNode> addedJobNodes = new HashMap<Long, LineageNode>();
int edgeIndex = 0; List<LineageNode> allSourceNodes = new ArrayList<LineageNode>();
List<LineageNode> allTargetNodes = new ArrayList<LineageNode>();
getObjectAdjacentNode(
pathInfo,
level,
upLevel,
downLevel,
null,
allSourceNodes,
allTargetNodes,
addedSourceNodes,
addedTargetNodes,
addedJobNodes,
lookBackTime);
return renderGraph(
pathInfo,
urn,
upLevel,
downLevel,
allSourceNodes,
allTargetNodes,
addedSourceNodes,
addedTargetNodes,
addedJobNodes,
nodes,
edges);
}
public static void addSourceNode(
LineageNode jobNode,
Map<Long, List<LineageNode>> addedSourceNodes,
Map<String, List<LineageNode>> addedTargetDataNodes,
Map<String, LineageNode> addedSourceDataNodes,
Map<String, List<LineageNode>> toBeConvertedSourceDataNodes,
List<LineageNode> nodes,
List<LineageEdge> edges,
boolean isUpLevel)
{
List<LineageNode> sourceNodes = addedSourceNodes.get(jobNode.exec_id);
if (sourceNodes != null)
{
for(LineageNode node : sourceNodes)
{
List<LineageNode> existTargetNodes = addedTargetDataNodes.get(node.abstracted_path);
LineageNode existNode = null;
if (existTargetNodes != null)
{
Collections.sort(existTargetNodes, new Comparator<LineageNode>(){
public int compare(LineageNode node1, LineageNode node2){
if(node1.job_end_unix_time == node2.job_end_unix_time)
return 0;
return node1.job_end_unix_time > node2.job_end_unix_time ? -1 : 1;
}
});
for(LineageNode target : existTargetNodes)
{
if (node.job_start_unix_time >= target.job_end_unix_time)
{
existNode = target;
break;
}
}
}
if (existNode == null)
{
existNode = addedSourceDataNodes.get(node.abstracted_path);
}
if (existNode != null)
{ node.id = existNode.id;
}
else
{
node.id = nodes.size();
nodes.add(node);
addedSourceDataNodes.put(node.abstracted_path, node);
if (isUpLevel) {
List<LineageNode> originalSourceNodes = toBeConvertedSourceDataNodes.get(node.abstracted_path);
if (originalSourceNodes == null) {
originalSourceNodes = new ArrayList<LineageNode>();
}
originalSourceNodes.add(node);
toBeConvertedSourceDataNodes.put(node.abstracted_path, originalSourceNodes);
}
}
LineageEdge edge = new LineageEdge();
edge.id = edges.size();
edge.source = node.id;
edge.target = jobNode.id;
edge.label = node.operation;
edge.chain = "";
edges.add(edge);
}
}
}
public static void addTargetNode(
LineageNode jobNode,
Map<Long, List<LineageNode>> addedTargetNodes,
Map<String, List<LineageNode>> addedTargetDataNodes,
Map<String, LineageNode> toBeClearedSourceDataNodes,
List<LineageNode> nodes,
List<LineageEdge> edges,
boolean isUpLevel)
{
List<LineageNode> targetNodes = addedTargetNodes.get(jobNode.exec_id);
if (targetNodes != null)
{
for(LineageNode node : targetNodes)
{
List<LineageNode> existTargetNodes = addedTargetDataNodes.get(node.abstracted_path);
LineageNode existNode = null;
if (existTargetNodes != null)
{
for(LineageNode target : existTargetNodes)
{
if (target.partition_end != null)
{
if (target.partition_end == node.partition_end)
{
existNode = target;
break;
}
}
else if(target.job_end_unix_time == node.job_end_unix_time)
{
existNode = target;
break;
}
}
}
if (isUpLevel) {
if (existNode == null)
{
existNode = toBeClearedSourceDataNodes.get(node.abstracted_path);
if (existNode != null && existNode.job_start_unix_time < node.job_end_unix_time)
{
existNode = null;
}
}
}
if (existNode != null)
{
node.id = existNode.id;
}
else
{
node.id = nodes.size();
nodes.add(node);
if (existTargetNodes == null)
{
existTargetNodes = new ArrayList<LineageNode>();
}
existTargetNodes.add(node);
addedTargetDataNodes.put(node.abstracted_path, existTargetNodes);
}
LineageEdge edge = new LineageEdge();
edge.id = edges.size();
edge.source = jobNode.id;
edge.target = node.id;
edge.label = node.operation;
edge.chain = "";
edges.add(edge);
}
}
}
public static JsonNode renderGraph(LineagePathInfo pathInfo,
String urn,
int upLevel,
int downLevel,
List<LineageNode> allSourceNodes,
List<LineageNode> allTargetNodes,
Map<Long, List<LineageNode>> addedSourceNodes,
Map<Long, List<LineageNode>> addedTargetNodes,
Map<Long, LineageNode> addedJobNodes,
List<LineageNode> nodes,
List<LineageEdge> edges)
{
ObjectNode resultNode = Json.newObject();
String message = null;
Map<String, LineageNode> addedSourceDataNodes = new HashMap<String, LineageNode>();
Map<String, LineageNode> toBeClearedSourceDataNodes = new HashMap<String, LineageNode>();
Map<String, List<LineageNode>> addedTargetDataNodes = new HashMap<String, List<LineageNode>>();
Map<String, List<LineageNode>> toBeConvertedSourceDataNodes = new HashMap<String, List<LineageNode>>();
message = "No lineage information found for this dataset";
if (allSourceNodes.size() == 0 && allTargetNodes.size() == 0)
{
LineageNode node = new LineageNode(); LineageNode node = new LineageNode();
node.id = index; node.id = nodes.size();
node._sort_list = new ArrayList<String>(); node._sort_list = new ArrayList<String>();
node.node_type = "data"; node.node_type = "data";
node.abstracted_path = pathInfo.filePath; node.abstracted_path = pathInfo.filePath;
@ -122,262 +360,395 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO
node._sort_list.add("urn"); node._sort_list.add("urn");
node.urn = urn; node.urn = urn;
nodes.add(node); nodes.add(node);
addedDataNodes.put(node.urn, node.id);
String message = null;
getObjectAdjacentNode(
pathInfo,
urn,
upLevel,
downLevel,
nodes.size()-1,
edges.size(),
nodes,
edges,
addedDataNodes,
addedJobNodes,
lookBackTime);
if (nodes.size() > 0)
{
message = "Found lineage on azkaban";
} }
else else
{ {
message = "No lineage information found for this dataset"; message = "Found lineage information";
for(int i = 0; i < Math.max(upLevel, downLevel); i++)
{
if (i < upLevel)
{
if (toBeConvertedSourceDataNodes.size() > 0)
{
for(Map.Entry<String, List<LineageNode> > mapEntry : toBeConvertedSourceDataNodes.entrySet())
{
List<LineageNode> hashedNodes = addedTargetDataNodes.get(mapEntry.getKey());
List<LineageNode> list = mapEntry.getValue();
if (list != null && list.size() > 0)
{
if (hashedNodes == null)
{
hashedNodes = new ArrayList<LineageNode>();
}
hashedNodes.addAll(list);
addedTargetDataNodes.put(mapEntry.getKey(), hashedNodes);
}
}
toBeConvertedSourceDataNodes.clear();
}
if (addedSourceDataNodes.size() > 0)
{
toBeClearedSourceDataNodes.putAll(addedSourceDataNodes);
addedSourceDataNodes.clear();
}
for(LineageNode job : addedJobNodes.values())
{
if (job.level != i)
{
continue;
}
job.id = nodes.size();
nodes.add(job);
addTargetNode(job,
addedTargetNodes,
addedTargetDataNodes,
toBeClearedSourceDataNodes,
nodes,
edges,
true);
addSourceNode(job,
addedSourceNodes,
addedTargetDataNodes,
addedSourceDataNodes,
toBeConvertedSourceDataNodes,
nodes,
edges,
true);
}
}
if ((i > 0) && (i < downLevel))
{
for(LineageNode job : addedJobNodes.values())
{
if (job.level != -i)
{
continue;
}
job.id = nodes.size();
nodes.add(job);
addTargetNode(job,
addedTargetNodes,
addedTargetDataNodes,
toBeClearedSourceDataNodes,
nodes,
edges,
false);
addSourceNode(job,
addedSourceNodes,
addedTargetDataNodes,
addedSourceDataNodes,
toBeConvertedSourceDataNodes,
nodes,
edges,
false);
}
}
}
} }
resultNode.set("nodes", Json.toJson(nodes)); resultNode.set("nodes", Json.toJson(nodes));
resultNode.set("links", Json.toJson(edges)); resultNode.set("links", Json.toJson(edges));
resultNode.put("urn", urn); resultNode.put("urn", urn);
resultNode.put("message", message); resultNode.put("message", message);
return resultNode; return resultNode;
} }
public static void searchInAzkaban( public static List<String> getLiasDatasetNames(String abstractedObjectName)
{
if (StringUtils.isBlank(abstractedObjectName))
return null;
List<String> totalNames = new ArrayList<String>();
totalNames.add(abstractedObjectName);
List<String> mappedNames = getJdbcTemplate().queryForList(GET_MAPPED_OBJECT_NAME, String.class, abstractedObjectName);
if (mappedNames != null && mappedNames.size() > 0)
{
totalNames.addAll(mappedNames);
for (String name : mappedNames)
{
List<String> objNames = getJdbcTemplate().queryForList(
GET_OBJECT_NAME_BY_MAPPED_NAME, String.class, name);
{
if (objNames != null)
{
totalNames.addAll(objNames);
}
}
}
}
else
{
List<String> objNames = getJdbcTemplate().queryForList(
GET_OBJECT_NAME_BY_MAPPED_NAME, String.class, abstractedObjectName);
{
if (objNames != null)
{
totalNames.addAll(objNames);
}
}
}
List<String> results = new ArrayList<String>();
Set<String> sets = new HashSet<String>();
sets.addAll(totalNames);
results.addAll(sets);
return results;
}
public static void getNodes(
LineagePathInfo pathInfo, LineagePathInfo pathInfo,
int level,
int upLevel, int upLevel,
int downLevel, int downLevel,
int index, LineageNode currentNode,
int nodeIndex, List<LineageNode> allSourceNodes,
int edgeIndex, List<LineageNode> allTargetNodes,
List<LineageNode> nodes, Map<Long, List<LineageNode>>addedSourceNodes,
List<LineageEdge> edges, Map<Long, List<LineageNode>>addedTargetNodes,
Map<String, Integer> addedDataNodes, Map<Long, LineageNode> addedJobNodes,
Map<Long, Integer> addedJobNodes,
int lookBackTime) int lookBackTime)
{ {
if (upLevel < 1 && downLevel < 1) if (upLevel < 1 && downLevel < 1)
{ {
return; return;
} }
if (currentNode != null)
{
if ( StringUtils.isBlank(currentNode.source_target_type))
{
Logger.error("Source target type is not available");
Logger.error(currentNode.abstracted_path);
return;
}
else if (currentNode.source_target_type.equalsIgnoreCase("target") && downLevel <= 0)
{
Logger.warn("Dataset " + currentNode.abstracted_path + " downLevel = " + Integer.toString(downLevel));
return;
}
else if (currentNode.source_target_type.equalsIgnoreCase("source") && upLevel <= 0)
{
Logger.warn("Dataset " + currentNode.abstracted_path + " upLevel = " + Integer.toString(upLevel));
return;
}
}
List<String> nameList = getLiasDatasetNames(pathInfo.filePath);
List<Map<String, Object>> rows = null; List<Map<String, Object>> rows = null;
rows = getJdbcTemplate().queryForList( MapSqlParameterSource parameters = new MapSqlParameterSource();
parameters.addValue("names", nameList);
NamedParameterJdbcTemplate namedParameterJdbcTemplate = new
NamedParameterJdbcTemplate(getJdbcTemplate().getDataSource());
parameters.addValue("days", lookBackTime);
if (currentNode != null)
{
if (currentNode.source_target_type.equalsIgnoreCase("source"))
{
rows = namedParameterJdbcTemplate.queryForList(
GET_UP_LEVEL_JOB,
parameters);
}
else
{
parameters.addValue("type", currentNode.source_target_type);
rows = namedParameterJdbcTemplate.queryForList(
GET_JOB_WITH_SOURCE,
parameters);
}
}
else
{
rows = namedParameterJdbcTemplate.queryForList(
GET_JOB, GET_JOB,
pathInfo.filePath, parameters);
lookBackTime); }
if (rows != null)
{ if (rows != null) {
for (Map row : rows) for (Map row : rows) {
{
LineageNode node = new LineageNode(); LineageNode node = new LineageNode();
Object jobExecIdObject = row.get("job_exec_id"); Object jobExecIdObject = row.get("job_exec_id");
if (jobExecIdObject == null) if (jobExecIdObject == null) {
{
continue; continue;
} }
Long jobExecId = ((BigInteger) jobExecIdObject).longValue(); Long jobExecId = ((BigInteger) jobExecIdObject).longValue();
Integer jobNodeId = addedJobNodes.get(jobExecId); if (addedJobNodes.get(jobExecId) != null) {
if (jobNodeId != null && jobNodeId > 0)
{
continue; continue;
} }
node._sort_list = new ArrayList<String>(); node._sort_list = new ArrayList<String>();
node.node_type = "script"; node.node_type = "script";
node.job_type = (String) row.get("job_type"); node.job_type = (String) row.get("job_type");
node.cluster = (String) row.get("cluster"); node.cluster = (String) row.get("cluster");
node.job_path = (String)row.get("job_path");
node.job_name = (String) row.get("job_name"); node.job_name = (String) row.get("job_name");
node.job_start_time = row.get("start_time").toString(); node.job_path = (String) row.get("flow_path") + "/" + node.job_name;
node.job_end_time = row.get("end_time").toString(); node.exec_id = jobExecId;
node.operation = (String) row.get("operation");
node.source_target_type = (String) row.get("source_target_type");
node.level = level;
node._sort_list.add("cluster"); node._sort_list.add("cluster");
node._sort_list.add("job_path"); node._sort_list.add("job_path");
node._sort_list.add("job_name"); node._sort_list.add("job_name");
node._sort_list.add("job_type"); node._sort_list.add("job_type");
node._sort_list.add("job_start_time"); node._sort_list.add("job_start_time");
node._sort_list.add("job_end_time"); node._sort_list.add("job_end_time");
String sourceType = (String)row.get("source_target_type"); node._sort_list.add("exec_id");
if (sourceType.equalsIgnoreCase("target") && upLevel > 0) addedJobNodes.put(jobExecId, node);
{ List<LineageNode> sourceNodeList = new ArrayList<LineageNode>();
node.id = nodeIndex; List<LineageNode> targetNodeList = new ArrayList<LineageNode>();
nodes.add(node);
addedJobNodes.put(jobExecId, node.id);
LineageEdge edge = new LineageEdge();
edge.source = nodeIndex;
edge.target = index;
edge.id = edgeIndex++;
edge.label = (String) row.get("operation");
edge.chain = (String) row.get("flow_path");
edges.add(edge);
}
else if (sourceType.equalsIgnoreCase("source") && downLevel > 0)
{
node.id = nodeIndex;
nodes.add(node);
addedJobNodes.put(jobExecId, node.id);
LineageEdge edge = new LineageEdge();
edge.source = index;
edge.target = nodeIndex;
edge.id = edgeIndex++;
edge.label = (String) row.get("operation");
edge.chain = (String) row.get("flow_path");
edges.add(edge);
}
else
{
continue;
}
int jobIndex = nodeIndex;
nodeIndex++;
int applicationID = (int)row.get("app_id"); int applicationID = (int)row.get("app_id");
Long jobId = ((BigInteger)row.get("job_exec_id")).longValue(); Long jobId = ((BigInteger)row.get("job_exec_id")).longValue();
List<Map<String, Object>> relatedDataRows = null; List<Map<String, Object>> relatedDataRows = null;
if (node.source_target_type.equalsIgnoreCase("source") && node.operation.equalsIgnoreCase("JDBC Read"))
{
MapSqlParameterSource lassenParams = new MapSqlParameterSource();
lassenParams.addValue("names", nameList);
lassenParams.addValue("appid", applicationID);
lassenParams.addValue("execid", jobId);
relatedDataRows = namedParameterJdbcTemplate.queryForList(
GET_DATA_FILTER_OUT_LASSEN,
lassenParams);
}
else
{
relatedDataRows = getJdbcTemplate().queryForList( relatedDataRows = getJdbcTemplate().queryForList(
GET_DATA, GET_DATA,
applicationID, applicationID,
jobId); jobId);
if (relatedDataRows != null) }
{
if (relatedDataRows != null) {
for (Map relatedDataRow : relatedDataRows) for (Map relatedDataRow : relatedDataRows)
{ {
String abstractedObjectName = (String)relatedDataRow.get("abstracted_object_name"); String abstractedObjectName = (String)relatedDataRow.get("abstracted_object_name");
Integer dataNodeId = addedDataNodes.get(abstractedObjectName);
if (abstractedObjectName.startsWith("/tmp/")) if (abstractedObjectName.startsWith("/tmp/"))
{ {
continue; continue;
} }
String relatedSourceType = (String)relatedDataRow.get("source_target_type"); String relatedSourceType = (String)relatedDataRow.get("source_target_type");
if (sourceType.equalsIgnoreCase("target") && relatedSourceType.equalsIgnoreCase("source"))
{
LineageNode relatedNode = new LineageNode(); LineageNode relatedNode = new LineageNode();
relatedNode._sort_list = new ArrayList<String>(); relatedNode._sort_list = new ArrayList<String>();
relatedNode.node_type = "data"; relatedNode.node_type = "data";
relatedNode.level = level;
relatedNode.source_target_type = relatedSourceType;
relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name");
relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase();
relatedNode.job_start_unix_time = (Long)relatedDataRow.get("job_start_unixtime");
relatedNode.job_start_time = relatedDataRow.get("start_time").toString();
relatedNode.job_end_time = relatedDataRow.get("end_time").toString();
relatedNode.job_end_unix_time = (Long)relatedDataRow.get("job_finished_unixtime");
node.job_start_unix_time = relatedNode.job_start_unix_time;
node.job_end_unix_time = relatedNode.job_end_unix_time;
node.job_start_time = relatedNode.job_start_time;
node.job_end_time = relatedNode.job_end_time;
relatedNode.operation = (String)relatedDataRow.get("operation");
LineagePathInfo info = new LineagePathInfo(); LineagePathInfo info = new LineagePathInfo();
info.filePath = relatedNode.abstracted_path; info.filePath = relatedNode.abstracted_path;
info.storageType = relatedNode.storage_type; info.storageType = relatedNode.storage_type;
relatedNode.urn = utils.Lineage.convertToURN(info); relatedNode.urn = utils.Lineage.convertToURN(info);
relatedNode._sort_list.add("abstracted_path"); relatedNode._sort_list.add("abstracted_path");
relatedNode._sort_list.add("storage_type"); relatedNode._sort_list.add("storage_type");
if (dataNodeId != null && dataNodeId > 0) relatedNode._sort_list.add("urn");
if (relatedSourceType.equalsIgnoreCase("source"))
{ {
relatedNode.id = dataNodeId; if (node.source_target_type.equalsIgnoreCase("target") ||
} utils.Lineage.isInList(nameList, relatedNode.abstracted_path))
else
{ {
relatedNode.id = nodeIndex; sourceNodeList.add(relatedNode);
nodes.add(relatedNode); allSourceNodes.add(relatedNode);
addedDataNodes.put(abstractedObjectName, relatedNode.id);
} }
LineageEdge relatedEdge = new LineageEdge(); }
relatedEdge.source = relatedNode.id; else if (relatedSourceType.equalsIgnoreCase("target"))
relatedEdge.target = jobIndex; {
relatedEdge.id = edgeIndex++; if (node.source_target_type.equalsIgnoreCase("source") ||
relatedEdge.label = (String)relatedDataRow.get("operation"); utils.Lineage.isInList(nameList, relatedNode.abstracted_path))
relatedEdge.chain = ""; {
edges.add(relatedEdge); targetNodeList.add(relatedNode);
nodeIndex++; allTargetNodes.add(relatedNode);
if (upLevel > 1) }
}
}
if (sourceNodeList.size() > 0)
{
addedSourceNodes.put(jobExecId, sourceNodeList);
}
if (targetNodeList.size() > 0)
{
addedTargetNodes.put(jobExecId, targetNodeList);
}
}
}
}
if ((allSourceNodes != null ) && (allSourceNodes.size() > 0) && (upLevel > 1))
{
List<LineageNode> currentSourceNodes = new ArrayList<LineageNode>();
currentSourceNodes.addAll(allSourceNodes);
for(LineageNode sourceNode : currentSourceNodes)
{ {
LineagePathInfo subPath = new LineagePathInfo(); LineagePathInfo subPath = new LineagePathInfo();
subPath.storageType = relatedNode.storage_type; subPath.storageType = sourceNode.storage_type;
subPath.filePath = relatedNode.abstracted_path; subPath.filePath = sourceNode.abstracted_path;
String subUrn = utils.Lineage.convertToURN(subPath); if (sourceNode.level == level)
{
getObjectAdjacentNode( getObjectAdjacentNode(
subPath, subPath,
subUrn, level+1,
upLevel - 1, upLevel - 1,
0, 0,
nodeIndex-1, sourceNode,
edgeIndex, allSourceNodes,
nodes, allTargetNodes,
edges, addedSourceNodes,
addedDataNodes, addedTargetNodes,
addedJobNodes, addedJobNodes,
lookBackTime); lookBackTime);
} }
nodeIndex = nodes.size();
} }
else if (sourceType.equalsIgnoreCase("source") && relatedSourceType.equalsIgnoreCase("target"))
{
LineageNode relatedNode = new LineageNode();
relatedNode._sort_list = new ArrayList<String>();
relatedNode.node_type = "data";
relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name");
relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase();
LineagePathInfo info = new LineagePathInfo();
info.filePath = relatedNode.abstracted_path;
info.storageType = relatedNode.storage_type;
relatedNode.urn = utils.Lineage.convertToURN(info);
relatedNode._sort_list.add("abstracted_path");
relatedNode._sort_list.add("storage_type");
if (dataNodeId != null && dataNodeId > 0)
{
relatedNode.id = dataNodeId;
} }
else if ((allTargetNodes != null ) && (allTargetNodes.size() > 0) && (downLevel > 1))
{ {
relatedNode.id = nodeIndex; List<LineageNode> currentTargetNodes = new ArrayList<LineageNode>();
nodes.add(relatedNode); currentTargetNodes.addAll(allTargetNodes);
addedDataNodes.put(abstractedObjectName, relatedNode.id); for(LineageNode targetNode : currentTargetNodes)
}
LineageEdge relatedEdge = new LineageEdge();
relatedEdge.source = jobIndex;
relatedEdge.target = relatedNode.id;
relatedEdge.id = edgeIndex++;
relatedEdge.label = (String)relatedDataRow.get("operation");
relatedEdge.chain = "";
edges.add(relatedEdge);
nodeIndex++;
if (downLevel > 1)
{ {
LineagePathInfo subPath = new LineagePathInfo(); LineagePathInfo subPath = new LineagePathInfo();
subPath.storageType = relatedNode.storage_type; subPath.storageType = targetNode.storage_type;
subPath.filePath = relatedNode.abstracted_path; subPath.filePath = targetNode.abstracted_path;
String subUrn = utils.Lineage.convertToURN(subPath); if (targetNode.level == level)
{
getObjectAdjacentNode( getObjectAdjacentNode(
subPath, subPath,
subUrn, level-1,
0, 0,
downLevel - 1, downLevel - 1,
nodeIndex-1, targetNode,
edgeIndex, allSourceNodes,
nodes, allTargetNodes,
edges, addedSourceNodes,
addedDataNodes, addedTargetNodes,
addedJobNodes, addedJobNodes,
lookBackTime); lookBackTime);
} }
nodeIndex = nodes.size();
} }
} }
} }
}
}
}
public static void getObjectAdjacentNode( public static void getObjectAdjacentNode(
LineagePathInfo pathInfo, LineagePathInfo pathInfo,
String urn, int level,
int upLevel, int upLevel,
int downLevel, int downLevel,
int index, LineageNode currentNode,
int edgeIndex, List<LineageNode> allSourceNodes,
List<LineageNode> nodes, List<LineageNode> allTargetNodes,
List<LineageEdge> edges, Map<Long, List<LineageNode>> addedSourceNodes,
Map<String, Integer> addedDataNodes, Map<Long, List<LineageNode>> addedTargetNodes,
Map<Long, Integer> addedJobNodes, Map<Long, LineageNode> addedJobNodes,
int lookBackTime) int lookBackTime)
{ {
if (upLevel < 1 && downLevel < 1) if (upLevel < 1 && downLevel < 1)
@ -389,31 +760,18 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO
{ {
return; return;
} }
getNodes(
int nodeIndex = index + 1;
List<Integer> appIDList = getJdbcTemplate().queryForList(
GET_APPLICATION_ID, new Object[] {pathInfo.filePath}, Integer.class);
if (appIDList != null)
{
int nodeId = 0;
for(Integer id : appIDList)
{
searchInAzkaban(
pathInfo, pathInfo,
level,
upLevel, upLevel,
downLevel, downLevel,
index, currentNode,
nodeIndex, allSourceNodes,
edgeIndex, allTargetNodes,
nodes, addedSourceNodes,
edges, addedTargetNodes,
addedDataNodes,
addedJobNodes, addedJobNodes,
lookBackTime); lookBackTime);
break;
}
}
} }
public static ObjectNode getFlowLineage(String application, String project, Long flowId) public static ObjectNode getFlowLineage(String application, String project, Long flowId)

View File

@ -31,6 +31,9 @@ public class LineageNode {
public String script_path; public String script_path;
public String job_start_time; public String job_start_time;
public String job_end_time; public String job_end_time;
public Long job_start_unix_time;
public Long job_end_unix_time;
public int level;
public String git_location; public String git_location;
public List<String> _sort_list; public List<String> _sort_list;
public String source_target_type; public String source_target_type;

View File

@ -24,6 +24,7 @@ import java.io.BufferedReader;
import java.io.FileReader; import java.io.FileReader;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
public class Lineage public class Lineage
@ -44,40 +45,11 @@ public class Lineage
String storageType = pathArray[0]; String storageType = pathArray[0];
pathInfo.storageType = storageType; pathInfo.storageType = storageType;
if (StringUtils.isNotBlank(storageType)) if (StringUtils.isNotBlank(storageType))
{
if (storageType.equalsIgnoreCase("hdfs"))
{ {
if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1]))
{ {
pathInfo.filePath = "/" + pathArray[1]; pathInfo.filePath = "/" + pathArray[1];
} }
}
else if (storageType.equalsIgnoreCase("teradata"))
{
if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1]))
{
int index = pathArray[1].indexOf("/");
if (index != -1)
{
pathInfo.schemaName = pathArray[1].substring(0, index);
pathInfo.filePath = pathArray[1].substring(index+1);
}
}
}
else if (storageType.equalsIgnoreCase("nas"))
{
if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1]))
{
pathInfo.filePath = "/" + pathArray[1];
}
}
else if (storageType.equalsIgnoreCase("hive"))
{
if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1]))
{
pathInfo.filePath = "/" + pathArray[1];
}
}
else else
{ {
pathInfo.storageType = null; pathInfo.storageType = null;
@ -113,13 +85,22 @@ public class Lineage
filePath = pathInfo.filePath; filePath = pathInfo.filePath;
} }
} }
if (StringUtils.isNotBlank(pathInfo.storageType) && pathInfo.storageType.equalsIgnoreCase("teradata"))
{
return "teradata:///" + pathInfo.schemaName + "/" + filePath;
}
else
{
return pathInfo.storageType.toLowerCase() + ":///" + filePath; return pathInfo.storageType.toLowerCase() + ":///" + filePath;
} }
public static boolean isInList(List<String> list, String source)
{
if (list == null || list.size() == 0 || StringUtils.isBlank(source))
{
return false;
}
for(String s : list)
{
if (source.equalsIgnoreCase(s))
{
return true;
}
}
return false;
} }
} }