diff --git a/wherehows-data-model/src/main/avro/CompliancePolicy.avsc b/wherehows-data-model/src/main/avro/CompliancePolicy.avsc index 387fee6526..1c899c9c92 100644 --- a/wherehows-data-model/src/main/avro/CompliancePolicy.avsc +++ b/wherehows-data-model/src/main/avro/CompliancePolicy.avsc @@ -29,13 +29,14 @@ } } }, - { "name" : "retentionWindow", - "type" : [ + { + "name": "retentionWindow", + "type": [ "null", "long" ], - "doc" : "How long data is retained in seconds for the case of MANUAL_LIMITED_RETENTION", - "default" : null + "doc": "How long data is retained in seconds for the case of MANUAL_LIMITED_RETENTION", + "default": null }, { "name": "compliancePurgeNote", @@ -66,6 +67,15 @@ "name": "datasetClassification", "type": "DatasetClassification", "doc": "dataset level classification: dataset contains such category of data or not, e.g. 'PROFILE_DATA':false" + }, + { + "name": "containsPersonalData", + "type": [ + "null", + "boolean" + ], + "doc": "Use this field to specify whether the dataset contains any personal data for schemaless systems like Couchbase, Ambry etc", + "default": null } ] } diff --git a/wherehows-data-model/src/main/avro/MetadataChangeEvent.avsc b/wherehows-data-model/src/main/avro/MetadataChangeEvent.avsc index d23f73104c..d209a64034 100644 --- a/wherehows-data-model/src/main/avro/MetadataChangeEvent.avsc +++ b/wherehows-data-model/src/main/avro/MetadataChangeEvent.avsc @@ -21,13 +21,18 @@ }, { "name": "datasetProperty", - "type": [ "null", "DatasetProperty" ], + "type": [ + "null", + "DatasetProperty" + ], "doc": "Basic properties of a dataset, such as Native Type, Case Sensitivity, URI" }, { "name": "owners", "doc": "A complete list of owners info", - "type": [ "null", { + "type": [ + "null", + { "type": "array", "items": "OwnerInfo", "doc": "Ownership information" @@ -36,12 +41,17 @@ }, { "name": "partitionSpec", - "type": [ "null", "PartitionSpecification" ], + "type": [ + "null", + "PartitionSpecification" + ], "doc": "Partition specification detail" }, { "name": "deploymentInfo", - "type": [ "null", { + "type": [ + "null", + { "type": "array", "items": "DeploymentDetail" } @@ -50,7 +60,9 @@ }, { "name": "tags", - "type": [ "null", { + "type": [ + "null", + { "type": "array", "items": "string" } @@ -59,12 +71,18 @@ }, { "name": "schema", - "doc": "The schema/structure definition of a dataset. For Key-Value and Document db, a dedicated KeySchema is provided. Schema includes KeySchema, OriginalSchema, FieldSchema, ChangeDataCaptureFields, AuditFields", - "type": [ "null", "DatasetSchema" ] + "doc": "The schema/structure definition of a dataset. For Key-Value and Document db, a dedicated KeySchema is provided. Schema includes KeySchema, OriginalSchema, FieldSchema, ChangeDataCaptureFields, AuditFields. Set this field to Schemaless explicitly for schemaless systems.", + "type": [ + "null", + "DatasetSchema", + "Schemaless" + ] }, { "name": "constraints", - "type": [ "null", { + "type": [ + "null", + { "type": "array", "doc": "Array of constraints", "items": "Constraint" @@ -73,7 +91,9 @@ }, { "name": "indices", - "type": [ "null", { + "type": [ + "null", + { "type": "array", "doc": "Array of indices", "items": "Index" @@ -82,7 +102,9 @@ }, { "name": "capacity", - "type": [ "null", { + "type": [ + "null", + { "type": "array", "doc": "Array of capacity info", "items": "Capacity" @@ -91,12 +113,18 @@ }, { "name": "compliancePolicy", - "type": [ "null", "CompliancePolicy" ], + "type": [ + "null", + "CompliancePolicy" + ], "doc": "Human-entered compliance metadata" }, { "name": "suggestedCompliancePolicy", - "type": [ "null", "SuggestedCompliancePolicy" ], + "type": [ + "null", + "SuggestedCompliancePolicy" + ], "default": null, "doc": "Machine-suggested compliance metadata" } diff --git a/wherehows-data-model/src/main/avro/Schemaless.avsc b/wherehows-data-model/src/main/avro/Schemaless.avsc new file mode 100644 index 0000000000..3c91fd4682 --- /dev/null +++ b/wherehows-data-model/src/main/avro/Schemaless.avsc @@ -0,0 +1,8 @@ +{ + "type": "record", + "name": "Schemaless", + "doc": "Denotes that the dataset contains no explicitly defined schema, e.g. Couchbase", + "namespace": "com.linkedin.events.metadata", + "fields": [ + ] +} \ No newline at end of file diff --git a/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java b/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java index 36a4aa62b9..0e0f5342c9 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java +++ b/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java @@ -17,8 +17,10 @@ import com.linkedin.events.KafkaAuditHeader; import com.linkedin.events.metadata.ChangeAuditStamp; import com.linkedin.events.metadata.ChangeType; import com.linkedin.events.metadata.DatasetIdentifier; +import com.linkedin.events.metadata.DatasetSchema; import com.linkedin.events.metadata.FailedMetadataChangeEvent; import com.linkedin.events.metadata.MetadataChangeEvent; +import com.linkedin.events.metadata.Schemaless; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -94,15 +96,18 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor { throw new Exception("Dataset name too long: " + identifier); } + final DatasetSchema dsSchema = + (event.schema == null || event.schema instanceof Schemaless) ? null : (DatasetSchema) event.schema; + // TODO: handle Schemaless separately + // create or update dataset - DictDataset ds = - _dictDatasetDao.insertUpdateDataset(identifier, changeAuditStamp, event.datasetProperty, event.schema, - event.deploymentInfo, toStringList(event.tags), event.capacity, event.partitionSpec); + DictDataset ds = _dictDatasetDao.insertUpdateDataset(identifier, changeAuditStamp, event.datasetProperty, dsSchema, + event.deploymentInfo, toStringList(event.tags), event.capacity, event.partitionSpec); // if schema is not null, insert or update schema - if (event.schema != null) { + if (dsSchema != null) { _fieldDetailDao.insertUpdateDatasetFields(identifier, ds.getId(), event.datasetProperty, changeAuditStamp, - event.schema); + dsSchema); } // if owners are not null, insert or update owner