diff --git a/backend-service/app/models/daos/LineageDao.java b/backend-service/app/models/daos/LineageDao.java index c242b56923..d6c58f0975 100644 --- a/backend-service/app/models/daos/LineageDao.java +++ b/backend-service/app/models/daos/LineageDao.java @@ -34,10 +34,8 @@ import wherehows.common.writers.DatabaseWriter; */ public class LineageDao { public static final String FIND_JOBS_BY_DATASET = - " select distinct ca.short_connection_string, f.flow_group, f.flow_name, jedl.job_name " + " select distinct ca.short_connection_string, jedl.job_name, jedl.flow_path " + " from job_execution_data_lineage jedl " - + " join flow_execution fe on jedl.app_id = fe.app_id and jedl.flow_exec_id = fe.flow_exec_id " - + " join flow f on fe.app_id = f.app_id and fe.flow_id = f.flow_id " + " join cfg_application ca on ca.app_id = jedl.app_id " + " join cfg_database cd on cd.db_id = jedl.db_id " + " where source_target_type = :source_target_type " diff --git a/backend-service/deploy b/backend-service/deploy index 5f7ac3ce68..a411bb2237 100755 --- a/backend-service/deploy +++ b/backend-service/deploy @@ -4,16 +4,14 @@ gradle dist; rm -r target/universal/backend-service-1.0-SNAPSHOT; unzip target/universal/backend-service-1.0-SNAPSHOT.zip -d target/universal/; -scp target/universal/backend-service-1.0-SNAPSHOT/lib/metadata-etl-1.0.jar dev_svc@lva1-rpt14:~/backendServer/lib/; -scp target/universal/backend-service-1.0-SNAPSHOT/lib/wherehows-common-1.0.jar dev_svc@lva1-rpt14:~/backendServer/lib/; +scp target/universal/backend-service-1.0-SNAPSHOT/lib/metadata-etl-1.0.jar ${TARGET_SERVER}:~/backendServer/lib/; -scp target/universal/backend-service-1.0-SNAPSHOT/lib/default.backend-service-1.0-SNAPSHOT.jar dev_svc@lva1-rpt14:~/backendServer/lib/; +scp target/universal/backend-service-1.0-SNAPSHOT/lib/default.backend-service-1.0-SNAPSHOT.jar ${TARGET_SERVER}:~/backendServer/lib/; -scp target/universal/backend-service-1.0-SNAPSHOT/lib/schemaFetch.jar dev_svc@lva1-rpt14:~/backendServer/lib/; +scp target/universal/backend-service-1.0-SNAPSHOT/lib/schemaFetch.jar ${TARGET_SERVER}:~/backendServer/lib/; + +scp target/universal/backend-service-1.0-SNAPSHOT/lib/wherehows-common-1.0.jar ${TARGET_SERVER}:~/backendServer/lib/; + +scp target/universal/backend-service-1.0-SNAPSHOT/bin/* ${TARGET_SERVER}:~/backendServer/bin/; - -scp target/universal/backend-service-1.0-SNAPSHOT/bin/* dev_svc@lva1-rpt14:~/backendServer/bin/; - - -scp target/universal/backend-service-1.0-SNAPSHOT/lib/metadata-etl-1.0.jar cloudera@172.21.98.211:~/wherehows/backend-service-1.0-SNAPSHOT/lib/ diff --git a/data-model/DDL/ETL_DDL/dataset_metadata.sql b/data-model/DDL/ETL_DDL/dataset_metadata.sql index 36c314f3ee..4015c1ab8d 100644 --- a/data-model/DDL/ETL_DDL/dataset_metadata.sql +++ b/data-model/DDL/ETL_DDL/dataset_metadata.sql @@ -165,21 +165,30 @@ CREATE TABLE `dict_field_detail` ( `field_label` VARCHAR(100) DEFAULT NULL, `data_type` VARCHAR(50) NOT NULL, `data_size` INT(10) UNSIGNED DEFAULT NULL, - `data_precision` TINYINT(4) DEFAULT NULL, - `data_fraction` TINYINT(4) DEFAULT NULL, + `data_precision` TINYINT(4) DEFAULT NULL + COMMENT 'only in decimal type', + `data_fraction` TINYINT(4) DEFAULT NULL + COMMENT 'only in decimal type', `default_comment_id` INT(11) UNSIGNED DEFAULT NULL COMMENT 'a list of comment_id', `comment_ids` VARCHAR(500) DEFAULT NULL, `is_nullable` CHAR(1) DEFAULT NULL, - `is_indexed` CHAR(1) DEFAULT NULL, - `is_partitioned` CHAR(1) DEFAULT NULL, - `is_distributed` TINYINT(4) DEFAULT NULL, + `is_indexed` CHAR(1) DEFAULT NULL + COMMENT 'only in RDBMS', + `is_partitioned` CHAR(1) DEFAULT NULL + COMMENT 'only in RDBMS', + `is_distributed` TINYINT(4) DEFAULT NULL + COMMENT 'only in RDBMS', `default_value` VARCHAR(200) DEFAULT NULL, `namespace` VARCHAR(200) DEFAULT NULL, - `java_data_type` VARCHAR(50) DEFAULT NULL, - `jdbc_data_type` VARCHAR(50) DEFAULT NULL, - `pig_data_type` VARCHAR(50) DEFAULT NULL, - `hcatalog_data_type` VARCHAR(50) DEFAULT NULL, + `java_data_type` VARCHAR(50) DEFAULT NULL + COMMENT 'correspond type in java', + `jdbc_data_type` VARCHAR(50) DEFAULT NULL + COMMENT 'correspond type in jdbc', + `pig_data_type` VARCHAR(50) DEFAULT NULL + COMMENT 'correspond type in pig', + `hcatalog_data_type` VARCHAR(50) DEFAULT NULL + COMMENT 'correspond type in hcatalog', `modified` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`field_id`), KEY `idx_dict_field__datasetid_fieldname` (`dataset_id`, `field_name`) USING BTREE, diff --git a/metadata-etl/src/main/java/metadata/etl/EtlJob.java b/metadata-etl/src/main/java/metadata/etl/EtlJob.java index b5debe4dd5..1e625b2739 100644 --- a/metadata-etl/src/main/java/metadata/etl/EtlJob.java +++ b/metadata-etl/src/main/java/metadata/etl/EtlJob.java @@ -35,9 +35,8 @@ import java.util.Properties; * Created by zsun on 7/29/15. */ public abstract class EtlJob { - public final static String CONFIG_FILE = "application.properties"; + private final static String CONFIG_FILE = "application.properties"; public PythonInterpreter interpreter; - public PySystemState sys; public Properties prop; public ClassLoader classLoader = getClass().getClassLoader(); protected final Logger logger = LoggerFactory.getLogger(getClass()); @@ -61,8 +60,8 @@ public abstract class EtlJob { */ @Deprecated public EtlJob(Integer appId, Integer dbId, long whExecId, String configFile) { - configFromFile(appId, dbId, whExecId, configFile); - addJythonToPath(); + PySystemState sys = configFromFile(appId, dbId, whExecId, configFile); + addJythonToPath(sys); interpreter = new PythonInterpreter(null, sys); } @@ -74,12 +73,12 @@ public abstract class EtlJob { * @param properties */ public EtlJob(Integer appId, Integer dbId, Long whExecId, Properties properties) { - configFromProperties(appId, dbId, whExecId, properties); - addJythonToPath(); + PySystemState sys = configFromProperties(appId, dbId, whExecId, properties); + addJythonToPath(sys); interpreter = new PythonInterpreter(null, sys); } - private void addJythonToPath() { + private void addJythonToPath(PySystemState pySystemState) { URL url = classLoader.getResource("jython"); if (url != null) { File file = new File(url.getFile()); @@ -87,12 +86,12 @@ public abstract class EtlJob { if (path.startsWith("file:")) { path = path.substring(5); } - sys.path.append(new PyString(path.replace("!", ""))); + pySystemState.path.append(new PyString(path.replace("!", ""))); } } @Deprecated - private void configFromFile(Integer appId, Integer dbId, long whExecId, String configFile) { + private PySystemState configFromFile(Integer appId, Integer dbId, long whExecId, String configFile) { prop = new Properties(); if (appId != null) { @@ -117,8 +116,9 @@ public abstract class EtlJob { config.put(new PyString(key), new PyString(value)); } - sys = new PySystemState(); + PySystemState sys = new PySystemState(); sys.argv.append(config); + return sys; } /** @@ -126,8 +126,9 @@ public abstract class EtlJob { * @param appId * @param whExecId * @param properties + * @return PySystemState A PySystemState that contain all the arguments. */ - private void configFromProperties(Integer appId, Integer dbId, Long whExecId, Properties properties) { + private PySystemState configFromProperties(Integer appId, Integer dbId, Long whExecId, Properties properties) { this.prop = properties; if (appId != null) prop.setProperty(Constant.APP_ID_KEY, String.valueOf(appId)); @@ -139,8 +140,9 @@ public abstract class EtlJob { String value = prop.getProperty(key); config.put(new PyString(key), new PyString(value)); } - sys = new PySystemState(); + PySystemState sys = new PySystemState(); sys.argv.append(config); + return sys; } /** * Extract data from source diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java index e8156054d7..e10692155a 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzJobChecker.java @@ -52,7 +52,7 @@ public class AzJobChecker { /** * Default 10 minutes - * @return + * @return A list of recent finished AzkabanJobExecRecord * @throws IOException * @throws SQLException */ @@ -72,23 +72,32 @@ public class AzJobChecker { public List getRecentFinishedJobFromFlow(int timeFrameMinutes) throws IOException, SQLException { long currentTimeStamp = System.currentTimeMillis(); - long oneHourAgo = currentTimeStamp - 1000 * 60 * timeFrameMinutes; - return getRecentFinishedJobFromFlow(oneHourAgo); + long beginTimeStamp = currentTimeStamp - 1000 * 60 * timeFrameMinutes; + return getRecentFinishedJobFromFlow(beginTimeStamp, currentTimeStamp); + } + + public List getRecentFinishedJobFromFlow(int timeFrameMinutes, long endTimeStamp) + throws IOException, SQLException { + long beginTimeStamp = endTimeStamp - 60 * timeFrameMinutes; + return getRecentFinishedJobFromFlow(beginTimeStamp * 1000, endTimeStamp * 1000); // convert to milli seconds } /** * Read the blob from "flow_data", do a topological sort on the nodes. Give them the sort id. - * @param timestamp the beginning timestamp + * @param startTimeStamp the begin timestamp + * @param endTimeStamp the end timestamp * @return */ - public List getRecentFinishedJobFromFlow(long timestamp) + public List getRecentFinishedJobFromFlow(long startTimeStamp, long endTimeStamp) throws SQLException, IOException { - logger.info("Get the jobs from time : {}", timestamp); + logger.info("Get the jobs from time : {} to time : {}", startTimeStamp, endTimeStamp); List results = new ArrayList<>(); Statement stmt = conn.createStatement(); final String cmd = - "select exec_id, flow_id, status, submit_user, flow_data from execution_flows where end_time > " + timestamp; + "select exec_id, flow_id, status, submit_user, flow_data from execution_flows where end_time > " + startTimeStamp + + " and end_time < " + endTimeStamp ; + logger.info("Get recent flow sql : " + cmd); final ResultSet rs = stmt.executeQuery(cmd); // this sql take 3 second to execute while (rs.next()) { @@ -101,27 +110,53 @@ public class AzJobChecker { return results; } + /** + * Parse the json of flow_data field from execution_flows. Use recursion to handle the nested case. + * @param flowJson + * @param flowExecId + * @return + * @throws IOException + */ public List parseJson(String flowJson, long flowExecId) throws IOException { - List results = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); JsonNode wholeFlow = mapper.readTree(flowJson); JsonNode allJobs = wholeFlow.get("nodes"); String flowPath = wholeFlow.get("projectName").asText() + ":" + wholeFlow.get("flowId").asText(); - for (JsonNode oneJob : allJobs) { - String jobName = oneJob.get("id").asText(); - long startTime = oneJob.get("startTime").asLong(); - long endTime = oneJob.get("endTime").asLong(); - String status = oneJob.get("status").asText(); - AzkabanJobExecRecord azkabanJobExecRecord = - new AzkabanJobExecRecord(appId, jobName, flowExecId, (int) (startTime / 1000), (int) (endTime / 1000), status, - flowPath); - results.add(azkabanJobExecRecord); - } + List results = parseJsonHelper(allJobs, flowExecId, flowPath); AzkabanJobExecUtil.sortAndSet(results); return results; } + /** + * Recursively process the execution info to get {@AzkabanJobExecRecord} + * @param allJobs JsonNode in "nodes" field + * @param flowExecId + * @param flowPath Format : project_name:first_level_flow/sub_flow/sub_flow + * @return + */ + private List parseJsonHelper(JsonNode allJobs, long flowExecId, String flowPath) { + List results = new ArrayList<>(); + for (JsonNode oneJob : allJobs) { + if (oneJob.has("nodes")) { // is a subflow + String subFlowName = oneJob.get("id").asText(); + flowPath += "/" + subFlowName; + results.addAll(parseJsonHelper(oneJob.get("nodes"), flowExecId, flowPath)); + } else { + String jobName = oneJob.get("id").asText(); + long startTime = oneJob.get("startTime").asLong(); + long endTime = oneJob.get("endTime").asLong(); + String status = oneJob.get("status").asText(); + AzkabanJobExecRecord azkabanJobExecRecord = + new AzkabanJobExecRecord(appId, jobName, flowExecId, (int) (startTime / 1000), (int) (endTime / 1000), + status, flowPath); + results.add(azkabanJobExecRecord); + } + } + return results; + } + public void close() throws SQLException { conn.close(); diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java index e7c78083fe..85f71b2129 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java @@ -43,9 +43,16 @@ public class AzLineageExtractor { throws Exception { List oneAzkabanJobLineage = new ArrayList<>(); + + // azkaban job name should have subflow name append in front + String flowSequence[] = message.azkabanJobExecution.getFlowPath().split(":")[1].split("/"); + String jobPrefix = ""; + for (int i = 1; i < flowSequence.length; i ++) { + jobPrefix += flowSequence[i] + ":"; + } //String log = asc.getExecLog(azJobExec.execId, azJobExec.jobName); String log = - message.adc.getExecLog(message.azkabanJobExecution.getFlowExecId(), message.azkabanJobExecution.getJobName()); + message.adc.getExecLog(message.azkabanJobExecution.getFlowExecId(), jobPrefix + message.azkabanJobExecution.getJobName()); Set hadoopJobIds = AzLogParser.getHadoopJobIdFromLog(log); for (String hadoopJobId : hadoopJobIds) { @@ -61,8 +68,8 @@ public class AzLineageExtractor { // normalize and combine the path LineageCombiner lineageCombiner = new LineageCombiner(message.connection); lineageCombiner.addAll(oneAzkabanJobLineage); - - List lineageFromLog = AzLogParser.getLineageFromLog(log, message.azkabanJobExecution); + Integer defaultDatabaseId = Integer.valueOf(message.prop.getProperty(Constant.AZ_DEFAULT_HADOOP_DATABASE_ID_KEY)); + List lineageFromLog = AzLogParser.getLineageFromLog(log, message.azkabanJobExecution, defaultDatabaseId); lineageCombiner.addAll(lineageFromLog); return lineageCombiner.getCombinedLineage(); diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java index 18fd4c663a..6e8303f867 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java @@ -60,17 +60,22 @@ public class AzLineageExtractorMaster { run(10); } + public void run(int timeFrame) + throws Exception { + run(timeFrame, System.currentTimeMillis()); + } + /** * Entry point. * All recent finished azkaban jobs' lineage. Will write to database stagging table * @param timeFrame in minutes * @throws Exception */ - public void run(int timeFrame) + public void run(int timeFrame, long endTimeStamp) throws Exception { // get recent finished job AzJobChecker azJobChecker = new AzJobChecker(prop); - List jobExecList = azJobChecker.getRecentFinishedJobFromFlow(timeFrame); + List jobExecList = azJobChecker.getRecentFinishedJobFromFlow(timeFrame, endTimeStamp); azJobChecker.close(); logger.info("Total number of azkaban jobs : {}", jobExecList.size()); @@ -90,7 +95,7 @@ public class AzLineageExtractorMaster { Connection conn = DriverManager.getConnection(connUrl); DatabaseWriter databaseWriter = new DatabaseWriter(connUrl, "stg_job_execution_data_lineage"); - AzLogParser.initialize(conn, Integer.valueOf(prop.getProperty(Constant.AZ_DEFAULT_HADOOP_DATABASE_ID_KEY))); + AzLogParser.initialize(conn); PathAnalyzer.initialize(conn); int timeout = 30; // default 30 minutes for one job if (prop.containsKey(Constant.LINEAGE_ACTOR_TIMEOUT_KEY)) diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java index 17b9026067..ffbe8a8f77 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageMetadataEtl.java @@ -27,7 +27,8 @@ import java.util.Properties; */ public class AzLineageMetadataEtl extends EtlJob { - public int timeFrame = -1; + public Integer timeFrame = null; + public Long endTimeStamp = null; Connection conn; /** @@ -53,6 +54,8 @@ public class AzLineageMetadataEtl extends EtlJob { public AzLineageMetadataEtl(int appId, long whExecId, Properties properties) { super(appId, null, whExecId, properties); this.timeFrame = Integer.valueOf(this.prop.getProperty(Constant.AZ_LINEAGE_ETL_LOOKBACK_MINS_KEY)); + if (this.prop.contains(Constant.AZ_LINEAGE_ETL_END_TIMESTAMP_KEY)) + this.endTimeStamp = Long.valueOf(this.prop.getProperty(Constant.AZ_LINEAGE_ETL_END_TIMESTAMP_KEY)); try { setUp(); } catch (SQLException e) { @@ -78,7 +81,11 @@ public class AzLineageMetadataEtl extends EtlJob { conn.createStatement().execute(emptyStaggingTable); AzLineageExtractorMaster azLineageExtractorMaster = new AzLineageExtractorMaster(prop); // get lineage - if (timeFrame > 0) { + if (timeFrame != null && endTimeStamp != null && endTimeStamp != 0) { + azLineageExtractorMaster.run(timeFrame, endTimeStamp); + } + + else if (timeFrame != null) { azLineageExtractorMaster.run(timeFrame); } else { azLineageExtractorMaster.run(10); @@ -110,8 +117,8 @@ public class AzLineageMetadataEtl extends EtlJob { conn.createStatement().execute(insertIntoFinalTable); logger.info("Azkaban lineage metadata ETL completed"); - if (prop.getProperty(Constant.APP_ID_KEY).equals("32")) { - logger.info("TEMPORARY load war's data into cmdb database"); + if (prop.getProperty(Constant.APP_ID_KEY).equals("32") || prop.getProperty(Constant.APP_ID_KEY).equals("31") ) { + logger.info("TEMPORARY load war & nertz's data into cmdb database"); loadIntoOthers(); } } diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java index fc6fd6edc0..14a2ba0ff4 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java @@ -34,7 +34,6 @@ public class AzLogParser { static List logLineagePatterns; static List logHadoopIdPatterns; - static int defaultDatabaseId; /** * Parse the hadoop job id from the log. @@ -65,18 +64,17 @@ public class AzLogParser { /** * initialize, download the regex info into cache */ - public synchronized static void initialize(Connection conn, int defaultDatabaseId) + public synchronized static void initialize(Connection conn) throws SQLException { if (logHadoopIdPatterns != null && logLineagePatterns != null) { return; } - loadLineagePatterns(conn, defaultDatabaseId); - loadHadoopIdPatterns(conn, defaultDatabaseId); + loadLineagePatterns(conn); + loadHadoopIdPatterns(conn); } - private static void loadLineagePatterns(Connection conn, int defaultDatabaseId) + private static void loadLineagePatterns(Connection conn) throws SQLException { - AzLogParser.defaultDatabaseId = defaultDatabaseId; logLineagePatterns = new ArrayList<>(); String cmd = "SELECT regex, database_type, database_name_index, dataset_index, operation_type, source_target_type, " + "record_count_index, record_byte_index, insert_count_index, insert_byte_index, " @@ -95,7 +93,7 @@ public class AzLogParser { } } - private static void loadHadoopIdPatterns(Connection conn, int defaultDatabaseId) + private static void loadHadoopIdPatterns(Connection conn) throws SQLException { logHadoopIdPatterns = new ArrayList<>(); String cmd = "SELECT regex FROM log_reference_job_id_pattern WHERE is_active = 1"; @@ -111,7 +109,7 @@ public class AzLogParser { * @param azkabanJobExecRecord contain the job execution info to construct the result * @return */ - public static List getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord) { + public static List getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord, Integer defaultDatabaseId) { List result = new ArrayList<>(); diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/HadoopNameNodeExtractor.java b/metadata-etl/src/main/java/metadata/etl/lineage/HadoopNameNodeExtractor.java index ec7237fab2..a33b7b173a 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/HadoopNameNodeExtractor.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/HadoopNameNodeExtractor.java @@ -61,15 +61,15 @@ public class HadoopNameNodeExtractor { String WH_HOME = System.getenv("WH_HOME"); String USER_HOME = System.getenv("HOME") + "/.kerberos"; String ETC = "/etc"; - String TMP = "/tmp" + "/.kerberos"; + String TMP = "/var/tmp" + "/.kerberos"; - String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME, TMP, ETC}; + String[] allPositions = new String[]{CURRENT_DIR, WH_HOME, USER_HOME, TMP}; for (String possition : allPositions) { String gssFileName = possition + "/gss-jaas.conf"; File gssFile = new File(gssFileName); if (gssFile.exists()) { - logger.debug("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath()); + logger.info("find gss-jaas.conf file in : {}", gssFile.getAbsolutePath()); System.setProperty("java.security.auth.login.config", gssFile.getAbsolutePath()); break; } else { @@ -80,7 +80,7 @@ public class HadoopNameNodeExtractor { String krb5FileName = possition + "/krb5.conf"; File krb5File = new File(krb5FileName); if (krb5File.exists()) { - logger.debug("find krb5.conf file in : {}", krb5File.getAbsolutePath()); + logger.info("find krb5.conf file in : {}", krb5File.getAbsolutePath()); System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath()); break; } else { diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index 0b45d0cdb8..f6599445bb 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -168,15 +168,74 @@ class HdfsLoad: analyze table field_comments; + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) + create temporary table if not exists t_deleted_fields (primary key (field_id)) + select x.field_id + from stg_dict_field_detail s + join dict_dataset i + on s.urn = i.urn + and s.db_id = {db_id} + right join dict_field_detail x + on i.id = x.dataset_id + and s.field_name = x.field_name + and s.parent_path = x.parent_path + where s.field_name is null + and x.dataset_id in ( + select d.id dataset_id + from stg_dict_field_detail k join dict_dataset d + on k.urn = d.urn + and k.db_id = {db_id} + ) + ; -- run time : ~2min + + delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); + + -- update the old record if some thing changed + update dict_field_detail t join + ( + select x.field_id, s.* + from stg_dict_field_detail s join dict_dataset d + on s.urn = d.urn + join dict_field_detail x + on s.field_name = x.field_name + and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') + and d.id = x.dataset_id + where s.db_id = {db_id} + and (x.sort_id <> s.sort_id + or x.parent_sort_id <> s.parent_sort_id + or x.data_type <> s.data_type + or x.data_size <> s.data_size or (x.data_size is null XOR s.data_size is null) + or x.data_precision <> s.data_precision or (x.data_precision is null XOR s.data_precision is null) + or x.is_nullable <> s.is_nullable or (x.is_nullable is null XOR s.is_nullable is null) + or x.is_partitioned <> s.is_partitioned or (x.is_partitioned is null XOR s.is_partitioned is null) + or x.is_distributed <> s.is_distributed or (x.is_distributed is null XOR s.is_distributed is null) + or x.default_value <> s.default_value or (x.default_value is null XOR s.default_value is null) + or x.namespace <> s.namespace or (x.namespace is null XOR s.namespace is null) + ) + ) p + on t.field_id = p.field_id + set t.sort_id = p.sort_id, + t.parent_sort_id = p.parent_sort_id, + t.data_type = p.data_type, + t.data_size = p.data_size, + t.data_precision = p.data_precision, + t.is_nullable = p.is_nullable, + t.is_partitioned = p.is_partitioned, + t.is_distributed = p.is_distributed, + t.default_value = p.default_value, + t.namespace = p.namespace, + t.modified = now() + ; + insert into dict_field_detail ( dataset_id, fields_layout_id, sort_id, parent_sort_id, parent_path, field_name, namespace, data_type, data_size, is_nullable, default_value, - default_comment_id + default_comment_id, modified ) select d.id, 0, sf.sort_id, sf.parent_sort_id, sf.parent_path, sf.field_name, sf.namespace, sf.data_type, sf.data_size, sf.is_nullable, sf.default_value, - coalesce(fc.id, t.default_comment_id) fc_id + coalesce(fc.id, t.default_comment_id) fc_id, now() from stg_dict_field_detail sf join dict_dataset d on sf.urn = d.urn left join field_comments fc diff --git a/metadata-etl/src/main/resources/jython/TeradataLoad.py b/metadata-etl/src/main/resources/jython/TeradataLoad.py index e001e4c9fb..c2ed96fb70 100644 --- a/metadata-etl/src/main/resources/jython/TeradataLoad.py +++ b/metadata-etl/src/main/resources/jython/TeradataLoad.py @@ -129,29 +129,63 @@ class TeradataLoad: and (char_length(trim(description)) = 0 or description in ('null', 'N/A', 'nothing', 'empty', 'none')); - UPDATE dict_field_detail t, ( - select - f.field_id, - s.sort_id, - s.data_type, - s.data_size, - s.data_precision, - s.data_scale, - s.is_nullable - from stg_dict_field_detail s join dict_dataset d - on s.urn = d.urn - join dict_field_detail f - on d.id = f.dataset_id - and s.field_name = f.field_name - where s.db_id = {db_id} - ) x - set t.sort_id = x.sort_id - , t.data_type = x.data_type - , t.data_size = x.data_size - , t.data_precision = x.data_precision - , t.data_fraction = x.data_scale - , t.is_nullable = x.is_nullable - where t.field_id = x.field_id + -- delete old record if it does not exist in this load batch anymore (but have the dataset id) + create temporary table if not exists t_deleted_fields (primary key (field_id)) + select x.field_id + from stg_dict_field_detail s + join dict_dataset i + on s.urn = i.urn + and s.db_id = {db_id} + right join dict_field_detail x + on i.id = x.dataset_id + and s.field_name = x.field_name + and s.parent_path = x.parent_path + where s.field_name is null + and x.dataset_id in ( + select d.id dataset_id + from stg_dict_field_detail k join dict_dataset d + on k.urn = d.urn + and k.db_id = {db_id} + ) + ; -- run time : ~2min + + delete from dict_field_detail where field_id in (select field_id from t_deleted_fields); + + -- update the old record if some thing changed + update dict_field_detail t join + ( + select x.field_id, s.* + from stg_dict_field_detail s join dict_dataset d + on s.urn = d.urn + join dict_field_detail x + on s.field_name = x.field_name + and coalesce(s.parent_path, '*') = coalesce(x.parent_path, '*') + and d.id = x.dataset_id + where s.db_id = {db_id} + and (x.sort_id <> s.sort_id + or x.parent_sort_id <> s.parent_sort_id + or x.data_type <> s.data_type + or x.data_size <> s.data_size or (x.data_size is null XOR s.data_size is null) + or x.data_precision <> s.data_precision or (x.data_precision is null XOR s.data_precision is null) + or x.is_nullable <> s.is_nullable or (x.is_nullable is null XOR s.is_nullable is null) + or x.is_partitioned <> s.is_partitioned or (x.is_partitioned is null XOR s.is_partitioned is null) + or x.is_distributed <> s.is_distributed or (x.is_distributed is null XOR s.is_distributed is null) + or x.default_value <> s.default_value or (x.default_value is null XOR s.default_value is null) + or x.namespace <> s.namespace or (x.namespace is null XOR s.namespace is null) + ) + ) p + on t.field_id = p.field_id + set t.sort_id = p.sort_id, + t.parent_sort_id = p.parent_sort_id, + t.data_type = p.data_type, + t.data_size = p.data_size, + t.data_precision = p.data_precision, + t.is_nullable = p.is_nullable, + t.is_partitioned = p.is_partitioned, + t.is_distributed = p.is_distributed, + t.default_value = p.default_value, + t.namespace = p.namespace, + t.modified = now() ; show warnings limit 10; diff --git a/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java b/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java index cb1d6f84a5..da3aad1399 100644 --- a/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java +++ b/metadata-etl/src/test/java/metadata/etl/lineage/AzJobCheckerTest.java @@ -13,7 +13,12 @@ */ package metadata.etl.lineage; +import java.io.File; import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Properties; import org.testng.Assert; import org.testng.annotations.BeforeTest; @@ -28,86 +33,7 @@ import wherehows.common.schemas.AzkabanJobExecRecord; * Created by zsun on 9/9/15. */ public class AzJobCheckerTest { - final String jsonInput = "{\"attempt\": 0,\n" + " \"endTime\": 1442233069062,\n" + " \"executionId\": 832765,\n" - + " \"executionOptions\": {\"concurrentOption\": \"skip\",\n" + " \"disabled\": [],\n" - + " \"failureAction\": \"FINISH_CURRENTLY_RUNNING\",\n" - + " \"failureEmails\": [\"zsun@linkedin.com\"],\n" - + " \"failureEmailsOverride\": true,\n" + " \"flowParameters\": {},\n" - + " \"mailCreator\": \"default\",\n" + " \"memoryCheck\": true,\n" - + " \"notifyOnFirstFailure\": false,\n" - + " \"notifyOnLastFailure\": false,\n" + " \"pipelineExecId\": null,\n" - + " \"pipelineLevel\": null,\n" + " \"queueLevel\": 0,\n" - + " \"successEmails\": [],\n" + " \"successEmailsOverride\": false},\n" - + " \"executionPath\": \"executions/832765\",\n" + " \"flowId\": \"hadoop-datasets-stats\",\n" + " \"id\": null,\n" - + " \"lastModfiedTime\": 1440783373310,\n" + " \"lastModifiedUser\": \"zsun\",\n" - + " \"nodes\": [{\"attempt\": 0,\n" + " \"endTime\": 1442233069053,\n" - + " \"id\": \"hadoop-datasets-stats\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats.job\",\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442233069045,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"noop\",\n" - + " \"updateTime\": 1442233069057},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442232636665,\n" - + " \"id\": \"hadoop-datasets-stats_load-size-avro-into-mysql\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_load-size-avro-into-mysql.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229210581,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442232636670},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442229861221,\n" - + " \"id\": \"hadoop-datasets-stats_extract-dataset-layout\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_extract-dataset-layout.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229210582,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442229861231},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442229210579,\n" + " \"id\": \"hadoop-datasets-stats_sizePartition\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_sizeAggr\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_sizePartition.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_load-size-avro-into-mysql\",\n" - + " \"hadoop-datasets-stats_extract-dataset-layout\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442228463681,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442229210587},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442228463629,\n" + " \"id\": \"hadoop-datasets-stats_sizeAggr\",\n" - + " \"jobSource\": \"hadoop-datasets-stats_sizeAggr.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_sizePartition\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442224810817,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442228463679},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442229882257,\n" - + " \"id\": \"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_extract-dataset-layout\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442229861224,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442229882261},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442233066192,\n" - + " \"id\": \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_load-size-avro-into-mysql\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats_enrich-abstract-dataset\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442232636668,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442233066196},\n" + " {\"attempt\": 0,\n" - + " \"endTime\": 1442233069043,\n" - + " \"id\": \"hadoop-datasets-stats_enrich-abstract-dataset\",\n" - + " \"inNodes\": [\"hadoop-datasets-stats_load-abstract-dataset-into-wherehows-mysql\",\n" - + " \"hadoop-datasets-stats_extra-load-size-into-wherehows-mysql\"],\n" - + " \"jobSource\": \"hadoop-datasets-stats_enrich-abstract-dataset.job\",\n" - + " \"outNodes\": [\"hadoop-datasets-stats\"],\n" - + " \"propSource\": \"common.properties\",\n" + " \"startTime\": 1442233066194,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"type\": \"hadoopJava\",\n" - + " \"updateTime\": 1442233069046}],\n" + " \"projectId\": 533,\n" - + " \"projectName\": \"WherehowsETL\",\n" + " \"properties\": [{\"source\": \"common.properties\"}],\n" - + " \"proxyUsers\": [\"azkaban@GRID.LINKEDIN.COM\",\n" + " \"dev_svc\",\n" - + " \"azkaban/eat1-nertzaz01.grid.linkedin.com@GRID.LINKEDIN.COM\",\n" - + " \"zsun\",\n" + " \"data_svc\"],\n" + " \"startTime\": 1442224810815,\n" - + " \"status\": \"SUCCEEDED\",\n" + " \"submitTime\": 1442224810778,\n" + " \"submitUser\": \"zsun\",\n" - + " \"type\": null,\n" + " \"updateTime\": 1442233069065,\n" + " \"version\": 301}"; + AzJobChecker ajc; Properties prop; @@ -131,9 +57,25 @@ public class AzJobCheckerTest { } @Test(groups = {"needConfig"}) - public void parseJsonTest() - throws IOException { - List result = ajc.parseJson(jsonInput, 11111); + public void getRecentFinishedJobFromFlowTest2() + throws SQLException, IOException { + List results = ajc.getRecentFinishedJobFromFlow(2, 1448916456L); + for (AzkabanJobExecRecord a : results) { + System.out.print(a.getFlowExecId() + "\t"); + System.out.print(a.getJobName() + "\t"); + System.out.println(a.getJobExecId()); + } + Assert.assertNotNull(results); + } + + @Test(groups = {"needConfig"}) + public void parseNestedJsonTest() + throws IOException, URISyntaxException { + + URL url = Thread.currentThread().getContextClassLoader().getResource("nestedFlowContent.json"); + byte[] encoded = Files.readAllBytes(Paths.get(url.getPath())); + String nestedJson = new String(encoded, "UTF-8"); + List result = ajc.parseJson(nestedJson, 11111); for (int i = 0; i < result.size(); i++) { AzkabanJobExecRecord aje = result.get(i); System.out.println(aje.getJobExecId()); @@ -145,4 +87,7 @@ public class AzJobCheckerTest { Assert.assertEquals((long) aje.getJobExecId(), 11111 * 1000 + i); } } + + + } diff --git a/metadata-etl/src/test/java/metadata/etl/lineage/AzLineageExtractorTest.java b/metadata-etl/src/test/java/metadata/etl/lineage/AzLineageExtractorTest.java index b75cfc2658..55506f6478 100644 --- a/metadata-etl/src/test/java/metadata/etl/lineage/AzLineageExtractorTest.java +++ b/metadata-etl/src/test/java/metadata/etl/lineage/AzLineageExtractorTest.java @@ -44,7 +44,7 @@ public class AzLineageExtractorTest { String wherehowsPassWord = prop.getProperty(Constant.WH_DB_PASSWORD_KEY); connUrl = wherehowsUrl + "?" + "user=" + wherehowsUserName + "&password=" + wherehowsPassWord; this.conn = DriverManager.getConnection(connUrl); - AzLogParser.initialize(conn, Integer.valueOf(prop.getProperty(Constant.AZ_DEFAULT_HADOOP_DATABASE_ID_KEY))); + AzLogParser.initialize(conn); PathAnalyzer.initialize(conn); } diff --git a/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java b/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java index 1542d16fce..c3b8c458b0 100644 --- a/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java +++ b/metadata-etl/src/test/java/metadata/etl/lineage/AzLogParserTest.java @@ -35,6 +35,8 @@ import wherehows.common.schemas.LineageRecord; */ public class AzLogParserTest { + private final int TEST_APP_ID = -1; + private final int TEST_DATABASE_ID = -1; @BeforeTest public void setUp() throws SQLException { @@ -45,7 +47,7 @@ public class AzLogParserTest { String wherehowsPassWord = prop.getProperty(Constant.WH_DB_PASSWORD_KEY); Connection conn = DriverManager.getConnection(wherehowsHost + "?" + "user=" + wherehowsUserName + "&password=" + wherehowsPassWord); - AzLogParser.initialize(conn, -1); + AzLogParser.initialize(conn); } @Test(groups = {"needConfig"}) @@ -79,9 +81,9 @@ public class AzLogParserTest { @Test(groups = {"needConfig"}) public void getLineageFromLogTest() { String logSample = "asfdasdfsadf Moving from staged path[asdf] to final resting place[/tm/b/c] sdaf dsfasdfasdf"; - AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path"); + AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(TEST_APP_ID, "someJobName", (long) 0, 0, 0, "S", "path"); sampleExecution.setJobExecId((long) 11111); - List result = AzLogParser.getLineageFromLog(logSample, sampleExecution); + List result = AzLogParser.getLineageFromLog(logSample, sampleExecution, -1); System.out.println(result.get(0).toDatabaseValue()); Assert.assertEquals(result.get(0).toDatabaseValue(), @@ -104,8 +106,8 @@ public class AzLogParserTest { + "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103 : Initiating swap of endorsements-member-restrictions with dataDir: /jobs/endorse/endorsements/master/tmp/endorsements-member-restrictions.store/lva1-voldemort-read-only-2-vip.prod.linkedin.com\n" + "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103 : Invoking fetch for Node lva1-app0610.prod.linkedin.com [id 0] for webhdfs://lva1-warnn01.grid.linkedin.com:50070/jobs/endorse/endorsements/master/tmp/endorsements-member-restrictions.store/lva1-voldemort-read-only-2-vip.prod.linkedin.com/node-0\n" + "17-11-2015 01:32:27 PST endorsements_push-lva-endorsements-member-restrictions INFO - INFO tcp://lva1-voldemort-rea"; - AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(-1, "someJobName", (long) 0, 0, 0, "S", "path"); - List result = AzLogParser.getLineageFromLog(logSample, sampleExecution); + AzkabanJobExecRecord sampleExecution = new AzkabanJobExecRecord(TEST_APP_ID, "someJobName", (long) 0, 0, 0, "S", "path"); + List result = AzLogParser.getLineageFromLog(logSample, sampleExecution, TEST_DATABASE_ID); System.out.println(result.get(0).toDatabaseValue()); Assert.assertEquals(result.get(0).getFullObjectName(), "tcp://lva1-voldemort-read-only-2-vip.prod.linkedin.com:10103/endorsements-member-restrictions"); diff --git a/wherehows-common/src/main/java/wherehows/common/Constant.java b/wherehows-common/src/main/java/wherehows/common/Constant.java index 4d80e23ad0..45f5a046b9 100644 --- a/wherehows-common/src/main/java/wherehows/common/Constant.java +++ b/wherehows-common/src/main/java/wherehows/common/Constant.java @@ -38,6 +38,7 @@ public class Constant { public static final String AZ_DEFAULT_HADOOP_DATABASE_ID_KEY = "az.default.hadoop.database.id"; public static final String AZ_LINEAGE_ETL_LOOKBACK_MINS_KEY = "az.lineage_etl.lookback_period.in.minutes"; public static final String LINEAGE_ACTOR_TIMEOUT_KEY = "az.lineage.actor.timeout"; + public static final String AZ_LINEAGE_ETL_END_TIMESTAMP_KEY = "az.lineage_etl.end_timestamp"; public static final String AZ_SERVICE_URL_KEY = "az.server.url"; public static final String AZ_SERVICE_USERNAME_KEY = "az.server.username"; @@ -106,4 +107,15 @@ public class Constant { public static final String GIT_HOST_KEY = "git.host"; public static final String GIT_PROJECT_WHITELIST_KEY = "git.project.whitelist"; + // hive + public static final String HIVE_METASTORE_JDBC_DRIVER = "hive.metastore.jdbc.driver"; + public static final String HIVE_METASTORE_JDBC_URL = "hive.metastore.jdbc.url"; + public static final String HIVE_METASTORE_USERNAME = "hive.metstore.username"; + public static final String HIVE_METASTORE_PASSWORD = "hive.metastore.password"; + + public static final String HIVE_SCHEMA_JSON_FILE_KEY = "hive.schema_json_file"; + // public static final String HIVE_SAMPLE_CSV_FILE_KEY = "hive.sample_csv"; + public static final String HIVE_SCHEMA_CSV_FILE_KEY = "hive.schema_csv_file"; + public static final String HIVE_FIELD_METADATA_KEY = "hive.field_metadata"; + } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldRecord.java index 584bbc3f1b..650f137eeb 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldRecord.java @@ -29,8 +29,11 @@ public class DatasetFieldRecord implements Record { Integer parentSortId; String parentPath; String fieldName; + String fieldLabel; String dataType; String isNullable; + String isIndexed; + String isPartitioned; String defaultValue; Integer dataSize; String namespace; @@ -39,18 +42,21 @@ public class DatasetFieldRecord implements Record { List allFields; char SEPR = 0x001A; - public DatasetFieldRecord(String urn, Integer sortId, Integer parentSortId, String parentPath, String fieldName, - String dataType, String isNullable, String defaultValue, Integer dataSize, String namespace, String description) { + public DatasetFieldRecord(String urn, Integer sortId, Integer parentSortId, String parentPath, String fieldName, String fieldLabel, + String dataType, String isNullable, String isIndexed, String isPartitioned, String defaultValue, Integer dataSize, String namespace, String description) { this.urn = urn; this.sortId = sortId; this.parentSortId = parentSortId; this.parentPath = parentPath; this.fieldName = fieldName; + this.fieldLabel = fieldLabel; this.dataType = dataType; - this.isNullable = isNullable; - this.defaultValue = defaultValue; this.dataSize = dataSize; + this.isNullable = isNullable; + this.isIndexed = isIndexed; + this.isPartitioned = isPartitioned; + this.defaultValue = defaultValue; this.namespace = namespace; this.description = description; @@ -61,9 +67,11 @@ public class DatasetFieldRecord implements Record { this.allFields.add(parentPath); this.allFields.add(fieldName); this.allFields.add(dataType); - this.allFields.add(isNullable); - this.allFields.add(defaultValue); this.allFields.add(dataSize); + this.allFields.add(isNullable); + this.allFields.add(isIndexed); + this.allFields.add(isPartitioned); + this.allFields.add(defaultValue); this.allFields.add(namespace); this.allFields.add(description); }