Support MCE save Schemaless (#908)

This commit is contained in:
Yi (Alan) Wang 2017-12-10 11:47:50 -08:00 committed by GitHub
parent c9cd9b3676
commit 67913fd752
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 12 deletions

View File

@ -73,6 +73,18 @@ public class FieldDetailDao extends BaseDao {
// update field comments?
}
/**
* Insert or update Schemaless from MetadataChangeEvent
* @param identifier DatasetIdentifier
* @param datasetId int
* @param auditStamp ChangeAuditStamp
* @throws Exception
*/
public void insertUpdateSchemaless(@Nonnull DatasetIdentifier identifier, int datasetId,
@Nonnull ChangeAuditStamp auditStamp) throws Exception {
throw new UnsupportedOperationException("Support for Schemaless not yet implemented.");
}
/**
* Fill in DictFieldDetail information from FieldSchema
* @param fs FieldSchema

View File

@ -65,7 +65,7 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
log.debug("Processing Metadata Change Event record.");
MetadataChangeEvent event = (MetadataChangeEvent) indexedRecord;
final MetadataChangeEvent event = (MetadataChangeEvent) indexedRecord;
try {
processEvent(event);
} catch (Exception exception) {
@ -91,23 +91,25 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
throw new Exception("Dataset name too long: " + identifier);
}
// if DELETE, mark dataset as removed and return
if (changeType == ChangeType.DELETE) {
_dictDatasetDao.setDatasetRemoved(identifier, true, changeAuditStamp);
return;
}
final DatasetSchema dsSchema =
(event.schema == null || event.schema instanceof Schemaless) ? null : (DatasetSchema) event.schema;
// TODO: handle Schemaless separately
final DatasetSchema dsSchema = event.schema instanceof DatasetSchema ? (DatasetSchema) event.schema : null;
// create or update dataset
DictDataset ds = _dictDatasetDao.insertUpdateDataset(identifier, changeAuditStamp, event.datasetProperty, dsSchema,
event.deploymentInfo, toStringList(event.tags), event.capacity, event.partitionSpec);
final DictDataset ds =
_dictDatasetDao.insertUpdateDataset(identifier, changeAuditStamp, event.datasetProperty, dsSchema,
event.deploymentInfo, toStringList(event.tags), event.capacity, event.partitionSpec);
// if schema is not null, insert or update schema
if (dsSchema != null) {
if (dsSchema != null) { // if instanceof DatasetSchema
_fieldDetailDao.insertUpdateDatasetFields(identifier, ds.getId(), event.datasetProperty, changeAuditStamp,
dsSchema);
} else if (event.schema instanceof Schemaless) { // if instanceof Schemaless
_fieldDetailDao.insertUpdateSchemaless(identifier, ds.getId(), changeAuditStamp);
}
// if owners are not null, insert or update owner
@ -128,10 +130,10 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
}
private FailedMetadataChangeEvent newFailedEvent(MetadataChangeEvent event, Throwable throwable) {
FailedMetadataChangeEvent faileEvent = new FailedMetadataChangeEvent();
faileEvent.time = System.currentTimeMillis();
faileEvent.error = ExceptionUtils.getStackTrace(throwable);
faileEvent.metadataChangeEvent = event;
return faileEvent;
FailedMetadataChangeEvent failedEvent = new FailedMetadataChangeEvent();
failedEvent.time = System.currentTimeMillis();
failedEvent.error = ExceptionUtils.getStackTrace(throwable);
failedEvent.metadataChangeEvent = event;
return failedEvent;
}
}