Fix topic indexes and migrations (#9439)

* Fix topic indexing

* Fix topic db migrations

* Fix topic db migrations

* Fix topic db migrations

* Fix test; add missing topic indexes on python side; fix ranking for tables using usage count
This commit is contained in:
Sriharsha Chintalapani 2022-12-21 01:33:12 -08:00 committed by GitHub
parent 9e01fe0636
commit b9242c1390
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 171 additions and 46 deletions

View File

@ -56,3 +56,12 @@ WHERE serviceType = 'Dagster' AND json -> '$.connection.config.configSource.hos
UPDATE pipeline_service_entity
SET json = JSON_INSERT(JSON_INSERT(JSON_REMOVE(json, '$.connection.config.configSource'),'$.connection.config.host', JSON_EXTRACT(json,'$.connection.config.configSource.hostPort')), '$.connection.config.token','')
WHERE serviceType = 'Dagster' AND json -> '$.connection.config.configSource.hostPort' IS NOT NULL;
UPDATE topic_entity
SET json = JSON_INSERT(JSON_REMOVE(json, '$.schemaType'), '$.messageSchema', JSON_OBJECT('schemaType', JSON_EXTRACT(json, '$.schemaType')))
WHERE json -> '$.schemaType' IS NOT NULL;
UPDATE topic_entity
SET json = JSON_INSERT(JSON_REMOVE(json, '$.schemaText'), '$.messageSchema.schemaText', JSON_EXTRACT(json, '$.schemaText'))
WHERE json -> '$.schemaText' IS NOT NULL;

View File

@ -50,6 +50,14 @@ SET json = jsonb_set(jsonb_set(json::jsonb #- '{connection,config,configSource}'
WHERE serviceType = 'Dagster' and json #>'{connection,config,configSource,host}' is not null;
update pipeline_service_entity
set json = jsonb_set(json::jsonb #- '{connection,config,configSource}', '{connection,config,host}', json#> '{connection,config,configSource,hostPort}', true)
where servicetype = 'Dagster' and json #>'{connection,config,configSource,hostPort}' is not null;
UPDATE pipeline_service_entity
SET json = jsonb_set(json::jsonb #- '{connection,config,configSource}', '{connection,config,host}', json#> '{connection,config,configSource,hostPort}', true)
WHERE servicetype = 'Dagster' and json #>'{connection,config,configSource,hostPort}' is not null;
UPDATE topic_entity
SET json = jsonb_set(json::jsonb #- '{schemaText}', '{messageSchema}', jsonb_build_object('schemaText', json#>'{schemaText}'), true)
WHERE json #> '{schemaText}' IS NOT NULL;
UPDATE topic_entity
SET json = jsonb_set(json::jsonb #- '{schemaType}', '{messageSchema,schemaType}', json#> '{schemaType}', true)
WHERE json #> '{schemaType}' IS NOT NULL;

View File

@ -98,6 +98,15 @@
"cleanupPolicies": ["compact","delete"],
"schemaType": "Protobuf",
"schemaText": "syntax = \"proto2\";\n\npackage tutorial;\n\nmessage Person {\n optional string name = 1;\n optional int32 id = 2;\n optional string email = 3;\n\n enum PhoneType {\n MOBILE = 0;\n HOME = 1;\n WORK = 2;\n }\n\n message PhoneNumber {\n optional string number = 1;\n optional PhoneType type = 2 [default = HOME];\n }\n\n repeated PhoneNumber phones = 4;\n}\n\nmessage AddressBook {\n repeated Person people = 1;\n}"
},
{
"name": "customer_contacts",
"description": "Kafka topic to capture the customer contacts such as email, phone",
"partitions": 56,
"retentionSize": 455858109,
"replicationFactor":2,
"maximumMessageSize":167,
"cleanupPolicies": ["delete"]
}
]
}

View File

@ -22,6 +22,7 @@ from metadata.generated.schema.entity.data.mlmodel import (
)
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.type import schema
from metadata.generated.schema.type.entityReference import (
EntityReference,
EntityReferenceList,
@ -96,7 +97,7 @@ class TopicESDocument(BaseModel):
href: Optional[str]
deleted: bool
service: EntityReference
serviceType: str
messageSchema: Optional[schema.Topic] = None
schemaText: Optional[str] = None
schemaType: Optional[str] = None
cleanupPolicies: List[str] = None

View File

@ -33,7 +33,7 @@ from metadata.generated.schema.dataInsight.kpi.basic import KpiResult
from metadata.generated.schema.dataInsight.kpi.kpi import Kpi
class DataInisghtMixin:
class DataInsightMixin:
"""data insight mixin used to write results"""
def add_data_insight_report_data(self, record: ReportData) -> ReportData:

View File

@ -73,7 +73,7 @@ from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.auth_provider import AuthenticationProvider
from metadata.ingestion.ometa.client import REST, APIError, ClientConfig
from metadata.ingestion.ometa.mixins.dashboard_mixin import OMetaDashboardMixin
from metadata.ingestion.ometa.mixins.data_insight_mixin import DataInisghtMixin
from metadata.ingestion.ometa.mixins.data_insight_mixin import DataInsightMixin
from metadata.ingestion.ometa.mixins.es_mixin import ESMixin
from metadata.ingestion.ometa.mixins.glossary_mixin import GlossaryMixin
from metadata.ingestion.ometa.mixins.ingestion_pipeline_mixin import (
@ -140,7 +140,7 @@ class OpenMetadata(
OMetaDashboardMixin,
OMetaPatchMixin,
OMetaTestsMixin,
DataInisghtMixin,
DataInsightMixin,
OMetaIngestionPipelineMixin,
OMetaUserMixin,
Generic[T, C],

View File

@ -553,8 +553,7 @@ class ElasticsearchSink(Sink[Entity]):
deleted=topic.deleted,
service=topic.service,
serviceType=str(topic.serviceType.name),
schemaText=topic.schemaText,
schemaType=str(topic.schemaType.name),
messageSchema=topic.messageSchema,
cleanupPolicies=[str(policy.name) for policy in topic.cleanupPolicies],
replicationFactor=topic.replicationFactor,
maximumMessageSize=topic.maximumMessageSize,

View File

@ -82,11 +82,56 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"href": {
"type": "text"
},
"schemaText": {
"type": "text"
},
"schemaType": {
"type": "text"
"messageSchema": {
"properties": {
"schemaType": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"schemaFields": {
"properties": {
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"dataType": {
"type": "text"
},
"description": {
"type": "text",
"analyzer": "om_analyzer"
},
"fullyQualifiedName": {
"type": "text"
},
"tags": {
"properties": {
"tagFQN": {
"type": "keyword"
},
"labelType": {
"type": "keyword"
},
"description": {
"type": "text"
},
"source": {
"type": "keyword"
},
"state": {
"type": "keyword"
}
}
}
}
}
}
},
"cleanupPolicies": {
"type": "keyword"

View File

@ -675,15 +675,6 @@ class SampleDataSource(
topic["service"] = EntityReference(
id=self.kafka_service.id, type="messagingService"
)
schema_type = topic["schemaType"].lower()
load_parser_fn = schema_parser_config_registry.registry.get(schema_type)
if not load_parser_fn:
raise InvalidSchemaTypeException(
f"Cannot find {schema_type} in parser providers registry."
)
schema_fields = load_parser_fn(topic["name"], topic["schemaText"])
create_topic = CreateTopicRequest(
name=topic["name"],
description=topic["description"],
@ -692,15 +683,26 @@ class SampleDataSource(
replicationFactor=topic["replicationFactor"],
maximumMessageSize=topic["maximumMessageSize"],
cleanupPolicies=topic["cleanupPolicies"],
messageSchema=Topic(
schemaText=topic["schemaText"],
schemaType=topic["schemaType"],
schemaFields=schema_fields,
),
service=EntityReference(
id=self.kafka_service.id, type="messagingService"
),
)
if "schemaType" in topic:
schema_type = topic["schemaType"].lower()
load_parser_fn = schema_parser_config_registry.registry.get(schema_type)
if not load_parser_fn:
raise InvalidSchemaTypeException(
f"Cannot find {schema_type} in parser providers registry."
)
schema_fields = load_parser_fn(topic["name"], topic["schemaText"])
create_topic.messageSchema = Topic(
schemaText=topic["schemaText"],
schemaType=topic["schemaType"],
schemaFields=schema_fields,
)
self.status.scanned("topic", create_topic.name.__root__)
yield create_topic
@ -913,7 +915,6 @@ class SampleDataSource(
fqn=table_profile["fqn"],
)
for days, profile in enumerate(table_profile["profile"]):
yield OMetaTableProfileSampleData(
table=table,
profile=CreateTableProfileRequest(

View File

@ -241,15 +241,15 @@ public class TopicRepository extends EntityRepository<Topic> {
recordChange("retentionSize", original.getRetentionSize(), updated.getRetentionSize());
if (updated.getMessageSchema() != null) {
recordChange(
"schema.schemaText",
"messageSchema.schemaText",
original.getMessageSchema().getSchemaText(),
updated.getMessageSchema().getSchemaText());
recordChange(
"schema.schemaType",
"messageSchema.schemaType",
original.getMessageSchema().getSchemaType(),
updated.getMessageSchema().getSchemaType());
updateSchemaFields(
"schemaFields",
"messageSchema.schemaFields",
original.getMessageSchema().getSchemaFields(),
updated.getMessageSchema().getSchemaFields(),
EntityUtil.schemaFieldMatch);

View File

@ -51,6 +51,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchNoneQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@ -397,20 +398,21 @@ public class SearchResource {
}
private SearchSourceBuilder buildTableSearchBuilder(String query, int from, int size) {
FieldValueFactorFunctionBuilder boostScoreBuilder =
ScoreFunctionBuilders.fieldValueFactorFunction("usageSummary.weeklyStats.count").missing(1).factor(2);
FunctionScoreQueryBuilder.FilterFunctionBuilder[] functions =
new FunctionScoreQueryBuilder.FilterFunctionBuilder[1];
functions[0] = new FunctionScoreQueryBuilder.FilterFunctionBuilder(boostScoreBuilder);
QueryStringQueryBuilder queryStringBuilder =
QueryBuilders.queryStringQuery(query)
.field(FIELD_DISPLAY_NAME, 10.0f)
.field(FIELD_DISPLAY_NAME, 20.0f)
.field(FIELD_DESCRIPTION, 2.0f)
.field("columns.name", 2.0f)
.field("columns.description", 1.0f)
.field("columns.children.name", 2.0f)
.defaultOperator(Operator.AND)
.fuzziness(Fuzziness.AUTO);
FieldValueFactorFunctionBuilder boostScoreBuilder =
ScoreFunctionBuilders.fieldValueFactorFunction("usageSummary.weeklyStats.count").missing(1).factor(4);
FunctionScoreQueryBuilder.FilterFunctionBuilder[] functions =
new FunctionScoreQueryBuilder.FilterFunctionBuilder[] {
new FunctionScoreQueryBuilder.FilterFunctionBuilder(new MatchNoneQueryBuilder(), boostScoreBuilder)
};
FunctionScoreQueryBuilder queryBuilder = QueryBuilders.functionScoreQuery(queryStringBuilder, functions);
HighlightBuilder.Field highlightTableName = new HighlightBuilder.Field(FIELD_DISPLAY_NAME);
highlightTableName.highlighterType(UNIFIED);
@ -444,6 +446,9 @@ public class SearchResource {
QueryBuilders.queryStringQuery(query)
.field(FIELD_DISPLAY_NAME, 10.0f)
.field(FIELD_DESCRIPTION, 2.0f)
.field("messageSchema.schemaFields.name", 2.0f)
.field("messageSchema.schemaFields.description", 1.0f)
.field("messageSchema.schemaFields.children.name", 2.0f)
.defaultOperator(Operator.AND)
.fuzziness(Fuzziness.AUTO);
HighlightBuilder.Field highlightTopicName = new HighlightBuilder.Field(FIELD_DISPLAY_NAME);
@ -453,7 +458,11 @@ public class SearchResource {
HighlightBuilder hb = new HighlightBuilder();
hb.field(highlightDescription);
hb.field(highlightTopicName);
hb.field(new HighlightBuilder.Field("messageSchema.schemaFields.description").highlighterType(UNIFIED));
hb.field(new HighlightBuilder.Field("messageSchema.schemaFields.children.name").highlighterType(UNIFIED));
SearchSourceBuilder searchSourceBuilder = searchBuilder(queryBuilder, hb, from, size);
searchSourceBuilder.aggregation(
AggregationBuilders.terms("messageSchema.schemaFields.name").field("messageSchema.schemaFields.name"));
return addAggregation(searchSourceBuilder);
}
@ -462,7 +471,7 @@ public class SearchResource {
QueryBuilders.queryStringQuery(query)
.field(FIELD_DISPLAY_NAME, 10.0f)
.field(FIELD_DESCRIPTION, 2.0f)
.field("chars.name", 2.0f)
.field("charts.name", 2.0f)
.field("charts.description")
.defaultOperator(Operator.AND)
.fuzziness(Fuzziness.AUTO);
@ -552,7 +561,6 @@ public class SearchResource {
.aggregation(AggregationBuilders.terms("serviceType").field("serviceType").size(MAX_AGGREGATE_SIZE))
.aggregation(
AggregationBuilders.terms("service.name.keyword").field("service.name.keyword").size(MAX_AGGREGATE_SIZE))
.aggregation(AggregationBuilders.terms("service.type").field("service.type").size(MAX_AGGREGATE_SIZE))
.aggregation(AggregationBuilders.terms("entityType").field("entityType").size(MAX_AGGREGATE_SIZE))
.aggregation(AggregationBuilders.terms("tier.tagFQN").field("tier.tagFQN"))
.aggregation(AggregationBuilders.terms("tags.tagFQN").field("tags.tagFQN").size(MAX_AGGREGATE_SIZE));

View File

@ -66,11 +66,56 @@
"href": {
"type": "text"
},
"schemaText": {
"type": "text"
},
"schemaType": {
"type": "text"
"messageSchema": {
"properties": {
"schemaType": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"schemaFields": {
"properties": {
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"dataType": {
"type": "text"
},
"description": {
"type": "text",
"analyzer": "om_analyzer"
},
"fullyQualifiedName": {
"type": "text"
},
"tags": {
"properties": {
"tagFQN": {
"type": "keyword"
},
"labelType": {
"type": "keyword"
},
"description": {
"type": "text"
},
"source": {
"type": "keyword"
},
"state": {
"type": "keyword"
}
}
}
}
}
}
},
"cleanupPolicies": {
"type": "keyword"

View File

@ -152,7 +152,7 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
fieldUpdated(change, "replicationFactor", 1, 2);
fieldUpdated(change, "retentionTime", 1.0, 2.0);
fieldUpdated(change, "retentionSize", 1.0, 2.0);
fieldUpdated(change, "schema.schemaText", "abc", "bcd");
fieldUpdated(change, "messageSchema.schemaText", "abc", "bcd");
fieldDeleted(change, "cleanupPolicies", List.of(CleanupPolicy.COMPACT));
fieldAdded(change, "cleanupPolicies", List.of(CleanupPolicy.DELETE));