From 67913fd752d1f404ff7b8d0ef9e63c99bda990bd Mon Sep 17 00:00:00 2001 From: "Yi (Alan) Wang" Date: Sun, 10 Dec 2017 11:47:50 -0800 Subject: [PATCH] Support MCE save Schemaless (#908) --- .../wherehows/dao/table/FieldDetailDao.java | 12 +++++++++ .../processors/MetadataChangeProcessor.java | 26 ++++++++++--------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/wherehows-dao/src/main/java/wherehows/dao/table/FieldDetailDao.java b/wherehows-dao/src/main/java/wherehows/dao/table/FieldDetailDao.java index 633beab8c2..d6161b57d8 100644 --- a/wherehows-dao/src/main/java/wherehows/dao/table/FieldDetailDao.java +++ b/wherehows-dao/src/main/java/wherehows/dao/table/FieldDetailDao.java @@ -73,6 +73,18 @@ public class FieldDetailDao extends BaseDao { // update field comments? } + /** + * Insert or update Schemaless from MetadataChangeEvent + * @param identifier DatasetIdentifier + * @param datasetId int + * @param auditStamp ChangeAuditStamp + * @throws Exception + */ + public void insertUpdateSchemaless(@Nonnull DatasetIdentifier identifier, int datasetId, + @Nonnull ChangeAuditStamp auditStamp) throws Exception { + throw new UnsupportedOperationException("Support for Schemaless not yet implemented."); + } + /** * Fill in DictFieldDetail information from FieldSchema * @param fs FieldSchema diff --git a/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java b/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java index 57b876285c..52df2b4654 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java +++ b/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java @@ -65,7 +65,7 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor { log.debug("Processing Metadata Change Event record."); - MetadataChangeEvent event = (MetadataChangeEvent) indexedRecord; + final MetadataChangeEvent event = (MetadataChangeEvent) indexedRecord; try { processEvent(event); } catch (Exception exception) { @@ -91,23 +91,25 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor { throw new Exception("Dataset name too long: " + identifier); } + // if DELETE, mark dataset as removed and return if (changeType == ChangeType.DELETE) { _dictDatasetDao.setDatasetRemoved(identifier, true, changeAuditStamp); return; } - final DatasetSchema dsSchema = - (event.schema == null || event.schema instanceof Schemaless) ? null : (DatasetSchema) event.schema; - // TODO: handle Schemaless separately + final DatasetSchema dsSchema = event.schema instanceof DatasetSchema ? (DatasetSchema) event.schema : null; // create or update dataset - DictDataset ds = _dictDatasetDao.insertUpdateDataset(identifier, changeAuditStamp, event.datasetProperty, dsSchema, - event.deploymentInfo, toStringList(event.tags), event.capacity, event.partitionSpec); + final DictDataset ds = + _dictDatasetDao.insertUpdateDataset(identifier, changeAuditStamp, event.datasetProperty, dsSchema, + event.deploymentInfo, toStringList(event.tags), event.capacity, event.partitionSpec); // if schema is not null, insert or update schema - if (dsSchema != null) { + if (dsSchema != null) { // if instanceof DatasetSchema _fieldDetailDao.insertUpdateDatasetFields(identifier, ds.getId(), event.datasetProperty, changeAuditStamp, dsSchema); + } else if (event.schema instanceof Schemaless) { // if instanceof Schemaless + _fieldDetailDao.insertUpdateSchemaless(identifier, ds.getId(), changeAuditStamp); } // if owners are not null, insert or update owner @@ -128,10 +130,10 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor { } private FailedMetadataChangeEvent newFailedEvent(MetadataChangeEvent event, Throwable throwable) { - FailedMetadataChangeEvent faileEvent = new FailedMetadataChangeEvent(); - faileEvent.time = System.currentTimeMillis(); - faileEvent.error = ExceptionUtils.getStackTrace(throwable); - faileEvent.metadataChangeEvent = event; - return faileEvent; + FailedMetadataChangeEvent failedEvent = new FailedMetadataChangeEvent(); + failedEvent.time = System.currentTimeMillis(); + failedEvent.error = ExceptionUtils.getStackTrace(throwable); + failedEvent.metadataChangeEvent = event; + return failedEvent; } }