Add metadata store access functions, update UrnUtil (#422)

This commit is contained in:
Yi (Alan) Wang 2017-04-07 17:07:11 -07:00 committed by Mars Lan
parent ca12a859cc
commit e729526486
7 changed files with 768 additions and 182 deletions

View File

@ -0,0 +1,130 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metastore.client;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringArray;
import com.linkedin.dataset.Dataset;
import com.linkedin.dataset.DatasetKey;
import com.linkedin.dataset.DatasetsGetRequestBuilder;
import com.linkedin.dataset.DatasetsRequestBuilders;
import com.linkedin.dataset.DeploymentInfoArray;
import com.linkedin.dataset.VersionedDataset;
import com.linkedin.dataset.VersionedDatasetsRequestBuilders;
import com.linkedin.restli.client.CreateIdRequest;
import com.linkedin.restli.client.PartialUpdateRequest;
import com.linkedin.restli.client.Request;
import com.linkedin.restli.client.ResponseFuture;
import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.client.util.PatchGenerator;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.IdResponse;
import com.linkedin.restli.common.PatchRequest;
import java.util.List;
import static metastore.util.UrnUtil.*;
public class Datasets {
private final RestClient _client;
private static final DatasetsRequestBuilders _datasetsBuilder = new DatasetsRequestBuilders();
private static final VersionedDatasetsRequestBuilders _versionedDatasetsBuilder =
new VersionedDatasetsRequestBuilders();
public Datasets(RestClient metadataStoreClient) {
_client = metadataStoreClient;
}
public DatasetKey toDatasetKey(String platformName, String datasetName, String origin) throws Exception {
return new DatasetKey().setName(datasetName)
.setPlatform(toDataPlatformUrn(platformName))
.setOrigin(toFabricType(origin));
}
public Dataset getDataset(String platformName, String datasetName, String origin) throws Exception {
DatasetKey key = toDatasetKey(platformName, datasetName, origin);
DatasetsGetRequestBuilder builder = _datasetsBuilder.get();
Request<Dataset> req = builder.id(new ComplexResourceKey<>(key, new EmptyRecord())).build();
// Send the request and wait for a response
final ResponseFuture<Dataset> responseFuture = _client.sendRequest(req);
return responseFuture.getResponse().getEntity();
}
public VersionedDataset getVersionedDataset(String platformName, String datasetName, String origin) throws Exception {
DatasetKey key = toDatasetKey(platformName, datasetName, origin);
Request<Dataset> reqDataset = _datasetsBuilder.get().id(new ComplexResourceKey<>(key, new EmptyRecord())).build();
Long datasetId = _client.sendRequest(reqDataset).getResponse().getEntity().getId();
Request<VersionedDataset> req = _versionedDatasetsBuilder.get()
.id(datasetId)
.datasetKeyKey(new ComplexResourceKey<>(key, new EmptyRecord()))
.build();
ResponseFuture<VersionedDataset> responseFuture = _client.sendRequest(req);
return responseFuture.getResponse().getEntity();
}
public DatasetKey createDataset(String platformName, String datasetName, String origin, String nativeType,
String description, List<String> tags, String uri, Urn actor) throws Exception {
Dataset newDataset = new Dataset().setPlatform(toDataPlatformUrn(platformName))
.setName(datasetName)
.setOrigin(toFabricType(origin))
.setDescription(description)
.setDeploymentInfos(new DeploymentInfoArray()) // TODO: add deployment info when creating dataset
.setTags(new StringArray(tags))
.setCreated(new AuditStamp().setActor(actor).setTime(System.currentTimeMillis()))
.setLastModified(new AuditStamp().setActor(actor))
.setPlatformNativeType(toPlatformNativeType(nativeType))
.setUri(new java.net.URI(uri));
CreateIdRequest<ComplexResourceKey<DatasetKey, EmptyRecord>, Dataset> req =
_datasetsBuilder.create().input(newDataset).build();
IdResponse<ComplexResourceKey<DatasetKey, EmptyRecord>> resp = _client.sendRequest(req).getResponseEntity();
return resp.getId().getKey();
}
public void partialUpdateDataset(String platformName, String datasetName, String origin, String description,
List<String> tags, String uri, Urn actor) throws Exception {
Dataset originDs = getDataset(platformName, datasetName, origin);
partialUpdateDataset(platformName, datasetName, origin, originDs, description, tags, uri, actor);
}
public void partialUpdateDataset(String platformName, String datasetName, String origin, Dataset originDs,
String description, List<String> tags, String uri, Urn actor) throws Exception {
DatasetKey key = toDatasetKey(platformName, datasetName, origin);
Dataset newDs = originDs.copy()
.setDescription(description)
.setTags(new StringArray(tags))
.setUri(new java.net.URI(uri))
.setLastModified(new AuditStamp().setActor(actor));
PatchRequest<Dataset> patch = PatchGenerator.diff(originDs, newDs);
PartialUpdateRequest<Dataset> req =
_datasetsBuilder.partialUpdate().id(new ComplexResourceKey<>(key, new EmptyRecord())).input(patch).build();
EmptyRecord resp = _client.sendRequest(req).getResponseEntity();
}
}

View File

@ -0,0 +1,207 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metastore.client;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.data.template.StringArray;
import com.linkedin.dataset.AutoPurge;
import com.linkedin.dataset.DatasetPrivacyCompliancePoliciesCreateRequestBuilder;
import com.linkedin.dataset.DatasetPrivacyCompliancePoliciesFindByDatasetRequestBuilder;
import com.linkedin.dataset.DatasetPrivacyCompliancePoliciesGetRequestBuilder;
import com.linkedin.dataset.DatasetPrivacyCompliancePoliciesRequestBuilders;
import com.linkedin.dataset.LimitedRetention;
import com.linkedin.dataset.ManualPurge;
import com.linkedin.dataset.PrivacyCompliancePolicy;
import com.linkedin.dataset.PrivacyCompliancePolicyKey;
import com.linkedin.dataset.PurgeNotApplicable;
import com.linkedin.dataset.SecurityClassification;
import com.linkedin.dataset.SecurityFieldSpec;
import com.linkedin.dataset.SecurityFieldSpecArray;
import com.linkedin.dataset.SecurityMetadata;
import com.linkedin.dataset.SecurityMetadataKey;
import com.linkedin.dataset.SecurityMetadataRequestBuilders;
import com.linkedin.restli.client.CreateIdRequest;
import com.linkedin.restli.client.FindRequest;
import com.linkedin.restli.client.Request;
import com.linkedin.restli.client.ResponseFuture;
import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.common.CollectionResponse;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.IdResponse;
import java.util.List;
import java.util.Map;
import static metastore.util.UrnUtil.*;
public class PrivacySecurity {
private final RestClient _client;
private static final DatasetPrivacyCompliancePoliciesRequestBuilders _privacyComplianceBuilder =
new DatasetPrivacyCompliancePoliciesRequestBuilders();
private static final SecurityMetadataRequestBuilders _securityBuilder = new SecurityMetadataRequestBuilders();
public PrivacySecurity(RestClient metadataStoreClient) {
_client = metadataStoreClient;
}
public PrivacyCompliancePolicy getPrivacyCompliancePolicy(String platformName, String datasetName, String origin,
long version) throws Exception {
DatasetUrn urn = toDatasetUrn(platformName, datasetName, origin);
PrivacyCompliancePolicyKey key = new PrivacyCompliancePolicyKey().setDataset(urn).setVersion(version);
DatasetPrivacyCompliancePoliciesGetRequestBuilder builder = _privacyComplianceBuilder.get();
Request<PrivacyCompliancePolicy> req = builder.id(new ComplexResourceKey<>(key, new EmptyRecord())).build();
ResponseFuture<PrivacyCompliancePolicy> responseFuture = _client.sendRequest(req);
return responseFuture.getResponse().getEntity();
}
public PrivacyCompliancePolicy getPrivacyCompliancePolicyLatest(String platformName, String datasetName,
String origin) throws Exception {
DatasetUrn urn = toDatasetUrn(platformName, datasetName, origin);
DatasetPrivacyCompliancePoliciesFindByDatasetRequestBuilder builder = _privacyComplianceBuilder.findByDataset();
FindRequest<PrivacyCompliancePolicy> req = builder.datasetParam(urn).build();
ResponseFuture<CollectionResponse<PrivacyCompliancePolicy>> responseFuture = _client.sendRequest(req);
long version = 0;
PrivacyCompliancePolicy latestPolicy = null;
for (PrivacyCompliancePolicy record : responseFuture.getResponse().getEntity().getElements()) {
if (record.getVersion() > version) {
latestPolicy = record;
version = record.getVersion();
}
}
return latestPolicy;
}
public PrivacyCompliancePolicyKey createPrivacyCompliancePolicy(String user, String platformName, String datasetName,
String origin, String purgeType, List<String> purgeFields) throws Exception {
CorpuserUrn actor = toCorpuserUrn(user);
PrivacyCompliancePolicy.PurgeMechanism purgeMechanism = new PrivacyCompliancePolicy.PurgeMechanism();
switch (purgeType) {
case "AUTO_PURGE":
purgeMechanism.setAutoPurge(new AutoPurge().setOwnerFields(new StringArray(purgeFields)));
break;
case "LIMITED_RETENTION":
purgeMechanism.setLimitedRetention(new LimitedRetention().setOwnerFields(new StringArray(purgeFields)));
break;
case "CUSTOM_PURGE":
purgeMechanism.setManualPurge(new ManualPurge().setOwnerFields(new StringArray(purgeFields)));
break;
case "PURGE_NOT_APPLICABLE":
purgeMechanism.setPurgeNotApplicable(new PurgeNotApplicable());
break;
default:
throw new IllegalArgumentException("Invalid privacy compliance purge type: " + purgeType);
}
DatasetUrn datasetUrn = toDatasetUrn(platformName, datasetName, origin);
DatasetPrivacyCompliancePoliciesCreateRequestBuilder builder = _privacyComplianceBuilder.create();
PrivacyCompliancePolicy policy =
new PrivacyCompliancePolicy().setCreated(new AuditStamp().setActor(actor).setTime(System.currentTimeMillis()))
.setLastModified(new AuditStamp().setActor(actor))
.setPurgeMechanism(purgeMechanism)
.setDataset(datasetUrn);
CreateIdRequest<ComplexResourceKey<PrivacyCompliancePolicyKey, EmptyRecord>, PrivacyCompliancePolicy> req =
builder.input(policy).actorParam(actor).build();
IdResponse<ComplexResourceKey<PrivacyCompliancePolicyKey, EmptyRecord>> resp =
_client.sendRequest(req).getResponseEntity();
return resp.getId().getKey();
}
public SecurityMetadata getSecurityMetadata(String platformName, String datasetName, String origin, long version)
throws Exception {
DatasetUrn urn = toDatasetUrn(platformName, datasetName, origin);
SecurityMetadataKey key = new SecurityMetadataKey().setDataset(urn).setVersion(version);
Request<SecurityMetadata> req = _securityBuilder.get().id(new ComplexResourceKey<>(key, new EmptyRecord())).build();
ResponseFuture<SecurityMetadata> responseFuture = _client.sendRequest(req);
return responseFuture.getResponse().getEntity();
}
public SecurityMetadata getSecurityMetadataLatest(String platformName, String datasetName, String origin)
throws Exception {
DatasetUrn urn = toDatasetUrn(platformName, datasetName, origin);
FindRequest<SecurityMetadata> req = _securityBuilder.findByDataset().datasetParam(urn).build();
ResponseFuture<CollectionResponse<SecurityMetadata>> responseFuture = _client.sendRequest(req);
long version = 0;
SecurityMetadata latestSecurity = null;
for (SecurityMetadata record : responseFuture.getResponse().getEntity().getElements()) {
if (record.getVersion() > version) {
latestSecurity = record;
version = record.getVersion();
}
}
return latestSecurity;
}
/**
* Create SecurityMetadata
* @param user String corp user
* @param platformName String
* @param datasetName String
* @param origin String
* @param fieldSpecs Map(String fieldName : String classification)
* @return SecurityMetadataKey
* @throws Exception
*/
public SecurityMetadataKey createSecurityMetadata(String user, String platformName, String datasetName, String origin,
Map<String, String> fieldSpecs) throws Exception {
CorpuserUrn actor = toCorpuserUrn(user);
DatasetUrn datasetUrn = toDatasetUrn(platformName, datasetName, origin);
SecurityFieldSpecArray securityFields = new SecurityFieldSpecArray();
for (Map.Entry<String, String> entry : fieldSpecs.entrySet()) {
securityFields.add(new SecurityFieldSpec().setSchemaField(entry.getKey())
.setClassification(toSecurityClassification(entry.getValue())));
}
SecurityClassification datasetSecurity = getHighestSecurityClassification(fieldSpecs.values());
SecurityMetadata security =
new SecurityMetadata().setCreated(new AuditStamp().setActor(actor).setTime(System.currentTimeMillis()))
.setLastModified(new AuditStamp().setActor(actor))
.setDatasetSecurityClassification(datasetSecurity)
.setSensitiveFields(securityFields)
.setDataset(datasetUrn);
CreateIdRequest<ComplexResourceKey<SecurityMetadataKey, EmptyRecord>, SecurityMetadata> req =
_securityBuilder.create().input(security).build();
IdResponse<ComplexResourceKey<SecurityMetadataKey, EmptyRecord>> resp =
_client.sendRequest(req).getResponseEntity();
return resp.getId().getKey();
}
}

View File

@ -0,0 +1,44 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metastore.client;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.RestClient;
import java.util.Collections;
public class RestliClient {
private final HttpClientFactory http;
private final Client r2Client;
private final RestClient client;
public RestliClient(String serverUrl) {
http = new HttpClientFactory();
r2Client = new TransportClientAdapter(http.getClient(Collections.<String, String>emptyMap()));
client = new RestClient(r2Client, serverUrl);
}
public RestClient getClient() {
return client;
}
public void shutdown() {
client.shutdown(new FutureCallback<>());
http.shutdown(new FutureCallback<>());
}
}

View File

@ -0,0 +1,180 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metastore.client;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.DatasetUrnArray;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.StringArray;
import com.linkedin.dataset.EspressoSchema;
import com.linkedin.dataset.ForeignKeySpecMap;
import com.linkedin.dataset.KafkaSchema;
import com.linkedin.dataset.OracleDDL;
import com.linkedin.dataset.SchemaField;
import com.linkedin.dataset.SchemaFieldArray;
import com.linkedin.dataset.SchemaFieldDataType;
import com.linkedin.dataset.SchemaMetadata;
import com.linkedin.dataset.SchemaMetadataFindByDatasetRequestBuilder;
import com.linkedin.dataset.SchemaMetadataGetRequestBuilder;
import com.linkedin.dataset.SchemaMetadataKey;
import com.linkedin.dataset.SchemaMetadataRequestBuilders;
import com.linkedin.restli.client.CreateIdRequest;
import com.linkedin.restli.client.FindRequest;
import com.linkedin.restli.client.Request;
import com.linkedin.restli.client.ResponseFuture;
import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.common.CollectionResponse;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.IdResponse;
import java.util.List;
import static metastore.util.UrnUtil.*;
public class Schemas {
private final RestClient _client;
private static final SchemaMetadataRequestBuilders _schemaMetadataBuilder = new SchemaMetadataRequestBuilders();
public Schemas(RestClient metadataStoreClient) {
_client = metadataStoreClient;
}
public SchemaMetadata getSchemaMetadata(String platformName, String schemaName, long version) throws Exception {
SchemaMetadataKey key = new SchemaMetadataKey().setSchemaName(schemaName)
.setPlatform(toDataPlatformUrn(platformName))
.setVersion(version);
SchemaMetadataGetRequestBuilder builder = _schemaMetadataBuilder.get();
Request<SchemaMetadata> req = builder.id(new ComplexResourceKey<>(key, new EmptyRecord())).build();
ResponseFuture<SchemaMetadata> responseFuture = _client.sendRequest(req);
return responseFuture.getResponse().getEntity();
}
public SchemaMetadata getLatestSchemaByDataset(String platformName, String datasetName, String origin)
throws Exception {
DatasetUrn urn = toDatasetUrn(platformName, datasetName, origin);
SchemaMetadataFindByDatasetRequestBuilder builder = _schemaMetadataBuilder.findByDataset();
FindRequest<SchemaMetadata> req = builder.datasetParam(urn).build();
ResponseFuture<CollectionResponse<SchemaMetadata>> responseFuture = _client.sendRequest(req);
long version = 0;
SchemaMetadata latestSchema = null;
for (SchemaMetadata sc : responseFuture.getResponse().getEntity().getElements()) {
if (sc.getVersion() > version) {
latestSchema = sc;
version = sc.getVersion();
}
}
return latestSchema;
}
public SchemaMetadataKey createSchemaMetadataEspresso(String platformName, String schemaName, String datasetName,
String origin, String tableSchema, String docSchema, Urn actor) throws Exception {
EspressoSchema schema = new EspressoSchema().setTableSchema(tableSchema).setDocumentSchema(docSchema);
SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
platformSchema.setEspressoSchema(schema);
return createSchemaMetadata(platformName, schemaName, datasetName, origin, platformSchema, new SchemaFieldArray(),
actor);
}
public SchemaMetadataKey createSchemaMetadataKafka(String platformName, String schemaName, String datasetName,
String origin, String docSchema, Urn actor) throws Exception {
KafkaSchema schema = new KafkaSchema().setDocumentSchema(docSchema);
SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
platformSchema.setKafkaSchema(schema);
return createSchemaMetadata(platformName, schemaName, datasetName, origin, platformSchema, new SchemaFieldArray(),
actor);
}
public SchemaMetadataKey createSchemaMetadataOracle(String platformName, String schemaName, String datasetName,
String origin, List<Object[]> fieldInfo, Urn actor) throws Exception {
OracleDDL ddl = new OracleDDL().setTableSchema("");
SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
platformSchema.setOracleDDL(ddl);
SchemaFieldArray fields = new SchemaFieldArray();
for (Object[] fd : fieldInfo) {
// sort_id, field_name, data_type, is_nullable, is_recursive, modified
String fieldname = fd[1].toString();
SchemaFieldDataType dataType = toFieldDataType(fd[2].toString());
Boolean nullable = fd[3] != null && 'Y' == (char) fd[3];
Boolean recursive = fd[4] != null && 'Y' == (char) fd[4];
String comment = String.valueOf(fd[5]);
fields.add(new SchemaField().setFieldPath(fieldname)
.setType(dataType)
.setNullable(nullable)
.setRecursive(recursive)
.setDescription(comment));
}
//System.out.println(fields);
return createSchemaMetadata(platformName, schemaName, datasetName, origin, platformSchema, fields, actor);
}
public SchemaMetadataKey createSchemaMetadata(String platformName, String schemaName, String datasetName,
String origin, SchemaMetadata.PlatformSchema platformSchema, SchemaFieldArray fields, Urn actor)
throws Exception {
DatasetUrnArray datasetUrns = new DatasetUrnArray();
datasetUrns.add(toDatasetUrn(platformName, datasetName, origin));
SchemaMetadata newSchema = new SchemaMetadata().setSchemaName(schemaName)
.setPlatform(toDataPlatformUrn(platformName))
.setPermissibleDatasets(datasetUrns)
.setPlatformSchema(platformSchema)
.setPlatformSchemaVersion("v-1") // TODO: get version
.setFields(fields)
.setPrimaryKeys(new StringArray())
.setForeignKeysSpecs(new ForeignKeySpecMap())
.setCreated(new AuditStamp().setActor(actor).setTime(System.currentTimeMillis()))
.setLastModified(new AuditStamp().setActor(actor));
CreateIdRequest<ComplexResourceKey<SchemaMetadataKey, EmptyRecord>, SchemaMetadata> req =
_schemaMetadataBuilder.create().input(newSchema).actorParam(actor).build();
//System.out.println(req);
//System.out.println(req.getInputRecord());
//return null;
IdResponse<ComplexResourceKey<SchemaMetadataKey, EmptyRecord>> resp = _client.sendRequest(req).getResponseEntity();
return resp.getId().getKey();
}
public SchemaMetadataKey updateSchemaFields(String platformName, String datasetName, String origin,
SchemaFieldArray fields, Urn actor) throws Exception {
// TODO: do we really need to query the latest schema before merge update
SchemaMetadata originSchema = getLatestSchemaByDataset(platformName, datasetName, origin);
SchemaMetadata newSchema = originSchema.copy().setFields(fields).setLastModified(new AuditStamp().setActor(actor));
// call create with parameter merge to be true
CreateIdRequest<ComplexResourceKey<SchemaMetadataKey, EmptyRecord>, SchemaMetadata> req =
_schemaMetadataBuilder.create().input(newSchema).actorParam(actor).mergeWithLatestParam(true).build();
IdResponse<ComplexResourceKey<SchemaMetadataKey, EmptyRecord>> resp = _client.sendRequest(req).getResponseEntity();
return resp.getId().getKey();
}
}

View File

@ -0,0 +1,198 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metastore.util;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.MemberUrn;
import com.linkedin.common.urn.TupleKey;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.BytesType;
import com.linkedin.dataset.NullType;
import com.linkedin.dataset.NumberType;
import com.linkedin.dataset.PlatformNativeType;
import com.linkedin.dataset.SchemaFieldDataType;
import com.linkedin.dataset.SecurityClassification;
import com.linkedin.dataset.StringType;
import java.net.URISyntaxException;
import java.util.Collection;
public class UrnUtil {
/**
* Transform platform String into DataPlatformUrn
* @param platformName String
* @return DataPlatformUrn
* @throws URISyntaxException
*/
public static DataPlatformUrn toDataPlatformUrn(String platformName) throws URISyntaxException {
return DataPlatformUrn.deserialize("urn:li:dataPlatform:" + platformName);
}
/**
* Transform platform + dataset + origin into DatasetUrn
* @param platformName String
* @param datasetName String
* @param origin String
* @return DatasetUrn
* @throws URISyntaxException
*/
public static DatasetUrn toDatasetUrn(String platformName, String datasetName, String origin)
throws URISyntaxException {
TupleKey keys = new TupleKey("urn:li:dataPlatform:" + platformName, datasetName, toFabricType(origin).name());
Urn urn = new Urn("dataset", keys);
return DatasetUrn.createFromUrn(urn);
}
/**
* Transform fabric String to FabricType
* @param fabric String
* @return FabricType
*/
public static FabricType toFabricType(String fabric) {
switch (fabric.toUpperCase()) {
case "PROD":
return FabricType.PROD;
case "CORP":
return FabricType.CORP;
case "EI":
return FabricType.EI;
case "DEV":
return FabricType.DEV;
default:
return FabricType.$UNKNOWN;
}
}
/**
* Transform nativeType String to PlatformNativeType enum
* @param nativeType String
* @return PlatformNativeType
*/
public static PlatformNativeType toPlatformNativeType(String nativeType) {
switch (nativeType.toUpperCase()) {
case "TABLE":
return PlatformNativeType.TABLE;
case "VIEW":
return PlatformNativeType.VIEW;
case "STREAM":
return PlatformNativeType.STREAM;
case "DIRECTORY":
return PlatformNativeType.DIRECTORY;
default:
return PlatformNativeType.$UNKNOWN;
}
}
/**
* Transform security classification string to SecurityClassification
* @param securityClass String
* @return SecurityClassification
*/
public static SecurityClassification toSecurityClassification(String securityClass) {
switch (securityClass.toUpperCase()) {
case "HIGHLY_CONFIDENTIAL":
return SecurityClassification.HIGHLY_CONFIDENTIAL;
case "CONFIDENTIAL":
return SecurityClassification.CONFIDENTIAL;
case "UNCLASSIFIED":
case "NOT CONFIDENTIAL":
return SecurityClassification.UNCLASSIFIED;
default:
return SecurityClassification.$UNKNOWN;
}
}
/**
* Find the highest security classification from classification Strings
* @param classifications Collection<String>
* @return SecurityClassification
*/
public static SecurityClassification getHighestSecurityClassification(Collection<String> classifications) {
if (classifications.stream().anyMatch(s -> s.equalsIgnoreCase("HIGHLY_CONFIDENTIAL"))) {
return SecurityClassification.HIGHLY_CONFIDENTIAL;
} else if (classifications.stream().anyMatch(s -> s.equalsIgnoreCase("CONFIDENTIAL"))) {
return SecurityClassification.CONFIDENTIAL;
} else if (classifications.stream()
.anyMatch(s -> s.equalsIgnoreCase("UNCLASSIFIED") || s.equalsIgnoreCase("NOT CONFIDENTIAL"))) {
return SecurityClassification.UNCLASSIFIED;
} else {
return SecurityClassification.$UNKNOWN;
}
}
/**
* Convert data field native type string to SchemaFieldDataType
* @param type String
* @return SchemaFieldDataType
*/
public static SchemaFieldDataType toFieldDataType(String type) {
if (type == null) {
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NullType()));
} else if (type.equalsIgnoreCase("NUMBER") || type.equalsIgnoreCase("LONG") || type.equalsIgnoreCase("FLOAT")) {
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NumberType()));
} else if (type.equalsIgnoreCase("CHAR") || type.equalsIgnoreCase("VARCHAR2") || type.equalsIgnoreCase("NVARCHAR2")
|| type.equalsIgnoreCase("XMLTYPE")) {
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType()));
} else if (type.equalsIgnoreCase("DATE") || type.startsWith("TIMESTAMP")) {
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NumberType()));
} else if (type.equalsIgnoreCase("BLOB") || type.startsWith("CLOB") || type.equalsIgnoreCase("NCLOB")
|| type.equalsIgnoreCase("RAW")) {
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new BytesType()));
} else {
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType()));
}
}
/**
* Convert long memberId to MemberUrn
* @param memberId long
* @return MemberUrn
* @throws URISyntaxException
*/
public static MemberUrn toMemberUrn(long memberId) throws URISyntaxException {
return MemberUrn.deserialize("urn:li:member:" + memberId);
}
/**
* Convert String corp userId to CorpuserUrn
* @param userId String
* @return CorpuserUrn
* @throws URISyntaxException
*/
public static CorpuserUrn toCorpuserUrn(String userId) throws URISyntaxException {
return CorpuserUrn.deserialize("urn:li:corpuser:" + userId);
}
/**
* Split WhereHows dataset URN into two parts: platform + dataset name
* Also replace '/' with '.' in dataset name for Espresso and Oracle
* @param urn String WhereHows dataset URN
* @return String[] platform + dataset name
*/
public static String[] splitWhUrn(String urn) {
int index = urn.indexOf(":///");
String fabric = urn.substring(0, index);
String dataset = urn.substring(index + 4);
if (fabric.equalsIgnoreCase("espresso") || fabric.equalsIgnoreCase("oracle")) {
dataset = dataset.replace("/", ".");
}
return new String[]{fabric, dataset};
}
}

View File

@ -1,85 +0,0 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.restli.util;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import java.net.URISyntaxException;
public class UrnUtil {
/**
* Transform platform name to DataPlatformUrn
* @param platformName String
* @return DataPlatformUrn
* @throws URISyntaxException
*/
public static DataPlatformUrn toDataPlatformUrn(String platformName) throws URISyntaxException {
return DataPlatformUrn.deserialize("urn:li:dataPlatform:" + platformName);
}
/**
* Transform platform name, dataset name and origin fabric into DatasetUrn
* @param platformName String
* @param datasetName String
* @param origin String
* @return DatasetUrn
* @throws URISyntaxException
*/
public static DatasetUrn toDatasetUrn(String platformName, String datasetName, String origin)
throws URISyntaxException {
return DatasetUrn.createFromUrn(
Urn.createFromTuple("dataset", toDataPlatformUrn(platformName), datasetName, toFabricType(origin)));
}
/**
* Transform fabric string into FabricType enum
* @param fabric String
* @return FabricType
*/
public static FabricType toFabricType(String fabric) {
switch (fabric.toUpperCase()) {
case "PROD":
return FabricType.PROD;
case "CORP":
return FabricType.CORP;
case "EI":
return FabricType.EI;
case "DEV":
return FabricType.DEV;
default:
return FabricType.$UNKNOWN;
}
}
/**
* Split WhereHows dataset URN into two parts: platform + dataset name
* @param urn String WhereHows dataset URN
* @return String[] platform + dataset name
*/
public static String[] splitWhUrn(String urn) {
int index = urn.indexOf(":///");
String fabric = urn.substring(0, index);
String dataset = urn.substring(index + 4);
// for espresso, change '/' back to '.'
if (fabric.equalsIgnoreCase("espresso")) {
dataset = dataset.replace("/", ".");
}
return new String[]{fabric, dataset};
}
}

View File

@ -13,39 +13,18 @@
*/
package dao;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.dataset.Dataset;
import com.linkedin.dataset.DatasetKey;
import com.linkedin.dataset.DatasetPrivacyCompliancePoliciesGetRequestBuilder;
import com.linkedin.dataset.DatasetPrivacyCompliancePoliciesRequestBuilders;
import com.linkedin.dataset.DatasetsGetRequestBuilder;
import com.linkedin.dataset.DatasetsRequestBuilders;
import com.linkedin.dataset.PrivacyCompliancePolicy;
import com.linkedin.dataset.PrivacyCompliancePolicyKey;
import com.linkedin.dataset.SchemaField;
import com.linkedin.dataset.SchemaFieldArray;
import com.linkedin.dataset.SchemaMetadata;
import com.linkedin.dataset.SchemaMetadataFindByDatasetRequestBuilder;
import com.linkedin.dataset.SchemaMetadataGetRequestBuilder;
import com.linkedin.dataset.SchemaMetadataKey;
import com.linkedin.dataset.SchemaMetadataRequestBuilders;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.FindRequest;
import com.linkedin.restli.client.Request;
import com.linkedin.restli.client.ResponseFuture;
import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.common.CollectionResponse;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import metastore.client.RestliClient;
import metastore.client.Schemas;
import models.DatasetColumn;
import play.Play;
import static wherehows.restli.util.UrnUtil.*;
import static metastore.util.UrnUtil.*;
public class MetadataStoreDao {
@ -53,64 +32,15 @@ public class MetadataStoreDao {
private static final String MetadataStoreURL =
Play.application().configuration().getString("wherehows.restli.server.url");
private static final HttpClientFactory http = new HttpClientFactory();
private static final Client r2Client =
new TransportClientAdapter(http.getClient(Collections.<String, String>emptyMap()));
private static final RestliClient metaStore = new RestliClient(MetadataStoreURL);
private static final RestClient _client = metaStore.getClient();
private static final RestClient _client = new RestClient(r2Client, MetadataStoreURL);
private static final Schemas _schemas = new Schemas(_client);
private static final DatasetsRequestBuilders _datasetsBuilders = new DatasetsRequestBuilders();
private static final SchemaMetadataRequestBuilders _schemaMetadataBuilder = new SchemaMetadataRequestBuilders();
private static final DatasetPrivacyCompliancePoliciesRequestBuilders _privacyComplianceBuilder =
new DatasetPrivacyCompliancePoliciesRequestBuilders();
public static Dataset getDataset(String datasetName, String platformName, String origin) throws Exception {
DatasetKey key = new DatasetKey().setName(datasetName)
.setPlatform(toDataPlatformUrn(platformName))
.setOrigin(toFabricType(origin));
DatasetsGetRequestBuilder builder = _datasetsBuilders.get();
Request<Dataset> req = builder.id(new ComplexResourceKey<>(key, new EmptyRecord())).build();
// Send the request and wait for a response
final ResponseFuture<Dataset> responseFuture = _client.sendRequest(req);
return responseFuture.getResponse().getEntity();
}
public static SchemaMetadata getSchemaMetadata(String schemaName, String platformName, long version)
throws Exception {
SchemaMetadataKey key = new SchemaMetadataKey().setSchemaName(schemaName)
.setPlatform(toDataPlatformUrn(platformName))
.setVersion(version);
SchemaMetadataGetRequestBuilder builder = _schemaMetadataBuilder.get();
Request<SchemaMetadata> req = builder.id(new ComplexResourceKey<>(key, new EmptyRecord())).build();
ResponseFuture<SchemaMetadata> responseFuture = _client.sendRequest(req);
return responseFuture.getResponse().getEntity();
}
public static SchemaMetadata getLatestSchemaByDataset(String platformName, String datasetName, String origin)
throws Exception {
DatasetUrn urn = toDatasetUrn(platformName, datasetName, origin);
SchemaMetadataFindByDatasetRequestBuilder builder = _schemaMetadataBuilder.findByDataset();
FindRequest<SchemaMetadata> req = builder.datasetParam(urn).build();
ResponseFuture<CollectionResponse<SchemaMetadata>> responseFuture = _client.sendRequest(req);
long version = 0;
SchemaMetadata latestSchema = null;
for (SchemaMetadata sc : responseFuture.getResponse().getEntity().getElements()) {
if (sc.getVersion() > version) {
latestSchema = sc;
version = sc.getVersion();
}
}
return latestSchema;
public static SchemaMetadata getLatestSchemaByWhUrn(String urn) throws Exception {
String[] urnParts = splitWhUrn(urn);
return _schemas.getLatestSchemaByDataset(urnParts[0], urnParts[1], "PROD");
}
public static List<DatasetColumn> datasetColumnsMapper(SchemaFieldArray fields) {
@ -125,22 +55,4 @@ public class MetadataStoreDao {
}
return columns;
}
public static SchemaMetadata getLatestSchemaByWhUrn(String urn) throws Exception {
String[] urnParts = splitWhUrn(urn);
return getLatestSchemaByDataset(urnParts[0], urnParts[1], "PROD");
}
public static PrivacyCompliancePolicy getPrivacyCompliancePolicy(String platformName, String datasetName,
String origin, long version) throws Exception {
DatasetUrn urn = toDatasetUrn(platformName, datasetName, origin);
PrivacyCompliancePolicyKey key = new PrivacyCompliancePolicyKey().setDataset(urn).setVersion(version);
DatasetPrivacyCompliancePoliciesGetRequestBuilder builder = _privacyComplianceBuilder.get();
Request<PrivacyCompliancePolicy> req = builder.id(new ComplexResourceKey<>(key, new EmptyRecord())).build();
ResponseFuture<PrivacyCompliancePolicy> responseFuture = _client.sendRequest(req);
return responseFuture.getResponse().getEntity();
}
}