From c56cc6a4209dbc61efab3445a66247d762eb158c Mon Sep 17 00:00:00 2001 From: jbai Date: Mon, 11 Apr 2016 16:42:47 -0700 Subject: [PATCH 1/3] update the tract pathinfo method since db content changed --- web/app/utils/Lineage.java | 33 ++------------------------------- 1 file changed, 2 insertions(+), 31 deletions(-) diff --git a/web/app/utils/Lineage.java b/web/app/utils/Lineage.java index 623adcbdc0..4a102114a1 100644 --- a/web/app/utils/Lineage.java +++ b/web/app/utils/Lineage.java @@ -45,38 +45,9 @@ public class Lineage pathInfo.storageType = storageType; if (StringUtils.isNotBlank(storageType)) { - if (storageType.equalsIgnoreCase("hdfs")) + if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) { - if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) - { - pathInfo.filePath = "/" + pathArray[1]; - } - } - else if (storageType.equalsIgnoreCase("teradata")) - { - if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) - { - int index = pathArray[1].indexOf("/"); - if (index != -1) - { - pathInfo.schemaName = pathArray[1].substring(0, index); - pathInfo.filePath = pathArray[1].substring(index+1); - } - } - } - else if (storageType.equalsIgnoreCase("nas")) - { - if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) - { - pathInfo.filePath = "/" + pathArray[1]; - } - } - else if (storageType.equalsIgnoreCase("hive")) - { - if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) - { - pathInfo.filePath = "/" + pathArray[1]; - } + pathInfo.filePath = "/" + pathArray[1]; } else { From 10fe986d712b3f2ce25af15608916bf9f9c4546d Mon Sep 17 00:00:00 2001 From: jbai Date: Mon, 18 Apr 2016 18:08:27 -0700 Subject: [PATCH 2/3] update lineage render method --- web/app/dao/LineageDAO.java | 587 ++++++++++++++++++++------------ web/app/models/LineageNode.java | 2 + web/app/utils/Lineage.java | 9 +- 3 files changed, 373 insertions(+), 225 deletions(-) diff --git a/web/app/dao/LineageDAO.java b/web/app/dao/LineageDAO.java index c359bd7ed1..fcdd3a82d0 100644 --- a/web/app/dao/LineageDAO.java +++ b/web/app/dao/LineageDAO.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.ImmutablePair; import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import play.Logger; import play.Play; @@ -42,23 +43,49 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO "je.flow_id, je.job_id, jedl.job_name, " + "fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + "jedl.operation, jedl.source_srl_no, jedl.srl_no, " + - "max(jedl.job_exec_id) as job_exec_id, FROM_UNIXTIME(jedl.job_start_unixtime) as start_time, " + + "max(jedl.job_exec_id) as job_exec_id, jedl.job_start_unixtime, jedl.job_finished_unixtime," + + "FROM_UNIXTIME(jedl.job_start_unixtime) as start_time, " + "FROM_UNIXTIME(jedl.job_finished_unixtime) as end_time FROM job_execution_data_lineage jedl " + "JOIN cfg_application ca on ca.app_id = jedl.app_id " + "JOIN job_execution je on jedl.app_id = je.app_id " + "and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " + - "JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + - "WHERE abstracted_object_name = ? and " + + "LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + + "WHERE abstracted_object_name in ( :names ) and " + "jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + "COALESCE(jedl.source_srl_no, jedl.srl_no) = jedl.srl_no and " + - "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL ? DAY " + - "GROUP BY ca.app_id, je.job_id, je.flow_id"; + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " + + "GROUP BY ca.app_id, je.job_id, je.flow_id, jedl.source_target_type, jedl.storage_type " + + "ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime"; + + private final static String GET_JOB_WITH_SOURCE = "SELECT ca.app_id, ca.app_code as cluster, " + + "je.flow_id, je.job_id, jedl.job_name, " + + "fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + + "jedl.operation, jedl.source_srl_no, jedl.srl_no, " + + "max(jedl.job_exec_id) as job_exec_id, jedl.job_start_unixtime, jedl.job_finished_unixtime," + + "FROM_UNIXTIME(jedl.job_start_unixtime) as start_time, " + + "FROM_UNIXTIME(jedl.job_finished_unixtime) as end_time FROM job_execution_data_lineage jedl " + + "JOIN cfg_application ca on ca.app_id = jedl.app_id " + + "JOIN job_execution je on jedl.app_id = je.app_id " + + "and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " + + "LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + + "WHERE abstracted_object_name in ( :names ) and jedl.source_target_type != (:type) and " + + "jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + + "COALESCE(jedl.source_srl_no, jedl.srl_no) = jedl.srl_no and " + + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " + + "GROUP BY ca.app_id, je.job_id, je.flow_id, jedl.source_target_type, jedl.storage_type " + + "ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime"; private final static String GET_DATA = "SELECT storage_type, operation, " + - "abstracted_object_name, source_target_type " + + "abstracted_object_name, source_target_type, job_start_unixtime, job_finished_unixtime " + "FROM job_execution_data_lineage WHERE app_id = ? and job_exec_id = ? and " + "flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + - "COALESCE(source_srl_no, srl_no) = srl_no"; + "COALESCE(source_srl_no, srl_no) = srl_no ORDER BY source_target_type DESC"; + + private final static String GET_DATA_WITH_SOURCE = "SELECT storage_type, operation, " + + "abstracted_object_name, source_target_type, job_start_unixtime, job_finished_unixtime " + + "FROM job_execution_data_lineage WHERE app_id = ? and job_exec_id = ? and source_target_type = ? and " + + "flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + + "COALESCE(source_srl_no, srl_no) = srl_no ORDER BY source_target_type DESC"; private final static String GET_APP_ID = "SELECT app_id FROM cfg_application WHERE LOWER(app_code) = ?"; @@ -89,17 +116,23 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO "JOIN cfg_application ca on ca.app_id = jedl.app_id " + "WHERE jedl.app_id = ? and jedl.flow_exec_id = ? ORDER BY jedl.partition_end DESC"; - private final static String GET_ONE_LEVEL_IMPACT_DATABASES = "SELECT DISTINCT j.storage_type, " + - "j.abstracted_object_name, d.id FROM job_execution_data_lineage j " + - "LEFT JOIN dict_dataset d ON d.urn = concat(j.storage_type, '://', j.abstracted_object_name) " + - "WHERE (app_id, job_exec_id) in ( " + - "SELECT app_id, job_exec_id FROM job_execution_data_lineage " + - "WHERE abstracted_object_name in (:pathlist) and source_target_type = 'source' and " + - "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL 60 DAY ) and " + - "abstracted_object_name not like '/tmp/%' and abstracted_object_name not like '%tmp' " + - "and source_target_type = 'target' and " + + private final static String GET_ONE_LEVEL_IMPACT_DATABASES = "SELECT DISTINCT j.storage_type, " + + "j.abstracted_object_name, d.id FROM job_execution_data_lineage j " + + "LEFT JOIN dict_dataset d ON d.urn = concat(j.storage_type, '://', j.abstracted_object_name) " + + "WHERE (app_id, job_exec_id) in ( " + + "SELECT app_id, job_exec_id FROM job_execution_data_lineage " + + "WHERE abstracted_object_name in (:pathlist) and source_target_type = 'source' and " + + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL 60 DAY ) and " + + "abstracted_object_name not like '/tmp/%' and abstracted_object_name not like '%tmp' " + + "and source_target_type = 'target' and " + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL 60 DAY"; + private final static String GET_MAPPED_OBJECT_NAME = "SELECT mapped_object_name " + + "FROM cfg_object_name_map WHERE object_name = ?"; + + private final static String GET_OBJECT_NAME_BY_MAPPED_NAME = "SELECT object_name " + + "FROM cfg_object_name_map WHERE mapped_object_name = ?"; + public static JsonNode getObjectAdjacnet(String urn, int upLevel, int downLevel, int lookBackTime) { @@ -108,37 +141,23 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO List nodes = new ArrayList(); List edges = new ArrayList(); Map addedJobNodes = new HashMap(); - Map addedDataNodes = new HashMap(); - int index = 0; - int edgeIndex = 0; - LineageNode node = new LineageNode(); - node.id = index; - node._sort_list = new ArrayList(); - node.node_type = "data"; - node.abstracted_path = pathInfo.filePath; - node.storage_type = pathInfo.storageType; - node._sort_list.add("abstracted_path"); - node._sort_list.add("storage_type"); - node._sort_list.add("urn"); - node.urn = urn; - nodes.add(node); - addedDataNodes.put(node.urn, node.id); + Map addedSourceDataNode = new HashMap(); + Map> addedTargetDataNodes = new HashMap>(); String message = null; getObjectAdjacentNode( pathInfo, - urn, upLevel, downLevel, - nodes.size()-1, - edges.size(), + null, nodes, edges, - addedDataNodes, + addedSourceDataNode, + addedTargetDataNodes, addedJobNodes, - lookBackTime); + lookBackTime); if (nodes.size() > 0) { - message = "Found lineage on azkaban"; + message = "Found lineage information"; } else { @@ -151,167 +170,290 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO return resultNode; } + public static List getLiasDatasetNames(String abstractedObjectName) + { + if (StringUtils.isBlank(abstractedObjectName)) + return null; + List totalNames = new ArrayList(); + totalNames.add(abstractedObjectName); + List mappedNames; + mappedNames = getJdbcTemplate().queryForList(GET_MAPPED_OBJECT_NAME, String.class, abstractedObjectName); + if (mappedNames != null) + { + totalNames.addAll(mappedNames); + for (String name : mappedNames) + { + List objNames = getJdbcTemplate().queryForList( + GET_OBJECT_NAME_BY_MAPPED_NAME, String.class, name); + { + if (objNames != null) + { + totalNames.addAll(objNames); + } + } + + } + } + List results = new ArrayList(); + Set sets = new HashSet(); + sets.addAll(totalNames); + results.addAll(sets); + return results; + } + public static void searchInAzkaban( LineagePathInfo pathInfo, int upLevel, int downLevel, - int index, - int nodeIndex, - int edgeIndex, + LineageNode currentNode, List nodes, List edges, - Map addedDataNodes, + MapaddedSourceNode, + Map>addedTargetNodes, Map addedJobNodes, - int lookBackTime) + int lookBackTime) { if (upLevel < 1 && downLevel < 1) { return; } - - List> rows = null; - rows = getJdbcTemplate().queryForList( - GET_JOB, - pathInfo.filePath, - lookBackTime); - if (rows != null) + if (currentNode != null) { - for (Map row : rows) + if ( StringUtils.isBlank(currentNode.source_target_type)) { + Logger.error("Source target type is not available"); + Logger.error(currentNode.abstracted_path); + return; + } + else if (currentNode.source_target_type.equalsIgnoreCase("target") && downLevel <= 0) + { + Logger.warn("Dataset " + currentNode.abstracted_path + " downLevel = " + Integer.toString(downLevel)); + return; + } + else if (currentNode.source_target_type.equalsIgnoreCase("source") && upLevel <= 0) + { + Logger.warn("Dataset " + currentNode.abstracted_path + " upLevel = " + Integer.toString(upLevel)); + return; + } + } + List nameList = getLiasDatasetNames(pathInfo.filePath); + List> rows = null; + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("names", nameList); + NamedParameterJdbcTemplate namedParameterJdbcTemplate = new + NamedParameterJdbcTemplate(getJdbcTemplate().getDataSource()); + parameters.addValue("days", lookBackTime); + + if (currentNode != null) + { + parameters.addValue("type", currentNode.source_target_type); + rows = namedParameterJdbcTemplate.queryForList( + GET_JOB_WITH_SOURCE, + parameters); + if (rows != null && rows.size() > 0) + { + if (upLevel >= 1) + { + List addedTargets = addedTargetNodes.get(currentNode.abstracted_path); + if (addedTargets == null) + { + addedTargets = new ArrayList(); + } + addedTargets.add(currentNode); + addedTargetNodes.put(currentNode.abstracted_path, addedTargets); + addedSourceNode.remove(currentNode.abstracted_path); + } + } + } else { + rows = namedParameterJdbcTemplate.queryForList( + GET_JOB, + parameters); + } + + if (rows != null) { + for (Map row : rows) { LineageNode node = new LineageNode(); Object jobExecIdObject = row.get("job_exec_id"); - if (jobExecIdObject == null) - { + if (jobExecIdObject == null) { continue; } - Long jobExecId = ((BigInteger)jobExecIdObject).longValue(); + Long jobExecId = ((BigInteger) jobExecIdObject).longValue(); Integer jobNodeId = addedJobNodes.get(jobExecId); - if (jobNodeId != null && jobNodeId > 0) - { + if (jobNodeId != null) { continue; } node._sort_list = new ArrayList(); node.node_type = "script"; - node.job_type = (String)row.get("job_type"); - node.cluster = (String)row.get("cluster"); - node.job_path = (String)row.get("job_path"); - node.job_name = (String)row.get("job_name"); + node.job_type = (String) row.get("job_type"); + node.cluster = (String) row.get("cluster"); + node.job_path = (String) row.get("job_path"); + node.job_name = (String) row.get("job_name"); node.job_start_time = row.get("start_time").toString(); + node.job_start_unix_time = (Long) row.get("job_start_unixtime"); node.job_end_time = row.get("end_time").toString(); + node.job_end_unix_time = (Long) row.get("job_finished_unixtime"); + node.source_target_type = (String) row.get("source_target_type"); node._sort_list.add("cluster"); node._sort_list.add("job_path"); node._sort_list.add("job_name"); node._sort_list.add("job_type"); node._sort_list.add("job_start_time"); node._sort_list.add("job_end_time"); - String sourceType = (String)row.get("source_target_type"); - if (sourceType.equalsIgnoreCase("target") && upLevel > 0) + + if (currentNode != null) { - node.id = nodeIndex; - nodes.add(node); - addedJobNodes.put(jobExecId, node.id); - LineageEdge edge = new LineageEdge(); - edge.source = nodeIndex; - edge.target = index; - edge.id = edgeIndex++; - edge.label = (String) row.get("operation"); - edge.chain = (String) row.get("flow_path"); - edges.add(edge); - } - else if (sourceType.equalsIgnoreCase("source") && downLevel > 0) - { - node.id = nodeIndex; - nodes.add(node); - addedJobNodes.put(jobExecId, node.id); - LineageEdge edge = new LineageEdge(); - edge.source = index; - edge.target = nodeIndex; - edge.id = edgeIndex++; - edge.label = (String) row.get("operation"); - edge.chain = (String) row.get("flow_path"); - edges.add(edge); + if (currentNode.source_target_type.equalsIgnoreCase("target") && downLevel > 0) + { + node.id = nodes.size(); + nodes.add(node); + addedJobNodes.put(jobExecId, node.id); + LineageEdge edge = new LineageEdge(); + edge.source = currentNode.id; + edge.target = node.id; + edge.id = edges.size(); + edge.label = (String) row.get("operation"); + edge.chain = (String) row.get("flow_path"); + edges.add(edge); + } + else if (currentNode.source_target_type.equalsIgnoreCase("source") && upLevel > 0) + { + node.id = nodes.size(); + nodes.add(node); + addedJobNodes.put(jobExecId, node.id); + LineageEdge edge = new LineageEdge(); + edge.source = node.id; + edge.target = currentNode.id; + edge.id = edges.size(); + edge.label = (String) row.get("operation"); + edge.chain = (String) row.get("flow_path"); + edges.add(edge); + } + else + { + continue; + } } else { - continue; + node.id = nodes.size(); + nodes.add(node); + addedJobNodes.put(jobExecId, node.id); } - int jobIndex = nodeIndex; - nodeIndex++; + int jobIndex = node.id; int applicationID = (int)row.get("app_id"); Long jobId = ((BigInteger)row.get("job_exec_id")).longValue(); List> relatedDataRows = null; - relatedDataRows = getJdbcTemplate().queryForList( - GET_DATA, - applicationID, - jobId); - if (relatedDataRows != null) + if (currentNode != null) { + relatedDataRows = getJdbcTemplate().queryForList( + GET_DATA_WITH_SOURCE, + applicationID, + jobId, + currentNode.source_target_type); + } + else + { + relatedDataRows = getJdbcTemplate().queryForList( + GET_DATA, + applicationID, + jobId); + } + + if (relatedDataRows != null) { for (Map relatedDataRow : relatedDataRows) { String abstractedObjectName = (String)relatedDataRow.get("abstracted_object_name"); - Integer dataNodeId = addedDataNodes.get(abstractedObjectName); if (abstractedObjectName.startsWith("/tmp/")) { continue; } String relatedSourceType = (String)relatedDataRow.get("source_target_type"); - if (sourceType.equalsIgnoreCase("target") && relatedSourceType.equalsIgnoreCase("source")) + if (relatedSourceType.equalsIgnoreCase("source")) { LineageNode relatedNode = new LineageNode(); relatedNode._sort_list = new ArrayList(); relatedNode.node_type = "data"; - relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); - relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); - LineagePathInfo info = new LineagePathInfo(); + relatedNode.source_target_type = relatedSourceType; + relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); + relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); + relatedNode.job_start_unix_time = (Long)relatedDataRow.get("job_start_unix_time"); + relatedNode.job_end_unix_time = (Long)relatedDataRow.get("job_end_unix_time"); + LineagePathInfo info = new LineagePathInfo(); info.filePath = relatedNode.abstracted_path; info.storageType = relatedNode.storage_type; relatedNode.urn = utils.Lineage.convertToURN(info); relatedNode._sort_list.add("abstracted_path"); relatedNode._sort_list.add("storage_type"); - if (dataNodeId != null && dataNodeId > 0) + relatedNode._sort_list.add("urn"); + List addedTargets = addedTargetNodes.get(abstractedObjectName); + LineageNode addedSource = addedSourceNode.get(abstractedObjectName); + LineageNode existNode = null; + if (addedTargets != null) { - relatedNode.id = dataNodeId; + Collections.sort(addedTargets, new Comparator(){ + public int compare(LineageNode node1, LineageNode node2){ + if(node1.job_end_unix_time == node2.job_end_unix_time) + return 0; + return node1.job_end_unix_time > node2.job_end_unix_time ? -1 : 1; + } + }); + for(LineageNode target : addedTargets) + { + if (relatedNode.job_start_unix_time != null + && target.job_end_unix_time != null + && relatedNode.job_start_unix_time > target.job_end_unix_time) + { + existNode = target; + break; + } + } + } + if (existNode == null && addedSource != null) + { + existNode = addedSource; + } + if (existNode != null) + { + relatedNode.id = existNode.id; } else { - relatedNode.id = nodeIndex; + relatedNode.id = nodes.size(); nodes.add(relatedNode); - addedDataNodes.put(abstractedObjectName, relatedNode.id); + addedSourceNode.put(abstractedObjectName, relatedNode); } LineageEdge relatedEdge = new LineageEdge(); relatedEdge.source = relatedNode.id; relatedEdge.target = jobIndex; - relatedEdge.id = edgeIndex++; + relatedEdge.id = edges.size(); relatedEdge.label = (String)relatedDataRow.get("operation"); relatedEdge.chain = ""; edges.add(relatedEdge); - nodeIndex++; if (upLevel > 1) { LineagePathInfo subPath = new LineagePathInfo(); subPath.storageType = relatedNode.storage_type; subPath.filePath = relatedNode.abstracted_path; - String subUrn = utils.Lineage.convertToURN(subPath); getObjectAdjacentNode( subPath, - subUrn, - upLevel-1, + upLevel - 1, 0, - nodeIndex-1, - edgeIndex, + relatedNode, nodes, edges, - addedDataNodes, + addedSourceNode, + addedTargetNodes, addedJobNodes, - lookBackTime); + lookBackTime); } - nodeIndex = nodes.size(); } - else if (sourceType.equalsIgnoreCase("source") && relatedSourceType.equalsIgnoreCase("target")) + else if (relatedSourceType.equalsIgnoreCase("target")) { + List addedTargets = addedTargetNodes.get(abstractedObjectName); LineageNode relatedNode = new LineageNode(); relatedNode._sort_list = new ArrayList(); relatedNode.node_type = "data"; + relatedNode.source_target_type = relatedSourceType; relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); LineagePathInfo info = new LineagePathInfo(); @@ -320,45 +462,71 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO relatedNode.urn = utils.Lineage.convertToURN(info); relatedNode._sort_list.add("abstracted_path"); relatedNode._sort_list.add("storage_type"); - if (dataNodeId != null && dataNodeId > 0) + LineageNode existNode = null; + if (addedTargets != null) { - relatedNode.id = dataNodeId; + for(LineageNode target : addedTargets) + { + if (relatedNode.partition_end != null ) + { + if (relatedNode.partition_end == target.partition_end) + { + existNode = target; + break; + } + } + else + { + if (relatedNode.job_end_unix_time == target.job_end_unix_time) + { + existNode = target; + break; + } + + } + + } + } + if (existNode != null) + { + relatedNode.id = existNode.id; } else { - relatedNode.id = nodeIndex; + relatedNode.id = nodes.size(); nodes.add(relatedNode); - addedDataNodes.put(abstractedObjectName, relatedNode.id); + if (addedTargets == null) + { + addedTargets = new ArrayList(); + } + addedTargets.add(relatedNode); + addedTargetNodes.put(abstractedObjectName, addedTargets); } LineageEdge relatedEdge = new LineageEdge(); relatedEdge.source = jobIndex; relatedEdge.target = relatedNode.id; - relatedEdge.id = edgeIndex++; + relatedEdge.id = edges.size(); relatedEdge.label = (String)relatedDataRow.get("operation"); relatedEdge.chain = ""; edges.add(relatedEdge); - nodeIndex++; if (downLevel > 1) { LineagePathInfo subPath = new LineagePathInfo(); subPath.storageType = relatedNode.storage_type; subPath.filePath = relatedNode.abstracted_path; - String subUrn = utils.Lineage.convertToURN(subPath); getObjectAdjacentNode( subPath, - subUrn, 0, - downLevel-1, - nodeIndex-1, - edgeIndex, + downLevel - 1, + relatedNode, nodes, edges, - addedDataNodes, + addedSourceNode, + addedTargetNodes, addedJobNodes, - lookBackTime); + lookBackTime); } - nodeIndex = nodes.size(); } } @@ -369,16 +537,15 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO public static void getObjectAdjacentNode( LineagePathInfo pathInfo, - String urn, int upLevel, int downLevel, - int index, - int edgeIndex, + LineageNode currentNode, List nodes, List edges, - Map addedDataNodes, + Map addedSourceNode, + Map> addedTargetNodes, Map addedJobNodes, - int lookBackTime) + int lookBackTime) { if (upLevel < 1 && downLevel < 1) { @@ -389,31 +556,17 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO { return; } - - int nodeIndex = index + 1; - - List appIDList = getJdbcTemplate().queryForList( - GET_APPLICATION_ID, new Object[] {pathInfo.filePath}, Integer.class); - if (appIDList != null) - { - int nodeId = 0; - for(Integer id : appIDList) - { - searchInAzkaban( - pathInfo, - upLevel, - downLevel, - index, - nodeIndex, - edgeIndex, - nodes, - edges, - addedDataNodes, - addedJobNodes, - lookBackTime); - break; - } - } + searchInAzkaban( + pathInfo, + upLevel, + downLevel, + currentNode, + nodes, + edges, + addedSourceNode, + addedTargetNodes, + addedJobNodes, + lookBackTime); } public static ObjectNode getFlowLineage(String application, String project, Long flowId) @@ -818,75 +971,75 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO return resultNode; } - public static void getImpactDatasets( - List searchUrnList, - int level, - List impactDatasets) - { - if (searchUrnList != null && searchUrnList.size() > 0) { + public static void getImpactDatasets( + List searchUrnList, + int level, + List impactDatasets) + { + if (searchUrnList != null && searchUrnList.size() > 0) { - if (impactDatasets == null) - { - impactDatasets = new ArrayList(); - } + if (impactDatasets == null) + { + impactDatasets = new ArrayList(); + } - List pathList = new ArrayList(); - List nextSearchList = new ArrayList(); + List pathList = new ArrayList(); + List nextSearchList = new ArrayList(); - for (String urn : searchUrnList) - { - LineagePathInfo pathInfo = Lineage.convertFromURN(urn); - if (pathInfo != null && StringUtils.isNotBlank(pathInfo.filePath)) - { - if (!pathList.contains(pathInfo.filePath)) - { - pathList.add(pathInfo.filePath); - } - } - } + for (String urn : searchUrnList) + { + LineagePathInfo pathInfo = Lineage.convertFromURN(urn); + if (pathInfo != null && StringUtils.isNotBlank(pathInfo.filePath)) + { + if (!pathList.contains(pathInfo.filePath)) + { + pathList.add(pathInfo.filePath); + } + } + } - if (pathList != null && pathList.size() > 0) - { - Map param = Collections.singletonMap("pathlist", pathList); - NamedParameterJdbcTemplate namedParameterJdbcTemplate = new - NamedParameterJdbcTemplate(getJdbcTemplate().getDataSource()); - List impactDatasetList = namedParameterJdbcTemplate.query( - GET_ONE_LEVEL_IMPACT_DATABASES, - param, - new ImpactDatasetRowMapper()); + if (pathList != null && pathList.size() > 0) + { + Map param = Collections.singletonMap("pathlist", pathList); + NamedParameterJdbcTemplate namedParameterJdbcTemplate = new + NamedParameterJdbcTemplate(getJdbcTemplate().getDataSource()); + List impactDatasetList = namedParameterJdbcTemplate.query( + GET_ONE_LEVEL_IMPACT_DATABASES, + param, + new ImpactDatasetRowMapper()); - if (impactDatasetList != null) { - for (ImpactDataset dataset : impactDatasetList) { - dataset.level = level; - if (impactDatasets.stream().filter(o -> o.urn.equals(dataset.urn)).findFirst().isPresent()) - { - continue; - } - impactDatasets.add(dataset); - nextSearchList.add(dataset.urn); - } - } - } + if (impactDatasetList != null) { + for (ImpactDataset dataset : impactDatasetList) { + dataset.level = level; + if (impactDatasets.stream().filter(o -> o.urn.equals(dataset.urn)).findFirst().isPresent()) + { + continue; + } + impactDatasets.add(dataset); + nextSearchList.add(dataset.urn); + } + } + } - if (nextSearchList.size() > 0) - { - getImpactDatasets(nextSearchList, level + 1, impactDatasets); - } - } - } + if (nextSearchList.size() > 0) + { + getImpactDatasets(nextSearchList, level + 1, impactDatasets); + } + } + } - public static List getImpactDatasetsByUrn(String urn) - { - List impactDatasetList = new ArrayList(); + public static List getImpactDatasetsByUrn(String urn) + { + List impactDatasetList = new ArrayList(); - if (StringUtils.isNotBlank(urn)) - { - List searchUrnList = new ArrayList(); - searchUrnList.add(urn); - getImpactDatasets(searchUrnList, 1, impactDatasetList); - } + if (StringUtils.isNotBlank(urn)) + { + List searchUrnList = new ArrayList(); + searchUrnList.add(urn); + getImpactDatasets(searchUrnList, 1, impactDatasetList); + } - return impactDatasetList; - } + return impactDatasetList; + } } diff --git a/web/app/models/LineageNode.java b/web/app/models/LineageNode.java index 79991f528e..b37e8bf1c0 100644 --- a/web/app/models/LineageNode.java +++ b/web/app/models/LineageNode.java @@ -31,6 +31,8 @@ public class LineageNode { public String script_path; public String job_start_time; public String job_end_time; + public Long job_start_unix_time; + public Long job_end_unix_time; public String git_location; public List _sort_list; public String source_target_type; diff --git a/web/app/utils/Lineage.java b/web/app/utils/Lineage.java index 4a102114a1..5cc3ea5b5f 100644 --- a/web/app/utils/Lineage.java +++ b/web/app/utils/Lineage.java @@ -84,13 +84,6 @@ public class Lineage filePath = pathInfo.filePath; } } - if (StringUtils.isNotBlank(pathInfo.storageType) && pathInfo.storageType.equalsIgnoreCase("teradata")) - { - return "teradata:///" + pathInfo.schemaName + "/" + filePath; - } - else - { - return pathInfo.storageType.toLowerCase() + ":///" + filePath; - } + return pathInfo.storageType.toLowerCase() + ":///" + filePath; } } From b254856a76678cf09c78298f71f7c530910a2e8d Mon Sep 17 00:00:00 2001 From: jbai Date: Thu, 21 Apr 2016 11:43:37 -0700 Subject: [PATCH 3/3] refactor the lineage render method --- web/app/dao/LineageDAO.java | 753 ++++++++++++++++++++------------ web/app/models/LineageNode.java | 1 + web/app/utils/Lineage.java | 17 + 3 files changed, 497 insertions(+), 274 deletions(-) diff --git a/web/app/dao/LineageDAO.java b/web/app/dao/LineageDAO.java index fcdd3a82d0..e74a558567 100644 --- a/web/app/dao/LineageDAO.java +++ b/web/app/dao/LineageDAO.java @@ -40,52 +40,68 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO "flow WHERE app_id = ? and flow_id = ?"; private final static String GET_JOB = "SELECT ca.app_id, ca.app_code as cluster, " + - "je.flow_id, je.job_id, jedl.job_name, " + - "fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + + "jedl.job_name, fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + "jedl.operation, jedl.source_srl_no, jedl.srl_no, " + - "max(jedl.job_exec_id) as job_exec_id, jedl.job_start_unixtime, jedl.job_finished_unixtime," + - "FROM_UNIXTIME(jedl.job_start_unixtime) as start_time, " + - "FROM_UNIXTIME(jedl.job_finished_unixtime) as end_time FROM job_execution_data_lineage jedl " + + "max(jedl.job_exec_id) as job_exec_id FROM job_execution_data_lineage jedl " + "JOIN cfg_application ca on ca.app_id = jedl.app_id " + - "JOIN job_execution je on jedl.app_id = je.app_id " + + "LEFT JOIN job_execution je on jedl.app_id = je.app_id " + "and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " + "LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + "WHERE abstracted_object_name in ( :names ) and " + "jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + - "COALESCE(jedl.source_srl_no, jedl.srl_no) = jedl.srl_no and " + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " + - "GROUP BY ca.app_id, je.job_id, je.flow_id, jedl.source_target_type, jedl.storage_type " + + "GROUP BY ca.app_id, cluster, jedl.job_name, jedl.flow_path, jedl.source_target_type, " + + "jedl.storage_type, jedl.operation " + + "ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime"; + + private final static String GET_UP_LEVEL_JOB = "SELECT ca.app_id, ca.app_code as cluster, " + + "jedl.job_name, fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + + "jedl.operation, jedl.source_srl_no, jedl.srl_no, " + + "max(jedl.job_exec_id) as job_exec_id FROM job_execution_data_lineage jedl " + + "JOIN cfg_application ca on ca.app_id = jedl.app_id " + + "LEFT JOIN job_execution je on jedl.app_id = je.app_id " + + "and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " + + "LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + + "WHERE abstracted_object_name in ( :names ) and jedl.source_target_type = 'target' and " + + "jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " + + "GROUP BY ca.app_id, cluster, jedl.job_name, jedl.flow_path, jedl.source_target_type, " + + "jedl.storage_type, jedl.operation " + "ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime"; private final static String GET_JOB_WITH_SOURCE = "SELECT ca.app_id, ca.app_code as cluster, " + - "je.flow_id, je.job_id, jedl.job_name, " + - "fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + + "jedl.job_name, fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + "jedl.operation, jedl.source_srl_no, jedl.srl_no, " + - "max(jedl.job_exec_id) as job_exec_id, jedl.job_start_unixtime, jedl.job_finished_unixtime," + - "FROM_UNIXTIME(jedl.job_start_unixtime) as start_time, " + - "FROM_UNIXTIME(jedl.job_finished_unixtime) as end_time FROM job_execution_data_lineage jedl " + + "max(jedl.job_exec_id) as job_exec_id FROM job_execution_data_lineage jedl " + "JOIN cfg_application ca on ca.app_id = jedl.app_id " + - "JOIN job_execution je on jedl.app_id = je.app_id " + + "LEFT JOIN job_execution je on jedl.app_id = je.app_id " + "and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " + "LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + "WHERE abstracted_object_name in ( :names ) and jedl.source_target_type != (:type) and " + "jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + - "COALESCE(jedl.source_srl_no, jedl.srl_no) = jedl.srl_no and " + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " + - "GROUP BY ca.app_id, je.job_id, je.flow_id, jedl.source_target_type, jedl.storage_type " + + "GROUP BY ca.app_id, cluster, jedl.job_name, jedl.flow_path, jedl.source_target_type, " + + "jedl.storage_type, jedl.operation " + "ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime"; private final static String GET_DATA = "SELECT storage_type, operation, " + - "abstracted_object_name, source_target_type, job_start_unixtime, job_finished_unixtime " + + "abstracted_object_name, source_target_type, job_start_unixtime, job_finished_unixtime, " + + "FROM_UNIXTIME(job_start_unixtime) as start_time, " + + "FROM_UNIXTIME(job_finished_unixtime) as end_time " + "FROM job_execution_data_lineage WHERE app_id = ? and job_exec_id = ? and " + "flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + "COALESCE(source_srl_no, srl_no) = srl_no ORDER BY source_target_type DESC"; - private final static String GET_DATA_WITH_SOURCE = "SELECT storage_type, operation, " + - "abstracted_object_name, source_target_type, job_start_unixtime, job_finished_unixtime " + - "FROM job_execution_data_lineage WHERE app_id = ? and job_exec_id = ? and source_target_type = ? and " + - "flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + - "COALESCE(source_srl_no, srl_no) = srl_no ORDER BY source_target_type DESC"; + private final static String GET_DATA_FILTER_OUT_LASSEN = "SELECT j1.storage_type, j1.operation, " + + "j1.abstracted_object_name, j1.source_target_type, j1.job_start_unixtime, j1.job_finished_unixtime, " + + "FROM_UNIXTIME(j1.job_start_unixtime) as start_time, " + + "FROM_UNIXTIME(j1.job_finished_unixtime) as end_time " + + "FROM job_execution_data_lineage j1 " + + "JOIN job_execution_data_lineage j2 on j1.app_id = j2.app_id and j1.job_exec_id = j2.job_exec_id " + + "and j2.abstracted_object_name in (:names) and j2.source_target_type = 'source' " + + "WHERE j1.app_id = (:appid) and j1.job_exec_id = (:execid) and " + + "j1.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + + "COALESCE(j1.source_srl_no, j1.srl_no) = j2.srl_no ORDER BY j1.source_target_type DESC"; private final static String GET_APP_ID = "SELECT app_id FROM cfg_application WHERE LOWER(app_code) = ?"; @@ -136,38 +152,309 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO public static JsonNode getObjectAdjacnet(String urn, int upLevel, int downLevel, int lookBackTime) { - ObjectNode resultNode = Json.newObject(); + int level = 0; LineagePathInfo pathInfo = utils.Lineage.convertFromURN(urn); List nodes = new ArrayList(); List edges = new ArrayList(); - Map addedJobNodes = new HashMap(); - Map addedSourceDataNode = new HashMap(); - Map> addedTargetDataNodes = new HashMap>(); - String message = null; + Map> addedSourceNodes= new HashMap>(); + Map> addedTargetNodes= new HashMap>(); + Map addedJobNodes = new HashMap(); + List allSourceNodes = new ArrayList(); + List allTargetNodes = new ArrayList(); getObjectAdjacentNode( pathInfo, + level, upLevel, downLevel, null, - nodes, - edges, - addedSourceDataNode, - addedTargetDataNodes, + allSourceNodes, + allTargetNodes, + addedSourceNodes, + addedTargetNodes, addedJobNodes, lookBackTime); - if (nodes.size() > 0) + + return renderGraph( + pathInfo, + urn, + upLevel, + downLevel, + allSourceNodes, + allTargetNodes, + addedSourceNodes, + addedTargetNodes, + addedJobNodes, + nodes, + edges); + } + + public static void addSourceNode( + LineageNode jobNode, + Map> addedSourceNodes, + Map> addedTargetDataNodes, + Map addedSourceDataNodes, + Map> toBeConvertedSourceDataNodes, + List nodes, + List edges, + boolean isUpLevel) + { + List sourceNodes = addedSourceNodes.get(jobNode.exec_id); + if (sourceNodes != null) { - message = "Found lineage information"; + for(LineageNode node : sourceNodes) + { + List existTargetNodes = addedTargetDataNodes.get(node.abstracted_path); + LineageNode existNode = null; + if (existTargetNodes != null) + { + Collections.sort(existTargetNodes, new Comparator(){ + public int compare(LineageNode node1, LineageNode node2){ + if(node1.job_end_unix_time == node2.job_end_unix_time) + return 0; + return node1.job_end_unix_time > node2.job_end_unix_time ? -1 : 1; + } + }); + for(LineageNode target : existTargetNodes) + { + if (node.job_start_unix_time >= target.job_end_unix_time) + { + existNode = target; + break; + } + } + } + if (existNode == null) + { + existNode = addedSourceDataNodes.get(node.abstracted_path); + } + if (existNode != null) + { node.id = existNode.id; + } + else + { + node.id = nodes.size(); + nodes.add(node); + addedSourceDataNodes.put(node.abstracted_path, node); + if (isUpLevel) { + List originalSourceNodes = toBeConvertedSourceDataNodes.get(node.abstracted_path); + if (originalSourceNodes == null) { + originalSourceNodes = new ArrayList(); + } + originalSourceNodes.add(node); + toBeConvertedSourceDataNodes.put(node.abstracted_path, originalSourceNodes); + } + + } + LineageEdge edge = new LineageEdge(); + edge.id = edges.size(); + edge.source = node.id; + edge.target = jobNode.id; + edge.label = node.operation; + edge.chain = ""; + edges.add(edge); + } + } + } + + public static void addTargetNode( + LineageNode jobNode, + Map> addedTargetNodes, + Map> addedTargetDataNodes, + Map toBeClearedSourceDataNodes, + List nodes, + List edges, + boolean isUpLevel) + { + List targetNodes = addedTargetNodes.get(jobNode.exec_id); + if (targetNodes != null) + { + for(LineageNode node : targetNodes) + { + List existTargetNodes = addedTargetDataNodes.get(node.abstracted_path); + LineageNode existNode = null; + if (existTargetNodes != null) + { + for(LineageNode target : existTargetNodes) + { + if (target.partition_end != null) + { + if (target.partition_end == node.partition_end) + { + existNode = target; + break; + } + } + else if(target.job_end_unix_time == node.job_end_unix_time) + { + existNode = target; + break; + } + } + } + if (isUpLevel) { + if (existNode == null) + { + existNode = toBeClearedSourceDataNodes.get(node.abstracted_path); + if (existNode != null && existNode.job_start_unix_time < node.job_end_unix_time) + { + existNode = null; + } + } + } + + if (existNode != null) + { + node.id = existNode.id; + } + else + { + node.id = nodes.size(); + nodes.add(node); + if (existTargetNodes == null) + { + existTargetNodes = new ArrayList(); + } + existTargetNodes.add(node); + addedTargetDataNodes.put(node.abstracted_path, existTargetNodes); + } + LineageEdge edge = new LineageEdge(); + edge.id = edges.size(); + edge.source = jobNode.id; + edge.target = node.id; + edge.label = node.operation; + edge.chain = ""; + edges.add(edge); + } + } + } + + public static JsonNode renderGraph(LineagePathInfo pathInfo, + String urn, + int upLevel, + int downLevel, + List allSourceNodes, + List allTargetNodes, + Map> addedSourceNodes, + Map> addedTargetNodes, + Map addedJobNodes, + List nodes, + List edges) + { + ObjectNode resultNode = Json.newObject(); + String message = null; + Map addedSourceDataNodes = new HashMap(); + Map toBeClearedSourceDataNodes = new HashMap(); + Map> addedTargetDataNodes = new HashMap>(); + Map> toBeConvertedSourceDataNodes = new HashMap>(); + message = "No lineage information found for this dataset"; + if (allSourceNodes.size() == 0 && allTargetNodes.size() == 0) + { + LineageNode node = new LineageNode(); + node.id = nodes.size(); + node._sort_list = new ArrayList(); + node.node_type = "data"; + node.abstracted_path = pathInfo.filePath; + node.storage_type = pathInfo.storageType; + node._sort_list.add("abstracted_path"); + node._sort_list.add("storage_type"); + node._sort_list.add("urn"); + node.urn = urn; + nodes.add(node); } else { - message = "No lineage information found for this dataset"; + message = "Found lineage information"; + for(int i = 0; i < Math.max(upLevel, downLevel); i++) + { + if (i < upLevel) + { + if (toBeConvertedSourceDataNodes.size() > 0) + { + for(Map.Entry > mapEntry : toBeConvertedSourceDataNodes.entrySet()) + { + List hashedNodes = addedTargetDataNodes.get(mapEntry.getKey()); + List list = mapEntry.getValue(); + if (list != null && list.size() > 0) + { + if (hashedNodes == null) + { + hashedNodes = new ArrayList(); + } + hashedNodes.addAll(list); + addedTargetDataNodes.put(mapEntry.getKey(), hashedNodes); + } + + } + toBeConvertedSourceDataNodes.clear(); + } + if (addedSourceDataNodes.size() > 0) + { + toBeClearedSourceDataNodes.putAll(addedSourceDataNodes); + addedSourceDataNodes.clear(); + } + + for(LineageNode job : addedJobNodes.values()) + { + if (job.level != i) + { + continue; + } + job.id = nodes.size(); + nodes.add(job); + addTargetNode(job, + addedTargetNodes, + addedTargetDataNodes, + toBeClearedSourceDataNodes, + nodes, + edges, + true); + + addSourceNode(job, + addedSourceNodes, + addedTargetDataNodes, + addedSourceDataNodes, + toBeConvertedSourceDataNodes, + nodes, + edges, + true); + } + } + if ((i > 0) && (i < downLevel)) + { + for(LineageNode job : addedJobNodes.values()) + { + if (job.level != -i) + { + continue; + } + job.id = nodes.size(); + nodes.add(job); + addTargetNode(job, + addedTargetNodes, + addedTargetDataNodes, + toBeClearedSourceDataNodes, + nodes, + edges, + false); + + addSourceNode(job, + addedSourceNodes, + addedTargetDataNodes, + addedSourceDataNodes, + toBeConvertedSourceDataNodes, + nodes, + edges, + false); + } + } + } } resultNode.set("nodes", Json.toJson(nodes)); resultNode.set("links", Json.toJson(edges)); resultNode.put("urn", urn); resultNode.put("message", message); return resultNode; + } public static List getLiasDatasetNames(String abstractedObjectName) @@ -176,9 +463,8 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO return null; List totalNames = new ArrayList(); totalNames.add(abstractedObjectName); - List mappedNames; - mappedNames = getJdbcTemplate().queryForList(GET_MAPPED_OBJECT_NAME, String.class, abstractedObjectName); - if (mappedNames != null) + List mappedNames = getJdbcTemplate().queryForList(GET_MAPPED_OBJECT_NAME, String.class, abstractedObjectName); + if (mappedNames != null && mappedNames.size() > 0) { totalNames.addAll(mappedNames); for (String name : mappedNames) @@ -194,6 +480,18 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO } } + else + { + List objNames = getJdbcTemplate().queryForList( + GET_OBJECT_NAME_BY_MAPPED_NAME, String.class, abstractedObjectName); + { + if (objNames != null) + { + totalNames.addAll(objNames); + } + } + + } List results = new ArrayList(); Set sets = new HashSet(); sets.addAll(totalNames); @@ -201,16 +499,17 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO return results; } - public static void searchInAzkaban( + public static void getNodes( LineagePathInfo pathInfo, + int level, int upLevel, int downLevel, LineageNode currentNode, - List nodes, - List edges, - MapaddedSourceNode, - Map>addedTargetNodes, - Map addedJobNodes, + List allSourceNodes, + List allTargetNodes, + Map>addedSourceNodes, + Map>addedTargetNodes, + Map addedJobNodes, int lookBackTime) { if (upLevel < 1 && downLevel < 1) @@ -246,25 +545,23 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO if (currentNode != null) { - parameters.addValue("type", currentNode.source_target_type); - rows = namedParameterJdbcTemplate.queryForList( - GET_JOB_WITH_SOURCE, - parameters); - if (rows != null && rows.size() > 0) + if (currentNode.source_target_type.equalsIgnoreCase("source")) { - if (upLevel >= 1) - { - List addedTargets = addedTargetNodes.get(currentNode.abstracted_path); - if (addedTargets == null) - { - addedTargets = new ArrayList(); - } - addedTargets.add(currentNode); - addedTargetNodes.put(currentNode.abstracted_path, addedTargets); - addedSourceNode.remove(currentNode.abstracted_path); - } + rows = namedParameterJdbcTemplate.queryForList( + GET_UP_LEVEL_JOB, + parameters); } - } else { + else + { + parameters.addValue("type", currentNode.source_target_type); + rows = namedParameterJdbcTemplate.queryForList( + GET_JOB_WITH_SOURCE, + parameters); + } + + } + else + { rows = namedParameterJdbcTemplate.queryForList( GET_JOB, parameters); @@ -278,78 +575,42 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO continue; } Long jobExecId = ((BigInteger) jobExecIdObject).longValue(); - Integer jobNodeId = addedJobNodes.get(jobExecId); - if (jobNodeId != null) { + if (addedJobNodes.get(jobExecId) != null) { continue; } node._sort_list = new ArrayList(); node.node_type = "script"; node.job_type = (String) row.get("job_type"); node.cluster = (String) row.get("cluster"); - node.job_path = (String) row.get("job_path"); node.job_name = (String) row.get("job_name"); - node.job_start_time = row.get("start_time").toString(); - node.job_start_unix_time = (Long) row.get("job_start_unixtime"); - node.job_end_time = row.get("end_time").toString(); - node.job_end_unix_time = (Long) row.get("job_finished_unixtime"); + node.job_path = (String) row.get("flow_path") + "/" + node.job_name; + node.exec_id = jobExecId; + node.operation = (String) row.get("operation"); node.source_target_type = (String) row.get("source_target_type"); + node.level = level; node._sort_list.add("cluster"); node._sort_list.add("job_path"); node._sort_list.add("job_name"); node._sort_list.add("job_type"); node._sort_list.add("job_start_time"); node._sort_list.add("job_end_time"); - - if (currentNode != null) - { - if (currentNode.source_target_type.equalsIgnoreCase("target") && downLevel > 0) - { - node.id = nodes.size(); - nodes.add(node); - addedJobNodes.put(jobExecId, node.id); - LineageEdge edge = new LineageEdge(); - edge.source = currentNode.id; - edge.target = node.id; - edge.id = edges.size(); - edge.label = (String) row.get("operation"); - edge.chain = (String) row.get("flow_path"); - edges.add(edge); - } - else if (currentNode.source_target_type.equalsIgnoreCase("source") && upLevel > 0) - { - node.id = nodes.size(); - nodes.add(node); - addedJobNodes.put(jobExecId, node.id); - LineageEdge edge = new LineageEdge(); - edge.source = node.id; - edge.target = currentNode.id; - edge.id = edges.size(); - edge.label = (String) row.get("operation"); - edge.chain = (String) row.get("flow_path"); - edges.add(edge); - } - else - { - continue; - } - } - else - { - node.id = nodes.size(); - nodes.add(node); - addedJobNodes.put(jobExecId, node.id); - } - int jobIndex = node.id; + node._sort_list.add("exec_id"); + addedJobNodes.put(jobExecId, node); + List sourceNodeList = new ArrayList(); + List targetNodeList = new ArrayList(); int applicationID = (int)row.get("app_id"); Long jobId = ((BigInteger)row.get("job_exec_id")).longValue(); List> relatedDataRows = null; - if (currentNode != null) + + if (node.source_target_type.equalsIgnoreCase("source") && node.operation.equalsIgnoreCase("JDBC Read")) { - relatedDataRows = getJdbcTemplate().queryForList( - GET_DATA_WITH_SOURCE, - applicationID, - jobId, - currentNode.source_target_type); + MapSqlParameterSource lassenParams = new MapSqlParameterSource(); + lassenParams.addValue("names", nameList); + lassenParams.addValue("appid", applicationID); + lassenParams.addValue("execid", jobId); + relatedDataRows = namedParameterJdbcTemplate.queryForList( + GET_DATA_FILTER_OUT_LASSEN, + lassenParams); } else { @@ -368,183 +629,126 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO continue; } String relatedSourceType = (String)relatedDataRow.get("source_target_type"); + LineageNode relatedNode = new LineageNode(); + relatedNode._sort_list = new ArrayList(); + relatedNode.node_type = "data"; + relatedNode.level = level; + relatedNode.source_target_type = relatedSourceType; + relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); + relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); + relatedNode.job_start_unix_time = (Long)relatedDataRow.get("job_start_unixtime"); + + relatedNode.job_start_time = relatedDataRow.get("start_time").toString(); + relatedNode.job_end_time = relatedDataRow.get("end_time").toString(); + relatedNode.job_end_unix_time = (Long)relatedDataRow.get("job_finished_unixtime"); + node.job_start_unix_time = relatedNode.job_start_unix_time; + node.job_end_unix_time = relatedNode.job_end_unix_time; + node.job_start_time = relatedNode.job_start_time; + node.job_end_time = relatedNode.job_end_time; + relatedNode.operation = (String)relatedDataRow.get("operation"); + LineagePathInfo info = new LineagePathInfo(); + info.filePath = relatedNode.abstracted_path; + info.storageType = relatedNode.storage_type; + relatedNode.urn = utils.Lineage.convertToURN(info); + relatedNode._sort_list.add("abstracted_path"); + relatedNode._sort_list.add("storage_type"); + relatedNode._sort_list.add("urn"); if (relatedSourceType.equalsIgnoreCase("source")) { - LineageNode relatedNode = new LineageNode(); - relatedNode._sort_list = new ArrayList(); - relatedNode.node_type = "data"; - relatedNode.source_target_type = relatedSourceType; - relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); - relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); - relatedNode.job_start_unix_time = (Long)relatedDataRow.get("job_start_unix_time"); - relatedNode.job_end_unix_time = (Long)relatedDataRow.get("job_end_unix_time"); - LineagePathInfo info = new LineagePathInfo(); - info.filePath = relatedNode.abstracted_path; - info.storageType = relatedNode.storage_type; - relatedNode.urn = utils.Lineage.convertToURN(info); - relatedNode._sort_list.add("abstracted_path"); - relatedNode._sort_list.add("storage_type"); - relatedNode._sort_list.add("urn"); - List addedTargets = addedTargetNodes.get(abstractedObjectName); - LineageNode addedSource = addedSourceNode.get(abstractedObjectName); - LineageNode existNode = null; - if (addedTargets != null) + if (node.source_target_type.equalsIgnoreCase("target") || + utils.Lineage.isInList(nameList, relatedNode.abstracted_path)) { - Collections.sort(addedTargets, new Comparator(){ - public int compare(LineageNode node1, LineageNode node2){ - if(node1.job_end_unix_time == node2.job_end_unix_time) - return 0; - return node1.job_end_unix_time > node2.job_end_unix_time ? -1 : 1; - } - }); - for(LineageNode target : addedTargets) - { - if (relatedNode.job_start_unix_time != null - && target.job_end_unix_time != null - && relatedNode.job_start_unix_time > target.job_end_unix_time) - { - existNode = target; - break; - } - } - } - if (existNode == null && addedSource != null) - { - existNode = addedSource; - } - if (existNode != null) - { - relatedNode.id = existNode.id; - } - else - { - relatedNode.id = nodes.size(); - nodes.add(relatedNode); - addedSourceNode.put(abstractedObjectName, relatedNode); - } - LineageEdge relatedEdge = new LineageEdge(); - relatedEdge.source = relatedNode.id; - relatedEdge.target = jobIndex; - relatedEdge.id = edges.size(); - relatedEdge.label = (String)relatedDataRow.get("operation"); - relatedEdge.chain = ""; - edges.add(relatedEdge); - if (upLevel > 1) - { - LineagePathInfo subPath = new LineagePathInfo(); - subPath.storageType = relatedNode.storage_type; - subPath.filePath = relatedNode.abstracted_path; - getObjectAdjacentNode( - subPath, - upLevel - 1, - 0, - relatedNode, - nodes, - edges, - addedSourceNode, - addedTargetNodes, - addedJobNodes, - lookBackTime); + sourceNodeList.add(relatedNode); + allSourceNodes.add(relatedNode); } } else if (relatedSourceType.equalsIgnoreCase("target")) { - List addedTargets = addedTargetNodes.get(abstractedObjectName); - LineageNode relatedNode = new LineageNode(); - relatedNode._sort_list = new ArrayList(); - relatedNode.node_type = "data"; - relatedNode.source_target_type = relatedSourceType; - relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); - relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); - LineagePathInfo info = new LineagePathInfo(); - info.filePath = relatedNode.abstracted_path; - info.storageType = relatedNode.storage_type; - relatedNode.urn = utils.Lineage.convertToURN(info); - relatedNode._sort_list.add("abstracted_path"); - relatedNode._sort_list.add("storage_type"); - LineageNode existNode = null; - if (addedTargets != null) + if (node.source_target_type.equalsIgnoreCase("source") || + utils.Lineage.isInList(nameList, relatedNode.abstracted_path)) { - for(LineageNode target : addedTargets) - { - if (relatedNode.partition_end != null ) - { - if (relatedNode.partition_end == target.partition_end) - { - existNode = target; - break; - } - } - else - { - if (relatedNode.job_end_unix_time == target.job_end_unix_time) - { - existNode = target; - break; - } - - } - - } - } - if (existNode != null) - { - relatedNode.id = existNode.id; - } - else - { - relatedNode.id = nodes.size(); - nodes.add(relatedNode); - if (addedTargets == null) - { - addedTargets = new ArrayList(); - } - addedTargets.add(relatedNode); - addedTargetNodes.put(abstractedObjectName, addedTargets); - } - - LineageEdge relatedEdge = new LineageEdge(); - relatedEdge.source = jobIndex; - relatedEdge.target = relatedNode.id; - relatedEdge.id = edges.size(); - relatedEdge.label = (String)relatedDataRow.get("operation"); - relatedEdge.chain = ""; - edges.add(relatedEdge); - if (downLevel > 1) - { - LineagePathInfo subPath = new LineagePathInfo(); - subPath.storageType = relatedNode.storage_type; - subPath.filePath = relatedNode.abstracted_path; - getObjectAdjacentNode( - subPath, - 0, - downLevel - 1, - relatedNode, - nodes, - edges, - addedSourceNode, - addedTargetNodes, - addedJobNodes, - lookBackTime); + targetNodeList.add(relatedNode); + allTargetNodes.add(relatedNode); } } } - + if (sourceNodeList.size() > 0) + { + addedSourceNodes.put(jobExecId, sourceNodeList); + } + if (targetNodeList.size() > 0) + { + addedTargetNodes.put(jobExecId, targetNodeList); + } } } } + if ((allSourceNodes != null ) && (allSourceNodes.size() > 0) && (upLevel > 1)) + { + List currentSourceNodes = new ArrayList(); + currentSourceNodes.addAll(allSourceNodes); + for(LineageNode sourceNode : currentSourceNodes) + { + LineagePathInfo subPath = new LineagePathInfo(); + subPath.storageType = sourceNode.storage_type; + subPath.filePath = sourceNode.abstracted_path; + if (sourceNode.level == level) + { + getObjectAdjacentNode( + subPath, + level+1, + upLevel - 1, + 0, + sourceNode, + allSourceNodes, + allTargetNodes, + addedSourceNodes, + addedTargetNodes, + addedJobNodes, + lookBackTime); + } + } + } + if ((allTargetNodes != null ) && (allTargetNodes.size() > 0) && (downLevel > 1)) + { + List currentTargetNodes = new ArrayList(); + currentTargetNodes.addAll(allTargetNodes); + for(LineageNode targetNode : currentTargetNodes) + { + LineagePathInfo subPath = new LineagePathInfo(); + subPath.storageType = targetNode.storage_type; + subPath.filePath = targetNode.abstracted_path; + if (targetNode.level == level) + { + getObjectAdjacentNode( + subPath, + level-1, + 0, + downLevel - 1, + targetNode, + allSourceNodes, + allTargetNodes, + addedSourceNodes, + addedTargetNodes, + addedJobNodes, + lookBackTime); + } + } + } + } public static void getObjectAdjacentNode( LineagePathInfo pathInfo, + int level, int upLevel, int downLevel, LineageNode currentNode, - List nodes, - List edges, - Map addedSourceNode, - Map> addedTargetNodes, - Map addedJobNodes, + List allSourceNodes, + List allTargetNodes, + Map> addedSourceNodes, + Map> addedTargetNodes, + Map addedJobNodes, int lookBackTime) { if (upLevel < 1 && downLevel < 1) @@ -556,14 +760,15 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO { return; } - searchInAzkaban( + getNodes( pathInfo, + level, upLevel, downLevel, currentNode, - nodes, - edges, - addedSourceNode, + allSourceNodes, + allTargetNodes, + addedSourceNodes, addedTargetNodes, addedJobNodes, lookBackTime); diff --git a/web/app/models/LineageNode.java b/web/app/models/LineageNode.java index b37e8bf1c0..c95bf652ad 100644 --- a/web/app/models/LineageNode.java +++ b/web/app/models/LineageNode.java @@ -33,6 +33,7 @@ public class LineageNode { public String job_end_time; public Long job_start_unix_time; public Long job_end_unix_time; + public int level; public String git_location; public List _sort_list; public String source_target_type; diff --git a/web/app/utils/Lineage.java b/web/app/utils/Lineage.java index 5cc3ea5b5f..7bd5df3545 100644 --- a/web/app/utils/Lineage.java +++ b/web/app/utils/Lineage.java @@ -24,6 +24,7 @@ import java.io.BufferedReader; import java.io.FileReader; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; public class Lineage @@ -86,4 +87,20 @@ public class Lineage } return pathInfo.storageType.toLowerCase() + ":///" + filePath; } + + public static boolean isInList(List list, String source) + { + if (list == null || list.size() == 0 || StringUtils.isBlank(source)) + { + return false; + } + for(String s : list) + { + if (source.equalsIgnoreCase(s)) + { + return true; + } + } + return false; + } }