Fix jsonpath after upgrading com.jayway.jsonpath to 2.2 (#299)

* use schema_url_helper to fetch avro schema from hdfs or http location

* trim space

* add dfs.namenode.kerberos.principal.pattern; include htrace for SchemaUrlHelper

* fix jsonpath for job history log parser; do not throw exception if kerberos config files are missing for job history http connection

* avoid null return value for sepCommaString(); fix a typo
This commit is contained in:
Eric Sun 2016-12-13 21:14:52 -08:00 committed by GitHub
parent 3eba3deed5
commit a3504fa57f
5 changed files with 54 additions and 22 deletions

View File

@ -16,6 +16,7 @@ package metadata.etl.lineage;
import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.PathNotFoundException; import com.jayway.jsonpath.PathNotFoundException;
import wherehows.common.DatasetPath; import wherehows.common.DatasetPath;
import wherehows.common.schemas.AzkabanJobExecRecord; import wherehows.common.schemas.AzkabanJobExecRecord;
@ -87,6 +88,7 @@ public class AzJsonAnalyzer {
// The string could be a comma separated file path. // The string could be a comma separated file path.
public List<String> sepCommaString(List<String> originalStrings) { public List<String> sepCommaString(List<String> originalStrings) {
List<String> result = new ArrayList<>(); List<String> result = new ArrayList<>();
if (null == originalStrings) return result;
for (String concatedString : originalStrings) { for (String concatedString : originalStrings) {
result.addAll(DatasetPath.separatedDataset(concatedString)); result.addAll(DatasetPath.separatedDataset(concatedString));
} }
@ -106,16 +108,14 @@ public class AzJsonAnalyzer {
} }
private List<String> parseProperties(String[] propertyNames) { private List<String> parseProperties(String[] propertyNames) {
StringBuilder query = new StringBuilder(); List<String> patterns = new ArrayList<String>();
for (String s : propertyNames) { for (String s : propertyNames) {
query.append("@.name=="); patterns.add(String.format("@.name=='%s'", s));
query.append(s);
query.append("||");
} }
query.delete(query.length() - 2, query.length()); String query = String.join("||", patterns);
try { try {
document = Configuration.defaultConfiguration().jsonProvider().parse(jobConfJson); document = Configuration.defaultConfiguration().jsonProvider().parse(jobConfJson);
List<String> result = JsonPath.read(document, "$.conf.property[?(" + query.toString() + ")].value"); List<String> result = JsonPath.read(document, "$.conf.property[?(" + query + ")].value");
return result; return result;
} catch (PathNotFoundException e){ } catch (PathNotFoundException e){
logger.error(String.format( logger.error(String.format(
@ -123,6 +123,12 @@ public class AzJsonAnalyzer {
appId, aje.getJobExecId(), jobConfJson.substring(1,100) appId, aje.getJobExecId(), jobConfJson.substring(1,100)
)); ));
return null; return null;
} catch (InvalidPathException e) {
logger.error(String.format(
"Invalid Path Exception of JSON from Hadoop JobHistory: appId=%d jobExecId=%d json=%s",
appId, aje.getJobExecId(), query
));
return null;
} }
} }

View File

@ -56,7 +56,7 @@ public class AzLineageExtractor {
Set<String> hadoopJobIds = AzLogParser.getHadoopJobIdFromLog(log); Set<String> hadoopJobIds = AzLogParser.getHadoopJobIdFromLog(log);
for (String hadoopJobId : hadoopJobIds) { for (String hadoopJobId : hadoopJobIds) {
logger.debug("get hadoop job :{} from azkaban job : {}" + hadoopJobId, message.azkabanJobExecution.toString()); logger.debug("Get Hadoop job config: {} from Azkaban job: {}" + hadoopJobId, message.azkabanJobExecution.toString());
// TODO persist this mapping? // TODO persist this mapping?
String confJson = message.hnne.getConfFromHadoop(hadoopJobId); String confJson = message.hnne.getConfFromHadoop(hadoopJobId);
AzJsonAnalyzer ja = new AzJsonAnalyzer(confJson, message.azkabanJobExecution, AzJsonAnalyzer ja = new AzJsonAnalyzer(confJson, message.azkabanJobExecution,
@ -82,12 +82,15 @@ public class AzLineageExtractor {
*/ */
public static void extract(AzExecMessage message) public static void extract(AzExecMessage message)
throws Exception { throws Exception {
List<LineageRecord> result = extractLineage(message); try{
List<LineageRecord> result = extractLineage(message);
for (LineageRecord lr : result) { for (LineageRecord lr : result) {
message.databaseWriter.append(lr); message.databaseWriter.append(lr);
}
logger.info(String.format("%03d lineage records extracted from [%s]", result.size(), message.toString()));
message.databaseWriter.flush();
} catch (Exception e) {
logger.error(String.format("Failed to extract lineage info from [%s].\n%s", message.toString(), e.getMessage()));
} }
logger.debug("Find " + result.size() + " Lineage record in execution " + message.toString());
message.databaseWriter.flush();
} }
} }

View File

@ -112,6 +112,8 @@ public class AzLogParser {
public static List<LineageRecord> getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord, Integer defaultDatabaseId) { public static List<LineageRecord> getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord, Integer defaultDatabaseId) {
List<LineageRecord> result = new ArrayList<>(); List<LineageRecord> result = new ArrayList<>();
Pattern typePattern = Pattern.compile("^(\\w+):/.*");
String datasetType = "";
for (LogLineagePattern patternObject : logLineagePatterns) { for (LogLineagePattern patternObject : logLineagePatterns) {
Pattern pattern = Pattern.compile(patternObject.regex); Pattern pattern = Pattern.compile(patternObject.regex);
@ -127,7 +129,15 @@ public class AzLogParser {
LineageRecord lineageRecord = LineageRecord lineageRecord =
new LineageRecord(azkabanJobExecRecord.getAppId(), azkabanJobExecRecord.getFlowExecId(), new LineageRecord(azkabanJobExecRecord.getAppId(), azkabanJobExecRecord.getFlowExecId(),
azkabanJobExecRecord.getJobName(), azkabanJobExecRecord.getJobExecId()); azkabanJobExecRecord.getJobName(), azkabanJobExecRecord.getJobExecId());
lineageRecord.setDatasetInfo(defaultDatabaseId, dataset, "HDFS");
Matcher typeMatcher = typePattern.matcher(dataset);
if (typeMatcher.matches()) {
datasetType = typeMatcher.group(1);
} else {
datasetType = "hdfs";
}
lineageRecord.setDatasetInfo(defaultDatabaseId, dataset, datasetType);
long recordCount = long recordCount =
(patternObject.recordCountIndex < 1) ? 0 : Long.valueOf(matcher.group(patternObject.recordCountIndex)); (patternObject.recordCountIndex < 1) ? 0 : Long.valueOf(matcher.group(patternObject.recordCountIndex));
long insertCount = long insertCount =

View File

@ -27,6 +27,8 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.config.Lookup; import org.apache.http.config.Lookup;
import org.apache.http.config.RegistryBuilder; import org.apache.http.config.RegistryBuilder;
import org.apache.http.impl.auth.BasicSchemeFactory;
import org.apache.http.impl.auth.KerberosSchemeFactory;
import org.apache.http.impl.auth.SPNegoSchemeFactory; import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
@ -59,36 +61,37 @@ public class HadoopJobHistoryNodeExtractor {
String CURRENT_DIR = System.getProperty("user.dir"); String CURRENT_DIR = System.getProperty("user.dir");
String WH_HOME = System.getenv("WH_HOME"); String WH_HOME = System.getenv("WH_HOME");
String APP_HOME = System.getenv("APP_HOME");
String USER_HOME = System.getenv("HOME") + "/.kerberos"; String USER_HOME = System.getenv("HOME") + "/.kerberos";
String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME}; String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, APP_HOME, USER_HOME, "/etc"};
for (String position : allPositions) { for (String position : allPositions) {
String gssFileName = position + "/gss-jaas.conf"; String gssFileName = position + "/gss-jaas.conf";
File gssFile = new File(gssFileName); File gssFile = new File(gssFileName);
if (gssFile.exists()) { if (gssFile.exists()) {
logger.debug("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath()); logger.debug("Found gss-jaas.conf file at: {}", gssFile.getAbsolutePath());
System.setProperty("java.security.auth.login.config", gssFile.getAbsolutePath()); System.setProperty("java.security.auth.login.config", gssFile.getAbsolutePath());
break; break;
} else { } else {
logger.debug("can't find here: {}", gssFile.getAbsolutePath()); logger.debug("{} doesn't exist.", gssFile.getAbsolutePath());
} }
} }
for (String position : allPositions) { for (String position : allPositions) {
String krb5FileName = position + "/krb5.conf"; String krb5FileName = position + "/krb5.conf";
File krb5File = new File(krb5FileName); File krb5File = new File(krb5FileName);
if (krb5File.exists()) { if (krb5File.exists()) {
logger.debug("find krb5.conf file in : {}", krb5File.getAbsolutePath()); logger.debug("Found krb5.conf file at: {}", krb5File.getAbsolutePath());
System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath()); System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
break; break;
} else { } else {
logger.debug("can't find here: {}", krb5File.getAbsolutePath()); logger.debug("{} does't exist.", krb5File.getAbsolutePath());
} }
} }
if (System.getProperty("java.security.auth.login.config") == null if (System.getProperty("java.security.auth.login.config") == null
|| System.getProperty("java.security.krb5.conf") == null) { || System.getProperty("java.security.krb5.conf") == null) {
throw new Exception("Can't find java security config files"); logger.warn("Can't find Java security config [krb5.conf, gss-jass.conf] for Kerberos! Trying other authentication methods...");
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -109,7 +112,10 @@ public class HadoopJobHistoryNodeExtractor {
credsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("DUMMY", null)); credsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("DUMMY", null));
Lookup<AuthSchemeProvider> authRegistry = Lookup<AuthSchemeProvider> authRegistry =
RegistryBuilder.<AuthSchemeProvider>create().register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build(); RegistryBuilder.<AuthSchemeProvider>create()
.register(AuthSchemes.BASIC, new BasicSchemeFactory())
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
.register(AuthSchemes.KERBEROS, new KerberosSchemeFactory()).build();
httpClient = httpClient =
HttpClients.custom().setDefaultCredentialsProvider(credsProvider).setDefaultAuthSchemeRegistry(authRegistry) HttpClients.custom().setDefaultCredentialsProvider(credsProvider).setDefaultAuthSchemeRegistry(authRegistry)

View File

@ -73,6 +73,13 @@ public class PathAnalyzer {
DatasetPath datasetPath = new DatasetPath(); DatasetPath datasetPath = new DatasetPath();
datasetPath.fullPath = fullPath; datasetPath.fullPath = fullPath;
// remove the "dalids://" or "hive://" header
Pattern daliPattern = Pattern.compile("(dalids|hive):///(\\w.*/)?(\\w.*)(\\.|/)(\\w.*)$");
Matcher daliMatcher = daliPattern.matcher(fullPath);
if (daliMatcher.matches()) {
fullPath = String.format( "/%s/%s", daliMatcher.group(3), daliMatcher.group(5) );
}
// remove the "hdfs://.../" header // remove the "hdfs://.../" header
Pattern headerPattern = Pattern.compile("hdfs://.*:\\d{4}(/.*)"); Pattern headerPattern = Pattern.compile("hdfs://.*:\\d{4}(/.*)");
Matcher headerMatcher = headerPattern.matcher(fullPath); Matcher headerMatcher = headerPattern.matcher(fullPath);
@ -81,7 +88,7 @@ public class PathAnalyzer {
} }
// remove 'tmp' folder // remove 'tmp' folder
if (fullPath.startsWith("/tmp/")) { if (fullPath.startsWith("/tmp/") || fullPath.startsWith("/temp/")) {
return null; return null;
} }