mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-03 20:27:50 +00:00 
			
		
		
		
	fix(timeseries): create specific elastic field mappings for all timeseries fields (#3350)
This commit is contained in:
		
							parent
							
								
									810e0caa85
								
							
						
					
					
						commit
						ef2182f102
					
				@ -1,7 +1,9 @@
 | 
				
			|||||||
package com.linkedin.metadata.timeseries.elastic.indexbuilder;
 | 
					package com.linkedin.metadata.timeseries.elastic.indexbuilder;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import com.google.common.collect.ImmutableMap;
 | 
					import com.google.common.collect.ImmutableMap;
 | 
				
			||||||
 | 
					import com.linkedin.data.schema.DataSchema;
 | 
				
			||||||
import com.linkedin.metadata.models.AspectSpec;
 | 
					import com.linkedin.metadata.models.AspectSpec;
 | 
				
			||||||
 | 
					import com.linkedin.metadata.models.TimeseriesFieldCollectionSpec;
 | 
				
			||||||
import java.util.HashMap;
 | 
					import java.util.HashMap;
 | 
				
			||||||
import java.util.Map;
 | 
					import java.util.Map;
 | 
				
			||||||
import javax.annotation.Nonnull;
 | 
					import javax.annotation.Nonnull;
 | 
				
			||||||
@ -36,6 +38,37 @@ public class MappingsBuilder {
 | 
				
			|||||||
    mappings.put(SYSTEM_METADATA_FIELD, ImmutableMap.of("type", "object", "enabled", false));
 | 
					    mappings.put(SYSTEM_METADATA_FIELD, ImmutableMap.of("type", "object", "enabled", false));
 | 
				
			||||||
    mappings.put(IS_EXPLODED_FIELD, ImmutableMap.of("type", "boolean"));
 | 
					    mappings.put(IS_EXPLODED_FIELD, ImmutableMap.of("type", "boolean"));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    aspectSpec.getTimeseriesFieldSpecs()
 | 
				
			||||||
 | 
					        .forEach(x -> mappings.put(x.getName(), getFieldMapping(x.getPegasusSchema().getType())));
 | 
				
			||||||
 | 
					    aspectSpec.getTimeseriesFieldCollectionSpecs()
 | 
				
			||||||
 | 
					        .forEach(x -> mappings.put(x.getName(), getTimeseriesFieldCollectionSpecMapping(x)));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return ImmutableMap.of("properties", mappings);
 | 
					    return ImmutableMap.of("properties", mappings);
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  private static Map<String, Object> getTimeseriesFieldCollectionSpecMapping(
 | 
				
			||||||
 | 
					      TimeseriesFieldCollectionSpec timeseriesFieldCollectionSpec) {
 | 
				
			||||||
 | 
					    Map<String, Object> collectionMappings = new HashMap<>();
 | 
				
			||||||
 | 
					    collectionMappings.put(timeseriesFieldCollectionSpec.getTimeseriesFieldCollectionAnnotation().getKey(),
 | 
				
			||||||
 | 
					        getFieldMapping(DataSchema.Type.STRING));
 | 
				
			||||||
 | 
					    timeseriesFieldCollectionSpec.getTimeseriesFieldSpecMap()
 | 
				
			||||||
 | 
					        .values()
 | 
				
			||||||
 | 
					        .forEach(x -> collectionMappings.put(x.getName(), getFieldMapping(x.getPegasusSchema().getType())));
 | 
				
			||||||
 | 
					    return ImmutableMap.of("properties", collectionMappings);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  private static Map<String, Object> getFieldMapping(DataSchema.Type dataSchemaType) {
 | 
				
			||||||
 | 
					    switch (dataSchemaType) {
 | 
				
			||||||
 | 
					      case INT:
 | 
				
			||||||
 | 
					        return ImmutableMap.of("type", "integer");
 | 
				
			||||||
 | 
					      case LONG:
 | 
				
			||||||
 | 
					        return ImmutableMap.of("type", "long");
 | 
				
			||||||
 | 
					      case FLOAT:
 | 
				
			||||||
 | 
					        return ImmutableMap.of("type", "float");
 | 
				
			||||||
 | 
					      case DOUBLE:
 | 
				
			||||||
 | 
					        return ImmutableMap.of("type", "double");
 | 
				
			||||||
 | 
					      default:
 | 
				
			||||||
 | 
					        return ImmutableMap.of("type", "keyword");
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -55,7 +55,6 @@ public class ESAggregatedStatsDAO {
 | 
				
			|||||||
  private static final String ES_TERMS_AGGREGATION_PREFIX = "terms_";
 | 
					  private static final String ES_TERMS_AGGREGATION_PREFIX = "terms_";
 | 
				
			||||||
  private static final String ES_MAX_AGGREGATION_PREFIX = "max_";
 | 
					  private static final String ES_MAX_AGGREGATION_PREFIX = "max_";
 | 
				
			||||||
  private static final String ES_FIELD_TIMESTAMP = "timestampMillis";
 | 
					  private static final String ES_FIELD_TIMESTAMP = "timestampMillis";
 | 
				
			||||||
  private static final String ES_KEYWORD_SUFFIX = ".keyword";
 | 
					 | 
				
			||||||
  private static final String ES_AGG_TIMESTAMP = ES_AGGREGATION_PREFIX + ES_FIELD_TIMESTAMP;
 | 
					  private static final String ES_AGG_TIMESTAMP = ES_AGGREGATION_PREFIX + ES_FIELD_TIMESTAMP;
 | 
				
			||||||
  private static final String ES_AGG_MAX_TIMESTAMP =
 | 
					  private static final String ES_AGG_MAX_TIMESTAMP =
 | 
				
			||||||
      ES_AGGREGATION_PREFIX + ES_MAX_AGGREGATION_PREFIX + ES_FIELD_TIMESTAMP;
 | 
					      ES_AGGREGATION_PREFIX + ES_MAX_AGGREGATION_PREFIX + ES_FIELD_TIMESTAMP;
 | 
				
			||||||
@ -362,9 +361,6 @@ public class ESAggregatedStatsDAO {
 | 
				
			|||||||
      AggregationSpec aggregationSpec) {
 | 
					      AggregationSpec aggregationSpec) {
 | 
				
			||||||
    String fieldPath = aggregationSpec.getFieldPath();
 | 
					    String fieldPath = aggregationSpec.getFieldPath();
 | 
				
			||||||
    String esFieldName = fieldPath;
 | 
					    String esFieldName = fieldPath;
 | 
				
			||||||
    if (!isIntegralType(getAggregationSpecMemberType(aspectSpec, aggregationSpec))) {
 | 
					 | 
				
			||||||
      esFieldName += ES_KEYWORD_SUFFIX;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    switch (aggregationSpec.getAggregationType()) {
 | 
					    switch (aggregationSpec.getAggregationType()) {
 | 
				
			||||||
      case LATEST:
 | 
					      case LATEST:
 | 
				
			||||||
@ -415,9 +411,6 @@ public class ESAggregatedStatsDAO {
 | 
				
			|||||||
        // Process the string grouping bucket using the 'terms' aggregation.
 | 
					        // Process the string grouping bucket using the 'terms' aggregation.
 | 
				
			||||||
        String fieldName = curGroupingBucket.getKey();
 | 
					        String fieldName = curGroupingBucket.getKey();
 | 
				
			||||||
        DataSchema.Type fieldType = getGroupingBucketKeyType(aspectSpec, curGroupingBucket);
 | 
					        DataSchema.Type fieldType = getGroupingBucketKeyType(aspectSpec, curGroupingBucket);
 | 
				
			||||||
        if (!isIntegralType(fieldType)) {
 | 
					 | 
				
			||||||
          fieldName += ES_KEYWORD_SUFFIX;
 | 
					 | 
				
			||||||
        }
 | 
					 | 
				
			||||||
        curAggregationBuilder = AggregationBuilders.terms(getGroupingBucketAggName(curGroupingBucket))
 | 
					        curAggregationBuilder = AggregationBuilders.terms(getGroupingBucketAggName(curGroupingBucket))
 | 
				
			||||||
            .field(fieldName)
 | 
					            .field(fieldName)
 | 
				
			||||||
            .size(MAX_TERM_BUCKETS)
 | 
					            .size(MAX_TERM_BUCKETS)
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user