Fix MCE processor now that Schema may have two types (#885)

This commit is contained in:
Yi (Alan) Wang 2017-11-29 17:07:49 -08:00 committed by GitHub
parent 15de751c38
commit a12b7dfbe1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 21 deletions

View File

@ -29,13 +29,14 @@
} }
} }
}, },
{ "name" : "retentionWindow", {
"type" : [ "name": "retentionWindow",
"type": [
"null", "null",
"long" "long"
], ],
"doc" : "How long data is retained in seconds for the case of MANUAL_LIMITED_RETENTION", "doc": "How long data is retained in seconds for the case of MANUAL_LIMITED_RETENTION",
"default" : null "default": null
}, },
{ {
"name": "compliancePurgeNote", "name": "compliancePurgeNote",
@ -66,6 +67,15 @@
"name": "datasetClassification", "name": "datasetClassification",
"type": "DatasetClassification", "type": "DatasetClassification",
"doc": "dataset level classification: dataset contains such category of data or not, e.g. 'PROFILE_DATA':false" "doc": "dataset level classification: dataset contains such category of data or not, e.g. 'PROFILE_DATA':false"
},
{
"name": "containsPersonalData",
"type": [
"null",
"boolean"
],
"doc": "Use this field to specify whether the dataset contains any personal data for schemaless systems like Couchbase, Ambry etc",
"default": null
} }
] ]
} }

View File

@ -21,13 +21,18 @@
}, },
{ {
"name": "datasetProperty", "name": "datasetProperty",
"type": [ "null", "DatasetProperty" ], "type": [
"null",
"DatasetProperty"
],
"doc": "Basic properties of a dataset, such as Native Type, Case Sensitivity, URI" "doc": "Basic properties of a dataset, such as Native Type, Case Sensitivity, URI"
}, },
{ {
"name": "owners", "name": "owners",
"doc": "A complete list of owners info", "doc": "A complete list of owners info",
"type": [ "null", { "type": [
"null",
{
"type": "array", "type": "array",
"items": "OwnerInfo", "items": "OwnerInfo",
"doc": "Ownership information" "doc": "Ownership information"
@ -36,12 +41,17 @@
}, },
{ {
"name": "partitionSpec", "name": "partitionSpec",
"type": [ "null", "PartitionSpecification" ], "type": [
"null",
"PartitionSpecification"
],
"doc": "Partition specification detail" "doc": "Partition specification detail"
}, },
{ {
"name": "deploymentInfo", "name": "deploymentInfo",
"type": [ "null", { "type": [
"null",
{
"type": "array", "type": "array",
"items": "DeploymentDetail" "items": "DeploymentDetail"
} }
@ -50,7 +60,9 @@
}, },
{ {
"name": "tags", "name": "tags",
"type": [ "null", { "type": [
"null",
{
"type": "array", "type": "array",
"items": "string" "items": "string"
} }
@ -59,12 +71,18 @@
}, },
{ {
"name": "schema", "name": "schema",
"doc": "The schema/structure definition of a dataset. For Key-Value and Document db, a dedicated KeySchema is provided. Schema includes KeySchema, OriginalSchema, FieldSchema, ChangeDataCaptureFields, AuditFields", "doc": "The schema/structure definition of a dataset. For Key-Value and Document db, a dedicated KeySchema is provided. Schema includes KeySchema, OriginalSchema, FieldSchema, ChangeDataCaptureFields, AuditFields. Set this field to Schemaless explicitly for schemaless systems.",
"type": [ "null", "DatasetSchema" ] "type": [
"null",
"DatasetSchema",
"Schemaless"
]
}, },
{ {
"name": "constraints", "name": "constraints",
"type": [ "null", { "type": [
"null",
{
"type": "array", "type": "array",
"doc": "Array of constraints", "doc": "Array of constraints",
"items": "Constraint" "items": "Constraint"
@ -73,7 +91,9 @@
}, },
{ {
"name": "indices", "name": "indices",
"type": [ "null", { "type": [
"null",
{
"type": "array", "type": "array",
"doc": "Array of indices", "doc": "Array of indices",
"items": "Index" "items": "Index"
@ -82,7 +102,9 @@
}, },
{ {
"name": "capacity", "name": "capacity",
"type": [ "null", { "type": [
"null",
{
"type": "array", "type": "array",
"doc": "Array of capacity info", "doc": "Array of capacity info",
"items": "Capacity" "items": "Capacity"
@ -91,12 +113,18 @@
}, },
{ {
"name": "compliancePolicy", "name": "compliancePolicy",
"type": [ "null", "CompliancePolicy" ], "type": [
"null",
"CompliancePolicy"
],
"doc": "Human-entered compliance metadata" "doc": "Human-entered compliance metadata"
}, },
{ {
"name": "suggestedCompliancePolicy", "name": "suggestedCompliancePolicy",
"type": [ "null", "SuggestedCompliancePolicy" ], "type": [
"null",
"SuggestedCompliancePolicy"
],
"default": null, "default": null,
"doc": "Machine-suggested compliance metadata" "doc": "Machine-suggested compliance metadata"
} }

View File

@ -0,0 +1,8 @@
{
"type": "record",
"name": "Schemaless",
"doc": "Denotes that the dataset contains no explicitly defined schema, e.g. Couchbase",
"namespace": "com.linkedin.events.metadata",
"fields": [
]
}

View File

@ -17,8 +17,10 @@ import com.linkedin.events.KafkaAuditHeader;
import com.linkedin.events.metadata.ChangeAuditStamp; import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.ChangeType; import com.linkedin.events.metadata.ChangeType;
import com.linkedin.events.metadata.DatasetIdentifier; import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.DatasetSchema;
import com.linkedin.events.metadata.FailedMetadataChangeEvent; import com.linkedin.events.metadata.FailedMetadataChangeEvent;
import com.linkedin.events.metadata.MetadataChangeEvent; import com.linkedin.events.metadata.MetadataChangeEvent;
import com.linkedin.events.metadata.Schemaless;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.exception.ExceptionUtils;
@ -94,15 +96,18 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
throw new Exception("Dataset name too long: " + identifier); throw new Exception("Dataset name too long: " + identifier);
} }
final DatasetSchema dsSchema =
(event.schema == null || event.schema instanceof Schemaless) ? null : (DatasetSchema) event.schema;
// TODO: handle Schemaless separately
// create or update dataset // create or update dataset
DictDataset ds = DictDataset ds = _dictDatasetDao.insertUpdateDataset(identifier, changeAuditStamp, event.datasetProperty, dsSchema,
_dictDatasetDao.insertUpdateDataset(identifier, changeAuditStamp, event.datasetProperty, event.schema, event.deploymentInfo, toStringList(event.tags), event.capacity, event.partitionSpec);
event.deploymentInfo, toStringList(event.tags), event.capacity, event.partitionSpec);
// if schema is not null, insert or update schema // if schema is not null, insert or update schema
if (event.schema != null) { if (dsSchema != null) {
_fieldDetailDao.insertUpdateDatasetFields(identifier, ds.getId(), event.datasetProperty, changeAuditStamp, _fieldDetailDao.insertUpdateDatasetFields(identifier, ds.getId(), event.datasetProperty, changeAuditStamp,
event.schema); dsSchema);
} }
// if owners are not null, insert or update owner // if owners are not null, insert or update owner