diff --git a/web/app/dao/LineageDAO.java b/web/app/dao/LineageDAO.java index c359bd7ed1..e74a558567 100644 --- a/web/app/dao/LineageDAO.java +++ b/web/app/dao/LineageDAO.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.ImmutablePair; import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import play.Logger; import play.Play; @@ -39,26 +40,68 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO "flow WHERE app_id = ? and flow_id = ?"; private final static String GET_JOB = "SELECT ca.app_id, ca.app_code as cluster, " + - "je.flow_id, je.job_id, jedl.job_name, " + - "fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + + "jedl.job_name, fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + "jedl.operation, jedl.source_srl_no, jedl.srl_no, " + - "max(jedl.job_exec_id) as job_exec_id, FROM_UNIXTIME(jedl.job_start_unixtime) as start_time, " + - "FROM_UNIXTIME(jedl.job_finished_unixtime) as end_time FROM job_execution_data_lineage jedl " + + "max(jedl.job_exec_id) as job_exec_id FROM job_execution_data_lineage jedl " + "JOIN cfg_application ca on ca.app_id = jedl.app_id " + - "JOIN job_execution je on jedl.app_id = je.app_id " + + "LEFT JOIN job_execution je on jedl.app_id = je.app_id " + "and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " + - "JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + - "WHERE abstracted_object_name = ? and " + + "LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + + "WHERE abstracted_object_name in ( :names ) and " + "jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + - "COALESCE(jedl.source_srl_no, jedl.srl_no) = jedl.srl_no and " + - "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL ? DAY " + - "GROUP BY ca.app_id, je.job_id, je.flow_id"; + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " + + "GROUP BY ca.app_id, cluster, jedl.job_name, jedl.flow_path, jedl.source_target_type, " + + "jedl.storage_type, jedl.operation " + + "ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime"; + + private final static String GET_UP_LEVEL_JOB = "SELECT ca.app_id, ca.app_code as cluster, " + + "jedl.job_name, fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + + "jedl.operation, jedl.source_srl_no, jedl.srl_no, " + + "max(jedl.job_exec_id) as job_exec_id FROM job_execution_data_lineage jedl " + + "JOIN cfg_application ca on ca.app_id = jedl.app_id " + + "LEFT JOIN job_execution je on jedl.app_id = je.app_id " + + "and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " + + "LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + + "WHERE abstracted_object_name in ( :names ) and jedl.source_target_type = 'target' and " + + "jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " + + "GROUP BY ca.app_id, cluster, jedl.job_name, jedl.flow_path, jedl.source_target_type, " + + "jedl.storage_type, jedl.operation " + + "ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime"; + + private final static String GET_JOB_WITH_SOURCE = "SELECT ca.app_id, ca.app_code as cluster, " + + "jedl.job_name, fj.job_path, fj.job_type, jedl.flow_path, jedl.storage_type, jedl.source_target_type, " + + "jedl.operation, jedl.source_srl_no, jedl.srl_no, " + + "max(jedl.job_exec_id) as job_exec_id FROM job_execution_data_lineage jedl " + + "JOIN cfg_application ca on ca.app_id = jedl.app_id " + + "LEFT JOIN job_execution je on jedl.app_id = je.app_id " + + "and jedl.flow_exec_id = je.flow_exec_id and jedl.job_exec_id = je.job_exec_id " + + "LEFT JOIN flow_job fj on je.app_id = fj.app_id and je.flow_id = fj.flow_id and je.job_id = fj.job_id " + + "WHERE abstracted_object_name in ( :names ) and jedl.source_target_type != (:type) and " + + "jedl.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL (:days) DAY " + + "GROUP BY ca.app_id, cluster, jedl.job_name, jedl.flow_path, jedl.source_target_type, " + + "jedl.storage_type, jedl.operation " + + "ORDER BY jedl.source_target_type DESC, jedl.job_finished_unixtime"; private final static String GET_DATA = "SELECT storage_type, operation, " + - "abstracted_object_name, source_target_type " + + "abstracted_object_name, source_target_type, job_start_unixtime, job_finished_unixtime, " + + "FROM_UNIXTIME(job_start_unixtime) as start_time, " + + "FROM_UNIXTIME(job_finished_unixtime) as end_time " + "FROM job_execution_data_lineage WHERE app_id = ? and job_exec_id = ? and " + "flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + - "COALESCE(source_srl_no, srl_no) = srl_no"; + "COALESCE(source_srl_no, srl_no) = srl_no ORDER BY source_target_type DESC"; + + private final static String GET_DATA_FILTER_OUT_LASSEN = "SELECT j1.storage_type, j1.operation, " + + "j1.abstracted_object_name, j1.source_target_type, j1.job_start_unixtime, j1.job_finished_unixtime, " + + "FROM_UNIXTIME(j1.job_start_unixtime) as start_time, " + + "FROM_UNIXTIME(j1.job_finished_unixtime) as end_time " + + "FROM job_execution_data_lineage j1 " + + "JOIN job_execution_data_lineage j2 on j1.app_id = j2.app_id and j1.job_exec_id = j2.job_exec_id " + + "and j2.abstracted_object_name in (:names) and j2.source_target_type = 'source' " + + "WHERE j1.app_id = (:appid) and j1.job_exec_id = (:execid) and " + + "j1.flow_path not REGEXP '^(rent-metrics:|tracking-investigation:)' and " + + "COALESCE(j1.source_srl_no, j1.srl_no) = j2.srl_no ORDER BY j1.source_target_type DESC"; private final static String GET_APP_ID = "SELECT app_id FROM cfg_application WHERE LOWER(app_code) = ?"; @@ -89,296 +132,624 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO "JOIN cfg_application ca on ca.app_id = jedl.app_id " + "WHERE jedl.app_id = ? and jedl.flow_exec_id = ? ORDER BY jedl.partition_end DESC"; - private final static String GET_ONE_LEVEL_IMPACT_DATABASES = "SELECT DISTINCT j.storage_type, " + - "j.abstracted_object_name, d.id FROM job_execution_data_lineage j " + - "LEFT JOIN dict_dataset d ON d.urn = concat(j.storage_type, '://', j.abstracted_object_name) " + - "WHERE (app_id, job_exec_id) in ( " + - "SELECT app_id, job_exec_id FROM job_execution_data_lineage " + - "WHERE abstracted_object_name in (:pathlist) and source_target_type = 'source' and " + - "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL 60 DAY ) and " + - "abstracted_object_name not like '/tmp/%' and abstracted_object_name not like '%tmp' " + - "and source_target_type = 'target' and " + + private final static String GET_ONE_LEVEL_IMPACT_DATABASES = "SELECT DISTINCT j.storage_type, " + + "j.abstracted_object_name, d.id FROM job_execution_data_lineage j " + + "LEFT JOIN dict_dataset d ON d.urn = concat(j.storage_type, '://', j.abstracted_object_name) " + + "WHERE (app_id, job_exec_id) in ( " + + "SELECT app_id, job_exec_id FROM job_execution_data_lineage " + + "WHERE abstracted_object_name in (:pathlist) and source_target_type = 'source' and " + + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL 60 DAY ) and " + + "abstracted_object_name not like '/tmp/%' and abstracted_object_name not like '%tmp' " + + "and source_target_type = 'target' and " + "FROM_UNIXTIME(job_finished_unixtime) > CURRENT_DATE - INTERVAL 60 DAY"; + private final static String GET_MAPPED_OBJECT_NAME = "SELECT mapped_object_name " + + "FROM cfg_object_name_map WHERE object_name = ?"; + + private final static String GET_OBJECT_NAME_BY_MAPPED_NAME = "SELECT object_name " + + "FROM cfg_object_name_map WHERE mapped_object_name = ?"; + public static JsonNode getObjectAdjacnet(String urn, int upLevel, int downLevel, int lookBackTime) { - ObjectNode resultNode = Json.newObject(); + int level = 0; LineagePathInfo pathInfo = utils.Lineage.convertFromURN(urn); List nodes = new ArrayList(); List edges = new ArrayList(); - Map addedJobNodes = new HashMap(); - Map addedDataNodes = new HashMap(); - int index = 0; - int edgeIndex = 0; - LineageNode node = new LineageNode(); - node.id = index; - node._sort_list = new ArrayList(); - node.node_type = "data"; - node.abstracted_path = pathInfo.filePath; - node.storage_type = pathInfo.storageType; - node._sort_list.add("abstracted_path"); - node._sort_list.add("storage_type"); - node._sort_list.add("urn"); - node.urn = urn; - nodes.add(node); - addedDataNodes.put(node.urn, node.id); - String message = null; + Map> addedSourceNodes= new HashMap>(); + Map> addedTargetNodes= new HashMap>(); + Map addedJobNodes = new HashMap(); + List allSourceNodes = new ArrayList(); + List allTargetNodes = new ArrayList(); getObjectAdjacentNode( + pathInfo, + level, + upLevel, + downLevel, + null, + allSourceNodes, + allTargetNodes, + addedSourceNodes, + addedTargetNodes, + addedJobNodes, + lookBackTime); + + return renderGraph( pathInfo, urn, upLevel, downLevel, - nodes.size()-1, - edges.size(), - nodes, - edges, - addedDataNodes, + allSourceNodes, + allTargetNodes, + addedSourceNodes, + addedTargetNodes, addedJobNodes, - lookBackTime); - if (nodes.size() > 0) + nodes, + edges); + } + + public static void addSourceNode( + LineageNode jobNode, + Map> addedSourceNodes, + Map> addedTargetDataNodes, + Map addedSourceDataNodes, + Map> toBeConvertedSourceDataNodes, + List nodes, + List edges, + boolean isUpLevel) + { + List sourceNodes = addedSourceNodes.get(jobNode.exec_id); + if (sourceNodes != null) { - message = "Found lineage on azkaban"; + for(LineageNode node : sourceNodes) + { + List existTargetNodes = addedTargetDataNodes.get(node.abstracted_path); + LineageNode existNode = null; + if (existTargetNodes != null) + { + Collections.sort(existTargetNodes, new Comparator(){ + public int compare(LineageNode node1, LineageNode node2){ + if(node1.job_end_unix_time == node2.job_end_unix_time) + return 0; + return node1.job_end_unix_time > node2.job_end_unix_time ? -1 : 1; + } + }); + for(LineageNode target : existTargetNodes) + { + if (node.job_start_unix_time >= target.job_end_unix_time) + { + existNode = target; + break; + } + } + } + if (existNode == null) + { + existNode = addedSourceDataNodes.get(node.abstracted_path); + } + if (existNode != null) + { node.id = existNode.id; + } + else + { + node.id = nodes.size(); + nodes.add(node); + addedSourceDataNodes.put(node.abstracted_path, node); + if (isUpLevel) { + List originalSourceNodes = toBeConvertedSourceDataNodes.get(node.abstracted_path); + if (originalSourceNodes == null) { + originalSourceNodes = new ArrayList(); + } + originalSourceNodes.add(node); + toBeConvertedSourceDataNodes.put(node.abstracted_path, originalSourceNodes); + } + + } + LineageEdge edge = new LineageEdge(); + edge.id = edges.size(); + edge.source = node.id; + edge.target = jobNode.id; + edge.label = node.operation; + edge.chain = ""; + edges.add(edge); + } + } + } + + public static void addTargetNode( + LineageNode jobNode, + Map> addedTargetNodes, + Map> addedTargetDataNodes, + Map toBeClearedSourceDataNodes, + List nodes, + List edges, + boolean isUpLevel) + { + List targetNodes = addedTargetNodes.get(jobNode.exec_id); + if (targetNodes != null) + { + for(LineageNode node : targetNodes) + { + List existTargetNodes = addedTargetDataNodes.get(node.abstracted_path); + LineageNode existNode = null; + if (existTargetNodes != null) + { + for(LineageNode target : existTargetNodes) + { + if (target.partition_end != null) + { + if (target.partition_end == node.partition_end) + { + existNode = target; + break; + } + } + else if(target.job_end_unix_time == node.job_end_unix_time) + { + existNode = target; + break; + } + } + } + if (isUpLevel) { + if (existNode == null) + { + existNode = toBeClearedSourceDataNodes.get(node.abstracted_path); + if (existNode != null && existNode.job_start_unix_time < node.job_end_unix_time) + { + existNode = null; + } + } + } + + if (existNode != null) + { + node.id = existNode.id; + } + else + { + node.id = nodes.size(); + nodes.add(node); + if (existTargetNodes == null) + { + existTargetNodes = new ArrayList(); + } + existTargetNodes.add(node); + addedTargetDataNodes.put(node.abstracted_path, existTargetNodes); + } + LineageEdge edge = new LineageEdge(); + edge.id = edges.size(); + edge.source = jobNode.id; + edge.target = node.id; + edge.label = node.operation; + edge.chain = ""; + edges.add(edge); + } + } + } + + public static JsonNode renderGraph(LineagePathInfo pathInfo, + String urn, + int upLevel, + int downLevel, + List allSourceNodes, + List allTargetNodes, + Map> addedSourceNodes, + Map> addedTargetNodes, + Map addedJobNodes, + List nodes, + List edges) + { + ObjectNode resultNode = Json.newObject(); + String message = null; + Map addedSourceDataNodes = new HashMap(); + Map toBeClearedSourceDataNodes = new HashMap(); + Map> addedTargetDataNodes = new HashMap>(); + Map> toBeConvertedSourceDataNodes = new HashMap>(); + message = "No lineage information found for this dataset"; + if (allSourceNodes.size() == 0 && allTargetNodes.size() == 0) + { + LineageNode node = new LineageNode(); + node.id = nodes.size(); + node._sort_list = new ArrayList(); + node.node_type = "data"; + node.abstracted_path = pathInfo.filePath; + node.storage_type = pathInfo.storageType; + node._sort_list.add("abstracted_path"); + node._sort_list.add("storage_type"); + node._sort_list.add("urn"); + node.urn = urn; + nodes.add(node); } else { - message = "No lineage information found for this dataset"; + message = "Found lineage information"; + for(int i = 0; i < Math.max(upLevel, downLevel); i++) + { + if (i < upLevel) + { + if (toBeConvertedSourceDataNodes.size() > 0) + { + for(Map.Entry > mapEntry : toBeConvertedSourceDataNodes.entrySet()) + { + List hashedNodes = addedTargetDataNodes.get(mapEntry.getKey()); + List list = mapEntry.getValue(); + if (list != null && list.size() > 0) + { + if (hashedNodes == null) + { + hashedNodes = new ArrayList(); + } + hashedNodes.addAll(list); + addedTargetDataNodes.put(mapEntry.getKey(), hashedNodes); + } + + } + toBeConvertedSourceDataNodes.clear(); + } + if (addedSourceDataNodes.size() > 0) + { + toBeClearedSourceDataNodes.putAll(addedSourceDataNodes); + addedSourceDataNodes.clear(); + } + + for(LineageNode job : addedJobNodes.values()) + { + if (job.level != i) + { + continue; + } + job.id = nodes.size(); + nodes.add(job); + addTargetNode(job, + addedTargetNodes, + addedTargetDataNodes, + toBeClearedSourceDataNodes, + nodes, + edges, + true); + + addSourceNode(job, + addedSourceNodes, + addedTargetDataNodes, + addedSourceDataNodes, + toBeConvertedSourceDataNodes, + nodes, + edges, + true); + } + } + if ((i > 0) && (i < downLevel)) + { + for(LineageNode job : addedJobNodes.values()) + { + if (job.level != -i) + { + continue; + } + job.id = nodes.size(); + nodes.add(job); + addTargetNode(job, + addedTargetNodes, + addedTargetDataNodes, + toBeClearedSourceDataNodes, + nodes, + edges, + false); + + addSourceNode(job, + addedSourceNodes, + addedTargetDataNodes, + addedSourceDataNodes, + toBeConvertedSourceDataNodes, + nodes, + edges, + false); + } + } + } } resultNode.set("nodes", Json.toJson(nodes)); resultNode.set("links", Json.toJson(edges)); resultNode.put("urn", urn); resultNode.put("message", message); return resultNode; + } - public static void searchInAzkaban( + public static List getLiasDatasetNames(String abstractedObjectName) + { + if (StringUtils.isBlank(abstractedObjectName)) + return null; + List totalNames = new ArrayList(); + totalNames.add(abstractedObjectName); + List mappedNames = getJdbcTemplate().queryForList(GET_MAPPED_OBJECT_NAME, String.class, abstractedObjectName); + if (mappedNames != null && mappedNames.size() > 0) + { + totalNames.addAll(mappedNames); + for (String name : mappedNames) + { + List objNames = getJdbcTemplate().queryForList( + GET_OBJECT_NAME_BY_MAPPED_NAME, String.class, name); + { + if (objNames != null) + { + totalNames.addAll(objNames); + } + } + + } + } + else + { + List objNames = getJdbcTemplate().queryForList( + GET_OBJECT_NAME_BY_MAPPED_NAME, String.class, abstractedObjectName); + { + if (objNames != null) + { + totalNames.addAll(objNames); + } + } + + } + List results = new ArrayList(); + Set sets = new HashSet(); + sets.addAll(totalNames); + results.addAll(sets); + return results; + } + + public static void getNodes( LineagePathInfo pathInfo, + int level, int upLevel, int downLevel, - int index, - int nodeIndex, - int edgeIndex, - List nodes, - List edges, - Map addedDataNodes, - Map addedJobNodes, - int lookBackTime) + LineageNode currentNode, + List allSourceNodes, + List allTargetNodes, + Map>addedSourceNodes, + Map>addedTargetNodes, + Map addedJobNodes, + int lookBackTime) { if (upLevel < 1 && downLevel < 1) { return; } - - List> rows = null; - rows = getJdbcTemplate().queryForList( - GET_JOB, - pathInfo.filePath, - lookBackTime); - if (rows != null) + if (currentNode != null) { - for (Map row : rows) + if ( StringUtils.isBlank(currentNode.source_target_type)) { + Logger.error("Source target type is not available"); + Logger.error(currentNode.abstracted_path); + return; + } + else if (currentNode.source_target_type.equalsIgnoreCase("target") && downLevel <= 0) + { + Logger.warn("Dataset " + currentNode.abstracted_path + " downLevel = " + Integer.toString(downLevel)); + return; + } + else if (currentNode.source_target_type.equalsIgnoreCase("source") && upLevel <= 0) + { + Logger.warn("Dataset " + currentNode.abstracted_path + " upLevel = " + Integer.toString(upLevel)); + return; + } + } + List nameList = getLiasDatasetNames(pathInfo.filePath); + List> rows = null; + MapSqlParameterSource parameters = new MapSqlParameterSource(); + parameters.addValue("names", nameList); + NamedParameterJdbcTemplate namedParameterJdbcTemplate = new + NamedParameterJdbcTemplate(getJdbcTemplate().getDataSource()); + parameters.addValue("days", lookBackTime); + + if (currentNode != null) + { + if (currentNode.source_target_type.equalsIgnoreCase("source")) + { + rows = namedParameterJdbcTemplate.queryForList( + GET_UP_LEVEL_JOB, + parameters); + } + else + { + parameters.addValue("type", currentNode.source_target_type); + rows = namedParameterJdbcTemplate.queryForList( + GET_JOB_WITH_SOURCE, + parameters); + } + + } + else + { + rows = namedParameterJdbcTemplate.queryForList( + GET_JOB, + parameters); + } + + if (rows != null) { + for (Map row : rows) { LineageNode node = new LineageNode(); Object jobExecIdObject = row.get("job_exec_id"); - if (jobExecIdObject == null) - { + if (jobExecIdObject == null) { continue; } - Long jobExecId = ((BigInteger)jobExecIdObject).longValue(); - Integer jobNodeId = addedJobNodes.get(jobExecId); - if (jobNodeId != null && jobNodeId > 0) - { + Long jobExecId = ((BigInteger) jobExecIdObject).longValue(); + if (addedJobNodes.get(jobExecId) != null) { continue; } node._sort_list = new ArrayList(); node.node_type = "script"; - node.job_type = (String)row.get("job_type"); - node.cluster = (String)row.get("cluster"); - node.job_path = (String)row.get("job_path"); - node.job_name = (String)row.get("job_name"); - node.job_start_time = row.get("start_time").toString(); - node.job_end_time = row.get("end_time").toString(); + node.job_type = (String) row.get("job_type"); + node.cluster = (String) row.get("cluster"); + node.job_name = (String) row.get("job_name"); + node.job_path = (String) row.get("flow_path") + "/" + node.job_name; + node.exec_id = jobExecId; + node.operation = (String) row.get("operation"); + node.source_target_type = (String) row.get("source_target_type"); + node.level = level; node._sort_list.add("cluster"); node._sort_list.add("job_path"); node._sort_list.add("job_name"); node._sort_list.add("job_type"); node._sort_list.add("job_start_time"); node._sort_list.add("job_end_time"); - String sourceType = (String)row.get("source_target_type"); - if (sourceType.equalsIgnoreCase("target") && upLevel > 0) - { - node.id = nodeIndex; - nodes.add(node); - addedJobNodes.put(jobExecId, node.id); - LineageEdge edge = new LineageEdge(); - edge.source = nodeIndex; - edge.target = index; - edge.id = edgeIndex++; - edge.label = (String) row.get("operation"); - edge.chain = (String) row.get("flow_path"); - edges.add(edge); - } - else if (sourceType.equalsIgnoreCase("source") && downLevel > 0) - { - node.id = nodeIndex; - nodes.add(node); - addedJobNodes.put(jobExecId, node.id); - LineageEdge edge = new LineageEdge(); - edge.source = index; - edge.target = nodeIndex; - edge.id = edgeIndex++; - edge.label = (String) row.get("operation"); - edge.chain = (String) row.get("flow_path"); - edges.add(edge); - } - else - { - continue; - } - int jobIndex = nodeIndex; - nodeIndex++; + node._sort_list.add("exec_id"); + addedJobNodes.put(jobExecId, node); + List sourceNodeList = new ArrayList(); + List targetNodeList = new ArrayList(); int applicationID = (int)row.get("app_id"); Long jobId = ((BigInteger)row.get("job_exec_id")).longValue(); List> relatedDataRows = null; - relatedDataRows = getJdbcTemplate().queryForList( - GET_DATA, - applicationID, - jobId); - if (relatedDataRows != null) + + if (node.source_target_type.equalsIgnoreCase("source") && node.operation.equalsIgnoreCase("JDBC Read")) { + MapSqlParameterSource lassenParams = new MapSqlParameterSource(); + lassenParams.addValue("names", nameList); + lassenParams.addValue("appid", applicationID); + lassenParams.addValue("execid", jobId); + relatedDataRows = namedParameterJdbcTemplate.queryForList( + GET_DATA_FILTER_OUT_LASSEN, + lassenParams); + } + else + { + relatedDataRows = getJdbcTemplate().queryForList( + GET_DATA, + applicationID, + jobId); + } + + if (relatedDataRows != null) { for (Map relatedDataRow : relatedDataRows) { String abstractedObjectName = (String)relatedDataRow.get("abstracted_object_name"); - Integer dataNodeId = addedDataNodes.get(abstractedObjectName); if (abstractedObjectName.startsWith("/tmp/")) { continue; } String relatedSourceType = (String)relatedDataRow.get("source_target_type"); - if (sourceType.equalsIgnoreCase("target") && relatedSourceType.equalsIgnoreCase("source")) - { - LineageNode relatedNode = new LineageNode(); - relatedNode._sort_list = new ArrayList(); - relatedNode.node_type = "data"; - relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); - relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); - LineagePathInfo info = new LineagePathInfo(); - info.filePath = relatedNode.abstracted_path; - info.storageType = relatedNode.storage_type; - relatedNode.urn = utils.Lineage.convertToURN(info); - relatedNode._sort_list.add("abstracted_path"); - relatedNode._sort_list.add("storage_type"); - if (dataNodeId != null && dataNodeId > 0) - { - relatedNode.id = dataNodeId; - } - else - { - relatedNode.id = nodeIndex; - nodes.add(relatedNode); - addedDataNodes.put(abstractedObjectName, relatedNode.id); - } - LineageEdge relatedEdge = new LineageEdge(); - relatedEdge.source = relatedNode.id; - relatedEdge.target = jobIndex; - relatedEdge.id = edgeIndex++; - relatedEdge.label = (String)relatedDataRow.get("operation"); - relatedEdge.chain = ""; - edges.add(relatedEdge); - nodeIndex++; - if (upLevel > 1) - { - LineagePathInfo subPath = new LineagePathInfo(); - subPath.storageType = relatedNode.storage_type; - subPath.filePath = relatedNode.abstracted_path; - String subUrn = utils.Lineage.convertToURN(subPath); - getObjectAdjacentNode( - subPath, - subUrn, - upLevel-1, - 0, - nodeIndex-1, - edgeIndex, - nodes, - edges, - addedDataNodes, - addedJobNodes, - lookBackTime); - } - nodeIndex = nodes.size(); - } - else if (sourceType.equalsIgnoreCase("source") && relatedSourceType.equalsIgnoreCase("target")) - { - LineageNode relatedNode = new LineageNode(); - relatedNode._sort_list = new ArrayList(); - relatedNode.node_type = "data"; - relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); - relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); - LineagePathInfo info = new LineagePathInfo(); - info.filePath = relatedNode.abstracted_path; - info.storageType = relatedNode.storage_type; - relatedNode.urn = utils.Lineage.convertToURN(info); - relatedNode._sort_list.add("abstracted_path"); - relatedNode._sort_list.add("storage_type"); - if (dataNodeId != null && dataNodeId > 0) - { - relatedNode.id = dataNodeId; - } - else - { - relatedNode.id = nodeIndex; - nodes.add(relatedNode); - addedDataNodes.put(abstractedObjectName, relatedNode.id); - } + LineageNode relatedNode = new LineageNode(); + relatedNode._sort_list = new ArrayList(); + relatedNode.node_type = "data"; + relatedNode.level = level; + relatedNode.source_target_type = relatedSourceType; + relatedNode.abstracted_path = (String)relatedDataRow.get("abstracted_object_name"); + relatedNode.storage_type = ((String)relatedDataRow.get("storage_type")).toLowerCase(); + relatedNode.job_start_unix_time = (Long)relatedDataRow.get("job_start_unixtime"); - LineageEdge relatedEdge = new LineageEdge(); - relatedEdge.source = jobIndex; - relatedEdge.target = relatedNode.id; - relatedEdge.id = edgeIndex++; - relatedEdge.label = (String)relatedDataRow.get("operation"); - relatedEdge.chain = ""; - edges.add(relatedEdge); - nodeIndex++; - if (downLevel > 1) + relatedNode.job_start_time = relatedDataRow.get("start_time").toString(); + relatedNode.job_end_time = relatedDataRow.get("end_time").toString(); + relatedNode.job_end_unix_time = (Long)relatedDataRow.get("job_finished_unixtime"); + node.job_start_unix_time = relatedNode.job_start_unix_time; + node.job_end_unix_time = relatedNode.job_end_unix_time; + node.job_start_time = relatedNode.job_start_time; + node.job_end_time = relatedNode.job_end_time; + relatedNode.operation = (String)relatedDataRow.get("operation"); + LineagePathInfo info = new LineagePathInfo(); + info.filePath = relatedNode.abstracted_path; + info.storageType = relatedNode.storage_type; + relatedNode.urn = utils.Lineage.convertToURN(info); + relatedNode._sort_list.add("abstracted_path"); + relatedNode._sort_list.add("storage_type"); + relatedNode._sort_list.add("urn"); + if (relatedSourceType.equalsIgnoreCase("source")) + { + if (node.source_target_type.equalsIgnoreCase("target") || + utils.Lineage.isInList(nameList, relatedNode.abstracted_path)) { - LineagePathInfo subPath = new LineagePathInfo(); - subPath.storageType = relatedNode.storage_type; - subPath.filePath = relatedNode.abstracted_path; - String subUrn = utils.Lineage.convertToURN(subPath); - getObjectAdjacentNode( - subPath, - subUrn, - 0, - downLevel-1, - nodeIndex-1, - edgeIndex, - nodes, - edges, - addedDataNodes, - addedJobNodes, - lookBackTime); + sourceNodeList.add(relatedNode); + allSourceNodes.add(relatedNode); + } + } + else if (relatedSourceType.equalsIgnoreCase("target")) + { + if (node.source_target_type.equalsIgnoreCase("source") || + utils.Lineage.isInList(nameList, relatedNode.abstracted_path)) + { + targetNodeList.add(relatedNode); + allTargetNodes.add(relatedNode); } - nodeIndex = nodes.size(); } } - + if (sourceNodeList.size() > 0) + { + addedSourceNodes.put(jobExecId, sourceNodeList); + } + if (targetNodeList.size() > 0) + { + addedTargetNodes.put(jobExecId, targetNodeList); + } } } } + if ((allSourceNodes != null ) && (allSourceNodes.size() > 0) && (upLevel > 1)) + { + List currentSourceNodes = new ArrayList(); + currentSourceNodes.addAll(allSourceNodes); + for(LineageNode sourceNode : currentSourceNodes) + { + LineagePathInfo subPath = new LineagePathInfo(); + subPath.storageType = sourceNode.storage_type; + subPath.filePath = sourceNode.abstracted_path; + if (sourceNode.level == level) + { + getObjectAdjacentNode( + subPath, + level+1, + upLevel - 1, + 0, + sourceNode, + allSourceNodes, + allTargetNodes, + addedSourceNodes, + addedTargetNodes, + addedJobNodes, + lookBackTime); + } + } + } + if ((allTargetNodes != null ) && (allTargetNodes.size() > 0) && (downLevel > 1)) + { + List currentTargetNodes = new ArrayList(); + currentTargetNodes.addAll(allTargetNodes); + for(LineageNode targetNode : currentTargetNodes) + { + LineagePathInfo subPath = new LineagePathInfo(); + subPath.storageType = targetNode.storage_type; + subPath.filePath = targetNode.abstracted_path; + if (targetNode.level == level) + { + getObjectAdjacentNode( + subPath, + level-1, + 0, + downLevel - 1, + targetNode, + allSourceNodes, + allTargetNodes, + addedSourceNodes, + addedTargetNodes, + addedJobNodes, + lookBackTime); + } + } + } + } public static void getObjectAdjacentNode( LineagePathInfo pathInfo, - String urn, + int level, int upLevel, int downLevel, - int index, - int edgeIndex, - List nodes, - List edges, - Map addedDataNodes, - Map addedJobNodes, - int lookBackTime) + LineageNode currentNode, + List allSourceNodes, + List allTargetNodes, + Map> addedSourceNodes, + Map> addedTargetNodes, + Map addedJobNodes, + int lookBackTime) { if (upLevel < 1 && downLevel < 1) { @@ -389,31 +760,18 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO { return; } - - int nodeIndex = index + 1; - - List appIDList = getJdbcTemplate().queryForList( - GET_APPLICATION_ID, new Object[] {pathInfo.filePath}, Integer.class); - if (appIDList != null) - { - int nodeId = 0; - for(Integer id : appIDList) - { - searchInAzkaban( - pathInfo, - upLevel, - downLevel, - index, - nodeIndex, - edgeIndex, - nodes, - edges, - addedDataNodes, - addedJobNodes, - lookBackTime); - break; - } - } + getNodes( + pathInfo, + level, + upLevel, + downLevel, + currentNode, + allSourceNodes, + allTargetNodes, + addedSourceNodes, + addedTargetNodes, + addedJobNodes, + lookBackTime); } public static ObjectNode getFlowLineage(String application, String project, Long flowId) @@ -818,75 +1176,75 @@ public class LineageDAO extends AbstractMySQLOpenSourceDAO return resultNode; } - public static void getImpactDatasets( - List searchUrnList, - int level, - List impactDatasets) - { - if (searchUrnList != null && searchUrnList.size() > 0) { + public static void getImpactDatasets( + List searchUrnList, + int level, + List impactDatasets) + { + if (searchUrnList != null && searchUrnList.size() > 0) { - if (impactDatasets == null) - { - impactDatasets = new ArrayList(); - } + if (impactDatasets == null) + { + impactDatasets = new ArrayList(); + } - List pathList = new ArrayList(); - List nextSearchList = new ArrayList(); + List pathList = new ArrayList(); + List nextSearchList = new ArrayList(); - for (String urn : searchUrnList) - { - LineagePathInfo pathInfo = Lineage.convertFromURN(urn); - if (pathInfo != null && StringUtils.isNotBlank(pathInfo.filePath)) - { - if (!pathList.contains(pathInfo.filePath)) - { - pathList.add(pathInfo.filePath); - } - } - } + for (String urn : searchUrnList) + { + LineagePathInfo pathInfo = Lineage.convertFromURN(urn); + if (pathInfo != null && StringUtils.isNotBlank(pathInfo.filePath)) + { + if (!pathList.contains(pathInfo.filePath)) + { + pathList.add(pathInfo.filePath); + } + } + } - if (pathList != null && pathList.size() > 0) - { - Map param = Collections.singletonMap("pathlist", pathList); - NamedParameterJdbcTemplate namedParameterJdbcTemplate = new - NamedParameterJdbcTemplate(getJdbcTemplate().getDataSource()); - List impactDatasetList = namedParameterJdbcTemplate.query( - GET_ONE_LEVEL_IMPACT_DATABASES, - param, - new ImpactDatasetRowMapper()); + if (pathList != null && pathList.size() > 0) + { + Map param = Collections.singletonMap("pathlist", pathList); + NamedParameterJdbcTemplate namedParameterJdbcTemplate = new + NamedParameterJdbcTemplate(getJdbcTemplate().getDataSource()); + List impactDatasetList = namedParameterJdbcTemplate.query( + GET_ONE_LEVEL_IMPACT_DATABASES, + param, + new ImpactDatasetRowMapper()); - if (impactDatasetList != null) { - for (ImpactDataset dataset : impactDatasetList) { - dataset.level = level; - if (impactDatasets.stream().filter(o -> o.urn.equals(dataset.urn)).findFirst().isPresent()) - { - continue; - } - impactDatasets.add(dataset); - nextSearchList.add(dataset.urn); - } - } - } + if (impactDatasetList != null) { + for (ImpactDataset dataset : impactDatasetList) { + dataset.level = level; + if (impactDatasets.stream().filter(o -> o.urn.equals(dataset.urn)).findFirst().isPresent()) + { + continue; + } + impactDatasets.add(dataset); + nextSearchList.add(dataset.urn); + } + } + } - if (nextSearchList.size() > 0) - { - getImpactDatasets(nextSearchList, level + 1, impactDatasets); - } - } - } + if (nextSearchList.size() > 0) + { + getImpactDatasets(nextSearchList, level + 1, impactDatasets); + } + } + } - public static List getImpactDatasetsByUrn(String urn) - { - List impactDatasetList = new ArrayList(); + public static List getImpactDatasetsByUrn(String urn) + { + List impactDatasetList = new ArrayList(); - if (StringUtils.isNotBlank(urn)) - { - List searchUrnList = new ArrayList(); - searchUrnList.add(urn); - getImpactDatasets(searchUrnList, 1, impactDatasetList); - } + if (StringUtils.isNotBlank(urn)) + { + List searchUrnList = new ArrayList(); + searchUrnList.add(urn); + getImpactDatasets(searchUrnList, 1, impactDatasetList); + } - return impactDatasetList; - } + return impactDatasetList; + } } diff --git a/web/app/models/LineageNode.java b/web/app/models/LineageNode.java index 79991f528e..c95bf652ad 100644 --- a/web/app/models/LineageNode.java +++ b/web/app/models/LineageNode.java @@ -31,6 +31,9 @@ public class LineageNode { public String script_path; public String job_start_time; public String job_end_time; + public Long job_start_unix_time; + public Long job_end_unix_time; + public int level; public String git_location; public List _sort_list; public String source_target_type; diff --git a/web/app/utils/Lineage.java b/web/app/utils/Lineage.java index 623adcbdc0..7bd5df3545 100644 --- a/web/app/utils/Lineage.java +++ b/web/app/utils/Lineage.java @@ -24,6 +24,7 @@ import java.io.BufferedReader; import java.io.FileReader; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; public class Lineage @@ -45,38 +46,9 @@ public class Lineage pathInfo.storageType = storageType; if (StringUtils.isNotBlank(storageType)) { - if (storageType.equalsIgnoreCase("hdfs")) + if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) { - if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) - { - pathInfo.filePath = "/" + pathArray[1]; - } - } - else if (storageType.equalsIgnoreCase("teradata")) - { - if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) - { - int index = pathArray[1].indexOf("/"); - if (index != -1) - { - pathInfo.schemaName = pathArray[1].substring(0, index); - pathInfo.filePath = pathArray[1].substring(index+1); - } - } - } - else if (storageType.equalsIgnoreCase("nas")) - { - if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) - { - pathInfo.filePath = "/" + pathArray[1]; - } - } - else if (storageType.equalsIgnoreCase("hive")) - { - if (pathArray.length > 1 && StringUtils.isNotBlank(pathArray[1])) - { - pathInfo.filePath = "/" + pathArray[1]; - } + pathInfo.filePath = "/" + pathArray[1]; } else { @@ -113,13 +85,22 @@ public class Lineage filePath = pathInfo.filePath; } } - if (StringUtils.isNotBlank(pathInfo.storageType) && pathInfo.storageType.equalsIgnoreCase("teradata")) + return pathInfo.storageType.toLowerCase() + ":///" + filePath; + } + + public static boolean isInList(List list, String source) + { + if (list == null || list.size() == 0 || StringUtils.isBlank(source)) { - return "teradata:///" + pathInfo.schemaName + "/" + filePath; + return false; } - else + for(String s : list) { - return pathInfo.storageType.toLowerCase() + ":///" + filePath; + if (source.equalsIgnoreCase(s)) + { + return true; + } } + return false; } }