From 66a8eea21bad91dafa246f101e1c7d4afd31c6ef Mon Sep 17 00:00:00 2001 From: "Yi (Alan) Wang" Date: Tue, 14 Mar 2017 17:19:30 -0700 Subject: [PATCH] Fix issues from Oracle MetadataChangeEvent integration (#336) * Fix issues from Oracle MetadataChangeEvent integration --- .../app/models/daos/DatasetInfoDao.java | 29 ++++++++++++++++--- .../models/kafka/MetadataChangeProcessor.java | 7 +++++ .../DDL/ETL_DDL/dataset_info_metadata.sql | 1 - 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/backend-service/app/models/daos/DatasetInfoDao.java b/backend-service/app/models/daos/DatasetInfoDao.java index 13c8337ca3..152203d16b 100644 --- a/backend-service/app/models/daos/DatasetInfoDao.java +++ b/backend-service/app/models/daos/DatasetInfoDao.java @@ -22,6 +22,8 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import utils.Urn; import org.springframework.dao.DataAccessException; import org.springframework.dao.EmptyResultDataAccessException; @@ -174,7 +176,7 @@ public class DatasetInfoDao { public static final String GET_USER_BY_USER_ID = "SELECT * FROM " + EXTERNAL_USER_TABLE + " WHERE user_id = :user_id"; public static final String GET_APP_ID_BY_GROUP_ID = - "SELECT app_id FROM " + EXTERNAL_GROUP_TABLE + " WHERE group_id = :group_id GROUP BY group_id"; + "SELECT app_id FROM " + EXTERNAL_GROUP_TABLE + " WHERE group_id = :group_id GROUP BY app_id"; public static final String GET_DATASET_CONSTRAINT_BY_DATASET_ID = "SELECT * FROM " + DATASET_CONSTRAINT_TABLE + " WHERE dataset_id = :dataset_id"; @@ -223,6 +225,20 @@ public class DatasetInfoDao { PreparedStatementUtil.prepareInsertTemplateWithColumn("REPLACE", DATASET_INVENTORY_TABLE, DatasetInventoryItemRecord.getInventoryItemColumns()); + private static final String UrnRegex = "urn:li:dataset:\\(urn:li:dataPlatform:(\\w*),\\s?([^\\s]*),\\s?(\\w*)\\)"; + + private static Pattern UrnPattern = Pattern.compile(UrnRegex); + + private static String urnToUri(String urn) { + // from urn:li:dataset:(urn:li:dataPlatform:p, nativeName, dataOrigin) to p:///nativeName + Matcher m = UrnPattern.matcher(urn); + if(m.matches()) { + String platform = m.group(1) + "://"; + String datasetName = m.group(2).replace(".", "/"); + return platform + (datasetName.startsWith("/") ? "" : "/") + datasetName; + } + return null; + } private static Object[] findIdAndUrn(Integer datasetId) throws SQLException { @@ -268,7 +284,7 @@ public class DatasetInfoDao { final JsonNode urnNode = root.path("urn"); if (!urnNode.isMissingNode() && !urnNode.isNull()) { try { - final Object[] idUrn = findIdAndUrn(urnNode.asText()); + final Object[] idUrn = findIdAndUrn(urnToUri(urnNode.asText())); if (idUrn[0] != null && idUrn[1] != null) { return idUrn; } @@ -867,7 +883,7 @@ public class DatasetInfoDao { record.setIsGroup(ownerTypeString != null && ownerTypeString.equalsIgnoreCase("group") ? "Y" : "N"); if (datasetId == 0 || appId == 0) { - String sql = PreparedStatementUtil.prepareInsertTemplateWithColumn(DATASET_OWNER_UNMATCHED_TABLE, + String sql = PreparedStatementUtil.prepareInsertTemplateWithColumn("REPLACE", DATASET_OWNER_UNMATCHED_TABLE, record.getDbColumnForUnmatchedOwner()); OWNER_UNMATCHED_WRITER.execute(sql, record.getValuesForUnmatchedOwner()); } else { @@ -1197,8 +1213,13 @@ public class DatasetInfoDao { if (idUrn[0] != null && idUrn[1] != null) { datasetId = (Integer) idUrn[0]; urn = (String) idUrn[1]; - } else { + } else if (root.get("datasetProperties") != null && root.get("datasetProperties").get("uri") != null) { urn = root.path("datasetProperties").path("uri").asText(); + } else if (root.get("urn") != null) { + urn = urnToUri(root.get("urn").asText()); + } else { + Logger.info("Can't identify dataset URN, abort process."); + return; } ObjectMapper om = new ObjectMapper(); diff --git a/backend-service/app/models/kafka/MetadataChangeProcessor.java b/backend-service/app/models/kafka/MetadataChangeProcessor.java index e781d83192..b4d2ce92c2 100644 --- a/backend-service/app/models/kafka/MetadataChangeProcessor.java +++ b/backend-service/app/models/kafka/MetadataChangeProcessor.java @@ -53,6 +53,13 @@ public class MetadataChangeProcessor { && datasetIdentifier == null) { Logger.info("Can't identify dataset from uri/urn/datasetIdentifier, abort process. " + record.toString()); return null; + } else if (urn != null) { + Logger.debug("URN: " + urn); + } else if (datasetProperties != null && datasetProperties.get("uri") != null) { + Logger.debug("URI: " + datasetProperties.get("uri")); + } else { + Logger.debug( + "Dataset Identifier: " + datasetIdentifier.get("dataPlatformUrn") + datasetIdentifier.get("nativeName")); } final JsonNode rootNode = new ObjectMapper().readTree(record.toString()); diff --git a/data-model/DDL/ETL_DDL/dataset_info_metadata.sql b/data-model/DDL/ETL_DDL/dataset_info_metadata.sql index aabc648f1f..7b5e3c21a0 100644 --- a/data-model/DDL/ETL_DDL/dataset_info_metadata.sql +++ b/data-model/DDL/ETL_DDL/dataset_info_metadata.sql @@ -175,7 +175,6 @@ CREATE TABLE dataset_schema_info ( `dataset_id` INT UNSIGNED NOT NULL, `dataset_urn` VARCHAR(200) NOT NULL, `is_backward_compatible` BOOLEAN DEFAULT NULL, - `is_latest_revision` BOOLEAN NOT NULL, `create_time` BIGINT NOT NULL, `revision` INT UNSIGNED DEFAULT NULL, `version` VARCHAR(20) DEFAULT NULL,