Add deployment when saving schema from MCE (#1366)

This commit is contained in:
Yi (Alan) Wang 2018-09-07 11:42:06 -07:00 committed by GitHub
parent 7d2d7ed3c4
commit 3db6a1f460
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 8 deletions

View File

@ -17,6 +17,7 @@ import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.DatasetProperty;
import com.linkedin.events.metadata.DatasetSchema;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.FieldSchema;
import java.util.ArrayList;
import java.util.Arrays;
@ -51,13 +52,14 @@ public class FieldDetailDao extends BaseDao {
* Insert or update dict field details given information from MetadataChangeEvent
* @param identifier DatasetIdentifier
* @param dataset DictDataset
* @param deployments List DeploymentDetail
* @param auditStamp ChangeAuditStamp
* @param schema DatasetSchema
* @throws Exception
*/
public void insertUpdateDatasetFields(@Nonnull DatasetIdentifier identifier, @Nullable DictDataset dataset,
@Nullable DatasetProperty property, @Nonnull ChangeAuditStamp auditStamp, @Nonnull DatasetSchema schema)
throws Exception {
@Nullable List<DeploymentDetail> deployments, @Nullable DatasetProperty property,
@Nonnull ChangeAuditStamp auditStamp, @Nonnull DatasetSchema schema) throws Exception {
if (dataset == null) {
throw new RuntimeException("Fail to update dataset fields, dataset is NULL.");
@ -82,11 +84,12 @@ public class FieldDetailDao extends BaseDao {
/**
* Insert or update Schemaless from MetadataChangeEvent
* @param identifier DatasetIdentifier
* @param deployments List DeploymentDetail
* @param auditStamp ChangeAuditStamp
* @throws Exception
*/
public void insertUpdateSchemaless(@Nonnull DatasetIdentifier identifier, @Nonnull ChangeAuditStamp auditStamp)
throws Exception {
public void insertUpdateSchemaless(@Nonnull DatasetIdentifier identifier,
@Nullable List<DeploymentDetail> deployments, @Nonnull ChangeAuditStamp auditStamp) throws Exception {
throw new UnsupportedOperationException("Support for Schemaless not yet implemented.");
}

View File

@ -49,7 +49,7 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
private final Set<String> _whitelistActors;
private final int MAX_DATASET_NAME_LENGTH = 400;
private final static int MAX_DATASET_NAME_LENGTH = 400;
public MetadataChangeProcessor(Config config, DaoFactory daoFactory, String producerTopic,
KafkaProducer<String, IndexedRecord> producer) {
@ -119,9 +119,10 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
// if schema is not null, insert or update schema
if (dsSchema != null) { // if instanceof DatasetSchema
_fieldDetailDao.insertUpdateDatasetFields(identifier, dataset, event.datasetProperty, changeAuditStamp, dsSchema);
_fieldDetailDao.insertUpdateDatasetFields(identifier, dataset, event.deploymentInfo, event.datasetProperty,
changeAuditStamp, dsSchema);
} else if (event.schema instanceof Schemaless) { // if instanceof Schemaless
_fieldDetailDao.insertUpdateSchemaless(identifier, changeAuditStamp);
_fieldDetailDao.insertUpdateSchemaless(identifier, event.deploymentInfo, changeAuditStamp);
}
// if owners are not null, insert or update owner
@ -132,7 +133,8 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
// if retention or compliance is not null, insert or update retention / compliance
// if both null, bypass this
if (event.compliancePolicy != null || event.retentionPolicy != null) {
_complianceDao.insertUpdateCompliance(identifier, dataset, changeAuditStamp, event.compliancePolicy, event.retentionPolicy);
_complianceDao.insertUpdateCompliance(identifier, dataset, changeAuditStamp, event.compliancePolicy,
event.retentionPolicy);
}
// if suggested compliance is not null, insert or update suggested compliance