diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/indexbuilder/MappingsBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/indexbuilder/MappingsBuilder.java index 3f96e6a360..80802c96b9 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/indexbuilder/MappingsBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/indexbuilder/MappingsBuilder.java @@ -1,7 +1,9 @@ package com.linkedin.metadata.timeseries.elastic.indexbuilder; import com.google.common.collect.ImmutableMap; +import com.linkedin.data.schema.DataSchema; import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.TimeseriesFieldCollectionSpec; import java.util.HashMap; import java.util.Map; import javax.annotation.Nonnull; @@ -36,6 +38,37 @@ public class MappingsBuilder { mappings.put(SYSTEM_METADATA_FIELD, ImmutableMap.of("type", "object", "enabled", false)); mappings.put(IS_EXPLODED_FIELD, ImmutableMap.of("type", "boolean")); + aspectSpec.getTimeseriesFieldSpecs() + .forEach(x -> mappings.put(x.getName(), getFieldMapping(x.getPegasusSchema().getType()))); + aspectSpec.getTimeseriesFieldCollectionSpecs() + .forEach(x -> mappings.put(x.getName(), getTimeseriesFieldCollectionSpecMapping(x))); + return ImmutableMap.of("properties", mappings); } + + private static Map getTimeseriesFieldCollectionSpecMapping( + TimeseriesFieldCollectionSpec timeseriesFieldCollectionSpec) { + Map collectionMappings = new HashMap<>(); + collectionMappings.put(timeseriesFieldCollectionSpec.getTimeseriesFieldCollectionAnnotation().getKey(), + getFieldMapping(DataSchema.Type.STRING)); + timeseriesFieldCollectionSpec.getTimeseriesFieldSpecMap() + .values() + .forEach(x -> collectionMappings.put(x.getName(), getFieldMapping(x.getPegasusSchema().getType()))); + return ImmutableMap.of("properties", collectionMappings); + } + + private static Map getFieldMapping(DataSchema.Type dataSchemaType) { + switch (dataSchemaType) { + case INT: + return ImmutableMap.of("type", "integer"); + case LONG: + return ImmutableMap.of("type", "long"); + case FLOAT: + return ImmutableMap.of("type", "float"); + case DOUBLE: + return ImmutableMap.of("type", "double"); + default: + return ImmutableMap.of("type", "keyword"); + } + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/query/ESAggregatedStatsDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/query/ESAggregatedStatsDAO.java index 49747e2e47..66cd775044 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/query/ESAggregatedStatsDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/query/ESAggregatedStatsDAO.java @@ -55,7 +55,6 @@ public class ESAggregatedStatsDAO { private static final String ES_TERMS_AGGREGATION_PREFIX = "terms_"; private static final String ES_MAX_AGGREGATION_PREFIX = "max_"; private static final String ES_FIELD_TIMESTAMP = "timestampMillis"; - private static final String ES_KEYWORD_SUFFIX = ".keyword"; private static final String ES_AGG_TIMESTAMP = ES_AGGREGATION_PREFIX + ES_FIELD_TIMESTAMP; private static final String ES_AGG_MAX_TIMESTAMP = ES_AGGREGATION_PREFIX + ES_MAX_AGGREGATION_PREFIX + ES_FIELD_TIMESTAMP; @@ -362,9 +361,6 @@ public class ESAggregatedStatsDAO { AggregationSpec aggregationSpec) { String fieldPath = aggregationSpec.getFieldPath(); String esFieldName = fieldPath; - if (!isIntegralType(getAggregationSpecMemberType(aspectSpec, aggregationSpec))) { - esFieldName += ES_KEYWORD_SUFFIX; - } switch (aggregationSpec.getAggregationType()) { case LATEST: @@ -415,9 +411,6 @@ public class ESAggregatedStatsDAO { // Process the string grouping bucket using the 'terms' aggregation. String fieldName = curGroupingBucket.getKey(); DataSchema.Type fieldType = getGroupingBucketKeyType(aspectSpec, curGroupingBucket); - if (!isIntegralType(fieldType)) { - fieldName += ES_KEYWORD_SUFFIX; - } curAggregationBuilder = AggregationBuilders.terms(getGroupingBucketAggName(curGroupingBucket)) .field(fieldName) .size(MAX_TERM_BUCKETS)