Merge with master

This commit is contained in:
SunZhaonan 2015-12-16 15:54:50 -08:00
commit cd44daba5d
17 changed files with 312 additions and 193 deletions

View File

@ -34,10 +34,8 @@ import wherehows.common.writers.DatabaseWriter;
*/
public class LineageDao {
public static final String FIND_JOBS_BY_DATASET =
" select distinct ca.short_connection_string, f.flow_group, f.flow_name, jedl.job_name "
" select distinct ca.short_connection_string, jedl.job_name, jedl.flow_path "
+ " from job_execution_data_lineage jedl "
+ " join flow_execution fe on jedl.app_id = fe.app_id and jedl.flow_exec_id = fe.flow_exec_id "
+ " join flow f on fe.app_id = f.app_id and fe.flow_id = f.flow_id "
+ " join cfg_application ca on ca.app_id = jedl.app_id "
+ " join cfg_database cd on cd.db_id = jedl.db_id "
+ " where source_target_type = :source_target_type "

View File

@ -4,16 +4,14 @@ gradle dist;
rm -r target/universal/backend-service-1.0-SNAPSHOT;
unzip target/universal/backend-service-1.0-SNAPSHOT.zip -d target/universal/;
scp target/universal/backend-service-1.0-SNAPSHOT/lib/metadata-etl-1.0.jar dev_svc@lva1-rpt14:~/backendServer/lib/;
scp target/universal/backend-service-1.0-SNAPSHOT/lib/wherehows-common-1.0.jar dev_svc@lva1-rpt14:~/backendServer/lib/;
scp target/universal/backend-service-1.0-SNAPSHOT/lib/metadata-etl-1.0.jar ${TARGET_SERVER}:~/backendServer/lib/;
scp target/universal/backend-service-1.0-SNAPSHOT/lib/default.backend-service-1.0-SNAPSHOT.jar dev_svc@lva1-rpt14:~/backendServer/lib/;
scp target/universal/backend-service-1.0-SNAPSHOT/lib/default.backend-service-1.0-SNAPSHOT.jar ${TARGET_SERVER}:~/backendServer/lib/;
scp target/universal/backend-service-1.0-SNAPSHOT/lib/schemaFetch.jar dev_svc@lva1-rpt14:~/backendServer/lib/;
scp target/universal/backend-service-1.0-SNAPSHOT/lib/schemaFetch.jar ${TARGET_SERVER}:~/backendServer/lib/;
scp target/universal/backend-service-1.0-SNAPSHOT/lib/wherehows-common-1.0.jar ${TARGET_SERVER}:~/backendServer/lib/;
scp target/universal/backend-service-1.0-SNAPSHOT/bin/* ${TARGET_SERVER}:~/backendServer/bin/;
scp target/universal/backend-service-1.0-SNAPSHOT/bin/* dev_svc@lva1-rpt14:~/backendServer/bin/;
scp target/universal/backend-service-1.0-SNAPSHOT/lib/metadata-etl-1.0.jar cloudera@172.21.98.211:~/wherehows/backend-service-1.0-SNAPSHOT/lib/

View File

@ -165,21 +165,30 @@ CREATE TABLE `dict_field_detail` (
`field_label` VARCHAR(100) DEFAULT NULL,
`data_type` VARCHAR(50) NOT NULL,
`data_size` INT(10) UNSIGNED DEFAULT NULL,
`data_precision` TINYINT(4) DEFAULT NULL,
`data_fraction` TINYINT(4) DEFAULT NULL,
`data_precision` TINYINT(4) DEFAULT NULL
COMMENT 'only in decimal type',
`data_fraction` TINYINT(4) DEFAULT NULL
COMMENT 'only in decimal type',
`default_comment_id` INT(11) UNSIGNED DEFAULT NULL
COMMENT 'a list of comment_id',
`comment_ids` VARCHAR(500) DEFAULT NULL,
`is_nullable` CHAR(1) DEFAULT NULL,
`is_indexed` CHAR(1) DEFAULT NULL,
`is_partitioned` CHAR(1) DEFAULT NULL,
`is_distributed` TINYINT(4) DEFAULT NULL,
`is_indexed` CHAR(1) DEFAULT NULL
COMMENT 'only in RDBMS',
`is_partitioned` CHAR(1) DEFAULT NULL
COMMENT 'only in RDBMS',
`is_distributed` TINYINT(4) DEFAULT NULL
COMMENT 'only in RDBMS',
`default_value` VARCHAR(200) DEFAULT NULL,
`namespace` VARCHAR(200) DEFAULT NULL,
`java_data_type` VARCHAR(50) DEFAULT NULL,
`jdbc_data_type` VARCHAR(50) DEFAULT NULL,
`pig_data_type` VARCHAR(50) DEFAULT NULL,
`hcatalog_data_type` VARCHAR(50) DEFAULT NULL,
`java_data_type` VARCHAR(50) DEFAULT NULL
COMMENT 'correspond type in java',
`jdbc_data_type` VARCHAR(50) DEFAULT NULL
COMMENT 'correspond type in jdbc',
`pig_data_type` VARCHAR(50) DEFAULT NULL
COMMENT 'correspond type in pig',
`hcatalog_data_type` VARCHAR(50) DEFAULT NULL
COMMENT 'correspond type in hcatalog',
`modified` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`field_id`),
KEY `idx_dict_field__datasetid_fieldname` (`dataset_id`, `field_name`) USING BTREE,

View File

@ -35,9 +35,8 @@ import java.util.Properties;
* Created by zsun on 7/29/15.
*/
public abstract class EtlJob {
public final static String CONFIG_FILE = "application.properties";
private final static String CONFIG_FILE = "application.properties";
public PythonInterpreter interpreter;
public PySystemState sys;
public Properties prop;
public ClassLoader classLoader = getClass().getClassLoader();
protected final Logger logger = LoggerFactory.getLogger(getClass());
@ -61,8 +60,8 @@ public abstract class EtlJob {
*/
@Deprecated
public EtlJob(Integer appId, Integer dbId, long whExecId, String configFile) {
configFromFile(appId, dbId, whExecId, configFile);
addJythonToPath();
PySystemState sys = configFromFile(appId, dbId, whExecId, configFile);
addJythonToPath(sys);
interpreter = new PythonInterpreter(null, sys);
}
@ -74,12 +73,12 @@ public abstract class EtlJob {
* @param properties
*/
public EtlJob(Integer appId, Integer dbId, Long whExecId, Properties properties) {
configFromProperties(appId, dbId, whExecId, properties);
addJythonToPath();
PySystemState sys = configFromProperties(appId, dbId, whExecId, properties);
addJythonToPath(sys);
interpreter = new PythonInterpreter(null, sys);
}
private void addJythonToPath() {
private void addJythonToPath(PySystemState pySystemState) {
URL url = classLoader.getResource("jython");
if (url != null) {
File file = new File(url.getFile());
@ -87,12 +86,12 @@ public abstract class EtlJob {
if (path.startsWith("file:")) {
path = path.substring(5);
}
sys.path.append(new PyString(path.replace("!", "")));
pySystemState.path.append(new PyString(path.replace("!", "")));
}
}
@Deprecated
private void configFromFile(Integer appId, Integer dbId, long whExecId, String configFile) {
private PySystemState configFromFile(Integer appId, Integer dbId, long whExecId, String configFile) {
prop = new Properties();
if (appId != null) {
@ -117,8 +116,9 @@ public abstract class EtlJob {
config.put(new PyString(key), new PyString(value));
}
sys = new PySystemState();
PySystemState sys = new PySystemState();
sys.argv.append(config);
return sys;
}
/**
@ -126,8 +126,9 @@ public abstract class EtlJob {
* @param appId
* @param whExecId
* @param properties
* @return PySystemState A PySystemState that contain all the arguments.
*/
private void configFromProperties(Integer appId, Integer dbId, Long whExecId, Properties properties) {
private PySystemState configFromProperties(Integer appId, Integer dbId, Long whExecId, Properties properties) {
this.prop = properties;
if (appId != null)
prop.setProperty(Constant.APP_ID_KEY, String.valueOf(appId));
@ -139,8 +140,9 @@ public abstract class EtlJob {
String value = prop.getProperty(key);
config.put(new PyString(key), new PyString(value));
}
sys = new PySystemState();
PySystemState sys = new PySystemState();
sys.argv.append(config);
return sys;
}
/**
* Extract data from source

View File

@ -52,7 +52,7 @@ public class AzJobChecker {
/**
* Default 10 minutes
* @return
* @return A list of recent finished AzkabanJobExecRecord
* @throws IOException
* @throws SQLException
*/
@ -72,23 +72,32 @@ public class AzJobChecker {
public List<AzkabanJobExecRecord> getRecentFinishedJobFromFlow(int timeFrameMinutes)
throws IOException, SQLException {
long currentTimeStamp = System.currentTimeMillis();
long oneHourAgo = currentTimeStamp - 1000 * 60 * timeFrameMinutes;
return getRecentFinishedJobFromFlow(oneHourAgo);
long beginTimeStamp = currentTimeStamp - 1000 * 60 * timeFrameMinutes;
return getRecentFinishedJobFromFlow(beginTimeStamp, currentTimeStamp);
}
public List<AzkabanJobExecRecord> getRecentFinishedJobFromFlow(int timeFrameMinutes, long endTimeStamp)
throws IOException, SQLException {
long beginTimeStamp = endTimeStamp - 60 * timeFrameMinutes;
return getRecentFinishedJobFromFlow(beginTimeStamp * 1000, endTimeStamp * 1000); // convert to milli seconds
}
/**
* Read the blob from "flow_data", do a topological sort on the nodes. Give them the sort id.
* @param timestamp the beginning timestamp
* @param startTimeStamp the begin timestamp
* @param endTimeStamp the end timestamp
* @return
*/
public List<AzkabanJobExecRecord> getRecentFinishedJobFromFlow(long timestamp)
public List<AzkabanJobExecRecord> getRecentFinishedJobFromFlow(long startTimeStamp, long endTimeStamp)
throws SQLException, IOException {
logger.info("Get the jobs from time : {}", timestamp);
logger.info("Get the jobs from time : {} to time : {}", startTimeStamp, endTimeStamp);
List<AzkabanJobExecRecord> results = new ArrayList<>();
Statement stmt = conn.createStatement();
final String cmd =
"select exec_id, flow_id, status, submit_user, flow_data from execution_flows where end_time > " + timestamp;
"select exec_id, flow_id, status, submit_user, flow_data from execution_flows where end_time > " + startTimeStamp
+ " and end_time < " + endTimeStamp ;
logger.info("Get recent flow sql : " + cmd);
final ResultSet rs = stmt.executeQuery(cmd); // this sql take 3 second to execute
while (rs.next()) {
@ -101,27 +110,53 @@ public class AzJobChecker {
return results;
}
/**
* Parse the json of flow_data field from execution_flows. Use recursion to handle the nested case.
* @param flowJson
* @param flowExecId
* @return
* @throws IOException
*/
public List<AzkabanJobExecRecord> parseJson(String flowJson, long flowExecId)
throws IOException {
List<AzkabanJobExecRecord> results = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
JsonNode wholeFlow = mapper.readTree(flowJson);
JsonNode allJobs = wholeFlow.get("nodes");
String flowPath = wholeFlow.get("projectName").asText() + ":" + wholeFlow.get("flowId").asText();
for (JsonNode oneJob : allJobs) {
String jobName = oneJob.get("id").asText();
long startTime = oneJob.get("startTime").asLong();
long endTime = oneJob.get("endTime").asLong();
String status = oneJob.get("status").asText();
AzkabanJobExecRecord azkabanJobExecRecord =
new AzkabanJobExecRecord(appId, jobName, flowExecId, (int) (startTime / 1000), (int) (endTime / 1000), status,
flowPath);
results.add(azkabanJobExecRecord);
}
List<AzkabanJobExecRecord> results = parseJsonHelper(allJobs, flowExecId, flowPath);
AzkabanJobExecUtil.sortAndSet(results);
return results;
}
/**
* Recursively process the execution info to get {@AzkabanJobExecRecord}
* @param allJobs JsonNode in "nodes" field
* @param flowExecId
* @param flowPath Format : project_name:first_level_flow/sub_flow/sub_flow
* @return
*/
private List<AzkabanJobExecRecord> parseJsonHelper(JsonNode allJobs, long flowExecId, String flowPath) {
List<AzkabanJobExecRecord> results = new ArrayList<>();
for (JsonNode oneJob : allJobs) {
if (oneJob.has("nodes")) { // is a subflow
String subFlowName = oneJob.get("id").asText();
flowPath += "/" + subFlowName;
results.addAll(parseJsonHelper(oneJob.get("nodes"), flowExecId, flowPath));
} else {
String jobName = oneJob.get("id").asText();
long startTime = oneJob.get("startTime").asLong();
long endTime = oneJob.get("endTime").asLong();
String status = oneJob.get("status").asText();
AzkabanJobExecRecord azkabanJobExecRecord =
new AzkabanJobExecRecord(appId, jobName, flowExecId, (int) (startTime / 1000), (int) (endTime / 1000),
status, flowPath);
results.add(azkabanJobExecRecord);
}
}
return results;
}
public void close()
throws SQLException {
conn.close();

View File

@ -43,9 +43,16 @@ public class AzLineageExtractor {
throws Exception {
List<LineageRecord> oneAzkabanJobLineage = new ArrayList<>();
// azkaban job name should have subflow name append in front
String flowSequence[] = message.azkabanJobExecution.getFlowPath().split(":")[1].split("/");
String jobPrefix = "";
for (int i = 1; i < flowSequence.length; i ++) {
jobPrefix += flowSequence[i] + ":";
}
//String log = asc.getExecLog(azJobExec.execId, azJobExec.jobName);
String log =
message.adc.getExecLog(message.azkabanJobExecution.getFlowExecId(), message.azkabanJobExecution.getJobName());
message.adc.getExecLog(message.azkabanJobExecution.getFlowExecId(), jobPrefix + message.azkabanJobExecution.getJobName());
Set<String> hadoopJobIds = AzLogParser.getHadoopJobIdFromLog(log);
for (String hadoopJobId : hadoopJobIds) {
@ -61,8 +68,8 @@ public class AzLineageExtractor {
// normalize and combine the path
LineageCombiner lineageCombiner = new LineageCombiner(message.connection);
lineageCombiner.addAll(oneAzkabanJobLineage);
List<LineageRecord> lineageFromLog = AzLogParser.getLineageFromLog(log, message.azkabanJobExecution);
Integer defaultDatabaseId = Integer.valueOf(message.prop.getProperty(Constant.AZ_DEFAULT_HADOOP_DATABASE_ID_KEY));
List<LineageRecord> lineageFromLog = AzLogParser.getLineageFromLog(log, message.azkabanJobExecution, defaultDatabaseId);
lineageCombiner.addAll(lineageFromLog);
return lineageCombiner.getCombinedLineage();

View File

@ -60,17 +60,22 @@ public class AzLineageExtractorMaster {
run(10);
}
public void run(int timeFrame)
throws Exception {
run(timeFrame, System.currentTimeMillis());
}
/**
* Entry point.
* All recent finished azkaban jobs' lineage. Will write to database stagging table
* @param timeFrame in minutes
* @throws Exception
*/
public void run(int timeFrame)
public void run(int timeFrame, long endTimeStamp)
throws Exception {
// get recent finished job
AzJobChecker azJobChecker = new AzJobChecker(prop);
List<AzkabanJobExecRecord> jobExecList = azJobChecker.getRecentFinishedJobFromFlow(timeFrame);
List<AzkabanJobExecRecord> jobExecList = azJobChecker.getRecentFinishedJobFromFlow(timeFrame, endTimeStamp);
azJobChecker.close();
logger.info("Total number of azkaban jobs : {}", jobExecList.size());
@ -90,7 +95,7 @@ public class AzLineageExtractorMaster {
Connection conn = DriverManager.getConnection(connUrl);
DatabaseWriter databaseWriter = new DatabaseWriter(connUrl, "stg_job_execution_data_lineage");
AzLogParser.initialize(conn, Integer.valueOf(prop.getProperty(Constant.AZ_DEFAULT_HADOOP_DATABASE_ID_KEY)));
AzLogParser.initialize(conn);
PathAnalyzer.initialize(conn);
int timeout = 30; // default 30 minutes for one job
if (prop.containsKey(Constant.LINEAGE_ACTOR_TIMEOUT_KEY))

View File

@ -27,7 +27,8 @@ import java.util.Properties;
*/
public class AzLineageMetadataEtl extends EtlJob {
public int timeFrame = -1;
public Integer timeFrame = null;
public Long endTimeStamp = null;
Connection conn;
/**
@ -53,6 +54,8 @@ public class AzLineageMetadataEtl extends EtlJob {
public AzLineageMetadataEtl(int appId, long whExecId, Properties properties) {
super(appId, null, whExecId, properties);
this.timeFrame = Integer.valueOf(this.prop.getProperty(Constant.AZ_LINEAGE_ETL_LOOKBACK_MINS_KEY));
if (this.prop.contains(Constant.AZ_LINEAGE_ETL_END_TIMESTAMP_KEY))
this.endTimeStamp = Long.valueOf(this.prop.getProperty(Constant.AZ_LINEAGE_ETL_END_TIMESTAMP_KEY));
try {
setUp();
} catch (SQLException e) {
@ -78,7 +81,11 @@ public class AzLineageMetadataEtl extends EtlJob {
conn.createStatement().execute(emptyStaggingTable);
AzLineageExtractorMaster azLineageExtractorMaster = new AzLineageExtractorMaster(prop);
// get lineage
if (timeFrame > 0) {
if (timeFrame != null && endTimeStamp != null && endTimeStamp != 0) {
azLineageExtractorMaster.run(timeFrame, endTimeStamp);
}
else if (timeFrame != null) {
azLineageExtractorMaster.run(timeFrame);
} else {
azLineageExtractorMaster.run(10);
@ -110,8 +117,8 @@ public class AzLineageMetadataEtl extends EtlJob {
conn.createStatement().execute(insertIntoFinalTable);
logger.info("Azkaban lineage metadata ETL completed");
if (prop.getProperty(Constant.APP_ID_KEY).equals("32")) {
logger.info("TEMPORARY load war's data into cmdb database");
if (prop.getProperty(Constant.APP_ID_KEY).equals("32") || prop.getProperty(Constant.APP_ID_KEY).equals("31") ) {
logger.info("TEMPORARY load war & nertz's data into cmdb database");
loadIntoOthers();
}
}

View File

@ -34,7 +34,6 @@ public class AzLogParser {
static List<LogLineagePattern> logLineagePatterns;
static List<String> logHadoopIdPatterns;
static int defaultDatabaseId;
/**
* Parse the hadoop job id from the log.
@ -65,18 +64,17 @@ public class AzLogParser {
/**
* initialize, download the regex info into cache
*/
public synchronized static void initialize(Connection conn, int defaultDatabaseId)
public synchronized static void initialize(Connection conn)
throws SQLException {
if (logHadoopIdPatterns != null && logLineagePatterns != null) {
return;
}
loadLineagePatterns(conn, defaultDatabaseId);
loadHadoopIdPatterns(conn, defaultDatabaseId);
loadLineagePatterns(conn);
loadHadoopIdPatterns(conn);
}
private static void loadLineagePatterns(Connection conn, int defaultDatabaseId)
private static void loadLineagePatterns(Connection conn)
throws SQLException {
AzLogParser.defaultDatabaseId = defaultDatabaseId;
logLineagePatterns = new ArrayList<>();
String cmd = "SELECT regex, database_type, database_name_index, dataset_index, operation_type, source_target_type, "
+ "record_count_index, record_byte_index, insert_count_index, insert_byte_index, "
@ -95,7 +93,7 @@ public class AzLogParser {
}
}
private static void loadHadoopIdPatterns(Connection conn, int defaultDatabaseId)
private static void loadHadoopIdPatterns(Connection conn)
throws SQLException {
logHadoopIdPatterns = new ArrayList<>();
String cmd = "SELECT regex FROM log_reference_job_id_pattern WHERE is_active = 1";
@ -111,7 +109,7 @@ public class AzLogParser {
* @param azkabanJobExecRecord contain the job execution info to construct the result
* @return
*/
public static List<LineageRecord> getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord) {
public static List<LineageRecord> getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord, Integer defaultDatabaseId) {
List<LineageRecord> result = new ArrayList<>();

View File

@ -61,15 +61,15 @@ public class HadoopNameNodeExtractor {
String WH_HOME = System.getenv("WH_HOME");
String USER_HOME = System.getenv("HOME") + "/.kerberos";
String ETC = "/etc";
String TMP = "/tmp" + "/.kerberos";
String TMP = "/var/tmp" + "/.kerberos";
String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME, TMP, ETC};
String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME, TMP};
for (String possition : allPositions) {
String gssFileName = possition + "/gss-jaas.conf";
File gssFile = new File(gssFileName);
if (gssFile.exists()) {
logger.debug("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath());
logger.info("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath());
System.setProperty("java.security.auth.login.config", gssFile.getAbsolutePath());
break;
} else {
@ -80,7 +80,7 @@ public class HadoopNameNodeExtractor {
String krb5FileName = possition + "/krb5.conf";
File krb5File = new File(krb5FileName);
if (krb5File.exists()) {
logger.debug("find krb5.conf file in : {}", krb5File.getAbsolutePath());
logger.info("find krb5.conf file in : {}", krb5File.getAbsolutePath());
System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
break;
} else {

View File

@ -168,15 +168,74 @@ class HdfsLoad:
analyze table field_comments;
-- delete old record if it does not exist in this load batch anymore (but have the dataset id)
create temporary table if not exists t_deleted_fields (primary key (field_id))
select x.field_id
from stg_dict_field_detail s
join dict_dataset i
on s.urn = i.urn
and s.db_id = {db_id}
right join dict_field_detail x
on i.id = x.dataset_id
and s.field_name = x.field_name
and s.parent_path = x.parent_path
where s.field_name is null
and x.dataset_id in (
select d.id dataset_id
from stg_dict_field_detail k join dict_dataset d
on k.urn = d.urn
and k.db_id = {db_id}
)
; -- run time : ~2min
delete from dict_field_detail where field_id in (select field_id from t_deleted_fields);
-- update the old record if some thing changed
update dict_field_detail t join
(
select x.field_id, s.*
from stg_dict_field_detail s join dict_dataset d
on s.urn = d.urn
join dict_field_detail x
on s.field_name = x.field_name
and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*')
and d.id = x.dataset_id
where s.db_id = {db_id}
and (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id
or x.data_type <> s.data_type
or x.data_size <> s.data_size or (x.data_size is null XOR s.data_size is null)
or x.data_precision <> s.data_precision or (x.data_precision is null XOR s.data_precision is null)
or x.is_nullable <> s.is_nullable or (x.is_nullable is null XOR s.is_nullable is null)
or x.is_partitioned <> s.is_partitioned or (x.is_partitioned is null XOR s.is_partitioned is null)
or x.is_distributed <> s.is_distributed or (x.is_distributed is null XOR s.is_distributed is null)
or x.default_value <> s.default_value or (x.default_value is null XOR s.default_value is null)
or x.namespace <> s.namespace or (x.namespace is null XOR s.namespace is null)
)
) p
on t.field_id = p.field_id
set t.sort_id = p.sort_id,
t.parent_sort_id = p.parent_sort_id,
t.data_type = p.data_type,
t.data_size = p.data_size,
t.data_precision = p.data_precision,
t.is_nullable = p.is_nullable,
t.is_partitioned = p.is_partitioned,
t.is_distributed = p.is_distributed,
t.default_value = p.default_value,
t.namespace = p.namespace,
t.modified = now()
;
insert into dict_field_detail (
dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path,
field_name, namespace, data_type, data_size, is_nullable, default_value,
default_comment_id
default_comment_id, modified
)
select
d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path,
sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value,
coalesce(fc.id, t.default_comment_id) fc_id
coalesce(fc.id, t.default_comment_id) fc_id, now()
from stg_dict_field_detail sf join dict_dataset d
on sf.urn = d.urn
left join field_comments fc

View File

@ -129,29 +129,63 @@ class TeradataLoad:
and (char_length(trim(description)) = 0
or description in ('null', 'N/A', 'nothing', 'empty', 'none'));
UPDATE dict_field_detail t, (
select
f.field_id,
s.sort_id,
s.data_type,
s.data_size,
s.data_precision,
s.data_scale,
s.is_nullable
from stg_dict_field_detail s join dict_dataset d
on s.urn = d.urn
join dict_field_detail f
on d.id = f.dataset_id
and s.field_name = f.field_name
where s.db_id = {db_id}
) x
set t.sort_id = x.sort_id
, t.data_type = x.data_type
, t.data_size = x.data_size
, t.data_precision = x.data_precision
, t.data_fraction = x.data_scale
, t.is_nullable = x.is_nullable
where t.field_id = x.field_id
-- delete old record if it does not exist in this load batch anymore (but have the dataset id)
create temporary table if not exists t_deleted_fields (primary key (field_id))
select x.field_id
from stg_dict_field_detail s
join dict_dataset i
on s.urn = i.urn
and s.db_id = {db_id}
right join dict_field_detail x
on i.id = x.dataset_id
and s.field_name = x.field_name
and s.parent_path = x.parent_path
where s.field_name is null
and x.dataset_id in (
select d.id dataset_id
from stg_dict_field_detail k join dict_dataset d
on k.urn = d.urn
and k.db_id = {db_id}
)
; -- run time : ~2min
delete from dict_field_detail where field_id in (select field_id from t_deleted_fields);
-- update the old record if some thing changed
update dict_field_detail t join
(
select x.field_id, s.*
from stg_dict_field_detail s join dict_dataset d
on s.urn = d.urn
join dict_field_detail x
on s.field_name = x.field_name
and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*')
and d.id = x.dataset_id
where s.db_id = {db_id}
and (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id
or x.data_type <> s.data_type
or x.data_size <> s.data_size or (x.data_size is null XOR s.data_size is null)
or x.data_precision <> s.data_precision or (x.data_precision is null XOR s.data_precision is null)
or x.is_nullable <> s.is_nullable or (x.is_nullable is null XOR s.is_nullable is null)
or x.is_partitioned <> s.is_partitioned or (x.is_partitioned is null XOR s.is_partitioned is null)
or x.is_distributed <> s.is_distributed or (x.is_distributed is null XOR s.is_distributed is null)
or x.default_value <> s.default_value or (x.default_value is null XOR s.default_value is null)
or x.namespace <> s.namespace or (x.namespace is null XOR s.namespace is null)
)
) p
on t.field_id = p.field_id
set t.sort_id = p.sort_id,
t.parent_sort_id = p.parent_sort_id,
t.data_type = p.data_type,
t.data_size = p.data_size,
t.data_precision = p.data_precision,
t.is_nullable = p.is_nullable,
t.is_partitioned = p.is_partitioned,
t.is_distributed = p.is_distributed,
t.default_value = p.default_value,
t.namespace = p.namespace,
t.modified = now()
;
show warnings limit 10;

View File

@ -13,7 +13,12 @@
*/
package metadata.etl.lineage;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
@ -28,86 +33,7 @@ import wherehows.common.schemas.AzkabanJobExecRecord;
* Created by zsun on 9/9/15.
*/
public class AzJobCheckerTest {
final String jsonInput = "{\"attempt\": 0,\n" + " \"endTime\": 1442233069062,\n" + " \"executionId\": 832765,\n"
+ " \"executionOptions\": {\"concurrentOption\": \"skip\",\n" + " \"disabled\": [],\n"
+ " \"failureAction\": \"FINISH_CURRENTLY_RUNNING\",\n"
+ " \"failureEmails\": [\"zsun@linkedin.com\"],\n"
+ " \"failureEmailsOverride\": true,\n" + " \"flowParameters\": {},\n"
+ " \"mailCreator\": \"default\",\n" + " \"memoryCheck\": true,\n"
+ " \"notifyOnFirstFailure\": false,\n"
+ " \"notifyOnLastFailure\": false,\n" + " \"pipelineExecId\": null,\n"
+ " \"pipelineLevel\": null,\n" + " \"queueLevel\": 0,\n"
+ " \"successEmails\": [],\n" + " \"successEmailsOverride\": false},\n"
+ " \"executionPath\": \"executions/832765\",\n" + " \"flowId\": \"hadoop-datasets-stats\",\n" + " \"id\": null,\n"
+ " \"lastModfiedTime\": 1440783373310,\n" + " \"lastModifiedUser\": \"zsun\",\n"
+ " \"nodes\": [{\"attempt\": 0,\n" + " \"endTime\": 1442233069053,\n"
+ " \"id\": \"hadoop-datasets-stats\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats.job\",\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442233069045,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"noop\",\n"
+ " \"updateTime\": 1442233069057},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442232636665,\n"
+ " \"id\": \"hadoop-datasets-stats_load-size-avro-into-mysql\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_load-size-avro-into-mysql.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229210581,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442232636670},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442229861221,\n"
+ " \"id\": \"hadoop-datasets-stats_extract-dataset-layout\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_extract-dataset-layout.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229210582,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442229861231},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442229210579,\n" + " \"id\": \"hadoop-datasets-stats_sizePartition\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_sizeAggr\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_sizePartition.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_load-size-avro-into-mysql\",\n"
+ " \"hadoop-datasets-stats_extract-dataset-layout\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442228463681,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442229210587},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442228463629,\n" + " \"id\": \"hadoop-datasets-stats_sizeAggr\",\n"
+ " \"jobSource\": \"hadoop-datasets-stats_sizeAggr.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442224810817,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442228463679},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442229882257,\n"
+ " \"id\": \"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_extract-dataset-layout\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229861224,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442229882261},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442233066192,\n"
+ " \"id\": \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_load-size-avro-into-mysql\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442232636668,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442233066196},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442233069043,\n"
+ " \"id\": \"hadoop-datasets-stats_enrich-abstract-dataset\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\",\n"
+ " \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_enrich-abstract-dataset.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442233066194,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442233069046}],\n" + " \"projectId\": 533,\n"
+ " \"projectName\": \"WherehowsETL\",\n" + " \"properties\": [{\"source\": \"common.properties\"}],\n"
+ " \"proxyUsers\": [\"azkaban@GRID.LINKEDIN.COM\",\n" + " \"dev_svc\",\n"
+ " \"azkaban/eat1-nertzaz01.grid.linkedin.com@GRID.LINKEDIN.COM\",\n"
+ " \"zsun\",\n" + " \"data_svc\"],\n" + " \"startTime\": 1442224810815,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"submitTime\": 1442224810778,\n" + " \"submitUser\": \"zsun\",\n"
+ " \"type\": null,\n" + " \"updateTime\": 1442233069065,\n" + " \"version\": 301}";
AzJobChecker ajc;
Properties prop;
@ -131,9 +57,25 @@ public class AzJobCheckerTest {
}
@Test(groups = {"needConfig"})
public void parseJsonTest()
throws IOException {
List<AzkabanJobExecRecord> result = ajc.parseJson(jsonInput, 11111);
public void getRecentFinishedJobFromFlowTest2()
throws SQLException, IOException {
List<AzkabanJobExecRecord> results = ajc.getRecentFinishedJobFromFlow(2, 1448916456L);
for (AzkabanJobExecRecord a : results) {
System.out.print(a.getFlowExecId() + "\t");
System.out.print(a.getJobName() + "\t");
System.out.println(a.getJobExecId());
}
Assert.assertNotNull(results);
}
@Test(groups = {"needConfig"})
public void parseNestedJsonTest()
throws IOException, URISyntaxException {
URL url = Thread.currentThread().getContextClassLoader().getResource("nestedFlowContent.json");
byte[] encoded = Files.readAllBytes(Paths.get(url.getPath()));
String nestedJson = new String(encoded, "UTF-8");
List<AzkabanJobExecRecord> result = ajc.parseJson(nestedJson, 11111);
for (int i = 0; i < result.size(); i++) {
AzkabanJobExecRecord aje = result.get(i);
System.out.println(aje.getJobExecId());
@ -145,4 +87,7 @@ public class AzJobCheckerTest {
Assert.assertEquals((long) aje.getJobExecId(), 11111 * 1000 + i);
}
}
}

View File

@ -44,7 +44,7 @@ public class AzLineageExtractorTest {
String wherehowsPassWord = prop.getProperty(Constant.WH_DB_PASSWORD_KEY);
connUrl = wherehowsUrl + "?" + "user=" + wherehowsUserName + "&password=" + wherehowsPassWord;
this.conn = DriverManager.getConnection(connUrl);
AzLogParser.initialize(conn, Integer.valueOf(prop.getProperty(Constant.AZ_DEFAULT_HADOOP_DATABASE_ID_KEY)));
AzLogParser.initialize(conn);
PathAnalyzer.initialize(conn);
}

View File

@ -35,6 +35,8 @@ import wherehows.common.schemas.LineageRecord;
*/
public class AzLogParserTest {
private final int TEST_APP_ID = -1;
private final int TEST_DATABASE_ID = -1;
@BeforeTest
public void setUp()
throws SQLException {
@ -45,7 +47,7 @@ public class AzLogParserTest {
String wherehowsPassWord = prop.getProperty(Constant.WH_DB_PASSWORD_KEY);
Connection conn =
DriverManager.getConnection(wherehowsHost + "?" + "user=" + wherehowsUserName + "&password=" + wherehowsPassWord);
AzLogParser.initialize(conn, -1);
AzLogParser.initialize(conn);
}
@Test(groups = {"needConfig"})
@ -79,9 +81,9 @@ public class AzLogParserTest {
@Test(groups = {"needConfig"})
public void getLineageFromLogTest() {
String logSample = "asfdasdfsadf Moving from staged path[asdf] to final resting place[/tm/b/c] sdaf dsfasdfasdf";
AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path");
AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(TEST_APP_ID, "someJobName", (long) 0, 0, 0, "S", "path");
sampleExecution.setJobExecId((long) 11111);
List<LineageRecord> result = AzLogParser.getLineageFromLog(logSample, sampleExecution);
List<LineageRecord> result = AzLogParser.getLineageFromLog(logSample, sampleExecution, -1);
System.out.println(result.get(0).toDatabaseValue());
Assert.assertEquals(result.get(0).toDatabaseValue(),
@ -104,8 +106,8 @@ public class AzLogParserTest {
+ "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103 : Initiating swap of endorsements-member-restrictions with dataDir: /jobs/endorse/endorsements/master/tmp/endorsements-member-restrictions.store/lva1-voldemort-read-only-2-vip.prod.linkedin.com\n"
+ "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103 : Invoking fetch for Node lva1-app0610.prod.linkedin.com [id 0] for webhdfs://lva1-warnn01.grid.linkedin.com:50070/jobs/endorse/endorsements/master/tmp/endorsements-member-restrictions.store/lva1-voldemort-read-only-2-vip.prod.linkedin.com/node-0\n"
+ "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-rea";
AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path");
List<LineageRecord> result = AzLogParser.getLineageFromLog(logSample, sampleExecution);
AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(TEST_APP_ID, "someJobName", (long) 0, 0, 0, "S", "path");
List<LineageRecord> result = AzLogParser.getLineageFromLog(logSample, sampleExecution, TEST_DATABASE_ID);
System.out.println(result.get(0).toDatabaseValue());
Assert.assertEquals(result.get(0).getFullObjectName(),
"tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103/endorsements-member-restrictions");

View File

@ -38,6 +38,7 @@ public class Constant {
public static final String AZ_DEFAULT_HADOOP_DATABASE_ID_KEY = "az.default.hadoop.database.id";
public static final String AZ_LINEAGE_ETL_LOOKBACK_MINS_KEY = "az.lineage_etl.lookback_period.in.minutes";
public static final String LINEAGE_ACTOR_TIMEOUT_KEY = "az.lineage.actor.timeout";
public static final String AZ_LINEAGE_ETL_END_TIMESTAMP_KEY = "az.lineage_etl.end_timestamp";
public static final String AZ_SERVICE_URL_KEY = "az.server.url";
public static final String AZ_SERVICE_USERNAME_KEY = "az.server.username";
@ -106,4 +107,15 @@ public class Constant {
public static final String GIT_HOST_KEY = "git.host";
public static final String GIT_PROJECT_WHITELIST_KEY = "git.project.whitelist";
// hive
public static final String HIVE_METASTORE_JDBC_DRIVER = "hive.metastore.jdbc.driver";
public static final String HIVE_METASTORE_JDBC_URL = "hive.metastore.jdbc.url";
public static final String HIVE_METASTORE_USERNAME = "hive.metstore.username";
public static final String HIVE_METASTORE_PASSWORD = "hive.metastore.password";
public static final String HIVE_SCHEMA_JSON_FILE_KEY = "hive.schema_json_file";
// public static final String HIVE_SAMPLE_CSV_FILE_KEY = "hive.sample_csv";
public static final String HIVE_SCHEMA_CSV_FILE_KEY = "hive.schema_csv_file";
public static final String HIVE_FIELD_METADATA_KEY = "hive.field_metadata";
}

View File

@ -29,8 +29,11 @@ public class DatasetFieldRecord implements Record {
Integer parentSortId;
String parentPath;
String fieldName;
String fieldLabel;
String dataType;
String isNullable;
String isIndexed;
String isPartitioned;
String defaultValue;
Integer dataSize;
String namespace;
@ -39,18 +42,21 @@ public class DatasetFieldRecord implements Record {
List<Object> allFields;
char SEPR = 0x001A;
public DatasetFieldRecord(String urn, Integer sortId, Integer parentSortId, String parentPath, String fieldName,
String dataType, String isNullable, String defaultValue, Integer dataSize, String namespace, String description) {
public DatasetFieldRecord(String urn, Integer sortId, Integer parentSortId, String parentPath, String fieldName, String fieldLabel,
String dataType, String isNullable, String isIndexed, String isPartitioned, String defaultValue, Integer dataSize, String namespace, String description) {
this.urn = urn;
this.sortId = sortId;
this.parentSortId = parentSortId;
this.parentPath = parentPath;
this.fieldName = fieldName;
this.fieldLabel = fieldLabel;
this.dataType = dataType;
this.isNullable = isNullable;
this.defaultValue = defaultValue;
this.dataSize = dataSize;
this.isNullable = isNullable;
this.isIndexed = isIndexed;
this.isPartitioned = isPartitioned;
this.defaultValue = defaultValue;
this.namespace = namespace;
this.description = description;
@ -61,9 +67,11 @@ public class DatasetFieldRecord implements Record {
this.allFields.add(parentPath);
this.allFields.add(fieldName);
this.allFields.add(dataType);
this.allFields.add(isNullable);
this.allFields.add(defaultValue);
this.allFields.add(dataSize);
this.allFields.add(isNullable);
this.allFields.add(isIndexed);
this.allFields.add(isPartitioned);
this.allFields.add(defaultValue);
this.allFields.add(namespace);
this.allFields.add(description);
}