Add JobExecutionLineageEvent and kafka processor

This commit is contained in:
Yi Wang 2016-11-08 19:11:37 -08:00
parent a32a0e4b7f
commit b4f5e438e2
27 changed files with 1698 additions and 40 deletions

View File

@ -66,7 +66,7 @@ Execute the [DDL files][DDL] to create the required repository tables in **where
2. Put a few 3rd-party jar files to **metadata-etl/extralibs** directory. Some of these jar files may not be available in Maven Central or Artifactory. See [the download instrucitons][EXJAR] for more detail. ```cd WhereHows/metadata-etl/extralibs```
3. Go back to the **WhereHows** root directory and build all the modules: ```./gradlew build```
4. Go back to the **WhereHows** root directory and start the metadata ETL and API service: ```cd backend-service ; $ACTIVATOR_HOME/bin/activator run```
5. Go back to the **WhereHows** root directory and start the web front-end: ```cd web ; $ACTIVATOR_HOME/bin/activator run``` Then WhereHows UI is available at http://localhost:9000 by default. For example, ```play run -Dhttp.port=19001``` will use port 19001 to serve UI.
5. Go back to the **WhereHows** root directory and start the web front-end: ```cd web ; $ACTIVATOR_HOME/bin/activator run``` Then WhereHows UI is available at http://localhost:9000 by default. For example, ```$ACTIVATOR_HOME/bin/activator run -Dhttp.port=19001``` will use port 19001 to serve UI.
## Contribute

View File

@ -34,6 +34,8 @@ import play.Logger;
import play.Play;
import models.kafka.KafkaConfig;
import models.kafka.KafkaConfig.Topic;
import play.db.DB;
import wherehows.common.PathAnalyzer;
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
import wherehows.common.utils.ClusterUtil;
@ -123,6 +125,13 @@ public class KafkaConsumerMaster extends UntypedActor {
} catch (Exception e) {
Logger.error("Fail to fetch cluster info from DB ", e);
}
try {
// initialize PathAnalyzer
PathAnalyzer.initialize(DB.getConnection("wherehows"));
} catch (Exception e) {
Logger.error("Fail to initialize PathAnalyzer from DB wherehows.", e);
}
}
@Override

View File

@ -29,7 +29,6 @@ import play.mvc.Result;
import wherehows.common.schemas.DatasetCapacityRecord;
import wherehows.common.schemas.DatasetCaseSensitiveRecord;
import wherehows.common.schemas.DatasetConstraintRecord;
import wherehows.common.schemas.DatasetDeploymentRecord;
import wherehows.common.schemas.DatasetIndexRecord;
import wherehows.common.schemas.DatasetOwnerRecord;
import wherehows.common.schemas.DatasetPartitionRecord;
@ -37,6 +36,7 @@ import wherehows.common.schemas.DatasetReferenceRecord;
import wherehows.common.schemas.DatasetSchemaInfoRecord;
import wherehows.common.schemas.DatasetSecurityRecord;
import wherehows.common.schemas.DatasetTagRecord;
import wherehows.common.schemas.DeploymentRecord;
public class DatasetInfoController extends Controller {
@ -49,7 +49,7 @@ public class DatasetInfoController extends Controller {
int datasetId = Integer.parseInt(datasetIdString);
try {
List<DatasetDeploymentRecord> records = DatasetInfoDao.getDatasetDeploymentByDatasetId(datasetId);
List<DeploymentRecord> records = DatasetInfoDao.getDatasetDeploymentByDatasetId(datasetId);
resultJson.put("return_code", 200);
resultJson.set("deploymentInfo", Json.toJson(records));
} catch (EmptyResultDataAccessException e) {
@ -68,7 +68,7 @@ public class DatasetInfoController extends Controller {
return ok(resultJson);
}
try {
List<DatasetDeploymentRecord> records = DatasetInfoDao.getDatasetDeploymentByDatasetUrn(urn);
List<DeploymentRecord> records = DatasetInfoDao.getDatasetDeploymentByDatasetUrn(urn);
resultJson.put("return_code", 200);
resultJson.set("deploymentInfo", Json.toJson(records));
} catch (EmptyResultDataAccessException e) {

View File

@ -31,7 +31,7 @@ import play.mvc.Result;
*/
public class LineageController extends Controller {
public static Result getJobsByDataset(String urn) throws SQLException {
if(!Urn.validateUrn(urn)){
if (!Urn.validateUrn(urn)) {
ObjectNode resultJson = Json.newObject();
resultJson.put("return_code", 400);
resultJson.put("error_message", "Urn format wrong!");
@ -143,4 +143,19 @@ public class LineageController extends Controller {
return ok(resultJson);
}
@BodyParser.Of(BodyParser.Json.class)
public static Result updateJobExecutionLineage() {
JsonNode lineage = request().body().asJson();
ObjectNode resultJson = Json.newObject();
try {
LineageDao.updateJobExecutionLineage(lineage);
resultJson.put("return_code", 200);
resultJson.put("message", "Job Execution Lineage Updated!");
} catch (Exception e) {
resultJson.put("return_code", 404);
resultJson.put("error_message", e.getMessage());
}
return ok(resultJson);
}
}

View File

@ -28,6 +28,7 @@ import java.util.Map;
public class CfgDao {
public static final String FIND_APP_INFO_BY_NAME = "SELECT * FROM cfg_application where short_connection_string = :name";
public static final String FIND_APP_INFO_BY_APP_CODE = "SELECT * FROM cfg_application where app_code = :app_code";
public static final String FIND_DB_INFO_BY_NAME = "SELECT * from cfg_database where short_connection_string = :name";
public static final String FIND_APP_INFO_BY_ID = "SELECT * FROM cfg_application where app_id = :id";
public static final String FIND_DB_INFO_BY_ID = "SELECT * from cfg_database where db_id = :id";
@ -49,6 +50,13 @@ public class CfgDao {
return JdbcUtil.wherehowsNamedJdbcTemplate.queryForMap(FIND_APP_INFO_BY_NAME, params);
}
public static Map<String, Object> getAppByAppCode(String appCode)
throws Exception {
Map<String, Object> params = new HashMap<>();
params.put("app_code", appCode);
return JdbcUtil.wherehowsNamedJdbcTemplate.queryForMap(FIND_APP_INFO_BY_APP_CODE, params);
}
public static Map<String, Object> getDbByName(String name) throws Exception {
Map<String, Object> params = new HashMap<>();
params.put("name", name);

View File

@ -31,7 +31,7 @@ import wherehows.common.enums.OwnerType;
import wherehows.common.schemas.DatasetCapacityRecord;
import wherehows.common.schemas.DatasetCaseSensitiveRecord;
import wherehows.common.schemas.DatasetConstraintRecord;
import wherehows.common.schemas.DatasetDeploymentRecord;
import wherehows.common.schemas.DeploymentRecord;
import wherehows.common.schemas.DatasetFieldIndexRecord;
import wherehows.common.schemas.DatasetFieldSchemaRecord;
import wherehows.common.schemas.DatasetIndexRecord;
@ -269,32 +269,32 @@ public class DatasetInfoDao {
return new Object[]{null, null};
}
public static List<DatasetDeploymentRecord> getDatasetDeploymentByDatasetId(int datasetId)
public static List<DeploymentRecord> getDatasetDeploymentByDatasetId(int datasetId)
throws DataAccessException {
Map<String, Object> params = new HashMap<>();
params.put("dataset_id", datasetId);
List<Map<String, Object>> results =
JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(GET_DATASET_DEPLOYMENT_BY_DATASET_ID, params);
List<DatasetDeploymentRecord> records = new ArrayList<>();
List<DeploymentRecord> records = new ArrayList<>();
for (Map<String, Object> result : results) {
DatasetDeploymentRecord record = new DatasetDeploymentRecord();
DeploymentRecord record = new DeploymentRecord();
record.convertToRecord(result);
records.add(record);
}
return records;
}
public static List<DatasetDeploymentRecord> getDatasetDeploymentByDatasetUrn(String datasetUrn)
public static List<DeploymentRecord> getDatasetDeploymentByDatasetUrn(String datasetUrn)
throws DataAccessException {
Map<String, Object> params = new HashMap<>();
params.put("dataset_urn", datasetUrn);
List<Map<String, Object>> results =
JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(GET_DATASET_DEPLOYMENT_BY_URN, params);
List<DatasetDeploymentRecord> records = new ArrayList<>();
List<DeploymentRecord> records = new ArrayList<>();
for (Map<String, Object> result : results) {
DatasetDeploymentRecord record = new DatasetDeploymentRecord();
DeploymentRecord record = new DeploymentRecord();
record.convertToRecord(result);
records.add(record);
}
@ -320,7 +320,7 @@ public class DatasetInfoDao {
// om.setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
for (final JsonNode deploymentInfo : deployment) {
DatasetDeploymentRecord record = om.convertValue(deploymentInfo, DatasetDeploymentRecord.class);
DeploymentRecord record = om.convertValue(deploymentInfo, DeploymentRecord.class);
record.setDatasetId(datasetId);
record.setDatasetUrn(urn);
record.setModifiedTime(System.currentTimeMillis() / 1000);

View File

@ -14,16 +14,26 @@
package models.daos;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.springframework.dao.DataAccessException;
import play.Logger;
import utils.Urn;
import utils.JdbcUtil;
import wherehows.common.DatasetPath;
import wherehows.common.LineageCombiner;
import wherehows.common.schemas.ApplicationRecord;
import wherehows.common.schemas.JobExecutionRecord;
import wherehows.common.schemas.LineageDatasetRecord;
import wherehows.common.schemas.LineageRecord;
import wherehows.common.utils.PartitionPatternMatcher;
import wherehows.common.utils.PreparedStatementUtil;
import wherehows.common.writers.DatabaseWriter;
@ -33,6 +43,16 @@ import wherehows.common.writers.DatabaseWriter;
* Modified by zechen on 10/12/15.
*/
public class LineageDao {
private static final String JOB_EXECUTION_TABLE = "job_execution";
private static final String JOB_EXECUTION_DATA_LINEAGE_TABLE = "job_execution_data_lineage";
private static final DatabaseWriter JOB_EXECUTION_WRITER =
new DatabaseWriter(JdbcUtil.wherehowsJdbcTemplate, JOB_EXECUTION_TABLE);
private static final DatabaseWriter JOB_EXECUTION_DATA_LINEAGE_WRITER =
new DatabaseWriter(JdbcUtil.wherehowsJdbcTemplate, JOB_EXECUTION_DATA_LINEAGE_TABLE);
public static final String FIND_JOBS_BY_DATASET =
" select distinct ca.short_connection_string, jedl.job_name, jedl.flow_path "
+ " from job_execution_data_lineage jedl "
@ -57,7 +77,6 @@ public class LineageDao {
+ " where jedl.source_target_type = :source_target_type "
+ " and ca.short_connection_string like :instance ";
public static final String FIND_DATASETS_BY_FLOW_EXEC =
" select distinct jedl.abstracted_object_name, jedl.db_id, jedl.partition_start, jedl.partition_end, "
+ " jedl.storage_type, jedl.record_count, jedl.insert_count, jedl.update_count, jedl.delete_count "
@ -68,7 +87,6 @@ public class LineageDao {
+ " and jedl.source_target_type = :source_target_type "
+ " and ca.short_connection_string like :instance ";
public static final String FIND_DATASETS_BY_JOB_EXEC =
" select distinct jedl.abstracted_object_name, jedl.db_id, jedl.partition_start, jedl.partition_end, "
+ " jedl.storage_type, jedl.record_count, jedl.insert_count, jedl.update_count, jedl.delete_count "
@ -78,6 +96,13 @@ public class LineageDao {
+ " and jedl.source_target_type = :source_target_type "
+ " and ca.short_connection_string like :instance ";
public static final String INSERT_JOB_EXECUTION_RECORD =
PreparedStatementUtil.prepareInsertTemplateWithColumn("REPLACE", JOB_EXECUTION_TABLE,
JobExecutionRecord.dbColumns());
public static final String INSERT_JOB_EXECUTION_DATA_LINEAGE_RECORD =
PreparedStatementUtil.prepareInsertTemplateWithColumn("REPLACE", JOB_EXECUTION_DATA_LINEAGE_TABLE,
LineageRecord.dbColumns());
public static List<Map<String, Object>> getJobsByDataset(String urn, String period, String cluster, String instance, String sourceTargetType)
throws SQLException {
@ -97,8 +122,7 @@ public class LineageDao {
} else {
params.put("instance", instance);
}
List<Map<String, Object>> jobs = JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(FIND_JOBS_BY_DATASET, params);
return jobs;
return JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(FIND_JOBS_BY_DATASET, params);
}
public static List<Map<String, Object>> getDatasetsByJob(String flowPath, String jobName, String instance, String sourceTargetType) {
@ -111,8 +135,7 @@ public class LineageDao {
} else {
params.put("instance", instance);
}
List<Map<String, Object>> datasets = JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(FIND_DATASETS_BY_JOB, params);
return datasets;
return JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(FIND_DATASETS_BY_JOB, params);
}
public static List<Map<String, Object>> getDatasetsByFlowExec(Long flowExecId, String jobName, String instance, String sourceTargetType) {
@ -125,8 +148,7 @@ public class LineageDao {
} else {
params.put("instance", instance);
}
List<Map<String, Object>> datasets = JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(FIND_DATASETS_BY_FLOW_EXEC, params);
return datasets;
return JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(FIND_DATASETS_BY_FLOW_EXEC, params);
}
public static List<Map<String, Object>> getDatasetsByJobExec(Long jobExecId, String instance, String sourceTargetType) {
@ -138,8 +160,7 @@ public class LineageDao {
} else {
params.put("instance", instance);
}
List<Map<String, Object>> datasets = JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(FIND_DATASETS_BY_JOB_EXEC, params);
return datasets;
return JdbcUtil.wherehowsNamedJdbcTemplate.queryForList(FIND_DATASETS_BY_JOB_EXEC, params);
}
public static void insertLineage(JsonNode lineage) throws Exception {
@ -149,7 +170,7 @@ public class LineageDao {
String appName = lineage.findPath("app_name").asText();
// Set application id if app id is not set or equals to 0
if (appId == 0) {
appId = (Integer) CfgDao.getAppByName(appName).get("id");
appId = (Integer) CfgDao.getAppByName(appName).get("app_id");
}
Long flowExecId = lineage.findPath("flow_exec_id").asLong();
@ -168,7 +189,7 @@ public class LineageDao {
String databaseName = node.findPath("database_name").asText();
// Set application id if app id is not set or equals to 0
if (databaseId == 0) {
databaseId = (Integer) CfgDao.getDbByName(databaseName).get("id");
databaseId = (Integer) CfgDao.getDbByName(databaseName).get("db_id");
}
String abstractedObjectName = node.findPath("abstracted_object_name").asText();
@ -251,6 +272,123 @@ public class LineageDao {
e.printStackTrace();
}
}
}
public static void updateJobExecutionLineage(JsonNode root)
throws Exception {
final ObjectMapper om = new ObjectMapper();
// get Application
final JsonNode application = root.path("application");
final JsonNode jobExecution = root.path("jobExecution");
final JsonNode inputDatasetList = root.path("inputDatasetList");
final JsonNode outputDatasetList = root.path("outputDatasetList");
if (application.isMissingNode() || jobExecution.isMissingNode() || !inputDatasetList.isArray()
|| !outputDatasetList.isArray()) {
throw new IllegalArgumentException(
"Job Execution Lineage info update error, missing necessary fields: " + root.toString());
}
ApplicationRecord appRecord = om.convertValue(application, ApplicationRecord.class);
// match app id from cfg_application
Integer appId;
try {
appId = (Integer) CfgDao.getAppByAppCode(appRecord.getName()).get("app_id");
} catch (Exception ex) {
Logger.error("Can't find application by app_code: " + application.toString(), ex);
throw ex;
}
// process job execution info
JobExecutionRecord jobExecRecord = om.convertValue(jobExecution, JobExecutionRecord.class);
// TODO generate flow_id, job_id if not provided
jobExecRecord.setAppId(appId);
jobExecRecord.setLogTime(System.currentTimeMillis());
JOB_EXECUTION_WRITER.execute(INSERT_JOB_EXECUTION_RECORD, jobExecRecord.dbValues());
// process job data lineage info
final List<LineageRecord> lineageRecords = new ArrayList<>();
for (final JsonNode inputDataset : inputDatasetList) {
LineageDatasetRecord lineageDataset = om.convertValue(inputDataset, LineageDatasetRecord.class);
lineageDataset.setSourceTargetType("source");
LineageRecord record = convertLineageDataset(lineageDataset, jobExecRecord);
if (record != null) {
lineageRecords.add(record);
}
}
for (final JsonNode outputDataset : outputDatasetList) {
LineageDatasetRecord lineageDataset = om.convertValue(outputDataset, LineageDatasetRecord.class);
lineageDataset.setSourceTargetType("target");
LineageRecord record = convertLineageDataset(lineageDataset, jobExecRecord);
if (record != null) {
lineageRecords.add(record);
}
}
// combine partitions
final LineageCombiner lineageCombiner = new LineageCombiner(null);
List<LineageRecord> combinedLineageRecords;
try {
lineageCombiner.addAllWoPartitionUpdate(lineageRecords);
combinedLineageRecords = lineageCombiner.getCombinedLineage();
} catch (Exception ex) {
Logger.error("Lineage records combine error: ", ex);
throw ex;
}
// generate srl_no
int srlNumber = 0;
for (LineageRecord record : combinedLineageRecords) {
record.setSrlNo(srlNumber);
srlNumber++;
}
// store data lineage info
for (LineageRecord record : combinedLineageRecords) {
try {
JOB_EXECUTION_DATA_LINEAGE_WRITER.execute(INSERT_JOB_EXECUTION_DATA_LINEAGE_RECORD, record.dbValues());
} catch (DataAccessException ex) {
Logger.error("Data Lineage input error: ", ex);
}
}
}
// convert LineageDatasetRecord and JobExecutionRecord into LineageRecord
private static LineageRecord convertLineageDataset(LineageDatasetRecord lineageDataset, JobExecutionRecord jobExec)
throws Exception {
final LineageRecord record = new LineageRecord(jobExec.getAppId(), jobExec.getFlowExecutionId(), jobExec.getName(),
jobExec.getExecutionId());
record.setFlowPath(jobExec.getTopLevelFlowName());
record.setJobExecUUID(jobExec.getExecutionGuid());
record.setSourceTargetType(lineageDataset.getSourceTargetType());
record.setOperation(lineageDataset.getOperation());
record.setJobStartTime((int) (jobExec.getStartTime() / 1000));
record.setJobEndTime((int) (jobExec.getEndTime() / 1000));
if (lineageDataset.getPartition() != null) {
record.setPartitionStart(lineageDataset.getPartition().getMinPartitionValue());
record.setPartitionEnd(lineageDataset.getPartition().getMaxPartitionValue());
record.setPartitionType(lineageDataset.getPartition().getPartitionType());
}
if (lineageDataset.getDatasetUrn() != null) {
record.setFullObjectName(lineageDataset.getDatasetUrn());
} else if (lineageDataset.getDatasetProperties() != null
&& lineageDataset.getDatasetProperties().getUri() != null) {
record.setFullObjectName(lineageDataset.getDatasetProperties().getUri());
}
if (record.getFullObjectName() != null) {
List<String> abstractPaths = DatasetPath.separatedDataset(record.getFullObjectName());
if (abstractPaths.size() > 0) {
record.setAbstractObjectName(abstractPaths.get(0));
}
}
return record;
}
}

View File

@ -0,0 +1,45 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package models.kafka;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import models.daos.LineageDao;
import org.apache.avro.generic.GenericData;
import play.Logger;
import wherehows.common.schemas.Record;
public class JobExecutionLineageProcessor extends KafkaConsumerProcessor {
/**
* Process a JobExecutionLineageEvent record
* @param record GenericData.Record
* @param topic String
* @throws Exception
* @return null
*/
public Record process(GenericData.Record record, String topic)
throws Exception {
if (record != null) {
// Logger.info("Processing Job Execution Lineage Event record. ");
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
final JsonNode rootNode = new ObjectMapper().readTree(record.toString());
LineageDao.updateJobExecutionLineage(rootNode);
}
return null;
}
}

View File

@ -45,6 +45,9 @@ GET /lineage/job/exec/job controllers.LineageController.getDatas
# Insert a new job data lineage
POST /lineage controllers.LineageController.addJobLineage()
# Update job execution lineage
POST /lineage/job/exec controllers.LineageController.updateJobExecutionLineage()
#param: instance
#param: flowPath
#get flow owner

View File

@ -0,0 +1,626 @@
{
"type": "record",
"name": "JobExecutionLineageEvent",
"namespace": "com.linkedin.events.metadata",
"fields": [
{
"name": "auditHeader",
"type": {
"type": "record",
"name": "KafkaAuditHeader",
"namespace": "com.linkedin.events",
"fields": [
{
"name": "time",
"type": "long",
"doc": "The time at which the event was emitted into kafka."
},
{
"name": "server",
"type": "string",
"doc": "The fully qualified name of the host from which the event is being emitted."
},
{
"name": "instance",
"type": [
"null",
"string"
],
"doc": "The instance on the server from which the event is being emitted. e.g. i001"
},
{
"name": "appName",
"type": "string",
"doc": "The name of the application from which the event is being emitted. see go/appname"
},
{
"name": "messageId",
"type": {
"type": "fixed",
"name": "UUID",
"namespace": "com.linkedin.events",
"size": 16
},
"doc": "A unique identifier for the message"
},
{
"name": "auditVersion",
"type": [
"null",
"int"
],
"doc": "The version that is being used for auditing. In version 0, the audit trail buckets events into 10 minute audit windows based on the EventHeader timestamp. In version 1, the audit trail buckets events as follows: if the schema has an outer KafkaAuditHeader, use the outer audit header timestamp for bucketing; else if the EventHeader has an inner KafkaAuditHeader use that inner audit header's timestamp for bucketing",
"default": null
},
{
"name": "fabricUrn",
"type": [
"null",
"string"
],
"doc": "The fabricUrn of the host from which the event is being emitted. Fabric Urn in the format of urn:li:fabric:{fabric_name}. See go/fabric.",
"default": null
}
]
},
"doc": "This header records information about the context of an event as it is emitted into kafka and is intended to be used by the kafka audit application. For more information see go/kafkaauditheader"
},
{
"name": "application",
"type": [
"null",
{
"type": "record",
"name": "Application",
"fields": [
{
"name": "type",
"type": "string",
"doc": "The name of the application, such as Oozie, Azkaban, UC4, etc."
},
{
"name": "name",
"type": "string",
"doc": "The unique name of the application, which may contain appType-deployTier-name-instNum, such as OOZIE-DEV-FOO, AZKABAN-PROD-BAR-02, UC4-PROD-COOL-01, etc."
},
{
"name": "deploymentDetail",
"type": [
"null",
{
"type": "record",
"name": "DeploymentDetail",
"fields": [
{
"name": "deploymentTier",
"type": {
"type": "enum",
"name": "DeploymentTier",
"symbols": [
"PROD",
"CORP",
"GRID",
"PREPROD",
"CANARY",
"DMZ",
"STG",
"UAT",
"UAT1",
"UAT2",
"UAT3",
"QA",
"QA1",
"QA2",
"QA3",
"EI",
"EI1",
"EI2",
"EI3",
"QEI",
"QEI1",
"QEI2",
"QEI3",
"TEST",
"LIT",
"SIT",
"INT",
"DEV",
"LOCAL",
"ARCHIVE",
"DROPBOX",
"SANDBOX",
"POC"
]
},
"doc": "defined in [dataOrigin], such as DEV,TEST,PROD",
"default": "PROD"
},
{
"name": "dataCenter",
"type": [
"null",
"string"
],
"doc": "DC1, DC2, LTX3, LVA4, ...",
"default": null
},
{
"name": "region",
"type": [
"null",
"string"
],
"doc": "Region name if applicable, such as us-central2, eu-west3",
"default": null
},
{
"name": "zone",
"type": [
"null",
"string"
],
"doc": "Zone name or id if applicable, such as asia-east1-b, us-west1-a",
"default": null
},
{
"name": "cluster",
"type": [
"null",
"string"
],
"doc": "Cluster name or a comma-delimited list of Servers",
"default": null
},
{
"name": "container",
"type": [
"null",
"string"
],
"doc": "Container or tenant name",
"default": null
},
{
"name": "enabled",
"type": "boolean",
"doc": "is the dataset instance enabled under this deployment environment",
"default": true
},
{
"name": "additionalDeploymentInfo",
"type": {
"type": "map",
"values": "string"
},
"doc": "Additional deployment info, such as Zookeeper, Connection, Graphite URL, native reference ID or KEY"
}
]
}
],
"doc": "Where the orchestration application is deployed.",
"default": null
},
{
"name": "uri",
"type": [
"null",
"string"
],
"doc": "The application URI/URL if available. This value is stored in cfg_connection table.",
"default": null
}
]
}
],
"doc": "Information about the application in which the job is executed.",
"default": null
},
{
"name": "jobExecution",
"type": {
"type": "record",
"name": "JobExecution",
"fields": [
{
"name": "name",
"type": "string",
"doc": "Job name. If job is in subflow, the parent flow names are included, such as top-level-flow/parent-flow/job-name",
"default": ""
},
{
"name": "node",
"type": [
"null",
"string"
],
"doc": "The virtual node name used to abstract the physical server.",
"default": null
},
{
"name": "server",
"type": "string",
"doc": "The fully-qualified name of the host of this event.",
"default": ""
},
{
"name": "definitionId",
"type": [
"null",
"long"
],
"doc": "Job Definition ID. This is available in some orchestration systems.",
"default": null
},
{
"name": "executionId",
"type": [
"null",
"long"
],
"doc": "Job execution ID as number",
"default": null
},
{
"name": "executionGuid",
"type": [
"null",
"string"
],
"doc": "Job execution GUID.",
"logicalType": "GUID"
},
{
"name": "topLevelFlowName",
"type": "string",
"doc": "Top-level flow name. If job is in subflow, the top-level-flow-name is captured here",
"default": ""
},
{
"name": "flowDefinitionId",
"type": [
"null",
"long"
],
"doc": "Flow Definition ID. This is available in some orchestration systems.",
"default": null
},
{
"name": "flowExecutionId",
"type": [
"null",
"long"
],
"doc": "Flow execution ID as number",
"default": null
},
{
"name": "flowExecutionGuid",
"type": [
"null",
"string"
],
"doc": "Flow execution GUID.",
"logicalType": "GUID"
},
{
"name": "attempt",
"type": [
"null",
"int"
],
"doc": "Retry attempt for a failed job, starting from 0"
},
{
"name": "startTime",
"type": "long",
"doc": "When the job starts.",
"logicalType": "timestamp-millis"
},
{
"name": "suspendTime",
"type": [
"null",
"long"
],
"doc": "When the job gets suspended.",
"default": null,
"logicalType": "timestamp-millis"
},
{
"name": "endTime",
"type": [
"null",
"long"
],
"doc": "When the job finishes.",
"default": null,
"logicalType": "timestamp-millis"
},
{
"name": "state",
"type": [
"string",
"null"
],
"doc": "DISABLED, WAIT_FOR_TIME, WAIT_FOR_DATA, WAIT_FOR_RESOURCE, RUNNING, SUSPENDED, FAILED, SUCCEEDED, NO_DATA, DELETED, SKIPPED",
"default": "SUCCEEDED"
}
]
},
"doc": "Job execution information, such as name, time, state."
},
{
"name": "inputDatasetList",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "LineageDatasetItem",
"fields": [
{
"name": "datasetIdentifier",
"type": {
"type": "record",
"name": "DatasetIdentifier",
"fields": [
{
"name": "dataPlatformUrn",
"type": "string",
"doc": "The platform or type of the metadata object: espresso,kafka,oracle,voldemort,hdfs,hive,dalids,teradata,... for example, urn:li:dataPlatform:espresso, urn:li:dataPlatform:dalids"
},
{
"name": "nativeName",
"type": "string",
"doc": "The native name: <db>.<table>, /dir/subdir/<name>, or <name>"
},
{
"name": "dataOrigin",
"type": "DeploymentTier",
"doc": "Origin/Source Tier where the record is generated? This can be different from Deployment. For example, PROD data can be copied to a TEST server, then DataOrigin=PROD while the dataset instance belongs to TEST",
"default": "PROD"
}
]
},
"doc": "dataset name, platform and origin"
},
{
"name": "datasetUrn",
"type": [
"null",
"string"
],
"doc": "dataset URN used in the job"
},
{
"name": "datasetProperties",
"type": [
"null",
{
"type": "record",
"name": "DatasetProperty",
"fields": [
{
"name": "changeAuditStamp",
"type": {
"type": "record",
"name": "ChangeAuditStamp",
"fields": [
{
"name": "actorUrn",
"type": "string",
"doc": "urn:li:corpuser:jsmith, urn:li:team:xyz, urn:li:service:money"
},
{
"name": "type",
"type": "string",
"doc": "CREATE, UPDATE, DELETE"
},
{
"name": "time",
"type": "long",
"doc": "Epoch",
"logicalType": "timestamp-millis"
},
{
"name": "note",
"type": "string",
"doc": "Extra detail about the changes"
}
]
}
},
{
"name": "nativeType",
"type": {
"type": "enum",
"name": "PlatformNativeType",
"symbols": [
"TABLE",
"VIEW",
"DIRECTORY",
"FILE",
"INDEX",
"STREAM",
"BLOB",
"FUNCTION",
"OTHER"
]
},
"doc": "The native type about how the dataset is stored in the platform"
},
{
"name": "uri",
"type": [
"string",
"null"
],
"doc": "The abstracted such as hdfs:///data/tracking/PageViewEvent, file:///dir/file_name. This is often used in codes and scripts."
},
{
"name": "caseSensitivity",
"type": [
"null",
{
"type": "record",
"name": "CaseSensitivityInfo",
"fields": [
{
"name": "datasetName",
"type": "boolean",
"doc": "Is native object name CS?",
"default": true
},
{
"name": "fieldName",
"type": "boolean",
"doc": "Is field name CS?",
"default": true
},
{
"name": "dataContent",
"type": "boolean",
"doc": "Is data content CS?",
"default": true
}
]
}
]
}
]
}
],
"doc": "Basic properties of a dataset"
},
{
"name": "partition",
"type": [
"null",
{
"type": "record",
"name": "LineagePartitionRange",
"fields": [
{
"name": "partitionType",
"type": "string",
"doc": "snapshot, daily, houlry, hash"
},
{
"name": "minPartitionValue",
"type": "string"
},
{
"name": "maxPartitionValue",
"type": "string"
},
{
"name": "listPartitionValue",
"type": "string"
}
]
}
],
"doc": "Parititon range "
},
{
"name": "qualifiedValues",
"type": {
"type": "array",
"items": "string"
},
"doc": "list of IDs or Values used as filter. e.g. the input is filtered by country code 'US' and 'UK', the output is filtered by ID 23 and 108",
"default": []
},
{
"name": "detailLineageMap",
"type": [
"null",
{
"type": "record",
"name": "LineageDatasetDetailMap",
"fields": [
{
"name": "mapDirectionType",
"type": {
"type": "enum",
"name": "LineageMapDirectionType",
"symbols": [
"OUTPUT_TO_INPUT",
"INPUT_TO_OUTPUT"
]
},
"default": "OUTPUT_TO_INPUT"
},
{
"name": "fieldLineage",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "FieldLineage",
"fields": [
{
"name": "fieldPath",
"type": "string",
"doc": "Field path which the lineage is mapped from. * means all fields",
"default": "*"
},
{
"name": "mappedToFields",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "MappedToFields",
"fields": [
{
"name": "mappedToDataset",
"type": "DatasetIdentifier"
},
{
"name": "fieldPaths",
"type": {
"type": "array",
"items": "string"
},
"default": [
"*"
]
}
]
}
},
"doc": "1 field is mapped to multiple datasets and fields in those datasets"
}
]
}
},
"doc": "Field-level lineage mapping"
}
]
}
],
"doc": "Used to describe the detailed mapping between dataset=>dataset and field=>field",
"default": null
},
{
"name": "operation",
"type": "string",
"doc": "READ, INSERT, APPEND, UPDATE, MERGE, COMPACT, DELETE, TRUNCATE, ..."
}
]
}
},
"doc": "List of input datasets",
"default": []
},
{
"name": "outputDatasetList",
"type": {
"type": "array",
"items": "LineageDatasetItem"
},
"doc": "List of output datasets",
"default": []
}
]
}

View File

@ -45,9 +45,9 @@ public class AzLineageExtractor {
List<LineageRecord> oneAzkabanJobLineage = new ArrayList<>();
// azkaban job name should have subflow name append in front
String flowSequence[] = message.azkabanJobExecution.getFlowPath().split(":")[1].split("/");
String[] flowSequence = message.azkabanJobExecution.getFlowPath().split(":")[1].split("/");
String jobPrefix = "";
for (int i = 1; i < flowSequence.length; i ++) {
for (int i = 1; i < flowSequence.length; i++) {
jobPrefix += flowSequence[i] + ":";
}
//String log = asc.getExecLog(azJobExec.execId, azJobExec.jobName);
@ -56,7 +56,7 @@ public class AzLineageExtractor {
Set<String> hadoopJobIds = AzLogParser.getHadoopJobIdFromLog(log);
for (String hadoopJobId : hadoopJobIds) {
logger.debug("get hadoop job :{} from azkaban job : {}" +hadoopJobId, message.azkabanJobExecution.toString());
logger.debug("get hadoop job :{} from azkaban job : {}" + hadoopJobId, message.azkabanJobExecution.toString());
// TODO persist this mapping?
String confJson = message.hnne.getConfFromHadoop(hadoopJobId);
AzJsonAnalyzer ja = new AzJsonAnalyzer(confJson, message.azkabanJobExecution,

View File

@ -60,13 +60,11 @@ public class HadoopJobHistoryNodeExtractor {
String CURRENT_DIR = System.getProperty("user.dir");
String WH_HOME = System.getenv("WH_HOME");
String USER_HOME = System.getenv("HOME") + "/.kerberos";
String ETC = "/etc";
String TMP = "/var/tmp" + "/.kerberos";
String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME, TMP};
String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME};
for (String possition : allPositions) {
String gssFileName = possition + "/gss-jaas.conf";
for (String position : allPositions) {
String gssFileName = position + "/gss-jaas.conf";
File gssFile = new File(gssFileName);
if (gssFile.exists()) {
logger.debug("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath());
@ -76,8 +74,8 @@ public class HadoopJobHistoryNodeExtractor {
logger.debug("can't find here: {}", gssFile.getAbsolutePath());
}
}
for (String possition : allPositions) {
String krb5FileName = possition + "/krb5.conf";
for (String position : allPositions) {
String krb5FileName = position + "/krb5.conf";
File krb5File = new File(krb5FileName);
if (krb5File.exists()) {
logger.debug("find krb5.conf file in : {}", krb5File.getAbsolutePath());

View File

@ -34,7 +34,6 @@ public class LineageCombiner {
// key is the operation + abstract name, value is the record
private Map<String, LineageRecord> _lineageRecordMap;
public LineageCombiner(Connection connection) {
_lineageRecordMap = new HashMap<>();
}
@ -54,6 +53,30 @@ public class LineageCombiner {
}
}
/**
* Similar to addAll but not update partition info if exist
* @param rawLineageRecords
*/
public void addAllWoPartitionUpdate(List<LineageRecord> rawLineageRecords) {
for (LineageRecord lr : rawLineageRecords) {
DatasetPath datasetPath = PathAnalyzer.analyze(lr.getFullObjectName());
if (datasetPath != null) {
lr.setAbstractObjectName(datasetPath.abstractPath);
lr.setLayoutId(datasetPath.layoutId);
if (lr.getPartitionStart() == null) {
lr.setPartitionStart(datasetPath.partitionStart);
}
if (lr.getPartitionEnd() == null) {
lr.setPartitionEnd(datasetPath.partitionEnd);
}
if (lr.getPartitionType() == null) {
lr.setPartitionType(datasetPath.partitionType);
}
addToMap(lr);
}
}
}
private void addToMap(LineageRecord lr) {
String lineageRecordKey = lr.getLineageRecordKey();
if (_lineageRecordMap.containsKey(lineageRecordKey)) {
@ -69,7 +92,7 @@ public class LineageCombiner {
* @return A list of {@code LineageRecord} after combined.
*/
public List<LineageRecord> getCombinedLineage() {
ArrayList<LineageRecord> allLineage = new ArrayList(_lineageRecordMap.values());
List<LineageRecord> allLineage = new ArrayList<>(_lineageRecordMap.values());
Collections.sort(allLineage);
for (int i = 0; i < allLineage.size(); i++) {

View File

@ -29,6 +29,7 @@ import wherehows.common.utils.StringUtil;
* Created by zechen on 9/16/15.
*/
public abstract class AbstractRecord implements Record {
@JsonIgnore
char SEPR = 0x001A;
@Override

View File

@ -0,0 +1,65 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
public class ApplicationRecord extends AbstractRecord {
String type;
String name;
DeploymentRecord deploymentDetail; // not using the dataset info in DeploymentRecord
String uri;
@Override
public List<Object> fillAllFields() {
return null;
}
public ApplicationRecord() {
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public DeploymentRecord getDeploymentDetail() {
return deploymentDetail;
}
public void setDeploymentDetail(DeploymentRecord deploymentDetail) {
this.deploymentDetail = deploymentDetail;
}
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
}

View File

@ -0,0 +1,57 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
public class ChangeAuditStamp {
String actorUrn;
String type;
Long time;
String note;
public ChangeAuditStamp() {
}
public String getActorUrn() {
return actorUrn;
}
public void setActorUrn(String actorUrn) {
this.actorUrn = actorUrn;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public String getNote() {
return note;
}
public void setNote(String note) {
this.note = note;
}
}

View File

@ -0,0 +1,56 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
public class DatasetIdentifier extends AbstractRecord {
String dataPlatformUrn;
String nativeName;
String dataOrigin;
@Override
public List<Object> fillAllFields() {
return null;
}
public DatasetIdentifier() {
}
public String getDataPlatformUrn() {
return dataPlatformUrn;
}
public void setDataPlatformUrn(String dataPlatformUrn) {
this.dataPlatformUrn = dataPlatformUrn;
}
public String getNativeName() {
return nativeName;
}
public void setNativeName(String nativeName) {
this.nativeName = nativeName;
}
public String getDataOrigin() {
return dataOrigin;
}
public void setDataOrigin(String dataOrigin) {
this.dataOrigin = dataOrigin;
}
}

View File

@ -0,0 +1,65 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
public class DatasetPartitionRangeRecord extends AbstractRecord {
String partitionType;
String minPartitionValue;
String maxPartitionValue;
String listPartitionValue;
@Override
public List<Object> fillAllFields() {
return null;
}
public DatasetPartitionRangeRecord() {
}
public String getPartitionType() {
return partitionType;
}
public void setPartitionType(String partitionType) {
this.partitionType = partitionType;
}
public String getMinPartitionValue() {
return minPartitionValue;
}
public void setMinPartitionValue(String minPartitionValue) {
this.minPartitionValue = minPartitionValue;
}
public String getMaxPartitionValue() {
return maxPartitionValue;
}
public void setMaxPartitionValue(String maxPartitionValue) {
this.maxPartitionValue = maxPartitionValue;
}
public String getListPartitionValue() {
return listPartitionValue;
}
public void setListPartitionValue(String listPartitionValue) {
this.listPartitionValue = listPartitionValue;
}
}

View File

@ -0,0 +1,65 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
public class DatasetPropertyRecord extends AbstractRecord {
ChangeAuditStamp changeAuditStamp;
String nativeType;
String uri;
DatasetCaseSensitiveRecord caseSensitivity;
@Override
public List<Object> fillAllFields() {
return null;
}
public DatasetPropertyRecord() {
}
public ChangeAuditStamp getChangeAuditStamp() {
return changeAuditStamp;
}
public void setChangeAuditStamp(ChangeAuditStamp changeAuditStamp) {
this.changeAuditStamp = changeAuditStamp;
}
public String getNativeType() {
return nativeType;
}
public void setNativeType(String nativeType) {
this.nativeType = nativeType;
}
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
public DatasetCaseSensitiveRecord getCaseSensitivity() {
return caseSensitivity;
}
public void setCaseSensitivity(DatasetCaseSensitiveRecord caseSensitivity) {
this.caseSensitivity = caseSensitivity;
}
}

View File

@ -17,7 +17,7 @@ import java.util.List;
import java.util.Map;
public class DatasetDeploymentRecord extends AbstractRecord {
public class DeploymentRecord extends AbstractRecord {
Integer datasetId;
String datasetUrn;
@ -42,7 +42,7 @@ public class DatasetDeploymentRecord extends AbstractRecord {
return null;
}
public DatasetDeploymentRecord() {
public DeploymentRecord() {
}
public Integer getDatasetId() {

View File

@ -0,0 +1,202 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
public class JobExecutionRecord extends AbstractRecord {
Integer appId;
String name;
String node; //
String server; //
Long definitionId;
Long executionId;
String executionGuid;
String topLevelFlowName; //
Long flowDefinitionId;
Long flowExecutionId;
String flowExecutionGuid;
Integer attempt;
Long startTime;
Long suspendTime;
Long endTime;
String state;
Long logTime;
@Override
public String[] getDbColumnNames() {
return new String[]{"app_id", "job_name", "node", "server", "job_id", "job_exec_id", "job_exec_uuid", "flow_name",
"flow_id", "flow_exec_id", "flow_exec_uuid", "attempt_id", "start_time", "end_time", "job_exec_status",
"created_time"};
}
@JsonIgnore
public static String[] dbColumns() {
return new String[]{"app_id", "job_name", "job_id", "job_exec_id", "job_exec_uuid", "flow_id", "flow_exec_id",
"attempt_id", "start_time", "end_time", "job_exec_status", "created_time"};
}
@JsonIgnore
public Object[] dbValues() {
return new Object[]{appId, name, definitionId, executionId, executionGuid, flowDefinitionId, flowExecutionId,
attempt, (int) (startTime / 1000), (int) (endTime / 1000), state, (int) (logTime / 1000)};
}
@Override
public List<Object> fillAllFields() {
return null;
}
public JobExecutionRecord() {
}
public Integer getAppId() {
return appId;
}
public void setAppId(Integer appId) {
this.appId = appId;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getNode() {
return node;
}
public void setNode(String node) {
this.node = node;
}
public String getServer() {
return server;
}
public void setServer(String server) {
this.server = server;
}
public Long getDefinitionId() {
return definitionId;
}
public void setDefinitionId(Long definitionId) {
this.definitionId = definitionId;
}
public Long getExecutionId() {
return executionId;
}
public void setExecutionId(Long executionId) {
this.executionId = executionId;
}
public String getExecutionGuid() {
return executionGuid;
}
public void setExecutionGuid(String executionGuid) {
this.executionGuid = executionGuid;
}
public String getTopLevelFlowName() {
return topLevelFlowName;
}
public void setTopLevelFlowName(String topLevelFlowName) {
this.topLevelFlowName = topLevelFlowName;
}
public Long getFlowDefinitionId() {
return flowDefinitionId;
}
public void setFlowDefinitionId(Long flowDefinitionId) {
this.flowDefinitionId = flowDefinitionId;
}
public Long getFlowExecutionId() {
return flowExecutionId;
}
public void setFlowExecutionId(Long flowExecutionId) {
this.flowExecutionId = flowExecutionId;
}
public String getFlowExecutionGuid() {
return flowExecutionGuid;
}
public void setFlowExecutionGuid(String flowExecutionGuid) {
this.flowExecutionGuid = flowExecutionGuid;
}
public Integer getAttempt() {
return attempt;
}
public void setAttempt(Integer attempt) {
this.attempt = attempt;
}
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
public Long getSuspendTime() {
return suspendTime;
}
public void setSuspendTime(Long suspendTime) {
this.suspendTime = suspendTime;
}
public Long getEndTime() {
return endTime;
}
public void setEndTime(Long endTime) {
this.endTime = endTime;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public Long getLogTime() {
return logTime;
}
public void setLogTime(Long logTime) {
this.logTime = logTime;
}
}

View File

@ -0,0 +1,47 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
public class LineageDatasetMapRecord extends AbstractRecord {
String mapDirectionType;
List<LineageFieldRecord> fieldLineage;
@Override
public List<Object> fillAllFields() {
return null;
}
public LineageDatasetMapRecord() {
}
public String getMapDirectionType() {
return mapDirectionType;
}
public void setMapDirectionType(String mapDirectionType) {
this.mapDirectionType = mapDirectionType;
}
public List<LineageFieldRecord> getFieldLineage() {
return fieldLineage;
}
public void setFieldLineage(List<LineageFieldRecord> fieldLineage) {
this.fieldLineage = fieldLineage;
}
}

View File

@ -0,0 +1,111 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
import java.util.Map;
public class LineageDatasetRecord extends AbstractRecord {
Integer datasetId;
DatasetIdentifier datasetIdentifier;
String datasetUrn;
DatasetPropertyRecord datasetProperties;
DatasetPartitionRangeRecord partition;
String[] qualifiedValues;
LineageDatasetMapRecord detailLineageMap;
String operation;
String sourceTargetType;
@Override
public List<Object> fillAllFields() {
return null;
}
public LineageDatasetRecord() {
}
public Integer getDatasetId() {
return datasetId;
}
public void setDatasetId(Integer datasetId) {
this.datasetId = datasetId;
}
public DatasetIdentifier getDatasetIdentifier() {
return datasetIdentifier;
}
public void setDatasetIdentifier(DatasetIdentifier datasetIdentifier) {
this.datasetIdentifier = datasetIdentifier;
}
public String getDatasetUrn() {
return datasetUrn;
}
public void setDatasetUrn(String datasetUrn) {
this.datasetUrn = datasetUrn;
}
public DatasetPropertyRecord getDatasetProperties() {
return datasetProperties;
}
public void setDatasetProperties(DatasetPropertyRecord datasetProperties) {
this.datasetProperties = datasetProperties;
}
public DatasetPartitionRangeRecord getPartition() {
return partition;
}
public void setPartition(DatasetPartitionRangeRecord partition) {
this.partition = partition;
}
public String[] getQualifiedValues() {
return qualifiedValues;
}
public void setQualifiedValues(String[] qualifiedValues) {
this.qualifiedValues = qualifiedValues;
}
public LineageDatasetMapRecord getDetailLineageMap() {
return detailLineageMap;
}
public void setDetailLineageMap(LineageDatasetMapRecord detailLineageMap) {
this.detailLineageMap = detailLineageMap;
}
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
public String getSourceTargetType() {
return sourceTargetType;
}
public void setSourceTargetType(String sourceTargetType) {
this.sourceTargetType = sourceTargetType;
}
}

View File

@ -0,0 +1,47 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
public class LineageFieldMapRecord extends AbstractRecord {
DatasetIdentifier mappedToDataset;
List<String> fieldPaths;
@Override
public List<Object> fillAllFields() {
return null;
}
public LineageFieldMapRecord() {
}
public DatasetIdentifier getMappedToDataset() {
return mappedToDataset;
}
public void setMappedToDataset(DatasetIdentifier mappedToDataset) {
this.mappedToDataset = mappedToDataset;
}
public List<String> getFieldPaths() {
return fieldPaths;
}
public void setFieldPaths(List<String> fieldPaths) {
this.fieldPaths = fieldPaths;
}
}

View File

@ -0,0 +1,47 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.List;
public class LineageFieldRecord extends AbstractRecord {
String fieldPath;
List<LineageFieldMapRecord> mappedToFields;
@Override
public List<Object> fillAllFields() {
return null;
}
public LineageFieldRecord() {
}
public String getFieldPath() {
return fieldPath;
}
public void setFieldPath(String fieldPath) {
this.fieldPath = fieldPath;
}
public List<LineageFieldMapRecord> getMappedToFields() {
return mappedToFields;
}
public void setMappedToFields(List<LineageFieldMapRecord> mappedToFields) {
this.mappedToFields = mappedToFields;
}
}

View File

@ -123,7 +123,7 @@ public class LineageRecord implements Record, Comparable<LineageRecord> {
allFields.add(flowPath);
// add the created_date and wh_etl_exec_id
allFields.add(System.currentTimeMillis()/1000);
allFields.add(System.currentTimeMillis() / 1000);
allFields.add(null);
StringBuilder sb = new StringBuilder();
for (Object o : allFields) {
@ -134,6 +134,19 @@ public class LineageRecord implements Record, Comparable<LineageRecord> {
return sb.toString();
}
public static String[] dbColumns() {
return new String[]{"app_id", "flow_exec_id", "job_exec_id", "job_exec_uuid", "flow_path", "job_name",
"job_start_unixtime", "job_finished_unixtime", "abstracted_object_name", "full_object_name", "partition_start",
"partition_end", "partition_type", "layout_id", "storage_type", "source_target_type",
"srl_no", "source_srl_no", "operation", "created_date"};
}
public Object[] dbValues() {
return new Object[]{appId, flowExecId, jobExecId, jobExecUUID, flowPath, jobName, jobStartTime, jobEndTime,
abstractObjectName, fullObjectName, partitionStart, partitionEnd, partitionType, layoutId, storageType,
sourceTargetType, srlNo, relatedSrlNo, operation, System.currentTimeMillis() / 1000};
}
/**
* After analyze, the path need to update to abstract format
* @param datasetPath
@ -214,14 +227,26 @@ public class LineageRecord implements Record, Comparable<LineageRecord> {
this.fullObjectName = fullObjectName;
}
public String getPartitionStart() {
return partitionStart;
}
public void setPartitionStart(String partitionStart) {
this.partitionStart = partitionStart;
}
public String getPartitionEnd() {
return partitionEnd;
}
public void setPartitionEnd(String partitionEnd) {
this.partitionEnd = partitionEnd;
}
public String getPartitionType() {
return partitionType;
}
public void setPartitionType(String partitionType) {
this.partitionType = partitionType;
}

View File

@ -13,6 +13,9 @@
*/
package wherehows.common.schemas;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* The {@code Record} define the data interface of metadata ETL.
* <p>
@ -27,10 +30,12 @@ public interface Record {
/**
* Convert to csv string that will write to csv file
*/
@JsonIgnore
public String toCsvString();
/**
* Convert to database value that will append into sql
*/
@JsonIgnore
public String toDatabaseValue();
}