diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java index 1d2555f4879..a3755fe6fe3 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/Entity.java @@ -80,6 +80,7 @@ public final class Entity { public static final String GLOSSARY = "glossary"; public static final String GLOSSARY_TERM = "glossaryTerm"; public static final String THREAD = "thread"; + public static final String TAG = "tag"; // // Policies diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java index 7afbc71c5f3..b33746731a2 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchEventPublisher.java @@ -34,6 +34,7 @@ import org.elasticsearch.xcontent.XContentType; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType; import org.openmetadata.catalog.entity.data.Dashboard; +import org.openmetadata.catalog.entity.data.GlossaryTerm; import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.data.Topic; @@ -92,6 +93,9 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { case Entity.TEAM: updateRequest = updateTeam(event); break; + case Entity.GLOSSARY_TERM: + updateRequest = updateGlossaryTerm(event); + break; default: LOG.warn("Ignoring Entity Type {}", entityType); } @@ -355,6 +359,33 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher { return updateRequest; } + private UpdateRequest updateGlossaryTerm(ChangeEvent event) throws IOException { + UpdateRequest updateRequest = + new UpdateRequest(ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, event.getEntityId().toString()); + GlossaryTermESIndex glossaryESIndex = null; + if (event.getEntity() != null && event.getEventType() != EventType.ENTITY_SOFT_DELETED) { + GlossaryTerm glossaryTerm = (GlossaryTerm) event.getEntity(); + glossaryESIndex = GlossaryTermESIndex.builder(glossaryTerm, event.getEventType()).build(); + } + switch (event.getEventType()) { + case ENTITY_CREATED: + String json = JsonUtils.pojoToJson(glossaryESIndex); + updateRequest.doc(json, XContentType.JSON); + updateRequest.docAsUpsert(true); + break; + case ENTITY_UPDATED: + scriptedUpsert(glossaryESIndex, updateRequest); + break; + case ENTITY_SOFT_DELETED: + softDeleteEntity(updateRequest); + break; + case ENTITY_DELETED: + break; + } + + return updateRequest; + } + private void scriptedUpsert(Object index, UpdateRequest updateRequest) { String scriptTxt = "for (k in params.keySet()) {if (k == 'change_descriptions') " diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java index f52ed6152db..c21b8125a27 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/elasticsearch/ElasticSearchIndexDefinition.java @@ -31,6 +31,7 @@ import org.elasticsearch.client.indices.PutMappingRequest; import org.elasticsearch.xcontent.XContentType; import org.openmetadata.catalog.Entity; import org.openmetadata.catalog.entity.data.Dashboard; +import org.openmetadata.catalog.entity.data.GlossaryTerm; import org.openmetadata.catalog.entity.data.Pipeline; import org.openmetadata.catalog.entity.data.Table; import org.openmetadata.catalog.entity.data.Topic; @@ -67,7 +68,8 @@ public class ElasticSearchIndexDefinition { DASHBOARD_SEARCH_INDEX("dashboard_search_index", "/elasticsearch/dashboard_index_mapping.json"), PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"), USER_SEARCH_INDEX("user_search_index", "/elasticsearch/user_index_mapping.json"), - TEAM_SEARCH_INDEX("team_search_index", "/elasticsearch/team_index_mapping.json"); + TEAM_SEARCH_INDEX("team_search_index", "/elasticsearch/team_index_mapping.json"), + GLOSSARY_SEARCH_INDEX("glossary_search_index", "/elasticsearch/glossary_index_mapping.json"); public final String indexName; public final String indexMappingFile; @@ -197,6 +199,8 @@ public class ElasticSearchIndexDefinition { return ElasticSearchIndexType.USER_SEARCH_INDEX; } else if (type.equalsIgnoreCase(Entity.TEAM)) { return ElasticSearchIndexType.TEAM_SEARCH_INDEX; + } else if (type.equalsIgnoreCase(Entity.GLOSSARY)) { + return ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX; } throw new RuntimeException("Failed to find index doc for type " + type); } @@ -850,3 +854,59 @@ class TeamESIndex { return teamESIndexBuilder; } } + +@EqualsAndHashCode(callSuper = true) +@Getter +@SuperBuilder(builderMethodName = "internalBuilder") +@Value +@JsonInclude(JsonInclude.Include.NON_NULL) +class GlossaryTermESIndex extends ElasticSearchIndex { + @JsonProperty("glossary_term_id") + String glossaryTermId; + + @JsonProperty("glossary_id") + String glossaryId; + + @JsonProperty("display_name") + String displayName; + + @JsonProperty("entity_type") + String entityType; + + @JsonProperty("status") + String status; + + @JsonProperty("glossary_name") + String glossaryName; + + public static GlossaryTermESIndexBuilder builder(GlossaryTerm glossaryTerm, EventType eventType) { + List tags = new ArrayList<>(); + List suggest = new ArrayList<>(); + suggest.add(ElasticSearchSuggest.builder().input(glossaryTerm.getName()).weight(5).build()); + suggest.add(ElasticSearchSuggest.builder().input(glossaryTerm.getDisplayName()).weight(10).build()); + + if (glossaryTerm.getTags() != null) { + glossaryTerm.getTags().forEach(tag -> tags.add(tag.getTagFQN())); + } + + Long updatedTimestamp = glossaryTerm.getUpdatedAt(); + ParseTags parseTags = new ParseTags(tags); + String description = glossaryTerm.getDescription() != null ? glossaryTerm.getDescription() : ""; + String displayName = glossaryTerm.getDisplayName() != null ? glossaryTerm.getDisplayName() : ""; + GlossaryTermESIndexBuilder builder = + internalBuilder() + .glossaryId(glossaryTerm.getGlossary().getId().toString()) + .name(glossaryTerm.getName()) + .displayName(displayName) + .description(description) + .fqdn(glossaryTerm.getName()) + .glossaryId(glossaryTerm.getGlossary().getId().toString()) + .glossaryName(glossaryTerm.getGlossary().getName()) + .lastUpdatedTimestamp(updatedTimestamp) + .entityType("glossaryTerm") + .suggest(suggest) + .tags(parseTags.tags); + + return builder; + } +} diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java index a8bec38416f..714aef7666a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java @@ -153,8 +153,8 @@ public class SearchResource { case "team_search_index": searchSourceBuilder = buildTeamSearchBuilder(query, from, size); break; - case "glossary_search_index": - searchSourceBuilder = buildGlossarySearchBuilder(query, from, size); + case "glossary_term_search_index": + searchSourceBuilder = buildGlossaryTermSearchBuilder(query, from, size); break; default: searchSourceBuilder = buildAggregateSearchBuilder(query, from, size); @@ -384,9 +384,9 @@ public class SearchResource { return searchSourceBuilder; } - private SearchSourceBuilder buildGlossarySearchBuilder(String query, int from, int size) { + private SearchSourceBuilder buildGlossaryTermSearchBuilder(String query, int from, int size) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - HighlightBuilder.Field highlightGlossaryName = new HighlightBuilder.Field("glossary_name"); + HighlightBuilder.Field highlightGlossaryName = new HighlightBuilder.Field("name"); highlightGlossaryName.highlighterType("unified"); HighlightBuilder.Field highlightDescription = new HighlightBuilder.Field("description"); highlightDescription.highlighterType("unified"); @@ -396,9 +396,8 @@ public class SearchResource { hb.preTags(""); hb.postTags(""); searchSourceBuilder - .query(QueryBuilders.queryStringQuery(query).field("glossary_name", 5.0f).field("description").lenient(true)) + .query(QueryBuilders.queryStringQuery(query).field("name", 5.0f).field("description").lenient(true)) .aggregation(AggregationBuilders.terms("EntityType").field("entity_type")) - .aggregation(AggregationBuilders.terms("Tier").field("tier")) .aggregation(AggregationBuilders.terms("Tags").field("tags")) .highlighter(hb) .from(from) diff --git a/catalog-rest-service/src/main/resources/elasticsearch/glossary_index_mapping.json b/catalog-rest-service/src/main/resources/elasticsearch/glossary_index_mapping.json new file mode 100644 index 00000000000..7f64f7abf65 --- /dev/null +++ b/catalog-rest-service/src/main/resources/elasticsearch/glossary_index_mapping.json @@ -0,0 +1,38 @@ +{ + "properties": { + "name": { + "type": "text" + }, + "display_name": { + "type": "text" + }, + "owner": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "glossary_name": { + "type": "keyword" + }, + "glossary_id": { + "type": "keyword" + }, + "status": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "entity_type": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + } + } +} \ No newline at end of file diff --git a/ingestion/.idea/misc.xml b/ingestion/.idea/misc.xml index eaacc716027..14631b7f6a9 100644 --- a/ingestion/.idea/misc.xml +++ b/ingestion/.idea/misc.xml @@ -1,4 +1,7 @@ + + \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index df9ecf36e8b..6c9664ff075 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -189,6 +189,24 @@ class TeamESDocument(BaseModel): doc_as_upsert: bool = True +class GlossaryTermESDocument(BaseModel): + """Elastic Search Mapping doc for Glossary Term""" + + glossary_term_id: str + deleted: bool + entity_type: str = "glossaryTerm" + name: str + display_name: str + description: str + glossary_name: str + glossary_id: str + status: str + suggest: List[dict] + last_updated_timestamp: Optional[int] + + doc_as_upsert: bool = True + + class DashboardOwner(BaseModel): """Dashboard owner""" diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/glossary_term_mixing.py b/ingestion/src/metadata/ingestion/ometa/mixins/glossary_term_mixing.py new file mode 100644 index 00000000000..2713aebc1aa --- /dev/null +++ b/ingestion/src/metadata/ingestion/ometa/mixins/glossary_term_mixing.py @@ -0,0 +1,24 @@ +""" +Mixin class containing Glossary Term specific methods + +To be used be OpenMetadata +""" +import logging +from typing import Type, TypeVar + +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) # pylint: disable=invalid-name +logger = logging.getLogger(__name__) + + +class GlossaryTermMixin: + def create_glossary_term(self, entity: Type[T], glossary_term_body): + """Method to create new Glossary Term + Args: + glossary_term_body (Glossary): body of the request + """ + resp = self.client.put( + path=self.get_suffix(entity), data=glossary_term_body.json() + ) + logger.info(f"Created a Glossary Term: {resp}") diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 652507c23f7..2d296ffb845 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -26,6 +26,7 @@ from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.glossary import Glossary +from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.metrics import Metrics from metadata.generated.schema.entity.data.mlmodel import MlModel @@ -233,6 +234,9 @@ class OpenMetadata( if issubclass(entity, Glossary): return "/glossaries" + if issubclass(entity, GlossaryTerm): + return "/glossaryTerms" + if issubclass(entity, get_args(Union[Role, self.get_create_entity_type(Role)])): return "/roles" diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index b24713bac8f..277cc5a1f23 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -24,6 +24,7 @@ from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm from metadata.generated.schema.entity.data.pipeline import Pipeline, Task from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.data.topic import Topic @@ -39,6 +40,7 @@ from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.models.table_metadata import ( ChangeDescription, DashboardESDocument, + GlossaryTermESDocument, PipelineESDocument, TableESDocument, TeamESDocument, @@ -49,6 +51,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.sink.elasticsearch_constants import ( DASHBOARD_ELASTICSEARCH_INDEX_MAPPING, + GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING, PIPELINE_ELASTICSEARCH_INDEX_MAPPING, TABLE_ELASTICSEARCH_INDEX_MAPPING, TEAM_ELASTICSEARCH_INDEX_MAPPING, @@ -74,12 +77,14 @@ class ElasticSearchConfig(ConfigModel): index_pipelines: Optional[bool] = True index_users: Optional[bool] = True index_teams: Optional[bool] = True + index_glossary_terms: Optional[bool] = True table_index_name: str = "table_search_index" topic_index_name: str = "topic_search_index" dashboard_index_name: str = "dashboard_search_index" pipeline_index_name: str = "pipeline_search_index" user_index_name: str = "user_search_index" team_index_name: str = "team_search_index" + glossary_term_index_name: str = "glossary_term_search_index" scheme: str = "http" use_ssl: bool = False verify_certs: bool = False @@ -163,6 +168,12 @@ class ElasticsearchSink(Sink[Entity]): self.config.team_index_name, TEAM_ELASTICSEARCH_INDEX_MAPPING ) + if self.config.index_glossary_terms: + self._check_or_create_index( + self.config.glossary_term_index_name, + GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING, + ) + def _check_or_create_index(self, index_name: str, es_mapping: str): """ Retrieve all indices that currently have {elasticsearch_alias} alias @@ -229,6 +240,7 @@ class ElasticsearchSink(Sink[Entity]): if isinstance(record, User): user_doc = self._create_user_es_doc(record) + print(user_doc.json()) self.elasticsearch_client.index( index=self.config.user_index_name, id=str(user_doc.user_id), @@ -245,6 +257,15 @@ class ElasticsearchSink(Sink[Entity]): request_timeout=self.config.timeout, ) + if isinstance(record, GlossaryTerm): + glossary_term_doc = self._create_glossary_term_es_doc(record) + self.elasticsearch_client.index( + index=self.config.glossary_term_index_name, + id=str(glossary_term_doc.glossary_term_id), + body=glossary_term_doc.json(), + request_timeout=self.config.timeout, + ) + if hasattr(record.name, "__root__"): self.status.records_written(record.name.__root__) else: @@ -481,8 +502,9 @@ class ElasticsearchSink(Sink[Entity]): return pipeline_doc def _create_user_es_doc(self, user: User): + display_name = user.displayName if user.displayName else user.name.__root__ suggest = [ - {"input": [user.displayName], "weight": 5}, + {"input": [display_name], "weight": 5}, {"input": [user.name], "weight": 10}, ] timestamp = user.updatedAt.__root__ @@ -500,7 +522,7 @@ class ElasticsearchSink(Sink[Entity]): user_id=str(user.id.__root__), deleted=user.deleted, name=user.name.__root__, - display_name=user.displayName, + display_name=display_name, email=user.email.__root__, suggest=suggest, last_updated_timestamp=timestamp, @@ -539,6 +561,28 @@ class ElasticsearchSink(Sink[Entity]): return team_doc + def _create_glossary_term_es_doc(self, glossary_term: GlossaryTerm): + suggest = [ + {"input": [glossary_term.displayName], "weight": 5}, + {"input": [glossary_term.name], "weight": 10}, + ] + timestamp = glossary_term.updatedAt.__root__ + + glossary_term_doc = GlossaryTermESDocument( + glossary_term_id=str(glossary_term.id.__root__), + deleted=glossary_term.deleted, + name=glossary_term.name.__root__, + display_name=glossary_term.displayName, + description=glossary_term.description, + glossary_id=str(glossary_term.glossary.id.__root__), + glossary_name=glossary_term.glossary.name, + status=glossary_term.status.name, + suggest=suggest, + last_updated_timestamp=timestamp, + ) + + return glossary_term_doc + def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]): charts = [] if chart_refs: diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py index e696b190690..24b8290c2c8 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py @@ -465,3 +465,48 @@ TEAM_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( } """ ) + +GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( + """ + { + "mappings": { + "properties": { + "name": { + "type": "text" + }, + "display_name": { + "type": "text" + }, + "owner": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "glossary_name": { + "type": "keyword" + }, + "glossary_id": { + "type": "keyword" + }, + "status": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "entity_type": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + } + } + } + } + """ +) diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index aa58b86e321..153e9e79538 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -16,6 +16,7 @@ from typing import Iterable, List, Optional from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic @@ -38,6 +39,7 @@ class MetadataTablesRestSourceConfig(ConfigModel): include_pipelines: Optional[bool] = True include_users: Optional[bool] = True include_teams: Optional[bool] = True + include_glossary_terms: Optional[bool] = True limit_records: int = 1000 @@ -100,6 +102,15 @@ class MetadataSourceStatus(SourceStatus): self.success.append(user_name) logger.info("User Scanned: %s", user_name) + def scanned_glossary_term(self, glossary_term: str) -> None: + """scanned glossary method + + Args: + glossary_term (str) + """ + self.success.append(glossary_term) + logger.info("Glossary Term Scanned: %s", glossary_term) + # pylint: disable=unused-argument def filtered( self, table_name: str, err: str, dataset_name: str = None, col_type: str = None @@ -169,6 +180,7 @@ class MetadataSource(Source[Entity]): yield from self.fetch_pipeline() yield from self.fetch_users() yield from self.fetch_teams() + yield from self.fetch_glossary_terms() def fetch_table(self) -> Table: """Fetch table method @@ -315,6 +327,28 @@ class MetadataSource(Source[Entity]): break after = team_entities.after + def fetch_glossary_terms(self) -> GlossaryTerm: + """fetch glossary terms method + + Returns: + GlossaryTerm: + """ + if self.config.include_glossary_terms: + after = None + while True: + glossary_term_entities = self.metadata.list_entities( + entity=GlossaryTerm, + fields=[], + after=after, + limit=self.config.limit_records, + ) + for glossary_term in glossary_term_entities.entities: + self.status.scanned_team(glossary_term.name) + yield glossary_term + if glossary_term_entities.after is None: + break + after = glossary_term_entities.after + def get_status(self) -> SourceStatus: return self.status