mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-31 04:25:29 +00:00
Merge branch 'master' into master
This commit is contained in:
commit
4ddf28885e
@ -16,6 +16,7 @@ package metadata.etl.lineage;
|
||||
import com.jayway.jsonpath.Configuration;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import com.jayway.jsonpath.InvalidPathException;
|
||||
import com.jayway.jsonpath.PathNotFoundException;
|
||||
import wherehows.common.DatasetPath;
|
||||
import wherehows.common.schemas.AzkabanJobExecRecord;
|
||||
@ -87,6 +88,7 @@ public class AzJsonAnalyzer {
|
||||
// The string could be a comma separated file path.
|
||||
public List<String> sepCommaString(List<String> originalStrings) {
|
||||
List<String> result = new ArrayList<>();
|
||||
if (null == originalStrings) return result;
|
||||
for (String concatedString : originalStrings) {
|
||||
result.addAll(DatasetPath.separatedDataset(concatedString));
|
||||
}
|
||||
@ -106,16 +108,14 @@ public class AzJsonAnalyzer {
|
||||
}
|
||||
|
||||
private List<String> parseProperties(String[] propertyNames) {
|
||||
StringBuilder query = new StringBuilder();
|
||||
List<String> patterns = new ArrayList<String>();
|
||||
for (String s : propertyNames) {
|
||||
query.append("@.name==");
|
||||
query.append(s);
|
||||
query.append("||");
|
||||
patterns.add(String.format("@.name=='%s'", s));
|
||||
}
|
||||
query.delete(query.length() - 2, query.length());
|
||||
String query = String.join("||", patterns);
|
||||
try {
|
||||
document = Configuration.defaultConfiguration().jsonProvider().parse(jobConfJson);
|
||||
List<String> result = JsonPath.read(document, "$.conf.property[?(" + query.toString() + ")].value");
|
||||
List<String> result = JsonPath.read(document, "$.conf.property[?(" + query + ")].value");
|
||||
return result;
|
||||
} catch (PathNotFoundException e){
|
||||
logger.error(String.format(
|
||||
@ -123,6 +123,12 @@ public class AzJsonAnalyzer {
|
||||
appId, aje.getJobExecId(), jobConfJson.substring(1,100)
|
||||
));
|
||||
return null;
|
||||
} catch (InvalidPathException e) {
|
||||
logger.error(String.format(
|
||||
"Invalid Path Exception of JSON from Hadoop JobHistory: appId=%d jobExecId=%d json=%s",
|
||||
appId, aje.getJobExecId(), query
|
||||
));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,7 +56,7 @@ public class AzLineageExtractor {
|
||||
Set<String> hadoopJobIds = AzLogParser.getHadoopJobIdFromLog(log);
|
||||
|
||||
for (String hadoopJobId : hadoopJobIds) {
|
||||
logger.debug("get hadoop job :{} from azkaban job : {}" + hadoopJobId, message.azkabanJobExecution.toString());
|
||||
logger.debug("Get Hadoop job config: {} from Azkaban job: {}" + hadoopJobId, message.azkabanJobExecution.toString());
|
||||
// TODO persist this mapping?
|
||||
String confJson = message.hnne.getConfFromHadoop(hadoopJobId);
|
||||
AzJsonAnalyzer ja = new AzJsonAnalyzer(confJson, message.azkabanJobExecution,
|
||||
@ -82,12 +82,15 @@ public class AzLineageExtractor {
|
||||
*/
|
||||
public static void extract(AzExecMessage message)
|
||||
throws Exception {
|
||||
List<LineageRecord> result = extractLineage(message);
|
||||
|
||||
for (LineageRecord lr : result) {
|
||||
message.databaseWriter.append(lr);
|
||||
try{
|
||||
List<LineageRecord> result = extractLineage(message);
|
||||
for (LineageRecord lr : result) {
|
||||
message.databaseWriter.append(lr);
|
||||
}
|
||||
logger.info(String.format("%03d lineage records extracted from [%s]", result.size(), message.toString()));
|
||||
message.databaseWriter.flush();
|
||||
} catch (Exception e) {
|
||||
logger.error(String.format("Failed to extract lineage info from [%s].\n%s", message.toString(), e.getMessage()));
|
||||
}
|
||||
logger.debug("Find " + result.size() + " Lineage record in execution " + message.toString());
|
||||
message.databaseWriter.flush();
|
||||
}
|
||||
}
|
||||
|
@ -112,6 +112,8 @@ public class AzLogParser {
|
||||
public static List<LineageRecord> getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord, Integer defaultDatabaseId) {
|
||||
|
||||
List<LineageRecord> result = new ArrayList<>();
|
||||
Pattern typePattern = Pattern.compile("^(\\w+):/.*");
|
||||
String datasetType = "";
|
||||
|
||||
for (LogLineagePattern patternObject : logLineagePatterns) {
|
||||
Pattern pattern = Pattern.compile(patternObject.regex);
|
||||
@ -127,7 +129,15 @@ public class AzLogParser {
|
||||
LineageRecord lineageRecord =
|
||||
new LineageRecord(azkabanJobExecRecord.getAppId(), azkabanJobExecRecord.getFlowExecId(),
|
||||
azkabanJobExecRecord.getJobName(), azkabanJobExecRecord.getJobExecId());
|
||||
lineageRecord.setDatasetInfo(defaultDatabaseId, dataset, "HDFS");
|
||||
|
||||
Matcher typeMatcher = typePattern.matcher(dataset);
|
||||
if (typeMatcher.matches()) {
|
||||
datasetType = typeMatcher.group(1);
|
||||
} else {
|
||||
datasetType = "hdfs";
|
||||
}
|
||||
lineageRecord.setDatasetInfo(defaultDatabaseId, dataset, datasetType);
|
||||
|
||||
long recordCount =
|
||||
(patternObject.recordCountIndex < 1) ? 0 : Long.valueOf(matcher.group(patternObject.recordCountIndex));
|
||||
long insertCount =
|
||||
|
@ -27,6 +27,8 @@ import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.apache.http.config.Lookup;
|
||||
import org.apache.http.config.RegistryBuilder;
|
||||
import org.apache.http.impl.auth.BasicSchemeFactory;
|
||||
import org.apache.http.impl.auth.KerberosSchemeFactory;
|
||||
import org.apache.http.impl.auth.SPNegoSchemeFactory;
|
||||
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
@ -59,36 +61,37 @@ public class HadoopJobHistoryNodeExtractor {
|
||||
|
||||
String CURRENT_DIR = System.getProperty("user.dir");
|
||||
String WH_HOME = System.getenv("WH_HOME");
|
||||
String APP_HOME = System.getenv("APP_HOME");
|
||||
String USER_HOME = System.getenv("HOME") + "/.kerberos";
|
||||
|
||||
String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME};
|
||||
String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, APP_HOME, USER_HOME, "/etc"};
|
||||
|
||||
for (String position : allPositions) {
|
||||
String gssFileName = position + "/gss-jaas.conf";
|
||||
File gssFile = new File(gssFileName);
|
||||
if (gssFile.exists()) {
|
||||
logger.debug("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath());
|
||||
logger.debug("Found gss-jaas.conf file at: {}", gssFile.getAbsolutePath());
|
||||
System.setProperty("java.security.auth.login.config", gssFile.getAbsolutePath());
|
||||
break;
|
||||
} else {
|
||||
logger.debug("can't find here: {}", gssFile.getAbsolutePath());
|
||||
logger.debug("{} doesn't exist.", gssFile.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
for (String position : allPositions) {
|
||||
String krb5FileName = position + "/krb5.conf";
|
||||
File krb5File = new File(krb5FileName);
|
||||
if (krb5File.exists()) {
|
||||
logger.debug("find krb5.conf file in : {}", krb5File.getAbsolutePath());
|
||||
logger.debug("Found krb5.conf file at: {}", krb5File.getAbsolutePath());
|
||||
System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
|
||||
break;
|
||||
} else {
|
||||
logger.debug("can't find here: {}", krb5File.getAbsolutePath());
|
||||
logger.debug("{} does't exist.", krb5File.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
if (System.getProperty("java.security.auth.login.config") == null
|
||||
|| System.getProperty("java.security.krb5.conf") == null) {
|
||||
throw new Exception("Can't find java security config files");
|
||||
logger.warn("Can't find Java security config [krb5.conf, gss-jass.conf] for Kerberos! Trying other authentication methods...");
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
@ -109,7 +112,10 @@ public class HadoopJobHistoryNodeExtractor {
|
||||
credsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("DUMMY", null));
|
||||
|
||||
Lookup<AuthSchemeProvider> authRegistry =
|
||||
RegistryBuilder.<AuthSchemeProvider>create().register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build();
|
||||
RegistryBuilder.<AuthSchemeProvider>create()
|
||||
.register(AuthSchemes.BASIC, new BasicSchemeFactory())
|
||||
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory())
|
||||
.register(AuthSchemes.KERBEROS, new KerberosSchemeFactory()).build();
|
||||
|
||||
httpClient =
|
||||
HttpClients.custom().setDefaultCredentialsProvider(credsProvider).setDefaultAuthSchemeRegistry(authRegistry)
|
||||
|
@ -230,7 +230,7 @@ public class DatasetsDAO extends AbstractMySQLOpenSourceDAO
|
||||
"(text, user_id, dataset_id, created, modified, comment_type) VALUES(?, ?, ?, NOW(), NOW(), ?)";
|
||||
|
||||
private final static String GET_WATCHED_URN_ID = "SELECT id FROM watch " +
|
||||
"WHERE user_id = ? and item_type = 'urn' and urn = '$URN'";
|
||||
"WHERE user_id = ? and item_type = 'urn' and urn = ?";
|
||||
|
||||
private final static String GET_WATCHED_DATASET_ID = "SELECT id FROM watch " +
|
||||
"WHERE user_id = ? and item_id = ? and item_type = 'dataset'";
|
||||
@ -1092,7 +1092,7 @@ public class DatasetsDAO extends AbstractMySQLOpenSourceDAO
|
||||
if (userId != null && userId !=0)
|
||||
{
|
||||
List<Map<String, Object>> rows = null;
|
||||
rows = getJdbcTemplate().queryForList(GET_WATCHED_URN_ID.replace("$URN", urn), userId);
|
||||
rows = getJdbcTemplate().queryForList(GET_WATCHED_URN_ID, userId, urn);
|
||||
if (rows != null)
|
||||
{
|
||||
for (Map row : rows) {
|
||||
@ -1151,7 +1151,7 @@ public class DatasetsDAO extends AbstractMySQLOpenSourceDAO
|
||||
rows = getJdbcTemplate().queryForList(GET_WATCHED_DATASET_ID, userId, datasetId);
|
||||
if (rows != null && rows.size() > 0)
|
||||
{
|
||||
message = "watch item is already exist";
|
||||
message = "watch item already exist";
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1245,7 +1245,7 @@ public class DatasetsDAO extends AbstractMySQLOpenSourceDAO
|
||||
if (userId != null && userId !=0)
|
||||
{
|
||||
List<Map<String, Object>> rows = null;
|
||||
rows = getJdbcTemplate().queryForList(GET_WATCHED_URN_ID.replace("$URN", urn), userId);
|
||||
rows = getJdbcTemplate().queryForList(GET_WATCHED_URN_ID, userId, urn);
|
||||
if (rows != null && rows.size() > 0)
|
||||
{
|
||||
message = "watch item is already exist";
|
||||
|
@ -45,14 +45,15 @@ public class MetricsDAO extends AbstractMySQLOpenSourceDAO
|
||||
"m.metric_ref_id, m.dashboard_name, m.metric_category, m.metric_group, IFNULL(w.id,0) as watch_id " +
|
||||
"FROM dict_business_metric m " +
|
||||
"LEFT JOIN watch w ON (m.metric_id = w.item_id AND w.item_type = 'metric' AND w.user_id = ?) " +
|
||||
"WHERE dashboard_name $value ORDER BY m.metric_name limit ?, ?";
|
||||
"WHERE (dashboard_name = ? OR (dashboard_name IS NULL AND ? IS NULL)) ORDER BY m.metric_name limit ?, ?";
|
||||
|
||||
private final static String SELECT_PAGED_METRICS_BY_DASHBOARD_AND_GROUP = "SELECT SQL_CALC_FOUND_ROWS " +
|
||||
"m.metric_id, m.metric_name, m.metric_description, m.metric_ref_id_type, m.metric_ref_id, " +
|
||||
"m.dashboard_name, m.metric_category, m.metric_group, IFNULL(w.id,0) as watch_id " +
|
||||
"FROM dict_busines_metric m " +
|
||||
"LEFT JOIN watch w ON (m.metric_id = w.item_id AND w.item_type = 'metric' AND w.user_id = ?) " +
|
||||
"WHERE m.dashboard_name $dashboard and m.metric_group $group " +
|
||||
"WHERE (m.dashboard_name = ? OR (m.dashboard_name IS NULL AND ? IS NULL)) " +
|
||||
"and (m.metric_group = ? OR (m.metric_group IS NULL AND ? IS NULL)) " +
|
||||
"ORDER BY metric_name limit ?, ?";
|
||||
|
||||
private final static String GET_METRIC_BY_ID = "SELECT m.metric_id, m.metric_name, " +
|
||||
@ -74,7 +75,7 @@ public class MetricsDAO extends AbstractMySQLOpenSourceDAO
|
||||
|
||||
private final static String GET_USER_ID = "SELECT id FROM users WHERE username = ?";
|
||||
|
||||
private final static String UPDATE_METRIC = "UPDATE dict_busines_metric SET $SET_CLAUSE WHERE metric_id = ?";
|
||||
private final static String UPDATE_METRIC = "UPDATE dict_business_metric SET $SET_CLAUSE WHERE metric_id = ?";
|
||||
|
||||
public static ObjectNode getPagedMetrics(
|
||||
String dashboardName,
|
||||
@ -97,48 +98,51 @@ public class MetricsDAO extends AbstractMySQLOpenSourceDAO
|
||||
{
|
||||
public ObjectNode doInTransaction(TransactionStatus status)
|
||||
{
|
||||
String query = null;
|
||||
List<Map<String, Object>> rows;
|
||||
if (StringUtils.isBlank(dashboardName))
|
||||
{
|
||||
query = SELECT_PAGED_METRICS;
|
||||
rows = jdbcTemplate.queryForList(SELECT_PAGED_METRICS, id, (page-1)*size, size);
|
||||
}
|
||||
else if (StringUtils.isBlank(group))
|
||||
{
|
||||
query = SELECT_PAGED_METRICS_BY_DASHBOARD_NAME;
|
||||
String dbName;
|
||||
if (dashboardName.equals("[Other]"))
|
||||
{
|
||||
query = query.replace("$value", "is null");
|
||||
dbName = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
query = query.replace("$value", "= '" + dashboardName + "'");
|
||||
dbName = dashboardName;
|
||||
}
|
||||
rows = jdbcTemplate.queryForList(SELECT_PAGED_METRICS_BY_DASHBOARD_NAME, id, dbName, dbName,
|
||||
(page-1)*size, size);
|
||||
}
|
||||
else
|
||||
{
|
||||
query = SELECT_PAGED_METRICS_BY_DASHBOARD_AND_GROUP;
|
||||
String dbName;
|
||||
if (dashboardName.equals("[Other]"))
|
||||
{
|
||||
query = query.replace("$dashboard", "is null");
|
||||
dbName = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
query = query.replace("$dashboard", "= '" + dashboardName + "'");
|
||||
dbName = dashboardName;
|
||||
}
|
||||
String grp;
|
||||
if (group.equals("[Other]"))
|
||||
{
|
||||
query = query.replace("$group", "is null");
|
||||
grp = null;
|
||||
}
|
||||
else
|
||||
{
|
||||
query = query.replace("$group", "= '" + group + "'");
|
||||
grp = group;
|
||||
}
|
||||
rows = jdbcTemplate.queryForList(SELECT_PAGED_METRICS_BY_DASHBOARD_AND_GROUP, id, dbName, dbName, grp, grp,
|
||||
(page-1)*size, size);
|
||||
}
|
||||
List<Map<String, Object>> rows = null;
|
||||
List<Metric> pagedMetrics = new ArrayList<Metric>();
|
||||
rows = jdbcTemplate.queryForList(query, id, (page-1)*size, size);
|
||||
for (Map row : rows) {
|
||||
|
||||
List<Metric> pagedMetrics = new ArrayList<>();
|
||||
for (Map row : rows) {
|
||||
Metric metric = new Metric();
|
||||
metric.id = (int)row.get("metric_id");
|
||||
metric.name = (String)row.get("metric_name");
|
||||
@ -148,7 +152,7 @@ public class MetricsDAO extends AbstractMySQLOpenSourceDAO
|
||||
metric.dashboardName = (String)row.get("dashboard_name");
|
||||
metric.category = (String)row.get("metric_category");
|
||||
metric.group = (String)row.get("metric_group");
|
||||
metric.watchId = (Long)row.get("watch_id");
|
||||
metric.watchId = (Long)row.get("watch_id");
|
||||
pagedMetrics.add(metric);
|
||||
}
|
||||
long count = 0;
|
||||
|
@ -1572,5 +1572,22 @@
|
||||
</div>
|
||||
{{/if}}
|
||||
</script>
|
||||
|
||||
<!-- Piwik -->
|
||||
<script type="text/javascript">
|
||||
var _paq = _paq || [];
|
||||
_paq.push(['trackPageView']);
|
||||
_paq.push(['enableLinkTracking']);
|
||||
(function() {
|
||||
var u="//piwik.corp.linkedin.com/piwik/";
|
||||
_paq.push(['setTrackerUrl', u+'piwik.php']);
|
||||
_paq.push(['setSiteId', 93]);
|
||||
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
|
||||
g.type='text/javascript'; g.async=true; g.defer=true; g.src=u+'piwik.js'; s.parentNode.insertBefore(g,s);
|
||||
})();
|
||||
</script>
|
||||
<noscript><p><img src="//piwik.corp.linkedin.com/piwik/piwik.php?idsite=94" style="border:0;" alt="" /></p></noscript>
|
||||
<!-- End Piwik Code -->
|
||||
|
||||
</body>
|
||||
</html>
|
||||
|
@ -73,6 +73,13 @@ public class PathAnalyzer {
|
||||
DatasetPath datasetPath = new DatasetPath();
|
||||
datasetPath.fullPath = fullPath;
|
||||
|
||||
// remove the "dalids://" or "hive://" header
|
||||
Pattern daliPattern = Pattern.compile("(dalids|hive):///(\\w.*/)?(\\w.*)(\\.|/)(\\w.*)$");
|
||||
Matcher daliMatcher = daliPattern.matcher(fullPath);
|
||||
if (daliMatcher.matches()) {
|
||||
fullPath = String.format( "/%s/%s", daliMatcher.group(3), daliMatcher.group(5) );
|
||||
}
|
||||
|
||||
// remove the "hdfs://.../" header
|
||||
Pattern headerPattern = Pattern.compile("hdfs://.*:\\d{4}(/.*)");
|
||||
Matcher headerMatcher = headerPattern.matcher(fullPath);
|
||||
@ -81,7 +88,7 @@ public class PathAnalyzer {
|
||||
}
|
||||
|
||||
// remove 'tmp' folder
|
||||
if (fullPath.startsWith("/tmp/")) {
|
||||
if (fullPath.startsWith("/tmp/") || fullPath.startsWith("/temp/")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user