diff --git a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v007__create_db_connection_info.sql b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v007__create_db_connection_info.sql index 3f6bf36b68a..caf2b7e7af8 100644 --- a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v007__create_db_connection_info.sql +++ b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v007__create_db_connection_info.sql @@ -56,3 +56,12 @@ WHERE serviceType = 'Dagster' AND json -> '$.connection.config.configSource.hos UPDATE pipeline_service_entity SET json = JSON_INSERT(JSON_INSERT(JSON_REMOVE(json, '$.connection.config.configSource'),'$.connection.config.host', JSON_EXTRACT(json,'$.connection.config.configSource.hostPort')), '$.connection.config.token','') WHERE serviceType = 'Dagster' AND json -> '$.connection.config.configSource.hostPort' IS NOT NULL; + +UPDATE topic_entity +SET json = JSON_INSERT(JSON_REMOVE(json, '$.schemaType'), '$.messageSchema', JSON_OBJECT('schemaType', JSON_EXTRACT(json, '$.schemaType'))) +WHERE json -> '$.schemaType' IS NOT NULL; + +UPDATE topic_entity +SET json = JSON_INSERT(JSON_REMOVE(json, '$.schemaText'), '$.messageSchema.schemaText', JSON_EXTRACT(json, '$.schemaText')) +WHERE json -> '$.schemaText' IS NOT NULL; + diff --git a/bootstrap/sql/org.postgresql.Driver/v007__create_db_connection_info.sql b/bootstrap/sql/org.postgresql.Driver/v007__create_db_connection_info.sql index 31f0172db46..5ddeb75b087 100644 --- a/bootstrap/sql/org.postgresql.Driver/v007__create_db_connection_info.sql +++ b/bootstrap/sql/org.postgresql.Driver/v007__create_db_connection_info.sql @@ -50,6 +50,14 @@ SET json = jsonb_set(jsonb_set(json::jsonb #- '{connection,config,configSource}' WHERE serviceType = 'Dagster' and json #>'{connection,config,configSource,host}' is not null; -update pipeline_service_entity -set json = jsonb_set(json::jsonb #- '{connection,config,configSource}', '{connection,config,host}', json#> '{connection,config,configSource,hostPort}', true) -where servicetype = 'Dagster' and json #>'{connection,config,configSource,hostPort}' is not null; +UPDATE pipeline_service_entity +SET json = jsonb_set(json::jsonb #- '{connection,config,configSource}', '{connection,config,host}', json#> '{connection,config,configSource,hostPort}', true) +WHERE servicetype = 'Dagster' and json #>'{connection,config,configSource,hostPort}' is not null; + +UPDATE topic_entity +SET json = jsonb_set(json::jsonb #- '{schemaText}', '{messageSchema}', jsonb_build_object('schemaText', json#>'{schemaText}'), true) +WHERE json #> '{schemaText}' IS NOT NULL; + +UPDATE topic_entity +SET json = jsonb_set(json::jsonb #- '{schemaType}', '{messageSchema,schemaType}', json#> '{schemaType}', true) +WHERE json #> '{schemaType}' IS NOT NULL; \ No newline at end of file diff --git a/ingestion/examples/sample_data/topics/topics.json b/ingestion/examples/sample_data/topics/topics.json index 982684014a0..982abee04d5 100644 --- a/ingestion/examples/sample_data/topics/topics.json +++ b/ingestion/examples/sample_data/topics/topics.json @@ -98,6 +98,15 @@ "cleanupPolicies": ["compact","delete"], "schemaType": "Protobuf", "schemaText": "syntax = \"proto2\";\n\npackage tutorial;\n\nmessage Person {\n optional string name = 1;\n optional int32 id = 2;\n optional string email = 3;\n\n enum PhoneType {\n MOBILE = 0;\n HOME = 1;\n WORK = 2;\n }\n\n message PhoneNumber {\n optional string number = 1;\n optional PhoneType type = 2 [default = HOME];\n }\n\n repeated PhoneNumber phones = 4;\n}\n\nmessage AddressBook {\n repeated Person people = 1;\n}" + }, + { + "name": "customer_contacts", + "description": "Kafka topic to capture the customer contacts such as email, phone", + "partitions": 56, + "retentionSize": 455858109, + "replicationFactor":2, + "maximumMessageSize":167, + "cleanupPolicies": ["delete"] } ] } \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/models/es_documents.py b/ingestion/src/metadata/ingestion/models/es_documents.py index b8d8bdc92f3..3dcd4ae6bb5 100644 --- a/ingestion/src/metadata/ingestion/models/es_documents.py +++ b/ingestion/src/metadata/ingestion/models/es_documents.py @@ -22,6 +22,7 @@ from metadata.generated.schema.entity.data.mlmodel import ( ) from metadata.generated.schema.entity.data.pipeline import Task from metadata.generated.schema.entity.data.table import Column +from metadata.generated.schema.type import schema from metadata.generated.schema.type.entityReference import ( EntityReference, EntityReferenceList, @@ -96,7 +97,7 @@ class TopicESDocument(BaseModel): href: Optional[str] deleted: bool service: EntityReference - serviceType: str + messageSchema: Optional[schema.Topic] = None schemaText: Optional[str] = None schemaType: Optional[str] = None cleanupPolicies: List[str] = None diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py index 60931aa7e60..f9cac789dbc 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/data_insight_mixin.py @@ -33,7 +33,7 @@ from metadata.generated.schema.dataInsight.kpi.basic import KpiResult from metadata.generated.schema.dataInsight.kpi.kpi import Kpi -class DataInisghtMixin: +class DataInsightMixin: """data insight mixin used to write results""" def add_data_insight_report_data(self, record: ReportData) -> ReportData: diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index f59382810da..7d664bb55cb 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -73,7 +73,7 @@ from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.auth_provider import AuthenticationProvider from metadata.ingestion.ometa.client import REST, APIError, ClientConfig from metadata.ingestion.ometa.mixins.dashboard_mixin import OMetaDashboardMixin -from metadata.ingestion.ometa.mixins.data_insight_mixin import DataInisghtMixin +from metadata.ingestion.ometa.mixins.data_insight_mixin import DataInsightMixin from metadata.ingestion.ometa.mixins.es_mixin import ESMixin from metadata.ingestion.ometa.mixins.glossary_mixin import GlossaryMixin from metadata.ingestion.ometa.mixins.ingestion_pipeline_mixin import ( @@ -140,7 +140,7 @@ class OpenMetadata( OMetaDashboardMixin, OMetaPatchMixin, OMetaTestsMixin, - DataInisghtMixin, + DataInsightMixin, OMetaIngestionPipelineMixin, OMetaUserMixin, Generic[T, C], diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 3c573d1a8e9..0fb98262a86 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -553,8 +553,7 @@ class ElasticsearchSink(Sink[Entity]): deleted=topic.deleted, service=topic.service, serviceType=str(topic.serviceType.name), - schemaText=topic.schemaText, - schemaType=str(topic.schemaType.name), + messageSchema=topic.messageSchema, cleanupPolicies=[str(policy.name) for policy in topic.cleanupPolicies], replicationFactor=topic.replicationFactor, maximumMessageSize=topic.maximumMessageSize, diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/topic_search_index_mapping.py b/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/topic_search_index_mapping.py index 9f68894b15c..d08df31dc5b 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/topic_search_index_mapping.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch_mapping/topic_search_index_mapping.py @@ -82,11 +82,56 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( "href": { "type": "text" }, - "schemaText": { - "type": "text" - }, - "schemaType": { - "type": "text" + "messageSchema": { + "properties": { + "schemaType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "schemaFields": { + "properties": { + "name": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "dataType": { + "type": "text" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "fullyQualifiedName": { + "type": "text" + }, + "tags": { + "properties": { + "tagFQN": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + } + } + } + } }, "cleanupPolicies": { "type": "keyword" diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index 75a3e4b0654..1fbd98dd43e 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -675,15 +675,6 @@ class SampleDataSource( topic["service"] = EntityReference( id=self.kafka_service.id, type="messagingService" ) - schema_type = topic["schemaType"].lower() - - load_parser_fn = schema_parser_config_registry.registry.get(schema_type) - if not load_parser_fn: - raise InvalidSchemaTypeException( - f"Cannot find {schema_type} in parser providers registry." - ) - schema_fields = load_parser_fn(topic["name"], topic["schemaText"]) - create_topic = CreateTopicRequest( name=topic["name"], description=topic["description"], @@ -692,15 +683,26 @@ class SampleDataSource( replicationFactor=topic["replicationFactor"], maximumMessageSize=topic["maximumMessageSize"], cleanupPolicies=topic["cleanupPolicies"], - messageSchema=Topic( - schemaText=topic["schemaText"], - schemaType=topic["schemaType"], - schemaFields=schema_fields, - ), service=EntityReference( id=self.kafka_service.id, type="messagingService" ), ) + + if "schemaType" in topic: + schema_type = topic["schemaType"].lower() + load_parser_fn = schema_parser_config_registry.registry.get(schema_type) + if not load_parser_fn: + raise InvalidSchemaTypeException( + f"Cannot find {schema_type} in parser providers registry." + ) + schema_fields = load_parser_fn(topic["name"], topic["schemaText"]) + + create_topic.messageSchema = Topic( + schemaText=topic["schemaText"], + schemaType=topic["schemaType"], + schemaFields=schema_fields, + ) + self.status.scanned("topic", create_topic.name.__root__) yield create_topic @@ -913,7 +915,6 @@ class SampleDataSource( fqn=table_profile["fqn"], ) for days, profile in enumerate(table_profile["profile"]): - yield OMetaTableProfileSampleData( table=table, profile=CreateTableProfileRequest( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java index f7f704e9504..7e2011a178e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java @@ -241,15 +241,15 @@ public class TopicRepository extends EntityRepository { recordChange("retentionSize", original.getRetentionSize(), updated.getRetentionSize()); if (updated.getMessageSchema() != null) { recordChange( - "schema.schemaText", + "messageSchema.schemaText", original.getMessageSchema().getSchemaText(), updated.getMessageSchema().getSchemaText()); recordChange( - "schema.schemaType", + "messageSchema.schemaType", original.getMessageSchema().getSchemaType(), updated.getMessageSchema().getSchemaType()); updateSchemaFields( - "schemaFields", + "messageSchema.schemaFields", original.getMessageSchema().getSchemaFields(), updated.getMessageSchema().getSchemaFields(), EntityUtil.schemaFieldMatch); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java index abbc4947bcf..7ba77ee1f5e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java @@ -51,6 +51,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.MatchNoneQueryBuilder; import org.elasticsearch.index.query.Operator; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -397,20 +398,21 @@ public class SearchResource { } private SearchSourceBuilder buildTableSearchBuilder(String query, int from, int size) { - FieldValueFactorFunctionBuilder boostScoreBuilder = - ScoreFunctionBuilders.fieldValueFactorFunction("usageSummary.weeklyStats.count").missing(1).factor(2); - FunctionScoreQueryBuilder.FilterFunctionBuilder[] functions = - new FunctionScoreQueryBuilder.FilterFunctionBuilder[1]; - functions[0] = new FunctionScoreQueryBuilder.FilterFunctionBuilder(boostScoreBuilder); QueryStringQueryBuilder queryStringBuilder = QueryBuilders.queryStringQuery(query) - .field(FIELD_DISPLAY_NAME, 10.0f) + .field(FIELD_DISPLAY_NAME, 20.0f) .field(FIELD_DESCRIPTION, 2.0f) .field("columns.name", 2.0f) .field("columns.description", 1.0f) .field("columns.children.name", 2.0f) .defaultOperator(Operator.AND) .fuzziness(Fuzziness.AUTO); + FieldValueFactorFunctionBuilder boostScoreBuilder = + ScoreFunctionBuilders.fieldValueFactorFunction("usageSummary.weeklyStats.count").missing(1).factor(4); + FunctionScoreQueryBuilder.FilterFunctionBuilder[] functions = + new FunctionScoreQueryBuilder.FilterFunctionBuilder[] { + new FunctionScoreQueryBuilder.FilterFunctionBuilder(new MatchNoneQueryBuilder(), boostScoreBuilder) + }; FunctionScoreQueryBuilder queryBuilder = QueryBuilders.functionScoreQuery(queryStringBuilder, functions); HighlightBuilder.Field highlightTableName = new HighlightBuilder.Field(FIELD_DISPLAY_NAME); highlightTableName.highlighterType(UNIFIED); @@ -444,6 +446,9 @@ public class SearchResource { QueryBuilders.queryStringQuery(query) .field(FIELD_DISPLAY_NAME, 10.0f) .field(FIELD_DESCRIPTION, 2.0f) + .field("messageSchema.schemaFields.name", 2.0f) + .field("messageSchema.schemaFields.description", 1.0f) + .field("messageSchema.schemaFields.children.name", 2.0f) .defaultOperator(Operator.AND) .fuzziness(Fuzziness.AUTO); HighlightBuilder.Field highlightTopicName = new HighlightBuilder.Field(FIELD_DISPLAY_NAME); @@ -453,7 +458,11 @@ public class SearchResource { HighlightBuilder hb = new HighlightBuilder(); hb.field(highlightDescription); hb.field(highlightTopicName); + hb.field(new HighlightBuilder.Field("messageSchema.schemaFields.description").highlighterType(UNIFIED)); + hb.field(new HighlightBuilder.Field("messageSchema.schemaFields.children.name").highlighterType(UNIFIED)); SearchSourceBuilder searchSourceBuilder = searchBuilder(queryBuilder, hb, from, size); + searchSourceBuilder.aggregation( + AggregationBuilders.terms("messageSchema.schemaFields.name").field("messageSchema.schemaFields.name")); return addAggregation(searchSourceBuilder); } @@ -462,7 +471,7 @@ public class SearchResource { QueryBuilders.queryStringQuery(query) .field(FIELD_DISPLAY_NAME, 10.0f) .field(FIELD_DESCRIPTION, 2.0f) - .field("chars.name", 2.0f) + .field("charts.name", 2.0f) .field("charts.description") .defaultOperator(Operator.AND) .fuzziness(Fuzziness.AUTO); @@ -552,7 +561,6 @@ public class SearchResource { .aggregation(AggregationBuilders.terms("serviceType").field("serviceType").size(MAX_AGGREGATE_SIZE)) .aggregation( AggregationBuilders.terms("service.name.keyword").field("service.name.keyword").size(MAX_AGGREGATE_SIZE)) - .aggregation(AggregationBuilders.terms("service.type").field("service.type").size(MAX_AGGREGATE_SIZE)) .aggregation(AggregationBuilders.terms("entityType").field("entityType").size(MAX_AGGREGATE_SIZE)) .aggregation(AggregationBuilders.terms("tier.tagFQN").field("tier.tagFQN")) .aggregation(AggregationBuilders.terms("tags.tagFQN").field("tags.tagFQN").size(MAX_AGGREGATE_SIZE)); diff --git a/openmetadata-service/src/main/resources/elasticsearch/topic_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/topic_index_mapping.json index 92dd18941b1..a2d6e04c46e 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/topic_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/topic_index_mapping.json @@ -66,11 +66,56 @@ "href": { "type": "text" }, - "schemaText": { - "type": "text" - }, - "schemaType": { - "type": "text" + "messageSchema": { + "properties": { + "schemaType": { + "type": "keyword", + "normalizer": "lowercase_normalizer" + }, + "schemaFields": { + "properties": { + "name": { + "type": "keyword", + "normalizer": "lowercase_normalizer", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "dataType": { + "type": "text" + }, + "description": { + "type": "text", + "analyzer": "om_analyzer" + }, + "fullyQualifiedName": { + "type": "text" + }, + "tags": { + "properties": { + "tagFQN": { + "type": "keyword" + }, + "labelType": { + "type": "keyword" + }, + "description": { + "type": "text" + }, + "source": { + "type": "keyword" + }, + "state": { + "type": "keyword" + } + } + } + } + } + } }, "cleanupPolicies": { "type": "keyword" diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/topics/TopicResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/topics/TopicResourceTest.java index 5b573d63d3e..715f5f68b24 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/topics/TopicResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/topics/TopicResourceTest.java @@ -152,7 +152,7 @@ public class TopicResourceTest extends EntityResourceTest { fieldUpdated(change, "replicationFactor", 1, 2); fieldUpdated(change, "retentionTime", 1.0, 2.0); fieldUpdated(change, "retentionSize", 1.0, 2.0); - fieldUpdated(change, "schema.schemaText", "abc", "bcd"); + fieldUpdated(change, "messageSchema.schemaText", "abc", "bcd"); fieldDeleted(change, "cleanupPolicies", List.of(CleanupPolicy.COMPACT)); fieldAdded(change, "cleanupPolicies", List.of(CleanupPolicy.DELETE));