Fix issues from Oracle MetadataChangeEvent integration (#336)

* Fix issues from Oracle MetadataChangeEvent integration
This commit is contained in:
Yi (Alan) Wang 2017-03-14 17:19:30 -07:00 committed by GitHub
parent 4f873a919a
commit 66a8eea21b
3 changed files with 32 additions and 5 deletions

View File

@ -22,6 +22,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import utils.Urn;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
@ -174,7 +176,7 @@ public class DatasetInfoDao {
public static final String GET_USER_BY_USER_ID = "SELECT * FROM " + EXTERNAL_USER_TABLE + " WHERE user_id = :user_id";
public static final String GET_APP_ID_BY_GROUP_ID =
"SELECT app_id FROM " + EXTERNAL_GROUP_TABLE + " WHERE group_id = :group_id GROUP BY group_id";
"SELECT app_id FROM " + EXTERNAL_GROUP_TABLE + " WHERE group_id = :group_id GROUP BY app_id";
public static final String GET_DATASET_CONSTRAINT_BY_DATASET_ID =
"SELECT * FROM " + DATASET_CONSTRAINT_TABLE + " WHERE dataset_id = :dataset_id";
@ -223,6 +225,20 @@ public class DatasetInfoDao {
PreparedStatementUtil.prepareInsertTemplateWithColumn("REPLACE", DATASET_INVENTORY_TABLE,
DatasetInventoryItemRecord.getInventoryItemColumns());
private static final String UrnRegex = "urn:li:dataset:\\(urn:li:dataPlatform:(\\w*),\\s?([^\\s]*),\\s?(\\w*)\\)";
private static Pattern UrnPattern = Pattern.compile(UrnRegex);
private static String urnToUri(String urn) {
// from urn:li:dataset:(urn:li:dataPlatform:p, nativeName, dataOrigin) to p:///nativeName
Matcher m = UrnPattern.matcher(urn);
if(m.matches()) {
String platform = m.group(1) + "://";
String datasetName = m.group(2).replace(".", "/");
return platform + (datasetName.startsWith("/") ? "" : "/") + datasetName;
}
return null;
}
private static Object[] findIdAndUrn(Integer datasetId)
throws SQLException {
@ -268,7 +284,7 @@ public class DatasetInfoDao {
final JsonNode urnNode = root.path("urn");
if (!urnNode.isMissingNode() && !urnNode.isNull()) {
try {
final Object[] idUrn = findIdAndUrn(urnNode.asText());
final Object[] idUrn = findIdAndUrn(urnToUri(urnNode.asText()));
if (idUrn[0] != null && idUrn[1] != null) {
return idUrn;
}
@ -867,7 +883,7 @@ public class DatasetInfoDao {
record.setIsGroup(ownerTypeString != null && ownerTypeString.equalsIgnoreCase("group") ? "Y" : "N");
if (datasetId == 0 || appId == 0) {
String sql = PreparedStatementUtil.prepareInsertTemplateWithColumn(DATASET_OWNER_UNMATCHED_TABLE,
String sql = PreparedStatementUtil.prepareInsertTemplateWithColumn("REPLACE", DATASET_OWNER_UNMATCHED_TABLE,
record.getDbColumnForUnmatchedOwner());
OWNER_UNMATCHED_WRITER.execute(sql, record.getValuesForUnmatchedOwner());
} else {
@ -1197,8 +1213,13 @@ public class DatasetInfoDao {
if (idUrn[0] != null && idUrn[1] != null) {
datasetId = (Integer) idUrn[0];
urn = (String) idUrn[1];
} else {
} else if (root.get("datasetProperties") != null && root.get("datasetProperties").get("uri") != null) {
urn = root.path("datasetProperties").path("uri").asText();
} else if (root.get("urn") != null) {
urn = urnToUri(root.get("urn").asText());
} else {
Logger.info("Can't identify dataset URN, abort process.");
return;
}
ObjectMapper om = new ObjectMapper();

View File

@ -53,6 +53,13 @@ public class MetadataChangeProcessor {
&& datasetIdentifier == null) {
Logger.info("Can't identify dataset from uri/urn/datasetIdentifier, abort process. " + record.toString());
return null;
} else if (urn != null) {
Logger.debug("URN: " + urn);
} else if (datasetProperties != null && datasetProperties.get("uri") != null) {
Logger.debug("URI: " + datasetProperties.get("uri"));
} else {
Logger.debug(
"Dataset Identifier: " + datasetIdentifier.get("dataPlatformUrn") + datasetIdentifier.get("nativeName"));
}
final JsonNode rootNode = new ObjectMapper().readTree(record.toString());

View File

@ -175,7 +175,6 @@ CREATE TABLE dataset_schema_info (
`dataset_id` INT UNSIGNED NOT NULL,
`dataset_urn` VARCHAR(200) NOT NULL,
`is_backward_compatible` BOOLEAN DEFAULT NULL,
`is_latest_revision` BOOLEAN NOT NULL,
`create_time` BIGINT NOT NULL,
`revision` INT UNSIGNED DEFAULT NULL,
`version` VARCHAR(20) DEFAULT NULL,