diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzJsonAnalyzer.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzJsonAnalyzer.java index 95afb9fb78..9bac8adcbb 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzJsonAnalyzer.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzJsonAnalyzer.java @@ -16,6 +16,7 @@ package metadata.etl.lineage; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import org.codehaus.jettison.json.JSONException; +import com.jayway.jsonpath.InvalidPathException; import com.jayway.jsonpath.PathNotFoundException; import wherehows.common.DatasetPath; import wherehows.common.schemas.AzkabanJobExecRecord; @@ -87,6 +88,7 @@ public class AzJsonAnalyzer { // The string could be a comma separated file path. public List sepCommaString(List originalStrings) { List result = new ArrayList<>(); + if (null == originalStrings) return result; for (String concatedString : originalStrings) { result.addAll(DatasetPath.separatedDataset(concatedString)); } @@ -106,16 +108,14 @@ public class AzJsonAnalyzer { } private List parseProperties(String[] propertyNames) { - StringBuilder query = new StringBuilder(); + List patterns = new ArrayList(); for (String s : propertyNames) { - query.append("@.name=="); - query.append(s); - query.append("||"); + patterns.add(String.format("@.name=='%s'", s)); } - query.delete(query.length() - 2, query.length()); + String query = String.join("||", patterns); try { document = Configuration.defaultConfiguration().jsonProvider().parse(jobConfJson); - List result = JsonPath.read(document, "$.conf.property[?(" + query.toString() + ")].value"); + List result = JsonPath.read(document, "$.conf.property[?(" + query + ")].value"); return result; } catch (PathNotFoundException e){ logger.error(String.format( @@ -123,6 +123,12 @@ public class AzJsonAnalyzer { appId, aje.getJobExecId(), jobConfJson.substring(1,100) )); return null; + } catch (InvalidPathException e) { + logger.error(String.format( + "Invalid Path Exception of JSON from Hadoop JobHistory: appId=%d jobExecId=%d json=%s", + appId, aje.getJobExecId(), query + )); + return null; } } diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java index 9497084a44..b779a7f25a 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java @@ -56,7 +56,7 @@ public class AzLineageExtractor { Set hadoopJobIds = AzLogParser.getHadoopJobIdFromLog(log); for (String hadoopJobId : hadoopJobIds) { - logger.debug("get hadoop job :{} from azkaban job : {}" + hadoopJobId, message.azkabanJobExecution.toString()); + logger.debug("Get Hadoop job config: {} from Azkaban job: {}" + hadoopJobId, message.azkabanJobExecution.toString()); // TODO persist this mapping? String confJson = message.hnne.getConfFromHadoop(hadoopJobId); AzJsonAnalyzer ja = new AzJsonAnalyzer(confJson, message.azkabanJobExecution, @@ -82,12 +82,15 @@ public class AzLineageExtractor { */ public static void extract(AzExecMessage message) throws Exception { - List result = extractLineage(message); - - for (LineageRecord lr : result) { - message.databaseWriter.append(lr); + try{ + List result = extractLineage(message); + for (LineageRecord lr : result) { + message.databaseWriter.append(lr); + } + logger.info(String.format("%03d lineage records extracted from [%s]", result.size(), message.toString())); + message.databaseWriter.flush(); + } catch (Exception e) { + logger.error(String.format("Failed to extract lineage info from [%s].\n%s", message.toString(), e.getMessage())); } - logger.debug("Find " + result.size() + " Lineage record in execution " + message.toString()); - message.databaseWriter.flush(); } } diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java index 14a2ba0ff4..6f9231953b 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java @@ -112,6 +112,8 @@ public class AzLogParser { public static List getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord, Integer defaultDatabaseId) { List result = new ArrayList<>(); + Pattern typePattern = Pattern.compile("^(\\w+):/.*"); + String datasetType = ""; for (LogLineagePattern patternObject : logLineagePatterns) { Pattern pattern = Pattern.compile(patternObject.regex); @@ -127,7 +129,15 @@ public class AzLogParser { LineageRecord lineageRecord = new LineageRecord(azkabanJobExecRecord.getAppId(), azkabanJobExecRecord.getFlowExecId(), azkabanJobExecRecord.getJobName(), azkabanJobExecRecord.getJobExecId()); - lineageRecord.setDatasetInfo(defaultDatabaseId, dataset, "HDFS"); + + Matcher typeMatcher = typePattern.matcher(dataset); + if (typeMatcher.matches()) { + datasetType = typeMatcher.group(1); + } else { + datasetType = "hdfs"; + } + lineageRecord.setDatasetInfo(defaultDatabaseId, dataset, datasetType); + long recordCount = (patternObject.recordCountIndex < 1) ? 0 : Long.valueOf(matcher.group(patternObject.recordCountIndex)); long insertCount = diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/HadoopJobHistoryNodeExtractor.java b/metadata-etl/src/main/java/metadata/etl/lineage/HadoopJobHistoryNodeExtractor.java index f5749aafb3..f53149d280 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/HadoopJobHistoryNodeExtractor.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/HadoopJobHistoryNodeExtractor.java @@ -27,6 +27,8 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.config.Lookup; import org.apache.http.config.RegistryBuilder; +import org.apache.http.impl.auth.BasicSchemeFactory; +import org.apache.http.impl.auth.KerberosSchemeFactory; import org.apache.http.impl.auth.SPNegoSchemeFactory; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.CloseableHttpClient; @@ -59,36 +61,37 @@ public class HadoopJobHistoryNodeExtractor { String CURRENT_DIR = System.getProperty("user.dir"); String WH_HOME = System.getenv("WH_HOME"); + String APP_HOME = System.getenv("APP_HOME"); String USER_HOME = System.getenv("HOME") + "/.kerberos"; - String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME}; + String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, APP_HOME, USER_HOME, "/etc"}; for (String position : allPositions) { String gssFileName = position + "/gss-jaas.conf"; File gssFile = new File(gssFileName); if (gssFile.exists()) { - logger.debug("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath()); + logger.debug("Found gss-jaas.conf file at: {}", gssFile.getAbsolutePath()); System.setProperty("java.security.auth.login.config", gssFile.getAbsolutePath()); break; } else { - logger.debug("can't find here: {}", gssFile.getAbsolutePath()); + logger.debug("{} doesn't exist.", gssFile.getAbsolutePath()); } } for (String position : allPositions) { String krb5FileName = position + "/krb5.conf"; File krb5File = new File(krb5FileName); if (krb5File.exists()) { - logger.debug("find krb5.conf file in : {}", krb5File.getAbsolutePath()); + logger.debug("Found krb5.conf file at: {}", krb5File.getAbsolutePath()); System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath()); break; } else { - logger.debug("can't find here: {}", krb5File.getAbsolutePath()); + logger.debug("{} does't exist.", krb5File.getAbsolutePath()); } } if (System.getProperty("java.security.auth.login.config") == null || System.getProperty("java.security.krb5.conf") == null) { - throw new Exception("Can't find java security config files"); + logger.warn("Can't find Java security config [krb5.conf, gss-jass.conf] for Kerberos! Trying other authentication methods..."); } if (logger.isTraceEnabled()) { @@ -109,7 +112,10 @@ public class HadoopJobHistoryNodeExtractor { credsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("DUMMY", null)); Lookup authRegistry = - RegistryBuilder.create().register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build(); + RegistryBuilder.create() + .register(AuthSchemes.BASIC, new BasicSchemeFactory()) + .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()) + .register(AuthSchemes.KERBEROS, new KerberosSchemeFactory()).build(); httpClient = HttpClients.custom().setDefaultCredentialsProvider(credsProvider).setDefaultAuthSchemeRegistry(authRegistry) diff --git a/wherehows-common/src/main/java/wherehows/common/PathAnalyzer.java b/wherehows-common/src/main/java/wherehows/common/PathAnalyzer.java index 45135d4061..696a7d4d77 100644 --- a/wherehows-common/src/main/java/wherehows/common/PathAnalyzer.java +++ b/wherehows-common/src/main/java/wherehows/common/PathAnalyzer.java @@ -73,6 +73,13 @@ public class PathAnalyzer { DatasetPath datasetPath = new DatasetPath(); datasetPath.fullPath = fullPath; + // remove the "dalids://" or "hive://" header + Pattern daliPattern = Pattern.compile("(dalids|hive):///(\\w.*/)?(\\w.*)(\\.|/)(\\w.*)$"); + Matcher daliMatcher = daliPattern.matcher(fullPath); + if (daliMatcher.matches()) { + fullPath = String.format( "/%s/%s", daliMatcher.group(3), daliMatcher.group(5) ); + } + // remove the "hdfs://.../" header Pattern headerPattern = Pattern.compile("hdfs://.*:\\d{4}(/.*)"); Matcher headerMatcher = headerPattern.matcher(fullPath); @@ -81,7 +88,7 @@ public class PathAnalyzer { } // remove 'tmp' folder - if (fullPath.startsWith("/tmp/")) { + if (fullPath.startsWith("/tmp/") || fullPath.startsWith("/temp/")) { return null; }