diff --git a/ingestion/src/metadata/ingestion/models/es_documents.py b/ingestion/src/metadata/ingestion/models/es_documents.py index 3dcd4ae6bb5..f3dd4cd958b 100644 --- a/ingestion/src/metadata/ingestion/models/es_documents.py +++ b/ingestion/src/metadata/ingestion/models/es_documents.py @@ -97,9 +97,8 @@ class TopicESDocument(BaseModel): href: Optional[str] deleted: bool service: EntityReference - messageSchema: Optional[schema.Topic] = None schemaText: Optional[str] = None - schemaType: Optional[str] = None + schemaType: Optional[schema.SchemaType] = None cleanupPolicies: List[str] = None replicationFactor: Optional[int] = None maximumMessageSize: Optional[int] = None diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 2a1a2781a96..3f4e80c22af 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -556,7 +556,8 @@ class ElasticsearchSink(Sink[Entity]): deleted=topic.deleted, service=topic.service, serviceType=str(topic.serviceType.name), - messageSchema=topic.messageSchema, + schemaText=topic.messageSchema.schemaText if topic.messageSchema else None, + schemaType=topic.messageSchema.schemaType if topic.messageSchema else None, cleanupPolicies=[str(policy.name) for policy in topic.cleanupPolicies], replicationFactor=topic.replicationFactor, maximumMessageSize=topic.maximumMessageSize, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/TopicIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/TopicIndex.java index e08058a3e44..3219b695577 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/TopicIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/elasticsearch/TopicIndex.java @@ -9,7 +9,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.util.JsonUtils; public class TopicIndex implements ElasticSearchIndex { - final List excludeTopicFields = List.of("sampleData", "changeDescription"); + final List excludeTopicFields = List.of("sampleData", "changeDescription", "messageSchema"); final Topic topic; public TopicIndex(Topic topic) { @@ -37,6 +37,8 @@ public class TopicIndex implements ElasticSearchIndex { doc.put("service_suggest", serviceSuggest); doc.put("entityType", Entity.TOPIC); doc.put("serviceType", topic.getServiceType()); + doc.put("schemaText", topic.getMessageSchema() != null ? topic.getMessageSchema().getSchemaText() : null); + doc.put("schemaType", topic.getMessageSchema() != null ? topic.getMessageSchema().getSchemaType() : null); return doc; } }