Change flowPath, jobName format.

Fix sql nullable value comparison bug.
Formating.
This commit is contained in:
SunZhaonan 2015-12-15 17:15:43 -08:00
parent 07c46304b5
commit 7bf3ecdb4a
7 changed files with 56 additions and 135 deletions

View File

@ -37,7 +37,6 @@ import java.util.Properties;
public abstract class EtlJob {
private final static String CONFIG_FILE = "application.properties";
public PythonInterpreter interpreter;
public PySystemState sys;
public Properties prop;
public ClassLoader classLoader = getClass().getClassLoader();
protected final Logger logger = LoggerFactory.getLogger(getClass());
@ -74,7 +73,7 @@ public abstract class EtlJob {
* @param properties
*/
public EtlJob(Integer appId, Integer dbId, Long whExecId, Properties properties) {
configFromProperties(appId, dbId, whExecId, properties);
PySystemState sys = configFromProperties(appId, dbId, whExecId, properties);
addJythonToPath(sys);
interpreter = new PythonInterpreter(null, sys);
}
@ -127,8 +126,9 @@ public abstract class EtlJob {
* @param appId
* @param whExecId
* @param properties
* @return PySystemState A PySystemState that contain all the arguments.
*/
private void configFromProperties(Integer appId, Integer dbId, Long whExecId, Properties properties) {
private PySystemState configFromProperties(Integer appId, Integer dbId, Long whExecId, Properties properties) {
this.prop = properties;
if (appId != null)
prop.setProperty(Constant.APP_ID_KEY, String.valueOf(appId));
@ -140,8 +140,9 @@ public abstract class EtlJob {
String value = prop.getProperty(key);
config.put(new PyString(key), new PyString(value));
}
sys = new PySystemState();
PySystemState sys = new PySystemState();
sys.argv.append(config);
return sys;
}
/**
* Extract data from source

View File

@ -124,24 +124,30 @@ public class AzJobChecker {
JsonNode wholeFlow = mapper.readTree(flowJson);
JsonNode allJobs = wholeFlow.get("nodes");
String flowPath = wholeFlow.get("projectName").asText() + ":" + wholeFlow.get("flowId").asText();
List<AzkabanJobExecRecord> results = parseJsonHelper(allJobs, flowExecId, "", flowPath);
List<AzkabanJobExecRecord> results = parseJsonHelper(allJobs, flowExecId, flowPath);
AzkabanJobExecUtil.sortAndSet(results);
return results;
}
private List<AzkabanJobExecRecord> parseJsonHelper(JsonNode allJobs, long flowExecId, String jobPrefix, String flowPath) {
/**
* Recursively process the execution info to get {@AzkabanJobExecRecord}
* @param allJobs JsonNode in "nodes" field
* @param flowExecId
* @param flowPath Format : project_name:first_level_flow/sub_flow/sub_flow
* @return
*/
private List<AzkabanJobExecRecord> parseJsonHelper(JsonNode allJobs, long flowExecId, String flowPath) {
List<AzkabanJobExecRecord> results = new ArrayList<>();
for (JsonNode oneJob : allJobs) {
if (oneJob.has("nodes")) { // is a subflow
String subFlowName = oneJob.get("id").asText();
String newJobPrefix = jobPrefix.length() > 0 ? jobPrefix + subFlowName + ":" : subFlowName + ":";
results.addAll(parseJsonHelper(oneJob.get("nodes"), flowExecId, newJobPrefix, flowPath));
flowPath += "/" + subFlowName;
results.addAll(parseJsonHelper(oneJob.get("nodes"), flowExecId, flowPath));
} else {
String jobName = oneJob.get("id").asText();
long startTime = oneJob.get("startTime").asLong();
long endTime = oneJob.get("endTime").asLong();
String status = oneJob.get("status").asText();
jobName = jobPrefix.length() > 0 ? jobPrefix + jobName : jobName;
AzkabanJobExecRecord azkabanJobExecRecord =
new AzkabanJobExecRecord(appId, jobName, flowExecId, (int) (startTime / 1000), (int) (endTime / 1000),
status, flowPath);

View File

@ -43,9 +43,16 @@ public class AzLineageExtractor {
throws Exception {
List<LineageRecord> oneAzkabanJobLineage = new ArrayList<>();
// azkaban job name should have subflow name append in front
String flowSequence[] = message.azkabanJobExecution.getFlowPath().split(":")[1].split("/");
String jobPrefix = "";
for (int i = 1; i < flowSequence.length; i ++) {
jobPrefix += flowSequence[i] + ":";
}
//String log = asc.getExecLog(azJobExec.execId, azJobExec.jobName);
String log =
message.adc.getExecLog(message.azkabanJobExecution.getFlowExecId(), message.azkabanJobExecution.getJobName());
message.adc.getExecLog(message.azkabanJobExecution.getFlowExecId(), jobPrefix + message.azkabanJobExecution.getJobName());
Set<String> hadoopJobIds = AzLogParser.getHadoopJobIdFromLog(log);
for (String hadoopJobId : hadoopJobIds) {

View File

@ -191,7 +191,7 @@ class HdfsLoad:
delete from dict_field_detail where field_id in (select field_id from t_deleted_fields);
-- update the old record if some thing changed
update dict_dict_field_detail t join
update dict_field_detail t join
(
select x.field_id, s.*
from stg_dict_field_detail s join dict_dataset d
@ -204,26 +204,27 @@ class HdfsLoad:
and (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id
or x.data_type <> s.data_type
or x.data_size <> s.data_size
or x.data_precision <> s.data_precision
or x.is_nullable <> s.is_nullable
or x.is_partitioned <> s.is_partitioned
or x.is_distributed <> s.is_distributed
or x.default_value <> s.default_value
or x.namespace <> s.namespace
or x.data_size <> s.data_size or (x.data_size is null) ^ (s.data_size is null)
or x.data_precision <> s.data_precision or (x.data_precision is null) ^ (s.data_precision is null)
or x.is_nullable <> s.is_nullable or (x.is_nullable is null) ^ (s.is_nullable is null)
or x.is_partitioned <> s.is_partitioned or (x.is_partitioned is null) ^ (s.is_partitioned is null)
or x.is_distributed <> s.is_distributed or (x.is_distributed is null) ^ (s.is_distributed is null)
or x.default_value <> s.default_value or (x.default_value is null) ^ (s.default_value is null)
or x.namespace <> s.namespace or (x.namespace is null) ^ (s.namespace is null)
)
) p
on t.field_id = p.field_id
set t.sort_id = p.sort_id,
t.parent_sort_id = p.parent_sort_id,
t.data_type = p.data_type,
t.data_size = p.data_size,
t.data_precision = p.data_precision,
t.is_nullable = p.is_nullable,
t.is_partitioned = p.is_partitioned,
t.is_distributed = p.is_distributed
t.default_value = p.default_value
t.namespace = p.namespace
t.last_modified = now()
t.is_distributed = p.is_distributed,
t.default_value = p.default_value,
t.namespace = p.namespace,
t.modified = now()
;
insert into dict_field_detail (

View File

@ -152,7 +152,7 @@ class TeradataLoad:
delete from dict_field_detail where field_id in (select field_id from t_deleted_fields);
-- update the old record if some thing changed
update dict_dict_field_detail t join
update dict_field_detail t join
(
select x.field_id, s.*
from stg_dict_field_detail s join dict_dataset d
@ -165,26 +165,27 @@ class TeradataLoad:
and (x.sort_id <> s.sort_id
or x.parent_sort_id <> s.parent_sort_id
or x.data_type <> s.data_type
or x.data_size <> s.data_size
or x.data_precision <> s.data_precision
or x.is_nullable <> s.is_nullable
or x.is_partitioned <> s.is_partitioned
or x.is_distributed <> s.is_distributed
or x.default_value <> s.default_value
or x.namespace <> s.namespace
or x.data_size <> s.data_size or (x.data_size is null) ^ (s.data_size is null)
or x.data_precision <> s.data_precision or (x.data_precision is null) ^ (s.data_precision is null)
or x.is_nullable <> s.is_nullable or (x.is_nullable is null) ^ (s.is_nullable is null)
or x.is_partitioned <> s.is_partitioned or (x.is_partitioned is null) ^ (s.is_partitioned is null)
or x.is_distributed <> s.is_distributed or (x.is_distributed is null) ^ (s.is_distributed is null)
or x.default_value <> s.default_value or (x.default_value is null) ^ (s.default_value is null)
or x.namespace <> s.namespace or (x.namespace is null) ^ (s.namespace is null)
)
) p
on t.field_id = p.field_id
set t.sort_id = p.sort_id,
t.parent_sort_id = p.parent_sort_id,
t.data_type = p.data_type,
t.data_size = p.data_size,
t.data_precision = p.data_precision,
t.is_nullable = p.is_nullable,
t.is_partitioned = p.is_partitioned,
t.is_distributed = p.is_distributed
t.default_value = p.default_value
t.namespace = p.namespace
t.last_modified = now()
t.is_distributed = p.is_distributed,
t.default_value = p.default_value,
t.namespace = p.namespace,
t.modified = now()
;
show warnings limit 10;

View File

@ -33,87 +33,6 @@ import wherehows.common.schemas.AzkabanJobExecRecord;
* Created by zsun on 9/9/15.
*/
public class AzJobCheckerTest {
final String jsonInput = "{\"attempt\": 0,\n" + " \"endTime\": 1442233069062,\n" + " \"executionId\": 832765,\n"
+ " \"executionOptions\": {\"concurrentOption\": \"skip\",\n" + " \"disabled\": [],\n"
+ " \"failureAction\": \"FINISH_CURRENTLY_RUNNING\",\n"
+ " \"failureEmails\": [\"zsun@linkedin.com\"],\n"
+ " \"failureEmailsOverride\": true,\n" + " \"flowParameters\": {},\n"
+ " \"mailCreator\": \"default\",\n" + " \"memoryCheck\": true,\n"
+ " \"notifyOnFirstFailure\": false,\n"
+ " \"notifyOnLastFailure\": false,\n" + " \"pipelineExecId\": null,\n"
+ " \"pipelineLevel\": null,\n" + " \"queueLevel\": 0,\n"
+ " \"successEmails\": [],\n" + " \"successEmailsOverride\": false},\n"
+ " \"executionPath\": \"executions/832765\",\n" + " \"flowId\": \"hadoop-datasets-stats\",\n" + " \"id\": null,\n"
+ " \"lastModfiedTime\": 1440783373310,\n" + " \"lastModifiedUser\": \"zsun\",\n"
+ " \"nodes\": [{\"attempt\": 0,\n" + " \"endTime\": 1442233069053,\n"
+ " \"id\": \"hadoop-datasets-stats\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats.job\",\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442233069045,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"noop\",\n"
+ " \"updateTime\": 1442233069057},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442232636665,\n"
+ " \"id\": \"hadoop-datasets-stats_load-size-avro-into-mysql\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_load-size-avro-into-mysql.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229210581,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442232636670},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442229861221,\n"
+ " \"id\": \"hadoop-datasets-stats_extract-dataset-layout\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_extract-dataset-layout.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229210582,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442229861231},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442229210579,\n" + " \"id\": \"hadoop-datasets-stats_sizePartition\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_sizeAggr\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_sizePartition.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_load-size-avro-into-mysql\",\n"
+ " \"hadoop-datasets-stats_extract-dataset-layout\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442228463681,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442229210587},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442228463629,\n" + " \"id\": \"hadoop-datasets-stats_sizeAggr\",\n"
+ " \"jobSource\": \"hadoop-datasets-stats_sizeAggr.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442224810817,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442228463679},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442229882257,\n"
+ " \"id\": \"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_extract-dataset-layout\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229861224,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442229882261},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442233066192,\n"
+ " \"id\": \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_load-size-avro-into-mysql\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442232636668,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442233066196},\n" + " {\"attempt\": 0,\n"
+ " \"endTime\": 1442233069043,\n"
+ " \"id\": \"hadoop-datasets-stats_enrich-abstract-dataset\",\n"
+ " \"inNodes\": [\"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\",\n"
+ " \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\"],\n"
+ " \"jobSource\": \"hadoop-datasets-stats_enrich-abstract-dataset.job\",\n"
+ " \"outNodes\": [\"hadoop-datasets-stats\"],\n"
+ " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442233066194,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n"
+ " \"updateTime\": 1442233069046}],\n" + " \"projectId\": 533,\n"
+ " \"projectName\": \"WherehowsETL\",\n" + " \"properties\": [{\"source\": \"common.properties\"}],\n"
+ " \"proxyUsers\": [\"azkaban@GRID.LINKEDIN.COM\",\n" + " \"dev_svc\",\n"
+ " \"azkaban/eat1-nertzaz01.grid.linkedin.com@GRID.LINKEDIN.COM\",\n"
+ " \"zsun\",\n" + " \"data_svc\"],\n" + " \"startTime\": 1442224810815,\n"
+ " \"status\": \"SUCCEEDED\",\n" + " \"submitTime\": 1442224810778,\n" + " \"submitUser\": \"zsun\",\n"
+ " \"type\": null,\n" + " \"updateTime\": 1442233069065,\n" + " \"version\": 301}";
AzJobChecker ajc;
Properties prop;
@ -149,27 +68,11 @@ public class AzJobCheckerTest {
Assert.assertNotNull(results);
}
@Test(groups = {"needConfig"})
public void parseJsonTest()
throws IOException {
List<AzkabanJobExecRecord> result = ajc.parseJson(jsonInput, 11111);
for (int i = 0; i < result.size(); i++) {
AzkabanJobExecRecord aje = result.get(i);
System.out.println(aje.getJobExecId());
System.out.println(aje.getJobName());
System.out.println(aje.getStartTime());
System.out.println(aje.getEndTime());
System.out.println(aje.getFlowPath());
System.out.println();
Assert.assertEquals((long) aje.getJobExecId(), 11111 * 1000 + i);
}
}
@Test(groups = {"needConfig"})
public void parseNestedJsonTest()
throws IOException, URISyntaxException {
URL url = Thread.currentThread().getContextClassLoader().getResource("nestedJson");
URL url = Thread.currentThread().getContextClassLoader().getResource("nestedFlowContent.json");
byte[] encoded = Files.readAllBytes(Paths.get(url.getPath()));
String nestedJson = new String(encoded, "UTF-8");
List<AzkabanJobExecRecord> result = ajc.parseJson(nestedJson, 11111);

View File

@ -35,6 +35,8 @@ import wherehows.common.schemas.LineageRecord;
*/
public class AzLogParserTest {
private final int TEST_APP_ID = -1;
private final int TEST_DATABASE_ID = -1;
@BeforeTest
public void setUp()
throws SQLException {
@ -79,7 +81,7 @@ public class AzLogParserTest {
@Test(groups = {"needConfig"})
public void getLineageFromLogTest() {
String logSample = "asfdasdfsadf Moving from staged path[asdf] to final resting place[/tm/b/c] sdaf dsfasdfasdf";
AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path");
AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(TEST_APP_ID, "someJobName", (long) 0, 0, 0, "S", "path");
sampleExecution.setJobExecId((long) 11111);
List<LineageRecord> result = AzLogParser.getLineageFromLog(logSample, sampleExecution, -1);
System.out.println(result.get(0).toDatabaseValue());
@ -104,8 +106,8 @@ public class AzLogParserTest {
+ "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103 : Initiating swap of endorsements-member-restrictions with dataDir: /jobs/endorse/endorsements/master/tmp/endorsements-member-restrictions.store/lva1-voldemort-read-only-2-vip.prod.linkedin.com\n"
+ "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103 : Invoking fetch for Node lva1-app0610.prod.linkedin.com [id 0] for webhdfs://lva1-warnn01.grid.linkedin.com:50070/jobs/endorse/endorsements/master/tmp/endorsements-member-restrictions.store/lva1-voldemort-read-only-2-vip.prod.linkedin.com/node-0\n"
+ "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-rea";
AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path");
List<LineageRecord> result = AzLogParser.getLineageFromLog(logSample, sampleExecution, -1);
AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(TEST_APP_ID, "someJobName", (long) 0, 0, 0, "S", "path");
List<LineageRecord> result = AzLogParser.getLineageFromLog(logSample, sampleExecution, TEST_DATABASE_ID);
System.out.println(result.get(0).toDatabaseValue());
Assert.assertEquals(result.get(0).getFullObjectName(),
"tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103/endorsements-member-restrictions");