From c9dfb637af44a9e8a352e12edeefc429174c2cc0 Mon Sep 17 00:00:00 2001 From: "Yi (Alan) Wang" Date: Thu, 6 Oct 2016 13:33:45 -0700 Subject: [PATCH] Update MetadataChangeEvent APIs according to schema change (#243) * Update MetadataChangeEvent APIs according to schema change * Update MultiproductLoad to reflect new Owner types * Add comments for Owner_type precedence (priority) and compliance --- .../app/models/daos/DatasetInfoDao.java | 414 ++-- .../app/utils/MetadataChangeProcessor.java | 105 +- .../DDL/ETL_DDL/dataset_info_metadata.sql | 38 +- data-model/DDL/create_all_tables_wrapper.sql | 1 + data-model/avro/MetadataChangeEvent.avsc | 1733 +++++++++++++---- data-model/avro/MetadataInventoryEvent.avsc | 216 ++ .../main/resources/jython/MultiproductLoad.py | 9 +- .../wherehows/common/enums/OwnerType.java | 15 +- .../schemas/DatasetDeploymentRecord.java | 11 +- .../common/schemas/DatasetEntityRecord.java | 47 + .../schemas/DatasetFieldSchemaRecord.java | 16 +- .../schemas/DatasetKeySchemaRecord.java | 56 + .../schemas/DatasetOriginalSchemaRecord.java | 66 + .../common/schemas/DatasetOwnerRecord.java | 29 +- .../schemas/DatasetPartitionKeyRecord.java | 10 +- .../schemas/DatasetPartitionRecord.java | 6 +- .../schemas/DatasetSchemaInfoRecord.java | 53 +- .../common/schemas/DatasetSecurityRecord.java | 20 +- 18 files changed, 2062 insertions(+), 783 deletions(-) create mode 100644 data-model/avro/MetadataInventoryEvent.avsc create mode 100644 wherehows-common/src/main/java/wherehows/common/schemas/DatasetEntityRecord.java create mode 100644 wherehows-common/src/main/java/wherehows/common/schemas/DatasetKeySchemaRecord.java create mode 100644 wherehows-common/src/main/java/wherehows/common/schemas/DatasetOriginalSchemaRecord.java diff --git a/backend-service/app/models/daos/DatasetInfoDao.java b/backend-service/app/models/daos/DatasetInfoDao.java index 89a1e66828..4a7a456a6f 100644 --- a/backend-service/app/models/daos/DatasetInfoDao.java +++ b/backend-service/app/models/daos/DatasetInfoDao.java @@ -57,7 +57,7 @@ public class DatasetInfoDao { private static final String DATASET_CASE_SENSITIVE_TABLE = "dataset_case_sensitivity"; private static final String DATASET_REFERENCE_TABLE = "dataset_reference"; private static final String DATASET_PARTITION_TABLE = "dataset_partition"; - private static final String DATASET_SECURITY_TABLE = "dataset_security"; + private static final String DATASET_SECURITY_TABLE = "dataset_security_info"; private static final String DATASET_OWNER_TABLE = "dataset_owner"; private static final String DATASET_OWNER_UNMATCHED_TABLE = "stg_dataset_owner_unmatched"; private static final String DATASET_CONSTRAINT_TABLE = "dataset_constraint"; @@ -214,20 +214,61 @@ public class DatasetInfoDao { DatasetInventoryItemRecord.getInventoryItemColumns()); - private static Object[] findIdAndUrn(JsonNode idNode, JsonNode urnNode) + private static Object[] findIdAndUrn(Integer datasetId) throws SQLException { - final Integer datasetId; - final String urn; - if (!idNode.isMissingNode()) { - datasetId = idNode.asInt(); - urn = DatasetDao.getDatasetById(datasetId).get("urn").toString(); - } else { - urn = urnNode.asText(); - datasetId = Integer.valueOf(DatasetDao.getDatasetByUrn(urn).get("id").toString()); - } + final String urn = datasetId != null ? DatasetDao.getDatasetById(datasetId).get("urn").toString() : null; return new Object[]{datasetId, urn}; } + private static Object[] findIdAndUrn(String urn) + throws SQLException { + final Integer datasetId = urn != null ? Integer.valueOf(DatasetDao.getDatasetByUrn(urn).get("id").toString()) : null; + return new Object[]{datasetId, urn}; + } + + private static Object[] findDataset(JsonNode root) { + // use dataset id to find dataset first + final JsonNode idNode = root.path("datasetId"); + if (!idNode.isMissingNode() && !idNode.isNull()) { + try { + final Object[] idUrn = findIdAndUrn(idNode.asInt()); + if (idUrn[0] != null && idUrn[1] != null) { + return idUrn; + } + } catch (Exception ex) { + } + } + + // use dataset uri to find dataset + final JsonNode properties = root.path("datasetProperties"); + if (!properties.isMissingNode() && !properties.isNull()) { + final JsonNode uri = properties.path("uri"); + if (!uri.isMissingNode() && !uri.isNull()) { + try { + final Object[] idUrn = findIdAndUrn(uri.asText()); + if (idUrn[0] != null && idUrn[1] != null) { + return idUrn; + } + } catch (Exception ex) { + } + } + } + + // use dataset urn to find dataset + final JsonNode urnNode = root.path("urn"); + if (!urnNode.isMissingNode() && !urnNode.isNull()) { + try { + final Object[] idUrn = findIdAndUrn(urnNode.asText()); + if (idUrn[0] != null && idUrn[1] != null) { + return idUrn; + } + } catch (Exception ex) { + } + } + + return new Object[]{null, null}; + } + public static List getDatasetDeploymentByDatasetId(int datasetId) throws DataAccessException { Map params = new HashMap<>(); @@ -262,16 +303,16 @@ public class DatasetInfoDao { public static void updateDatasetDeployment(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); final JsonNode deployment = root.path("deploymentInfo"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || deployment.isMissingNode() || !deployment.isArray()) { + if (deployment.isMissingNode() || !deployment.isArray()) { throw new IllegalArgumentException( - "Dataset deployment info update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset deployment info update error, missing necessary fields: " + root.toString()); } - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString()); + } final Integer datasetId = (Integer) idUrn[0]; final String urn = (String) idUrn[1]; @@ -325,16 +366,16 @@ public class DatasetInfoDao { public static void updateDatasetCapacity(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); final JsonNode capacity = root.path("capacity"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || capacity.isMissingNode() || !capacity.isArray()) { + if (capacity.isMissingNode() || !capacity.isArray()) { throw new IllegalArgumentException( - "Dataset capacity info update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset capacity info update error, missing necessary fields: " + root.toString()); } - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString()); + } final Integer datasetId = (Integer) idUrn[0]; final String urn = (String) idUrn[1]; @@ -387,16 +428,16 @@ public class DatasetInfoDao { public static void updateDatasetTags(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); final JsonNode tags = root.path("tags"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || tags.isMissingNode() || !tags.isArray()) { + if (tags.isMissingNode() || !tags.isArray()) { throw new IllegalArgumentException( - "Dataset tag info update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset tag info update error, missing necessary fields: " + root.toString()); } - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString()); + } final Integer datasetId = (Integer) idUrn[0]; final String urn = (String) idUrn[1]; @@ -440,16 +481,21 @@ public class DatasetInfoDao { public static void updateDatasetCaseSensitivity(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); - final JsonNode caseSensitivity = root.path("caseSensitivity"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || caseSensitivity.isMissingNode()) { + final JsonNode properties = root.path("datasetProperties"); + if (properties.isMissingNode() || properties.isNull()) { throw new IllegalArgumentException( - "Dataset case_sensitivity info update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset properties update fail, missing necessary fields: " + root.toString()); + } + final JsonNode caseSensitivity = properties.path("caseSensitivity"); + if (caseSensitivity.isMissingNode() || caseSensitivity.isNull()) { + throw new IllegalArgumentException( + "Dataset properties update fail, missing necessary fields: " + root.toString()); } - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString()); + } final Integer datasetId = (Integer) idUrn[0]; final String urn = (String) idUrn[1]; @@ -506,16 +552,16 @@ public class DatasetInfoDao { public static void updateDatasetReference(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); final JsonNode references = root.path("references"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || references.isMissingNode() || !references.isArray()) { + if (references.isMissingNode() || !references.isArray()) { throw new IllegalArgumentException( - "Dataset reference info update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset reference info update fail, missing necessary fields: " + root.toString()); } - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString()); + } final Integer datasetId = (Integer) idUrn[0]; final String urn = (String) idUrn[1]; @@ -560,16 +606,16 @@ public class DatasetInfoDao { public static void updateDatasetPartition(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); final JsonNode partition = root.path("partitionSpec"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || partition.isMissingNode()) { + if (partition.isMissingNode() || partition.isNull()) { throw new IllegalArgumentException( - "Dataset reference info update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset reference info update fail, missing necessary fields: " + root.toString()); } - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString()); + } final Integer datasetId = (Integer) idUrn[0]; final String urn = (String) idUrn[1]; @@ -629,16 +675,16 @@ public class DatasetInfoDao { public static void updateDatasetSecurity(JsonNode root) throws Exception { - final JsonNode urnNode = root.path("urn"); - final JsonNode idNode = root.path("datasetId"); final JsonNode security = root.path("securitySpec"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || security.isMissingNode()) { + if (security.isMissingNode() || security.isNull()) { throw new IllegalArgumentException( - "Dataset security info update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset security info update fail, missing necessary fields: " + root.toString()); } - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString()); + } final Integer datasetId = (Integer) idUrn[0]; final String urn = (String) idUrn[1]; @@ -695,29 +741,27 @@ public class DatasetInfoDao { public static void updateDatasetOwner(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); final JsonNode owners = root.path("owners"); + if (owners.isMissingNode() || !owners.isArray()) { + throw new IllegalArgumentException( + "Dataset owner info update fail, missing necessary fields: " + root.toString()); + } + final JsonNode ownerSourceNode = root.path("source"); String ownerSource = null; - - if (ownerSourceNode != null && (!ownerSourceNode.isMissingNode())) - { + if (!ownerSourceNode.isNull() && !ownerSourceNode.isMissingNode()) { ownerSource = ownerSourceNode.asText(); } - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || owners.isMissingNode() || !owners.isArray()) { - throw new IllegalArgumentException( - "Dataset owner info update fail, " + "Json missing necessary fields: " + root.toString()); - } - - Integer datasetId = 0; - String urn = null; - try { - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Integer datasetId; + final String urn; + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + datasetId = 0; + urn = root.path("datasetProperties").path("uri").asText(); + } else { datasetId = (Integer) idUrn[0]; urn = (String) idUrn[1]; - } catch (Exception ex) { } final JsonNode auditHeader = root.path("auditHeader"); @@ -734,15 +778,15 @@ public class DatasetInfoDao { record.setCreatedTime(eventTime); record.setModifiedTime(System.currentTimeMillis() / 1000); - final String ownerString = record.getOwner(); + final String ownerString = record.getOwnerUrn(); int lastIndex = ownerString.lastIndexOf(':'); if (lastIndex >= 0) { - record.setOwner(ownerString.substring(lastIndex + 1)); + record.setOwnerUrn(ownerString.substring(lastIndex + 1)); record.setNamespace(ownerString.substring(0, lastIndex)); } else { record.setNamespace(""); } - Map ownerInfo = getOwnerByOwnerId(record.getOwner()); + Map ownerInfo = getOwnerByOwnerId(record.getOwnerUrn()); Integer appId = 0; String isActive = "N"; if (ownerInfo.containsKey("app_id")) { @@ -766,78 +810,56 @@ public class DatasetInfoDao { mergeDatasetOwners(ownerList, datasetId, urn, ownerSource); } - public static void updateKafkaDatasetOwner( - String datasetUrn, - String owners, - String ownerSource, - Long sourceUnixTime) - throws Exception - { - if (datasetUrn == null) - { + public static void updateKafkaDatasetOwner(String datasetUrn, String owners, String ownerSource, Long sourceUnixTime) + throws Exception { + if (datasetUrn == null) { return; } Integer datasetId = 0; - - try - { + try { datasetId = Integer.parseInt(DatasetDao.getDatasetByUrn(datasetUrn).get("id").toString()); - } - catch(Exception e) - { + } catch (Exception e) { Logger.error("Exception in updateKafkaDatasetOwner: " + e.getMessage()); } - if (datasetId == 0) - { + if (datasetId == 0) { return; } - List ownerList = new ArrayList(); - if (owners != null) - { + List ownerList = new ArrayList<>(); + if (owners != null) { String[] ownerArray = owners.split(","); - if (ownerArray != null && ownerArray.length > 0) - { - for(int i = 0; i < ownerArray.length; i++) - { - String ownerName = null; - String namespace = null; - String ownerIdType = null; - String isGroup = "N"; - String owner = ownerArray[i]; - if (owner != null) - { - int lastIndex = owner.lastIndexOf(':'); - if (lastIndex != -1) - { - ownerName = owner.substring(lastIndex+1); - namespace = owner.substring(0, lastIndex); - if (namespace != null && namespace.equalsIgnoreCase("urn:li:griduser")) - { - isGroup = "Y"; - ownerIdType = "GROUP"; - } - else - { - ownerIdType = "PERSON"; - } + for (String owner : ownerArray) { + String ownerName = null; + String namespace = null; + String ownerIdType = null; + String isGroup = "N"; + if (owner != null) { + int lastIndex = owner.lastIndexOf(':'); + if (lastIndex != -1) { + ownerName = owner.substring(lastIndex + 1); + namespace = owner.substring(0, lastIndex); + if (namespace != null && namespace.equalsIgnoreCase("urn:li:griduser")) { + isGroup = "Y"; + ownerIdType = "GROUP"; + } else { + ownerIdType = "PERSON"; } - DatasetOwnerRecord record = new DatasetOwnerRecord(); - record.setDatasetId(datasetId); - record.setDatasetUrn(datasetUrn); - record.setOwnerType("Producer"); - record.setOwner(ownerName); - record.setOwnerType(ownerIdType); - record.setIsGroup(isGroup); - record.setIsActive("Y"); - record.setNamespace(namespace); - record.setOwnerSource(ownerSource); - record.setSourceTime(sourceUnixTime); - record.setCreatedTime(sourceUnixTime); - record.setModifiedTime(System.currentTimeMillis() / 1000); - ownerList.add(record); } + DatasetOwnerRecord record = new DatasetOwnerRecord(); + record.setDatasetId(datasetId); + record.setDatasetUrn(datasetUrn); + record.setOwnerType("Producer"); + record.setOwnerUrn(ownerName); + record.setOwnerType(ownerIdType); + record.setIsGroup(isGroup); + record.setIsActive("Y"); + record.setNamespace(namespace); + record.setOwnerSource(ownerSource); + record.setSourceTime(sourceUnixTime); + record.setCreatedTime(sourceUnixTime); + record.setModifiedTime(System.currentTimeMillis() / 1000); + ownerList.add(record); } } } @@ -845,54 +867,46 @@ public class DatasetInfoDao { mergeDatasetOwners(ownerList, datasetId, datasetUrn, ownerSource); } - public static void mergeDatasetOwners( - List newOwnerList, - Integer datasetId, - String datasetUrn, - String source) - throws Exception{ - List oldOwnerList = getDatasetOwnerByDatasetUrn(datasetUrn); - Integer sortId = 0; - Map uniqueRecords = new HashMap(); - List combinedList = new ArrayList(); - if (newOwnerList != null) - { - for(DatasetOwnerRecord owner: newOwnerList) - { - owner.setSortId(sortId++); - uniqueRecords.put(owner.getOwner(), owner); - combinedList.add(owner); - } + private static void mergeDatasetOwners(List newOwnerList, Integer datasetId, String datasetUrn, + String source) + throws Exception { + List oldOwnerList = new ArrayList<>(); + try { + oldOwnerList.addAll(getDatasetOwnerByDatasetUrn(datasetUrn)); + } catch (Exception ex) { } - if (oldOwnerList != null) - { - for(DatasetOwnerRecord owner: newOwnerList) - { - DatasetOwnerRecord exist = uniqueRecords.get(owner.getOwner()); - if (exist != null) - { - exist.setDbIds(owner.getDbIds()); - exist.setCreatedTime(StringUtil.toLong(owner.getCreatedTime())); + Integer sortId = 0; + Map uniqueRecords = new HashMap<>(); + List combinedList = new ArrayList<>(); + if (newOwnerList != null) { + for (DatasetOwnerRecord owner : newOwnerList) { + owner.setSortId(sortId++); + uniqueRecords.put(owner.getOwnerUrn(), owner); + combinedList.add(owner); + } + } - // take the higher priority owner category - exist.setOwnerCategory(OwnerType.chooseOwnerType(exist.getOwnerCategory(), owner.getOwnerCategory())); + for (DatasetOwnerRecord owner : oldOwnerList) { + DatasetOwnerRecord exist = uniqueRecords.get(owner.getOwnerUrn()); + if (exist != null) { + exist.setDbIds(owner.getDbIds()); + exist.setCreatedTime(StringUtil.toLong(owner.getCreatedTime())); - // merge owner source as comma separated list - exist.setOwnerSource(mergeOwnerSource(exist.getOwnerSource(), owner.getOwnerSource())); - exist.setConfirmedBy(owner.getConfirmedBy()); - exist.setConfirmedOn(owner.getConfirmedOn()); - } - else - { - if(!(source != null && source.equalsIgnoreCase(owner.getOwnerSource()))) - { - owner.setSortId(sortId++); - uniqueRecords.put(owner.getOwner(), owner); - combinedList.add(owner); - } - } + // take the higher priority owner category + exist.setOwnerCategory(OwnerType.chooseOwnerType(exist.getOwnerCategory(), owner.getOwnerCategory())); + + // merge owner source as comma separated list + exist.setOwnerSource(mergeOwnerSource(exist.getOwnerSource(), owner.getOwnerSource())); + exist.setConfirmedBy(owner.getConfirmedBy()); + exist.setConfirmedOn(owner.getConfirmedOn()); + } else { + if (!(source != null && source.equalsIgnoreCase(owner.getOwnerSource()))) { + owner.setSortId(sortId++); + uniqueRecords.put(owner.getOwnerUrn(), owner); + combinedList.add(owner); } + } } // remove old info then insert new info @@ -979,17 +993,16 @@ public class DatasetInfoDao { public static void updateDatasetConstraint(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); final JsonNode constraints = root.path("constraints"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || constraints.isMissingNode() - || !constraints.isArray()) { + if (constraints.isMissingNode() || !constraints.isArray()) { throw new IllegalArgumentException( - "Dataset constraints info update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset constraints info update fail, missing necessary fields: " + root.toString()); } - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString()); + } final Integer datasetId = (Integer) idUrn[0]; final String urn = (String) idUrn[1]; @@ -1042,16 +1055,16 @@ public class DatasetInfoDao { public static void updateDatasetIndex(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); final JsonNode indices = root.path("indices"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || indices.isMissingNode() || !indices.isArray()) { + if (indices.isMissingNode() || !indices.isArray()) { throw new IllegalArgumentException( - "Dataset indices info update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset indices info update fail, missing necessary fields: " + root.toString()); } - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] == null || idUrn[1] == null) { + throw new IllegalArgumentException("Cannot identify dataset from id/uri/urn: " + root.toString()); + } final Integer datasetId = (Integer) idUrn[0]; final String urn = (String) idUrn[1]; @@ -1104,28 +1117,25 @@ public class DatasetInfoDao { public static void updateDatasetSchema(JsonNode root) throws Exception { - final JsonNode idNode = root.path("datasetId"); - final JsonNode urnNode = root.path("urn"); - final JsonNode schemas = root.path("schemas"); - - if ((idNode.isMissingNode() && urnNode.isMissingNode()) || schemas.isMissingNode()) { + final JsonNode schema = root.path("schema"); + if (schema.isMissingNode() || schema.isNull()) { throw new IllegalArgumentException( - "Dataset schemas update fail, " + "Json missing necessary fields: " + root.toString()); + "Dataset schema update fail, missing necessary fields: " + root.toString()); } Integer datasetId = 0; String urn = null; - try { - final Object[] idUrn = findIdAndUrn(idNode, urnNode); + final Object[] idUrn = findDataset(root); + if (idUrn[0] != null && idUrn[1] != null) { datasetId = (Integer) idUrn[0]; urn = (String) idUrn[1]; - } catch (Exception ex) { - urn = urnNode.asText(); + } else { + urn = root.path("datasetProperties").path("uri").asText(); } ObjectMapper om = new ObjectMapper(); - DatasetSchemaInfoRecord rec = om.convertValue(schemas, DatasetSchemaInfoRecord.class); + DatasetSchemaInfoRecord rec = om.convertValue(schema, DatasetSchemaInfoRecord.class); rec.setDatasetId(datasetId); rec.setDatasetUrn(urn); rec.setModifiedTime(System.currentTimeMillis() / 1000); @@ -1135,8 +1145,8 @@ public class DatasetInfoDao { DatasetRecord record = new DatasetRecord(); record.setUrn(urn); record.setSourceCreatedTime("" + rec.getCreateTime() / 1000); - record.setSchema(rec.getOriginalSchema()); - record.setSchemaType(rec.getFormat()); + record.setSchema(rec.getOriginalSchema().getText()); + record.setSchemaType(rec.getOriginalSchema().getFormat()); record.setFields((String) StringUtil.objectToJsonString(rec.getFieldSchema())); record.setSource("API"); @@ -1154,8 +1164,8 @@ public class DatasetInfoDao { // if dataset already exist in dict_dataset, update info else { DICT_DATASET_WRITER.execute(UPDATE_DICT_DATASET_WITH_SCHEMA_CHANGE, - new Object[]{rec.getOriginalSchema(), rec.getFormat(), StringUtil.objectToJsonString(rec.getFieldSchema()), - "API", System.currentTimeMillis() / 1000, datasetId}); + new Object[]{rec.getOriginalSchema().getText(), rec.getOriginalSchema().getFormat(), + StringUtil.objectToJsonString(rec.getFieldSchema()), "API", System.currentTimeMillis() / 1000, datasetId}); } // get old dataset fields info diff --git a/backend-service/app/utils/MetadataChangeProcessor.java b/backend-service/app/utils/MetadataChangeProcessor.java index 9cb1bf8126..b8fe600f36 100644 --- a/backend-service/app/utils/MetadataChangeProcessor.java +++ b/backend-service/app/utils/MetadataChangeProcessor.java @@ -26,6 +26,10 @@ import wherehows.common.schemas.Record; public class MetadataChangeProcessor { + private final String[] CHANGE_ITEMS = + {"schema", "owners", "datasetProperties", "references", "partitionSpec", "deploymentInfo", "tags", + "constraints", "indices", "capacity", "securitySpec"}; + /** * Process a MetadataChangeEvent record * @param record GenericData.Record @@ -36,97 +40,34 @@ public class MetadataChangeProcessor { public Record process(GenericData.Record record, String topic) throws Exception { if (record != null) { - // Logger.info("Processing Metadata Change Event record. "); + Logger.debug("Processing Metadata Change Event record. "); final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader"); - final GenericData.Array changedItems = - (GenericData.Array) record.get("changedItems"); - if (auditHeader == null || auditHeader.get("time") == null) { Logger.info("MetadataChangeEvent without auditHeader, abort process. " + record.toString()); return null; } - // list change items, schema first - List changes = new ArrayList<>(); - for (GenericData.Record item : changedItems) { - switch (item.get("changeScope").toString().toLowerCase()) { // can't be null - case "*": - Collections.addAll(changes, "schemas", "owners", "references", "partitionSpec", "deploymentInfo", - "caseSensitivity", "tags", "constraints", "indices", "capacity", "securitySpec"); - break; - case "schema": - if (!changes.contains("schemas")) { - changes.add(0, "schemas"); // add to front - } - break; - case "owner": - if (!changes.contains("owners")) { - changes.add("owners"); - } - break; - case "reference": - if (!changes.contains("references")) { - changes.add("references"); - } - break; - case "partition": - if (!changes.contains("partitionSpec")) { - changes.add("partitionSpec"); - } - break; - case "deployment": - if (!changes.contains("deploymentInfo")) { - changes.add("deploymentInfo"); - } - break; - case "casesensitivity": - if (!changes.contains("caseSensitivity")) { - changes.add("caseSensitivity"); - } - break; - case "tag": - if (!changes.contains("tags")) { - changes.add("tags"); - } - break; - case "constraint": - if (!changes.contains("constraints")) { - changes.add("constraints"); - } - break; - case "index": - if (!changes.contains("indices")) { - changes.add("indices"); - } - break; - case "capacity": - if (!changes.contains("capacity")) { - changes.add("capacity"); - } - break; - case "security": - if (!changes.contains("securitySpec")) { - changes.add("securitySpec"); - } - break; - default: - break; - } + final GenericData.Record datasetIdentifier = (GenericData.Record) record.get("datasetIdentifier"); + final GenericData.Record datasetProperties = (GenericData.Record) record.get("datasetProperties"); + final String urn = String.valueOf(record.get("urn")); + + if (urn == null && (datasetProperties == null || datasetProperties.get("uri") == null) + && datasetIdentifier == null) { + Logger.info("Can't identify dataset from uri/urn/datasetIdentifier, abort process. " + record.toString()); + return null; } - Logger.debug("Updating dataset " + changedItems.toString()); - ObjectMapper mapper = new ObjectMapper(); - JsonNode rootNode = mapper.readTree(record.toString()); + final JsonNode rootNode = new ObjectMapper().readTree(record.toString()); - for (String itemName : changes) { + for (String itemName : CHANGE_ITEMS) { // check if the corresponding change field has content if (record.get(itemName) == null) { continue; } switch (itemName) { - case "schemas": + case "schema": try { DatasetInfoDao.updateDatasetSchema(rootNode); } catch (Exception ex) { @@ -140,6 +81,13 @@ public class MetadataChangeProcessor { Logger.debug("Metadata change exception: owner ", ex); } break; + case "datasetProperties": + try { + DatasetInfoDao.updateDatasetCaseSensitivity(rootNode); + } catch (Exception ex) { + Logger.debug("Metadata change exception: case sensitivity ", ex); + } + break; case "references": try { DatasetInfoDao.updateDatasetReference(rootNode); @@ -161,13 +109,6 @@ public class MetadataChangeProcessor { Logger.debug("Metadata change exception: deployment ", ex); } break; - case "caseSensitivity": - try { - DatasetInfoDao.updateDatasetCaseSensitivity(rootNode); - } catch (Exception ex) { - Logger.debug("Metadata change exception: case sensitivity ", ex); - } - break; case "tags": try { DatasetInfoDao.updateDatasetTags(rootNode); diff --git a/data-model/DDL/ETL_DDL/dataset_info_metadata.sql b/data-model/DDL/ETL_DDL/dataset_info_metadata.sql index 44029541a3..db17bf07df 100644 --- a/data-model/DDL/ETL_DDL/dataset_info_metadata.sql +++ b/data-model/DDL/ETL_DDL/dataset_info_metadata.sql @@ -17,10 +17,11 @@ CREATE TABLE dataset_deployment ( `dataset_id` INT UNSIGNED NOT NULL, `dataset_urn` VARCHAR(200) NOT NULL, `deployment_tier` VARCHAR(20) NOT NULL, - `datacenter` VARCHAR(20) NOT NULL, + `datacenter` VARCHAR(20) DEFAULT NULL, `region` VARCHAR(50) DEFAULT NULL, `zone` VARCHAR(50) DEFAULT NULL, `cluster` VARCHAR(100) DEFAULT NULL, + `container` VARCHAR(100) DEFAULT NULL, `enabled` BOOLEAN NOT NULL, `additional_info` TEXT CHAR SET utf8 DEFAULT NULL, `modified_time` INT UNSIGNED DEFAULT NULL @@ -104,22 +105,27 @@ CREATE TABLE dataset_partition ( ENGINE = InnoDB DEFAULT CHARSET = latin1; -CREATE TABLE dataset_security ( - `dataset_id` INT UNSIGNED NOT NULL, - `dataset_urn` VARCHAR(200) NOT NULL, - `classification` VARCHAR(200) DEFAULT NULL, - `record_owner_type` VARCHAR(20) DEFAULT NULL, - `record_owner` VARCHAR(200) DEFAULT NULL, - `compliance_type` VARCHAR(30) DEFAULT NULL, - `retention_policy` VARCHAR(200) DEFAULT NULL, - `geographic_affinity` VARCHAR(200) DEFAULT NULL, - `modified_time` INT UNSIGNED DEFAULT NULL +CREATE TABLE `dataset_security_info` ( + `dataset_id` INT(10) UNSIGNED NOT NULL, + `dataset_urn` VARCHAR(200) NOT NULL, + `classification` VARCHAR(500) DEFAULT NULL + COMMENT 'JSON: confidential fields', + `retention_policy` VARCHAR(200) DEFAULT NULL + COMMENT 'JSON: specification of retention', + `geographic_affinity` VARCHAR(200) DEFAULT NULL + COMMENT 'JSON: must be stored in the geo region', + `record_owner_type` VARCHAR(50) DEFAULT NULL + COMMENT 'MEMBER,CUSTOMER,INTERNAL,COMPANY,GROUP', + `compliance_purge_type` VARCHAR(30) DEFAULT NULL + COMMENT 'AUTO_PURGE,CUSTOM_PURGE,LIMITED_RETENTION,PURGE_NOT_APPLICABLE', + `compliance_purge_entities` VARCHAR(200) DEFAULT NULL, + `modified_time` INT(10) UNSIGNED DEFAULT NULL COMMENT 'the modified time in epoch', PRIMARY KEY (`dataset_id`), - UNIQUE KEY (`dataset_urn`) + UNIQUE KEY `dataset_urn` (`dataset_urn`) ) ENGINE = InnoDB - DEFAULT CHARSET = latin1; + DEFAULT CHARSET = utf8; CREATE TABLE dataset_constraint ( `dataset_id` INT UNSIGNED NOT NULL, @@ -163,11 +169,7 @@ CREATE TABLE dataset_schema_info ( `version` VARCHAR(20) DEFAULT NULL, `name` VARCHAR(100) DEFAULT NULL, `description` TEXT CHAR SET utf8 DEFAULT NULL, - `format` VARCHAR(20) DEFAULT NULL, - `original_schema` TEXT DEFAULT NULL, - `original_schema_checksum` VARCHAR(100) DEFAULT NULL, - `key_schema_type` VARCHAR(20) DEFAULT NULL, - `key_schema_format` VARCHAR(100) DEFAULT NULL, + `original_schema` MEDIUMTEXT CHAR SET utf8 DEFAULT NULL, `key_schema` MEDIUMTEXT CHAR SET utf8 DEFAULT NULL, `is_field_name_case_sensitive` BOOLEAN DEFAULT NULL, `field_schema` MEDIUMTEXT CHAR SET utf8 DEFAULT NULL, diff --git a/data-model/DDL/create_all_tables_wrapper.sql b/data-model/DDL/create_all_tables_wrapper.sql index 36fc371bd4..df4adb42a7 100644 --- a/data-model/DDL/create_all_tables_wrapper.sql +++ b/data-model/DDL/create_all_tables_wrapper.sql @@ -24,6 +24,7 @@ source ETL_DDL/metric_metadata.sql; source ETL_DDL/owner_metadata.sql; source ETL_DDL/patterns.sql; source ETL_DDL/kafka_tracking.sql; +source ETL_DDL/dataset_info_metadata.sql; source WEB_DDL/track.sql; source WEB_DDL/users.sql; diff --git a/data-model/avro/MetadataChangeEvent.avsc b/data-model/avro/MetadataChangeEvent.avsc index 29155205a4..5c75627066 100644 --- a/data-model/avro/MetadataChangeEvent.avsc +++ b/data-model/avro/MetadataChangeEvent.avsc @@ -1,424 +1,1407 @@ { "type": "record", "name": "MetadataChangeEvent", - "namespace": "com.linkedin.events.wherehows", + "namespace": "com.linkedin.events.metadata", "fields": [ - { "name": "auditHeader", + { + "name": "auditHeader", "type": { "type": "record", "name": "KafkaAuditHeader", "namespace": "com.linkedin.events", - "doc": "This header intended to be used by the Kafka audit application.", "fields": [ { "name": "time", "type": "long", - "logicalType": "timestamp-millis", - "doc": "The time at which the event was emitted into Kafka." + "doc": "The time at which the event was emitted into Kafka.", + "logicalType": "timestamp-millis" }, { "name": "server", "type": "string", - "doc": "The full name of the host of this event." + "doc": "The fully-qualified name of the host of this event." }, { "name": "instance", - "type": ["null", "string"], - "doc": "The instance of the host." + "type": [ + "null", + "string" + ], + "doc": "The name of the application instance on the server. e.g. i002,stg05,group003" }, { "name": "appName", "type": "string", - "doc": "The name of the application of the event." + "doc": "The name of the application/service from which the event is emitted." }, { "name": "messageId", - "type": {"name": "UUID", "type":"fixed", "size":16}, + "type": { + "type": "fixed", + "name": "UUID", + "namespace": "com.linkedin.events", + "size": 16 + }, "doc": "A unique identifier for the message." } ] } }, - { "name": "namespace", "type": "string", "doc": "The platform or type of the metadata object: espresso,kafka,oracle,voldemort,hdfs,hive,teradata,..." }, - { "name": "nativeName", "type": "string", "doc": "The native name: ., /dir/subdir/, or " }, - { "name": "nativeType", "type": "string", "doc": "The native type: TABLE, VIEW, DALI_DATASET, DIRECTORY, LDAP, LUCENE_INDEX, or STREAM" }, - { "name": "uri", "type": ["string", "null"], "doc": "The abstracted such as hdfs:///data/tracking/PageViewEvent, file:///dir/file_name. This can be used as unique identifier in absensce of URN." }, - { "name": "urn", "type": ["string", "null"], "doc": "The applicable URN - urn:li:dataset:dali:, urn:salesforce:table:opportunity, arn:aws:dynamodb:::table/
. This is the preferred " }, - - { "name": "changedItems", - "type": { - "type": "array", - "doc": "Array of changed items", - "items": { + { + "name": "datasetIdentifier", + "type": [ + "null", + { "type": "record", - "name": "ChangeItems", - "doc": "Explain the changes", + "name": "DatasetIdentifier", "fields": [ - { "name": "changeScope", "type": "string", "default": "[ANY]", "doc": "[ANY], OWNER, SCHEMA, DEPLOYMENT, PARTITION, CONSTRAINT, INDEX, REFERENCE, CAPACITY"}, - { "name": "changeType", "type": "string", "doc": "CREATE, UPDATE, DELETE"}, - { "name": "changeTime", "type": "long", "logicalType": "timestamp-millis", "doc": "Epoch"}, - { "name": "changeNote", "type": "string", "doc": "Extra detail about the changes"} - ] - } - } - }, - - { "name": "owners", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "OwnerInfo", - "fields": [ - { "name": "ownerCategory", "type": "string", "doc": "Producer, Delegate, Consumer, Auditor" }, - { "name": "ownerSubCategory", "type": ["string", "null"], "doc": "optional team, group, department" }, - { "name": "owner", "type": "string", "doc": "urn:li:corp:ldap, urn:li:group:abc, urn:li:service:mp" }, - { "name": "ownerType", "type": ["string","null"], "doc": "user, group, role, service" }, - { "name": "ownerSource", "type": "string", "doc": "JIRA,RB,DB,FS,AUDIT,NUAGE" } - ]}, - "doc": "Owner info" - } - }, - - { "name": "references", - "type": [ "null", { - "type": "array", - "doc": "Array of reference info", - "items": { - "type": "record", - "name": "Reference", - "doc": "An id, urn or url of references", - "fields": [ - { "name": "referenceType", "type": "string", "doc": "JIRA, RB, LIX, CR"}, - { "name": "referenceFormat", "type": "string", "doc": "ID, URN, URI or URL"}, - { "name": "referenceList", "type": ["null", {"type": "array", "items": "string"}], "doc": "references[]"} - ] - } - }], - "default": null - }, - { "name": "partitionSpec", - "type": [ "null", { - "type": "record", - "name": "PartitionSpecification", - "doc": "Partition strategy: level, type, keys", - "fields": [ - { "name": "totalPartitionLevel", "type": ["int", "null"], "default": 0, "doc": "What is the deepest partition level"}, - { "name": "partitionSpecText", "type": ["null","string"], "default": null, "doc": "Original partition DDL"}, - { "name": "hasTimePartition", "type": ["string","null"], "default": "N", "doc": "Does the dataset contain time partition? Y/N"}, - { "name": "hasHashPartition", "type": "string", "default": "N", "doc": "Does the dataset contain hash/shard partition? Y/N"}, - { "name": "partitionKeys", - "doc": "Array of partition keys", - "type": [ "null", { - "type": "array", - "items": { - "type": "record", - "name": "PartitionKey", - "doc": "Detail of partition key", - "fields": [ - { "name": "partitionLevel", "type": "int", "doc": "1,2,3..."}, - { "name": "partitionType", "type": "string", "doc": "RANGE, HASH, LIST, EQUAL"}, - { "name": "timeFormat", "type": ["null","string"], "doc": "yyyyMMdd or Epoch. https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html"}, - { "name": "granularity", "type": ["null","string"], "doc": "The most granular level: Day, Hour or Minute"}, - { "name": "fieldNames", "type": - {"type": "array", "items": "string"}, - "doc": "Partition keys" - }, - { "name": "partitionValues", - "type": {"type": "array", "items": "string"}, - "doc": "Values of the partition keys" - }, - { "name": "numberOfHashBuckets", "type": ["null","int"], "doc": "2,4,8,16..."} - ] - } - }], - "default": null - }, - { "name": "timePartitionExpression", "type": ["null","string"], "doc": "ds=yyyy-MM-dd/ts=HH"} - ] - } - ], "doc": "Partition specification detail" - }, - { "name": "deploymentInfo", - "type": [ "null", { - "type": "array", - "items": { - "type": "record", - "name": "DeploymentDetail", - "doc": "Each deployment environment/tier", - "fields": [ - { "name": "deploymentEnvironment", "type": "string", "default": "*", "doc": "DEV, SI, EI, QA, CORP, STAGING, PROD, GRID... (aka Fabric Group)"}, - { "name": "dataCenter", "type": "string", "default": "*", "doc": "DC1, DC2, LTX3, LVA4, *"}, - { "name": "cluster", "type": "string", "default": "*", "doc": "Cluster or a comma-delimited list of servers"}, - { "name": "enabled", "type": "string", "default": "Y", "doc": "Y/N"}, - { "name": "additionalDeploymentInfo", - "type": {"type": "map", "values": "string"}, - "doc": "Misc deployment info, such as Zookeeper, Connection, InGraph URL, native reference ID or KEY" - } - ] - } - - }], - "doc": "The deployment info" - }, - - { "name": "isCaseSensitive", - "type": ["null", { - "type": "record", - "name": "CaseSensitiveInfo", - "doc": "Is the object_name/data/field case sensitive?", - "fields": [ - { "name": "nativeName", "type": "string", "default": "Y", "doc": "Is native object name CS?"}, - { "name": "fieldName", "type": "string", "default": "Y", "doc": "Is field name CS?"}, - { "name": "dataContent", "type": "string", "default": "Y", "doc": "Is data content CS?"} - ] - }] - }, - - { "name": "tags", "type": { "type": "array", "items": "string"}, "doc": "Tags of the metadata object"}, - - { "name": "schemas", "doc": "The schema of a metadata object", - "type": [ "null", { - "type": "record", - "name": "SchemaInfo", - "fields": [ - { "name": "isLatest", "type": "string", "doc": "Is the latest version Y/N" }, - { "name": "createTime", "type": "long", "doc": "epoch" }, - { "name": "revision", "type": ["null","int"], "doc": "revision if available" }, - { "name": "version", "type": ["null","string"], "doc": "1.0.3, 2.12, 2_3_6"}, - { "name": "name", "type": "string", "doc": "name or class name" }, - { "name": "description", "type": ["null","string"], "doc": "descripiton" }, - { "name": "type", "type": "string", "default": "default", "doc": "default, table, key, document, database" }, - { "name": "format", "type": "string", "default": "TEXT", "doc": "JSON,DDL,Avro,Hive,XML,HOCOON,Thrift,Text..." }, - { "name": "isCaseSensitive", "type": "string", "default": "Y", "doc": "Is the object names in schema case sensitive? Y/N" }, - { "name": "originalSchema", "type": ["null","string"], "doc": "The raw schema content" }, - { "name": "originalSchemaChecksum", "type": {"type": "map", "values": "string"}, "doc": "Map key can be MD5,SHA-1,CRC32. Checksum is hex string in lower case." }, - { "name": "fieldSchema", - "type": [ "null", { - "type": "array", - "items": { - "type": "record", - "name": "FieldSchemaRecord", - "doc": "Abstracted field schema in tablular model", - "fields": [ - { "name": "position", "type": "int", "default": 1, "doc": "Field position id, SORT_ID. Starting from 1" }, - { "name": "parentFieldPosition", "type": "int", "default": 0, "doc": "Position id of the parent field for nested structure. 0 means this field is the top-level field" }, - { "name": "parentFieldPath", "type": ["null", "string"], "doc": "For example, .." }, - { "name": "name", "type": "string", "doc": "Field name" }, - { "name": "label", "type": ["null", "string"], "doc": "Field title/label for display" }, - { "name": "aliases", "type": {"type": "array", "items": "string"}, "doc": "Field aliaes." }, - { "name": "type", "type": "string", "doc": "native data type" }, - { "name": "logicalType", "type": ["null", "string"], "doc": "logical data type, DateTime, NUMBER(15,2), GUID, Epoch(3)" }, - { "name": "domainType", "type": ["null", "string"], "doc": "domain data type. can be URN, MEMBER_ID, EMAIL, PHONE" }, - { "name": "abstractType", "type": "string", "doc": "Standard types to generate canonical schema abstraction. Hide the subtle difference." }, - { "name": "description", "type": ["null", "string"], "doc": "field comment/description" }, - { "name": "nullable", "type": "string", "default": "Y", "doc": "Is the field nullable" }, - { "name": "defaultValue", "type": ["null", "string"], "doc": "default value" }, - { "name": "maxByteLength", "type": ["null", "int"], "doc": "max length in bytes" }, - { "name": "maxCharLength", "type": ["null", "int"], "doc": "max length in characters" }, - { "name": "charType", "type": ["null", "string"], "doc": "ascii, utf8, utf16, latin, iso8859" }, - { "name": "precision", "type": ["null", "int"], "doc": "number(p,s)" }, - { "name": "scale", "type": ["null", "int"], "doc": "number(p,s)" }, - { "name": "changeDataCaptureRole", "type": ["null","string"], "doc": "SEQUENCE, TIME" }, - { "name": "auditRole", "type": ["null","string"], "doc": "PARTY, TIME" }, - { "name": "sensitiveFlags", "type": ["null","string"], "doc": "ERPF=Encrypted Restricted Privacy Financial. --P-=Privacy" }, - { "name": "isRecursive", "type": ["null","string"], "doc": "Does this field contains recursive structure?" } - ] - } - }] - } - ] - }], - "default": null - }, - { "name": "constraints", "type": [ "null", { - "type": "array", - "doc": "Array of constraints", - "items": { - "type": "record", - "name": "Constraint", "doc": "constraint info", - "fields": [ - { "name": "constraintType", "type": "string", "doc": "constraint, referetial, index, dataquality" }, - { "name": "constraintName", "type": "string", "doc": "constraint name" }, - { "name": "constraintExpression", "type": "string", "doc": "Expression in constraint" }, - { "name": "enabled", "type": "string", "doc": "Is constraint enabled: Y/N" }, - { "name": "referredFields", - "type": { - "type": "array", - "doc": "Fields involved in this constraint", - "items": { - "type": "record", - "name": "Field", - "fields": [ - { "name": "fieldName", "type": "string", "doc": "Field name with parent path." }, - { "name": "position", "type": "int", "doc": "Position in a constraint" } - ] - } - } - }, - { "name": "additionalReferences", - "type": ["null", { - "type": "map", - "values": "string" - }], - "doc": "When map key = Index, the value = PK/AK index name; when key = Constraint, the value refers to another dataset's constraint, such as FK; when key = DataQuality, the value points to data quality rule." - } - ] - } - }], - "default": null - }, - { "name": "indices", - "type": ["null", { - "type": "array", - "items": { - "type": "record", - "name": "Index", - "doc": "Index Info", - "fields": [ - { "name": "indexType", "type": "string", "doc": "BTree, Hash, Fulltext, Normal" }, - { "name": "indexName", "type": "string", "doc": "Index name" }, - { "name": "isUnique", "type": "string", "default": "N", "doc": "Is unique index or not" }, - { "name": "indexedFields", "doc": "Fields in an index", + { + "name": "dataPlatformUrn", + "type": "string", + "doc": "The platform or type of the metadata object: espresso,kafka,oracle,voldemort,hdfs,hive,dalids,teradata,... for example, urn:li:dataPlatform:espresso, urn:li:dataPlatform:dalids" + }, + { + "name": "nativeName", + "type": "string", + "doc": "The native name: .
, /dir/subdir/, or " + }, + { + "name": "dataOrigin", "type": { - "type": "array", - "items": { - "type": "record", - "name": "IndexedFieldName", "doc": "Indexed field name", - "fields": [ - { "name": "position", "type": "int", "doc": "Position id within an index" }, - { "name": "fieldName", "type": "string", "doc": "Field name or expression" }, - { "name": "descend", "type": ["null","string"], "defualt": "ASC", "doc": "ASC or DESC" }, - { "name": "prefixLength", "type": ["null","int"], "doc": "The length of the prefix portion of a string" }, - { "name": "filter", "type": ["null","string"], "doc": "Filter expression for conditional index" } - ] - } - } - }], - "default": null + "type": "enum", + "name": "DeploymentTier", + "symbols": [ + "PROD", + "CORP", + "GRID", + "PREPROD", + "CANARY", + "DMZ", + "STG", + "UAT", + "UAT1", + "UAT2", + "UAT3", + "QA", + "QA1", + "QA2", + "QA3", + "EI", + "EI1", + "EI2", + "EI3", + "QEI", + "QEI1", + "QEI2", + "QEI3", + "TEST", + "LIT", + "SIT", + "INT", + "DEV", + "LOCAL", + "ARCHIVE", + "DROPBOX", + "SANDBOX", + "POC" + ] + }, + "doc": "Origin/Source Tier where the record is generated? This can be different from Deployment. For example, PROD data can be copied to a TEST server, then DataOrigin=PROD while the dataset instance belongs to TEST", + "default": "PROD" + } + ] } + ] + }, + { + "name": "urn", + "type": [ + "string", + "null" + ], + "doc": "The applicable URN - urn:li:dataset:(urn:li:dataPlatform:dali, search_mp.search_event_event, PROD), urn:salesforce:table:opportunity, arn:aws:dynamodb:::table/
. This is the preferred identifier for a dataset." + }, + { + "name": "datasetProperties", + "type": [ + "null", + { + "type": "record", + "name": "DatasetProperties", + "fields": [ + { + "name": "changeAuditStamp", + "type": { + "type": "record", + "name": "ChangeAuditStamp", + "fields": [ + { + "name": "actorUrn", + "type": "string", + "doc": "urn:li:corpuser:jsmith, urn:li:team:xyz, urn:li:service:money" + }, + { + "name": "type", + "type": "string", + "doc": "CREATE, UPDATE, DELETE" + }, + { + "name": "time", + "type": "long", + "doc": "Epoch", + "logicalType": "timestamp-millis" + }, + { + "name": "note", + "type": "string", + "doc": "Extra detail about the changes" + } + ] + } + }, + { + "name": "nativeType", + "type": { + "type": "enum", + "name": "PlatformNativeType", + "symbols": [ + "TABLE", + "VIEW", + "DIRECTORY", + "FILE", + "INDEX", + "STREAM", + "BLOB", + "FUNCTION", + "OTHER" + ] + }, + "doc": "The native type about how the dataset is stored in the platform" + }, + { + "name": "uri", + "type": [ + "string", + "null" + ], + "doc": "The abstracted such as hdfs:///data/tracking/PageViewEvent, file:///dir/file_name. This is often used in codes and scripts." + }, + { + "name": "caseSensitivity", + "type": [ + "null", + { + "type": "record", + "name": "CaseSensitivityInfo", + "fields": [ + { + "name": "datasetName", + "type": "boolean", + "doc": "Is native object name CS?", + "default": true + }, + { + "name": "fieldName", + "type": "boolean", + "doc": "Is field name CS?", + "default": true + }, + { + "name": "dataContent", + "type": "boolean", + "doc": "Is data content CS?", + "default": true + } + ] + } + ] + } + ] + } + ] + }, + { + "name": "owners", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "OwnerInfo", + "fields": [ + { + "name": "ownerCategory", + "type": { + "type": "enum", + "name": "OwnerCategory", + "symbols": [ + "OWNER", + "PRODUCER", + "DELEGATE", + "STAKEHOLDER" + ] + }, + "doc": "Owner, Producer, Delegate, Stakeholder", + "default": "OWNER" + }, + { + "name": "ownerUrn", + "type": "string", + "doc": "urn:li:corp:ldap, urn:li:group:abc, urn:li:service:mp_name, user_id, group_name" + }, + { + "name": "ownerType", + "type": { + "type": "enum", + "name": "OwnerType", + "symbols": [ + "USER", + "GROUP", + "ROLE", + "SERVICE", + "URN" + ] + }, + "doc": "user, group, role, service, or urn", + "default": "USER" + }, + { + "name": "ownerSource", + "type": "string", + "doc": "JIRA,RB,DB,FS,AUDIT,NUAGE where the owner info is extracted" + } + ] + } } ], "default": null }, - { "name": "capacity", "type": [ "null", { - "type": "array", - "items": { - "type": "record", - "name": "Capacity", - "doc": "capacity info", - "fields": [ - { "name": "capacityName", "type": "string", "doc": "Capacity name" }, - { "name": "capacityType", "type": ["null","string"], "doc": "storage, read qps, write qps" }, - { "name": "capacityUnit", "type": ["null","string"], "doc": "KB,MB,GB,TB,QPS" }, - { "name": "capacityLow", "type": "long", "doc": "lower/min capacity" }, - { "name": "capacityHigh", "type": "long", "doc": "higher/max capacity" } - ] - } - }], - "default": null - }, - { "name": "securitySpec", "type": [ "null", { - "type": "record", - "name": "SecuritySpecification", - "doc": "", - "fields": [ - { "name": "classification", "type": { - "type": "map", "values": "int", "doc": "How many fields match this classification" - }, - "doc": "Map key = HighlyConfidential, Confidential, LimitedDistribution, Encrypted, Masked" - }, - { "name": "ownerType", "type" : - { "type": "enum", - "name": "OwnerTypes", - "doc": "Record ownership spec that marks the owner of a dataset", - "symbols" : [ "MEMBER", "CUSTOMER", "JOINT", "INTERNAL", "COMPANY" ], - "symbolDoc" : { - "INTERNAL" : "data is generated by value internal to Linkedin", - "CUSTOMER" : "If the data is generated through a customer or enterprise product integration, that data is owned by a Customer.", - "JOINT" : "If the data is generated by an action of a member to a customer object, such as a member applying to a customer’s job, the ownership is joint.", - "COMPANY" : "If data is generated/produced by company", - "MEMBER" : "If data is generated/produced by member. All datasets that marked as member, should comply with legal retention policies." - } - } - }, - { "name" : "retentionPolicy", - "type" : { + { + "name": "references", + "type": [ + "null", + { + "type": "array", + "items": { "type": "record", - "name": "RetentionSpec", - "namespace": "com.linkedin.common", - "doc" : "Retention policy on a dataset", - "fields" : [ - { "name" : "retentionType", - "type" : { - "type" : "enum", - "name" : "RetentionType", - "doc" : "types of different retention policies", - "symbols" : [ "LIMITED", "LEGAL_HOLD", "UNLIMITED" ], - "symbolDocs" : { - "LIMITED" : "Data is stored for limited time only", - "LEGAL_HOLD" : "", - "UNLIMITED" : "Data can be held indefinitely" - } - }, - "doc" : "Retention type on dataset" - }, - { "name" : "retentionWindow", - "type" : [ "null", "long" ], - "doc" : "Time in (unit) how long data is retained for in case of LIMITED retention", - "default" : null - }, - { "name" : "retentionWindowUnit", - "type" : [ "null", { + "name": "Reference", + "fields": [ + { + "name": "referenceType", + "type": { "type": "enum", - "name": "TimePeriodUnit", - "doc": "Unit of time period", - "symbols": [ "YEAR", "MONTH", "WEEK", "DAY", "HOUR", "MINUTE", "SECOND", "MILLISECOND"] - }], - "doc" : "", - "default" : null - } - ] - } - }, - { "name" : "geographicAffinity", - "type" : [ "null", { - "type" : "record", - "name" : "GeographicAffinity", - "fields" : [ { - "name" : "affinity", - "type" : [ "null", { - "type" : "enum", - "name" : "AffinityType", - "symbols" : [ "LIMITED", "EXCLUDED" ] - } ], - "doc" : "Affinity type", - "default" : null - }, - { "name" : "locations", - "type" : { - "type" : "array", - "items" : { - "type" : "record", - "name" : "Locale", - "namespace" : "com.linkedin.common", - "doc" : "Motivated by java.util.Locale", - "fields" : [ - { "name" : "language", "type" : "string", "doc" : "A lowercase two-letter language code as defined by ISO-639." }, - { "name" : "country", "type" : [ "null", "string" ], "doc" : "An uppercase two-letter country code as defined by ISO-3166.", "default" : null }, - { "name" : "variant", "type" : [ "null", "string" ], "doc" : "Vendor or browser-specific code.", "default" : null } + "name": "ReferenceType", + "symbols": [ + "JIRA", + "RB", + "LIX", + "CR" ] } }, - "doc" : "List of locations data should be stored at" + { + "name": "referenceFormat", + "type": { + "type": "enum", + "name": "ReferenceFormat", + "symbols": [ + "ID", + "URN", + "URL", + "URI" + ] + } + }, + { + "name": "referenceList", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "doc": "references[]" + } + ] + } + } + ], + "default": null + }, + { + "name": "partitionSpec", + "type": [ + "null", + { + "type": "record", + "name": "PartitionSpecification", + "fields": [ + { + "name": "totalPartitionLevel", + "type": [ + "int", + "null" + ], + "doc": "What is the deepest partition level", + "default": 0 + }, + { + "name": "partitionSpecText", + "type": [ + "null", + "string" + ], + "doc": "Original partition DDL", + "default": null + }, + { + "name": "hasTimePartition", + "type": [ + "boolean", + "null" + ], + "doc": "Does the dataset contain time partition?", + "default": false + }, + { + "name": "hasHashPartition", + "type": [ + "boolean", + "null" + ], + "doc": "Does the dataset contain hash/shard partition?", + "default": false + }, + { + "name": "partitionKeys", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "PartitionKey", + "fields": [ + { + "name": "partitionLevel", + "type": "int", + "doc": "1,2,3..." + }, + { + "name": "partitionType", + "type": { + "type": "enum", + "name": "PartitionType", + "symbols": [ + "RANGE", + "HASH", + "LIST", + "CASE", + "ROUND_ROBIN" + ] + } + }, + { + "name": "timeGranularity", + "type": [ + "null", + "string" + ], + "doc": "The most granular level: Month, Week, Day, Hour, Minute or Second" + }, + { + "name": "timeFormat", + "type": [ + "null", + "string" + ], + "doc": "yyyyMMdd, yyyy-MM-dd or Epoch. https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html" + }, + { + "name": "fieldNames", + "type": { + "type": "array", + "items": "string" + }, + "doc": "Partition keys (use the full field path for nested fields)" + }, + { + "name": "partitionValues", + "type": { + "type": "array", + "items": "string" + }, + "doc": "For RANGE: [min, next_value); for LIST: [values...]; for Hive: [value]; for HASH: N/A" + }, + { + "name": "numberOfHashBuckets", + "type": [ + "null", + "int" + ], + "doc": "2,4,8,16..." + } + ] + } + } + ], + "doc": "Array of partition keys", + "default": null + }, + { + "name": "timePartitionExpression", + "type": [ + "null", + "string" + ], + "doc": "ds=yyyy-MM-dd/ts=HH, datepartition=yyyy-MM-dd-HH, or dt=yyyyMMdd" } ] - }], - "doc" : "Geographic affinity if applicable", - "default" : null + } + ], + "doc": "Partition specification detail" + }, + { + "name": "deploymentInfo", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "DeploymentDetail", + "fields": [ + { + "name": "deploymentTier", + "type": "DeploymentTier", + "doc": "defined in [dataOrigin], such as DEV,TEST,PROD", + "default": "PROD" + }, + { + "name": "dataCenter", + "type": [ + "null", + "string" + ], + "doc": "DC1, DC2, LTX3, LVA4, ...", + "default": null + }, + { + "name": "region", + "type": [ + "null", + "string" + ], + "doc": "Region name if applicable, such as us-central2, eu-west3", + "default": null + }, + { + "name": "zone", + "type": [ + "null", + "string" + ], + "doc": "Zone name or id if applicable, such as asia-east1-b, us-west1-a", + "default": null + }, + { + "name": "cluster", + "type": [ + "null", + "string" + ], + "doc": "Cluster name or a comma-delimited list of Servers", + "default": null + }, + { + "name": "container", + "type": [ + "null", + "string" + ], + "doc": "Container or tenant name", + "default": null + }, + { + "name": "enabled", + "type": "boolean", + "doc": "is the dataset instance enabled under this deployment environment", + "default": true + }, + { + "name": "additionalDeploymentInfo", + "type": { + "type": "map", + "values": "string" + }, + "doc": "Additional deployment info, such as Zookeeper, Connection, Graphite URL, native reference ID or KEY" + } + ] } - ] - }] + } + ], + "doc": "The deployment info", + "default": null + }, + { + "name": "tags", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "doc": "Tags of the dataset object" + }, + { + "name": "schema", + "type": [ + "null", + { + "type": "record", + "name": "DatasetSchema", + "fields": [ + { + "name": "isLatestRevision", + "type": "boolean", + "doc": "Is the latest reversion?", + "default": false + }, + { + "name": "createTime", + "type": "long", + "doc": "epoch milli", + "logicalType": "timestamp-millis" + }, + { + "name": "revision", + "type": [ + "null", + "int" + ], + "doc": "revision if applicable" + }, + { + "name": "version", + "type": [ + "null", + "string" + ], + "doc": "1.0.3, 2.12, 2_3_6" + }, + { + "name": "name", + "type": [ + "null", + "string" + ], + "doc": "name or class name if applicable" + }, + { + "name": "description", + "type": [ + "null", + "string" + ], + "doc": "description" + }, + { + "name": "originalSchema", + "type": { + "type": "record", + "name": "OriginalSchema", + "fields": [ + { + "name": "format", + "type": { + "type": "enum", + "name": "SchemaTextFormat", + "symbols": [ + "JSON", + "XML", + "DDL", + "THRIFT", + "PROTOBUF", + "HOCON", + "TEXT" + ] + }, + "default": "TEXT" + }, + { + "name": "name", + "type": [ + "null", + "string" + ], + "doc": "The schema name or class name if available" + }, + { + "name": "text", + "type": "string", + "doc": "The raw schema content" + }, + { + "name": "checksum", + "type": { + "type": "map", + "values": "string" + }, + "doc": "Map key can be MD5,SHA-1,CRC32. Checksum is hex string in lower case." + } + ] + } + }, + { + "name": "keySchema", + "type": [ + "null", + { + "type": "record", + "name": "DatasetKeySchema", + "fields": [ + { + "name": "keyType", + "type": { + "type": "enum", + "name": "SchemaKeyValueType", + "symbols": [ + "NONE", + "KEY", + "DATABASE", + "TABLE", + "VALUE", + "DOCUMENT" + ] + }, + "doc": "KEY is only applicable for Key-Value(redis, voldemort) and Document(monogo, espresso) datasets.", + "default": "KEY" + }, + { + "name": "format", + "type": "SchemaTextFormat", + "doc": "only applicable if key schema exists", + "default": "JSON" + }, + { + "name": "text", + "type": "string", + "doc": "schema for the Key if applicable" + } + ] + } + ], + "doc": "if dataset has dedicated key schema which is separated from the table or value part, it can be stored here" + }, + { + "name": "isFieldNameCaseSensitive", + "type": "boolean", + "doc": "Are the field names in schema Case Sensitive?", + "default": true + }, + { + "name": "fieldSchema", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "FieldSchemaRecord", + "fields": [ + { + "name": "position", + "type": "int", + "doc": "Field position id, SORT_ID. Starting from 1", + "default": 1 + }, + { + "name": "parentFieldPosition", + "type": "int", + "doc": "Position id of the parent field for nested structure. 0 means this field is the top-level field", + "default": 0 + }, + { + "name": "fieldJsonPath", + "type": [ + "null", + "string" + ], + "doc": "http://goessner.net/articles/JsonPath For example, $.store.book[0].title" + }, + { + "name": "fieldPath", + "type": "string", + "doc": "For example, ..." + }, + { + "name": "label", + "type": [ + "null", + "string" + ], + "doc": "Field title/label for display" + }, + { + "name": "aliases", + "type": { + "type": "array", + "items": "string" + }, + "doc": "Field aliaes." + }, + { + "name": "type", + "type": "string", + "doc": "native data type" + }, + { + "name": "logicalType", + "type": [ + "null", + "string" + ], + "doc": "logical data type, DateTime, NUMBER(15,2), GUID, Epoch(3)" + }, + { + "name": "semanticType", + "type": [ + "null", + "string" + ], + "doc": "semantic data type. can be Urn, MemberId, Email, PhoneNumber, GroupName" + }, + { + "name": "abstractType", + "type": [ + "null", + { + "type": "enum", + "name": "AbstractDataType", + "symbols": [ + "STRING", + "NUMBER", + "BINARY", + "BOOLEAN", + "TIMESTAMP", + "ARRAY", + "MAP", + "RECORD", + "UNION" + ] + } + ], + "doc": "Standard types to generate canonical schema abstraction. Hide the subtle difference." + }, + { + "name": "description", + "type": [ + "null", + "string" + ], + "doc": "field comment/description" + }, + { + "name": "nullable", + "type": "boolean", + "doc": "Is the field nullable", + "default": true + }, + { + "name": "defaultValue", + "type": [ + "null", + "string" + ], + "doc": "default value" + }, + { + "name": "maxByteLength", + "type": [ + "null", + "int" + ], + "doc": "max length in bytes. For UTF8, maxByteLength = maxCharLength x 3" + }, + { + "name": "maxCharLength", + "type": [ + "null", + "int" + ], + "doc": "max length in characters" + }, + { + "name": "charType", + "type": [ + "null", + "string" + ], + "doc": "ascii, utf8, utf16, latin, iso8859" + }, + { + "name": "precision", + "type": [ + "null", + "int" + ], + "doc": "number(p,s)" + }, + { + "name": "scale", + "type": [ + "null", + "int" + ], + "doc": "number(p,s)" + }, + { + "name": "isRecursive", + "type": "boolean", + "doc": "Does this field contains recursive structure?", + "default": false + } + ] + } + } + ], + "doc": "flattened/normalized field-level schema definition" + }, + { + "name": "changeDataCaptureFields", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "ChangeDataCaptureField", + "fields": [ + { + "name": "fieldPath", + "type": "string", + "doc": "field path" + }, + { + "name": "role", + "type": { + "type": "enum", + "name": "ChangeDataCaptureRole", + "symbols": [ + "SEQUENCE", + "TIME" + ] + } + } + ] + } + } + }, + { + "name": "auditFields", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "AuditField", + "fields": [ + { + "name": "fieldPath", + "type": "string", + "doc": "field path" + }, + { + "name": "role", + "type": { + "type": "enum", + "name": "AuditFieldType", + "symbols": [ + "TOKEN", + "DATA_TIME", + "LOG_TIME", + "TIME" + ] + } + } + ] + } + }, + "default": [] + } + ] + } + ], + "doc": "The schema/structure definition of a dataset. For Key-Value and Document store, dedicated Key Schema is provided.", + "default": null + }, + { + "name": "constraints", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "Constraint", + "fields": [ + { + "name": "constraintType", + "type": { + "type": "enum", + "name": "ConstraintType", + "symbols": [ + "CONSTRAINT", + "REFERETIAL", + "INDEX", + "DATAQUALITY", + "SECURITY" + ] + } + }, + { + "name": "constraintSubType", + "type": { + "type": "enum", + "name": "ConstraintSubType", + "symbols": [ + "RANGE", + "LIST", + "REGEXP" + ] + } + }, + { + "name": "constraintName", + "type": [ + "null", + "string" + ], + "doc": "constraint name" + }, + { + "name": "constraintExpression", + "type": "string", + "doc": "Expression in constraint" + }, + { + "name": "enabled", + "type": "boolean", + "doc": "Is constraint enabled", + "default": true + }, + { + "name": "referredFields", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "ReferrenceField", + "fields": [ + { + "name": "position", + "type": "int", + "doc": "Position in a constraint, starting from 1" + }, + { + "name": "fieldPath", + "type": "string", + "doc": "Field name with full path" + } + ] + } + } + }, + { + "name": "additionalReferences", + "type": [ + "null", + { + "type": "map", + "values": "string" + } + ], + "doc": "When map key = Index, the value = PK/AK index name; when key = Constraint, the value refers to another dataset's constraint, such as FK; when key = DataQuality, the value points to data quality rule." + } + ] + } + } + ], + "default": null + }, + { + "name": "indices", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "Index", + "fields": [ + { + "name": "indexType", + "type": "string", + "doc": "BTree, Hash, Fulltext, Normal" + }, + { + "name": "indexName", + "type": "string", + "doc": "Index name" + }, + { + "name": "isUnique", + "type": "boolean", + "doc": "Is unique index or not", + "default": false + }, + { + "name": "indexedFields", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "IndexedFieldName", + "fields": [ + { + "name": "position", + "type": "int", + "doc": "Position id within an index" + }, + { + "name": "fieldPath", + "type": "string", + "doc": "Field name or expression" + }, + { + "name": "descend", + "type": [ + "null", + "string" + ], + "doc": "ASC or DESC", + "defualt": "ASC" + }, + { + "name": "prefixLength", + "type": [ + "null", + "int" + ], + "doc": "The length of the prefix portion of a string" + }, + { + "name": "filter", + "type": [ + "null", + "string" + ], + "doc": "Filter expression for conditional index" + } + ] + } + }, + "doc": "Fields in an index" + } + ] + } + } + ], + "default": null + }, + { + "name": "capacity", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "Capacity", + "fields": [ + { + "name": "capacityName", + "type": "string", + "doc": "Capacity name" + }, + { + "name": "capacityType", + "type": [ + "null", + "string" + ], + "doc": "storage, read qps, write qps" + }, + { + "name": "capacityUnit", + "type": [ + "null", + "string" + ], + "doc": "KB,MB,GB,TB,QPS" + }, + { + "name": "capacityLow", + "type": "long", + "doc": "lower/min capacity" + }, + { + "name": "capacityHigh", + "type": "long", + "doc": "higher/max capacity" + } + ] + } + } + ], + "default": null + }, + { + "name": "securitySpec", + "type": [ + "null", + { + "type": "record", + "name": "SecuritySpecification", + "fields": [ + { + "name": "classification", + "type": [ + "null", + { + "type": "record", + "name": "ConfidentialClassification", + "fields": [ + { + "name": "highlyConfidential", + "type": { + "type": "array", + "items": "string" + } + }, + { + "name": "confidential", + "type": { + "type": "array", + "items": "string" + } + }, + { + "name": "limitedDistribution", + "type": { + "type": "array", + "items": "string" + } + }, + { + "name": "mustBeEncrypted", + "type": { + "type": "array", + "items": "string" + } + }, + { + "name": "mustBeMasked", + "type": { + "type": "array", + "items": "string" + } + } + ] + } + ], + "doc": "HighlyConfidential, Confidential, LimitedDistribution, MustBeEncrypted, MustBeMasked" + }, + { + "name": "recordOwnerType", + "type": { + "type": "enum", + "name": "RecordOwnerType", + "symbols": [ + "MEMBER", + "CUSTOMER", + "JOINT", + "INTERNAL", + "COMPANY" + ] + } + }, + { + "name": "complianceType", + "type": { + "type": "enum", + "name": "PrivacyComplianceType", + "symbols": [ + "AUTO_PURGE", + "CUSTOM_PURGE", + "LIMITED_RETENTION", + "PURGE_NOT_APPLICABLE" + ] + } + }, + { + "name": "compliancePurgeEntities", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "PurageableEntityField", + "fields": [ + { + "name": "identifierType", + "type": { + "type": "enum", + "name": "PurgeableEntityFieldIdentifierType", + "symbols": [ + "MEMBER_ID", + "SUBJECT_MEMBER_ID", + "URN", + "SUBJECT_URN", + "COMPANY_ID", + "GROUP_ID", + "CUSTOMER_ID" + ] + } + }, + { + "name": "identifierField", + "type": "string", + "doc": "a pathspec to a field inside of the record containing an identifier for a purgeable entity, e.g. header.memberId, header.viewerUrn, value.activity.actorUrn, etc." + } + ] + } + } + ], + "doc": "The fields which identify purgeable entities in records" + }, + { + "name": "retentionPolicy", + "type": { + "type": "record", + "name": "RetentionSpec", + "namespace": "com.linkedin.common", + "fields": [ + { + "name": "retentionType", + "type": { + "type": "enum", + "name": "RetentionType", + "namespace": "com.linkedin.common", + "symbols": [ + "LIMITED", + "LEGAL_HOLD", + "UNLIMITED" + ] + }, + "doc": "Retention type on dataset" + }, + { + "name": "retentionWindow", + "type": [ + "null", + "long" + ], + "doc": "Time in (unit) how long data is retained for in case of LIMITED retention", + "default": null + }, + { + "name": "retentionWindowUnit", + "type": [ + "null", + { + "type": "enum", + "name": "TimePeriodUnit", + "namespace": "com.linkedin.common", + "symbols": [ + "YEAR", + "MONTH", + "WEEK", + "DAY", + "HOUR", + "MINUTE", + "SECOND", + "MILLISECOND" + ] + } + ], + "doc": "", + "default": null + } + ] + } + }, + { + "name": "geographicAffinity", + "type": [ + "null", + { + "type": "record", + "name": "GeographicAffinity", + "fields": [ + { + "name": "affinity", + "type": [ + "null", + { + "type": "enum", + "name": "AffinityType", + "symbols": [ + "LIMITED", + "EXCLUDED" + ] + } + ], + "doc": "Affinity type", + "default": null + }, + { + "name": "locations", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "Locale", + "namespace": "com.linkedin.common", + "fields": [ + { + "name": "language", + "type": "string", + "doc": "A lowercase two-letter language code as defined by ISO-639." + }, + { + "name": "country", + "type": [ + "null", + "string" + ], + "doc": "An uppercase two-letter country code as defined by ISO-3166.", + "default": null + }, + { + "name": "variant", + "type": [ + "null", + "string" + ], + "doc": "Vendor or browser-specific code.", + "default": null + } + ] + } + }, + "doc": "List of locations data should be stored at" + } + ] + } + ], + "doc": "Geographic affinity if applicable", + "default": null + } + ] + } + ] } ] -} +} \ No newline at end of file diff --git a/data-model/avro/MetadataInventoryEvent.avsc b/data-model/avro/MetadataInventoryEvent.avsc new file mode 100644 index 0000000000..cb9cf90109 --- /dev/null +++ b/data-model/avro/MetadataInventoryEvent.avsc @@ -0,0 +1,216 @@ +{ + "type": "record", + "name": "MetadataInventoryEvent", + "namespace": "com.linkedin.events.wherehows", + "fields": [ + { + "name": "auditHeader", + "type": { + "type": "record", + "name": "KafkaAuditHeader", + "namespace": "com.linkedin.events", + "fields": [ + { + "name": "time", + "type": "long", + "doc": "The time at which the event was emitted into Kafka.", + "logicalType": "timestamp-millis" + }, + { + "name": "server", + "type": "string", + "doc": "The fully-qualified name of the host of this event." + }, + { + "name": "instance", + "type": [ + "null", + "string" + ], + "doc": "The name of the application instance on the server. e.g. i002,stg05,group003" + }, + { + "name": "appName", + "type": "string", + "doc": "The name of the application/service from which the event is emitted." + }, + { + "name": "messageId", + "type": { + "type": "fixed", + "name": "UUID", + "namespace": "com.linkedin.events", + "size": 16 + }, + "doc": "A unique identifier for the message." + } + ] + } + }, + { + "name": "dataPlatformUrn", + "type": "string", + "doc": "The platform or type of the metadata object: espresso,kafka,oracle,voldemort,hdfs,hive,dalids,teradata,... for example, urn:li:dataPlatform:espresso, urn:li:dataPlatform:dalids" + }, + { + "name": "datasetList", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "DatasetIventoryItem", + "fields": [ + { + "name": "nativeName", + "type": "string", + "doc": "The native name: .
, /dir/subdir/, or " + }, + { + "name": "dataOrigin", + "type": { + "type": "enum", + "name": "DeploymentTier", + "symbols": [ + "PROD", + "CORP", + "GRID", + "PREPROD", + "CANARY", + "DMZ", + "STG", + "UAT", + "UAT1", + "UAT2", + "UAT3", + "QA", + "QA1", + "QA2", + "QA3", + "EI", + "EI1", + "EI2", + "EI3", + "QEI", + "QEI1", + "QEI2", + "QEI3", + "TEST", + "LIT", + "SIT", + "INT", + "DEV", + "LOCAL", + "ARCHIVE", + "DROPBOX", + "SANDBOX", + "POC" + ] + }, + "doc": "Origin/Source Tier where the record is generated? This can be different from Deployment. For example, PROD data can be copied to a TEST server, then DataOrigin=PROD while the dataset instance belongs to TEST", + "default": "PROD" + }, + { + "name": "datasetProperties", + "type": [ + "null", + { + "type": "record", + "name": "DatasetProperties", + "fields": [ + { + "name": "changeAuditStamp", + "type": { + "type": "record", + "name": "ChangeAuditStamp", + "fields": [ + { + "name": "actorUrn", + "type": "string", + "doc": "urn:li:corpuser:jsmith, urn:li:team:xyz, urn:li:service:money" + }, + { + "name": "type", + "type": "string", + "doc": "CREATE, UPDATE, DELETE" + }, + { + "name": "time", + "type": "long", + "doc": "Epoch", + "logicalType": "timestamp-millis" + }, + { + "name": "note", + "type": "string", + "doc": "Extra detail about the changes" + } + ] + } + }, + { + "name": "nativeType", + "type": { + "type": "enum", + "name": "PlatformNativeType", + "symbols": [ + "TABLE", + "VIEW", + "DIRECTORY", + "FILE", + "INDEX", + "STREAM", + "BLOB", + "FUNCTION", + "OTHER" + ] + }, + "doc": "The native type about how the dataset is stored in the platform" + }, + { + "name": "uri", + "type": [ + "string", + "null" + ], + "doc": "The abstracted such as hdfs:///data/tracking/PageViewEvent, file:///dir/file_name. This is often used in codes and scripts." + }, + { + "name": "caseSensitivity", + "type": [ + "null", + { + "type": "record", + "name": "CaseSensitivityInfo", + "fields": [ + { + "name": "datasetName", + "type": "boolean", + "doc": "Is native object name CS?", + "default": true + }, + { + "name": "fieldName", + "type": "boolean", + "doc": "Is field name CS?", + "default": true + }, + { + "name": "dataContent", + "type": "boolean", + "doc": "Is data content CS?", + "default": true + } + ] + } + ] + } + ] + } + ] + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/metadata-etl/src/main/resources/jython/MultiproductLoad.py b/metadata-etl/src/main/resources/jython/MultiproductLoad.py index 4012e0acda..03d4052aee 100644 --- a/metadata-etl/src/main/resources/jython/MultiproductLoad.py +++ b/metadata-etl/src/main/resources/jython/MultiproductLoad.py @@ -129,8 +129,13 @@ class MultiproductLoad: ON DUPLICATE KEY UPDATE dataset_urn = n.urn, sort_id = COALESCE(n.n_sort_id, sort_id), - owner_type = CASE WHEN n.n_owner_type IS NULL OR owner_type >= n.n_owner_type - THEN owner_type ELSE n.n_owner_type END, + -- the Owner_type precedence (from high to low) is: OWNER, PRODUCER, DELEGATE, STAKEHOLDER + owner_type = CASE WHEN ( + case owner_type when 'OWNER' then 20 when 'PRODUCER' then 40 when 'DELEGATE' then 60 when 'STACKHOLDER' then 80 else 100 end + ) <= ( + case n.n_owner_type when 'OWNER' then 20 when 'PRODUCER' then 40 when 'DELEGATE' then 60 when 'STACKHOLDER' then 80 else 100 end + ) + THEN owner_type ELSE n.n_owner_type END, owner_sub_type = COALESCE(owner_sub_type, n.n_owner_sub_type), owner_id_type = COALESCE(owner_id_type, n.n_owner_id_type), owner_source = CASE WHEN owner_source is null THEN 'SCM' diff --git a/wherehows-common/src/main/java/wherehows/common/enums/OwnerType.java b/wherehows-common/src/main/java/wherehows/common/enums/OwnerType.java index 54f683cc78..61b70f5c45 100644 --- a/wherehows-common/src/main/java/wherehows/common/enums/OwnerType.java +++ b/wherehows-common/src/main/java/wherehows/common/enums/OwnerType.java @@ -15,12 +15,11 @@ package wherehows.common.enums; public enum OwnerType { - - PRODUCER(10), - DELEGATE(20), - CONSUMER(30), - AUDITOR(40), - STAKEHOLDER(50); + // the precedence (from high to low) is: OWNER, PRODUCER, DELEGATE, STAKEHOLDER + OWNER(20), + PRODUCER(40), + DELEGATE(60), + STAKEHOLDER(80); private int numVal; @@ -45,12 +44,12 @@ public enum OwnerType { } catch (NullPointerException | IllegalArgumentException ex) { } - int type2value = 110; + int type2value = 100; try { type2value = OwnerType.valueOf(type2.toUpperCase()).value(); } catch (NullPointerException | IllegalArgumentException ex) { } - return type1value < type2value ? type1 : type2; + return type1value <= type2value ? type1 : type2; } } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetDeploymentRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetDeploymentRecord.java index c896f519c1..cdb0745728 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetDeploymentRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetDeploymentRecord.java @@ -26,6 +26,7 @@ public class DatasetDeploymentRecord extends AbstractRecord { String region; String zone; String cluster; + String container; Boolean enabled; Map additionalDeploymentInfo; Long modifiedTime; @@ -33,7 +34,7 @@ public class DatasetDeploymentRecord extends AbstractRecord { @Override public String[] getDbColumnNames() { return new String[]{"dataset_id", "dataset_urn", "deployment_tier", "datacenter", "region", "zone", "cluster", - "enabled", "additional_info", "modified_time"}; + "container", "enabled", "additional_info", "modified_time"}; } @Override @@ -100,6 +101,14 @@ public class DatasetDeploymentRecord extends AbstractRecord { this.cluster = cluster; } + public String getContainer() { + return container; + } + + public void setContainer(String container) { + this.container = container; + } + public Boolean getEnabled() { return enabled; } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetEntityRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetEntityRecord.java new file mode 100644 index 0000000000..0b208c1fed --- /dev/null +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetEntityRecord.java @@ -0,0 +1,47 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package wherehows.common.schemas; + +import java.util.List; + + +public class DatasetEntityRecord extends AbstractRecord { + + String identifierType; + String identifierField; + + @Override + public List fillAllFields() { + return null; + } + + public DatasetEntityRecord() { + } + + public String getIdentifierType() { + return identifierType; + } + + public void setIdentifierType(String identifierType) { + this.identifierType = identifierType; + } + + public String getIdentifierField() { + return identifierField; + } + + public void setIdentifierField(String identifierField) { + this.identifierField = identifierField; + } +} diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldSchemaRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldSchemaRecord.java index 22c5e19934..74bfdb787a 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldSchemaRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetFieldSchemaRecord.java @@ -42,7 +42,6 @@ public class DatasetFieldSchemaRecord extends AbstractRecord { String charType; Integer precision; Integer scale; - String confidentialFlags; Boolean isRecursive; Boolean partitioned; Boolean indexed; @@ -55,7 +54,7 @@ public class DatasetFieldSchemaRecord extends AbstractRecord { return new String[]{"dataset_id", "position", "parent_field_position", "field_json_path", "field_path", "parent_path", "field_name", "label", "aliases", "type", "logical_type", "semantic_type", "abstract_type", "description", "nullable", "default_value", "max_byte_length", "max_char_length", "char_type", "precision", "scale", - "confidential_flags", "is_recursive", "partitioned", "indexed", "namespace", "default_comment_id", "comment_ids"}; + "is_recursive", "partitioned", "indexed", "namespace", "default_comment_id", "comment_ids"}; } @Override @@ -67,8 +66,7 @@ public class DatasetFieldSchemaRecord extends AbstractRecord { public String[] getFieldDetailColumns() { return new String[]{"dataset_id", "sort_id", "parent_sort_id", "parent_path", "field_name", "fields_layout_id", "field_label", "data_type", "data_size", "data_precision", "data_fraction", "is_nullable", "is_indexed", - "is_partitioned", "is_recursive", "confidential_flags", "default_value", "namespace", "default_comment_id", - "comment_ids"}; + "is_partitioned", "is_recursive", "default_value", "namespace", "default_comment_id", "comment_ids"}; } @JsonIgnore @@ -76,7 +74,7 @@ public class DatasetFieldSchemaRecord extends AbstractRecord { return new Object[]{datasetId, position, parentFieldPosition, parentPath, fieldName, 0, label, type, maxCharLength, precision, scale, nullable != null && nullable ? "Y" : "N", indexed != null && indexed ? "Y" : "N", partitioned != null && partitioned ? "Y" : "N", isRecursive != null && isRecursive ? "Y" : "N", - confidentialFlags, defaultValue, namespace, defaultCommentId, commentIds}; + defaultValue, namespace, defaultCommentId, commentIds}; } public DatasetFieldSchemaRecord() { @@ -250,14 +248,6 @@ public class DatasetFieldSchemaRecord extends AbstractRecord { this.scale = scale; } - public String getConfidentialFlags() { - return confidentialFlags; - } - - public void setConfidentialFlags(String confidentialFlags) { - this.confidentialFlags = confidentialFlags; - } - public Boolean getRecursive() { return isRecursive; } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetKeySchemaRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetKeySchemaRecord.java new file mode 100644 index 0000000000..15ac1ed710 --- /dev/null +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetKeySchemaRecord.java @@ -0,0 +1,56 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package wherehows.common.schemas; + +import java.util.List; + + +public class DatasetKeySchemaRecord extends AbstractRecord { + + String keyType; + String format; + String text; + + @Override + public List fillAllFields() { + return null; + } + + public DatasetKeySchemaRecord() { + } + + public String getKeyType() { + return keyType; + } + + public void setKeyType(String keyType) { + this.keyType = keyType; + } + + public String getFormat() { + return format; + } + + public void setFormat(String format) { + this.format = format; + } + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } +} diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOriginalSchemaRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOriginalSchemaRecord.java new file mode 100644 index 0000000000..8377535f9b --- /dev/null +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOriginalSchemaRecord.java @@ -0,0 +1,66 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package wherehows.common.schemas; + +import java.util.List; +import java.util.Map; + + +public class DatasetOriginalSchemaRecord extends AbstractRecord { + + String format; + String name; + String text; + Map checksum; + + @Override + public List fillAllFields() { + return null; + } + + public DatasetOriginalSchemaRecord() { + } + + public String getFormat() { + return format; + } + + public void setFormat(String format) { + this.format = format; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + public Map getChecksum() { + return checksum; + } + + public void setChecksum(Map checksum) { + this.checksum = checksum; + } +} diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOwnerRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOwnerRecord.java index cacca867ef..85d2191b79 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOwnerRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetOwnerRecord.java @@ -24,8 +24,7 @@ public class DatasetOwnerRecord extends AbstractRecord { String datasetUrn; Integer appId; String ownerCategory; - String ownerSubCategory; - String owner; + String ownerUrn; String ownerType; String isGroup; String isActive; @@ -41,7 +40,7 @@ public class DatasetOwnerRecord extends AbstractRecord { @Override public String[] getDbColumnNames() { - return new String[]{"dataset_id", "dataset_urn", "app_id", "owner_type", "owner_sub_type", "owner_id", + return new String[]{"dataset_id", "dataset_urn", "app_id", "owner_type", "owner_id", "owner_id_type", "is_group", "is_active", "sort_id", "namespace", "owner_source", "db_ids", "source_time", "created_time", "modified_time", "confirmed_by", "confirmed_on"}; } @@ -50,7 +49,7 @@ public class DatasetOwnerRecord extends AbstractRecord { public List fillAllFields() { List allFields = new ArrayList<>(); allFields.add(datasetUrn); - allFields.add(owner); + allFields.add(ownerUrn); allFields.add(sortId); allFields.add(namespace); allFields.add(dbIds); @@ -60,13 +59,13 @@ public class DatasetOwnerRecord extends AbstractRecord { @JsonIgnore public String[] getDbColumnForUnmatchedOwner() { - return new String[]{"dataset_urn", "app_id", "owner_type", "owner_sub_type", "owner_id", "owner_id_type", + return new String[]{"dataset_urn", "app_id", "owner_type", "owner_id", "owner_id_type", "is_group", "is_active", "sort_id", "namespace", "owner_source", "db_name", "db_id", "source_time"}; } @JsonIgnore public Object[] getValuesForUnmatchedOwner() { - return new Object[]{datasetUrn, appId, ownerCategory, ownerSubCategory, owner, ownerType, isGroup, + return new Object[]{datasetUrn, appId, ownerCategory, ownerUrn, ownerType, isGroup, isActive, sortId, namespace, ownerSource, "N/A", dbIds, sourceTime}; } @@ -76,7 +75,7 @@ public class DatasetOwnerRecord extends AbstractRecord { public DatasetOwnerRecord(String datasetUrn, String ownerId, Integer sortId, String namespace, String dbName, Long sourceTime) { this.datasetUrn = datasetUrn; - this.owner = ownerId; + this.ownerUrn = ownerId; this.sortId = sortId; this.namespace = namespace; this.dbIds = dbName; @@ -115,20 +114,12 @@ public class DatasetOwnerRecord extends AbstractRecord { this.ownerCategory = ownerCategory; } - public String getOwnerSubCategory() { - return ownerSubCategory; + public String getOwnerUrn() { + return ownerUrn; } - public void setOwnerSubCategory(String ownerSubCategory) { - this.ownerSubCategory = ownerSubCategory; - } - - public String getOwner() { - return owner; - } - - public void setOwner(String owner) { - this.owner = owner; + public void setOwnerUrn(String ownerUrn) { + this.ownerUrn = ownerUrn; } public String getOwnerType() { diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPartitionKeyRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPartitionKeyRecord.java index 8f59f4039e..db5f5e9c9b 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPartitionKeyRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPartitionKeyRecord.java @@ -23,7 +23,7 @@ public class DatasetPartitionKeyRecord { String partitionLevel; String partitionType; String timeFormat; - String granularity; + String timeGranularity; List fieldNames; List partitionValues; Integer numberOfHashBuckets; @@ -55,12 +55,12 @@ public class DatasetPartitionKeyRecord { this.timeFormat = timeFormat; } - public String getGranularity() { - return granularity; + public String getTimeGranularity() { + return timeGranularity; } - public void setGranularity(String granularity) { - this.granularity = granularity; + public void setTimeGranularity(String timeGranularity) { + this.timeGranularity = timeGranularity; } public List getFieldNames() { diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPartitionRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPartitionRecord.java index a55139f898..63ebea5253 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPartitionRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetPartitionRecord.java @@ -20,7 +20,7 @@ public class DatasetPartitionRecord extends AbstractRecord { Integer datasetId; String datasetUrn; - String totalPartitionLevel; + Integer totalPartitionLevel; String partitionSpecText; Boolean hasTimePartition; Boolean hasHashPartition; @@ -58,11 +58,11 @@ public class DatasetPartitionRecord extends AbstractRecord { this.datasetUrn = datasetUrn; } - public String getTotalPartitionLevel() { + public Integer getTotalPartitionLevel() { return totalPartitionLevel; } - public void setTotalPartitionLevel(String totalPartitionLevel) { + public void setTotalPartitionLevel(Integer totalPartitionLevel) { this.totalPartitionLevel = totalPartitionLevel; } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSchemaInfoRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSchemaInfoRecord.java index 07871da6d6..e3caf87e71 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSchemaInfoRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSchemaInfoRecord.java @@ -27,12 +27,8 @@ public class DatasetSchemaInfoRecord extends AbstractRecord { String version; String name; String description; - String format; - String originalSchema; - Map originalSchemaChecksum; - String keySchemaType; - String keySchemaFormat; - String keySchema; + DatasetOriginalSchemaRecord originalSchema; + DatasetKeySchemaRecord keySchema; Boolean isFieldNameCaseSensitive; List fieldSchema; List changeDataCaptureFields; @@ -42,9 +38,8 @@ public class DatasetSchemaInfoRecord extends AbstractRecord { @Override public String[] getDbColumnNames() { return new String[]{"dataset_id", "dataset_urn", "is_latest_revision", "create_time", "revision", "version", "name", - "description", "format", "original_schema", "original_schema_checksum", "key_schema_type", "key_schema_format", - "key_schema", "is_field_name_case_sensitive", "field_schema", "change_data_capture_fields", "audit_fields", - "modified_time"}; + "description", "original_schema", "key_schema", "is_field_name_case_sensitive", "field_schema", + "change_data_capture_fields", "audit_fields", "modified_time"}; } @Override @@ -119,51 +114,19 @@ public class DatasetSchemaInfoRecord extends AbstractRecord { this.description = description; } - public String getFormat() { - return format; - } - - public void setFormat(String format) { - this.format = format; - } - - public String getOriginalSchema() { + public DatasetOriginalSchemaRecord getOriginalSchema() { return originalSchema; } - public void setOriginalSchema(String originalSchema) { + public void setOriginalSchema(DatasetOriginalSchemaRecord originalSchema) { this.originalSchema = originalSchema; } - public Map getOriginalSchemaChecksum() { - return originalSchemaChecksum; - } - - public void setOriginalSchemaChecksum(Map originalSchemaChecksum) { - this.originalSchemaChecksum = originalSchemaChecksum; - } - - public String getKeySchemaType() { - return keySchemaType; - } - - public void setKeySchemaType(String keySchemaType) { - this.keySchemaType = keySchemaType; - } - - public String getKeySchemaFormat() { - return keySchemaFormat; - } - - public void setKeySchemaFormat(String keySchemaFormat) { - this.keySchemaFormat = keySchemaFormat; - } - - public String getKeySchema() { + public DatasetKeySchemaRecord getKeySchema() { return keySchema; } - public void setKeySchema(String keySchema) { + public void setKeySchema(DatasetKeySchemaRecord keySchema) { this.keySchema = keySchema; } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSecurityRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSecurityRecord.java index b70160f010..d15e26c3e0 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSecurityRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetSecurityRecord.java @@ -21,18 +21,18 @@ public class DatasetSecurityRecord extends AbstractRecord { Integer datasetId; String datasetUrn; - Map classification; + Map> classification; String recordOwnerType; - List recordOwner; String complianceType; + List compliancePurgeEntities; DatasetRetentionRecord retentionPolicy; DatasetGeographicAffinityRecord geographicAffinity; Long modifiedTime; @Override public String[] getDbColumnNames() { - return new String[]{"dataset_id", "dataset_urn", "classification", "record_owner_type", "record_owner", - "compliance_type", "retention_policy", "geographic_affinity", "modified_time"}; + return new String[]{"dataset_id", "dataset_urn", "classification", "record_owner_type", "compliance_purge_type", + "compliance_purge_entities", "retention_policy", "geographic_affinity", "modified_time"}; } @Override @@ -59,11 +59,11 @@ public class DatasetSecurityRecord extends AbstractRecord { this.datasetUrn = datasetUrn; } - public Map getClassification() { + public Map> getClassification() { return classification; } - public void setClassification(Map classification) { + public void setClassification(Map> classification) { this.classification = classification; } @@ -75,12 +75,12 @@ public class DatasetSecurityRecord extends AbstractRecord { this.recordOwnerType = recordOwnerType; } - public List getRecordOwner() { - return recordOwner; + public List getCompliancePurgeEntities() { + return compliancePurgeEntities; } - public void setRecordOwner(List recordOwner) { - this.recordOwner = recordOwner; + public void setCompliancePurgeEntities(List compliancePurgeEntities) { + this.compliancePurgeEntities = compliancePurgeEntities; } public String getComplianceType() {