Fix #2991: Add ES support for glossary (#2993)

This commit is contained in:
Sriharsha Chintalapani 2022-02-25 20:53:53 -08:00 committed by GitHub
parent 990608522a
commit fa62a1e4e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 310 additions and 9 deletions

View File

@ -80,6 +80,7 @@ public final class Entity {
public static final String GLOSSARY = "glossary";
public static final String GLOSSARY_TERM = "glossaryTerm";
public static final String THREAD = "thread";
public static final String TAG = "tag";
//
// Policies

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xcontent.XContentType;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.elasticsearch.ElasticSearchIndexDefinition.ElasticSearchIndexType;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.GlossaryTerm;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
@ -92,6 +93,9 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
case Entity.TEAM:
updateRequest = updateTeam(event);
break;
case Entity.GLOSSARY_TERM:
updateRequest = updateGlossaryTerm(event);
break;
default:
LOG.warn("Ignoring Entity Type {}", entityType);
}
@ -355,6 +359,33 @@ public class ElasticSearchEventPublisher extends AbstractEventPublisher {
return updateRequest;
}
private UpdateRequest updateGlossaryTerm(ChangeEvent event) throws IOException {
UpdateRequest updateRequest =
new UpdateRequest(ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, event.getEntityId().toString());
GlossaryTermESIndex glossaryESIndex = null;
if (event.getEntity() != null && event.getEventType() != EventType.ENTITY_SOFT_DELETED) {
GlossaryTerm glossaryTerm = (GlossaryTerm) event.getEntity();
glossaryESIndex = GlossaryTermESIndex.builder(glossaryTerm, event.getEventType()).build();
}
switch (event.getEventType()) {
case ENTITY_CREATED:
String json = JsonUtils.pojoToJson(glossaryESIndex);
updateRequest.doc(json, XContentType.JSON);
updateRequest.docAsUpsert(true);
break;
case ENTITY_UPDATED:
scriptedUpsert(glossaryESIndex, updateRequest);
break;
case ENTITY_SOFT_DELETED:
softDeleteEntity(updateRequest);
break;
case ENTITY_DELETED:
break;
}
return updateRequest;
}
private void scriptedUpsert(Object index, UpdateRequest updateRequest) {
String scriptTxt =
"for (k in params.keySet()) {if (k == 'change_descriptions') "

View File

@ -31,6 +31,7 @@ import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.xcontent.XContentType;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Dashboard;
import org.openmetadata.catalog.entity.data.GlossaryTerm;
import org.openmetadata.catalog.entity.data.Pipeline;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.entity.data.Topic;
@ -67,7 +68,8 @@ public class ElasticSearchIndexDefinition {
DASHBOARD_SEARCH_INDEX("dashboard_search_index", "/elasticsearch/dashboard_index_mapping.json"),
PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"),
USER_SEARCH_INDEX("user_search_index", "/elasticsearch/user_index_mapping.json"),
TEAM_SEARCH_INDEX("team_search_index", "/elasticsearch/team_index_mapping.json");
TEAM_SEARCH_INDEX("team_search_index", "/elasticsearch/team_index_mapping.json"),
GLOSSARY_SEARCH_INDEX("glossary_search_index", "/elasticsearch/glossary_index_mapping.json");
public final String indexName;
public final String indexMappingFile;
@ -197,6 +199,8 @@ public class ElasticSearchIndexDefinition {
return ElasticSearchIndexType.USER_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.TEAM)) {
return ElasticSearchIndexType.TEAM_SEARCH_INDEX;
} else if (type.equalsIgnoreCase(Entity.GLOSSARY)) {
return ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX;
}
throw new RuntimeException("Failed to find index doc for type " + type);
}
@ -850,3 +854,59 @@ class TeamESIndex {
return teamESIndexBuilder;
}
}
@EqualsAndHashCode(callSuper = true)
@Getter
@SuperBuilder(builderMethodName = "internalBuilder")
@Value
@JsonInclude(JsonInclude.Include.NON_NULL)
class GlossaryTermESIndex extends ElasticSearchIndex {
@JsonProperty("glossary_term_id")
String glossaryTermId;
@JsonProperty("glossary_id")
String glossaryId;
@JsonProperty("display_name")
String displayName;
@JsonProperty("entity_type")
String entityType;
@JsonProperty("status")
String status;
@JsonProperty("glossary_name")
String glossaryName;
public static GlossaryTermESIndexBuilder builder(GlossaryTerm glossaryTerm, EventType eventType) {
List<String> tags = new ArrayList<>();
List<ElasticSearchSuggest> suggest = new ArrayList<>();
suggest.add(ElasticSearchSuggest.builder().input(glossaryTerm.getName()).weight(5).build());
suggest.add(ElasticSearchSuggest.builder().input(glossaryTerm.getDisplayName()).weight(10).build());
if (glossaryTerm.getTags() != null) {
glossaryTerm.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
Long updatedTimestamp = glossaryTerm.getUpdatedAt();
ParseTags parseTags = new ParseTags(tags);
String description = glossaryTerm.getDescription() != null ? glossaryTerm.getDescription() : "";
String displayName = glossaryTerm.getDisplayName() != null ? glossaryTerm.getDisplayName() : "";
GlossaryTermESIndexBuilder builder =
internalBuilder()
.glossaryId(glossaryTerm.getGlossary().getId().toString())
.name(glossaryTerm.getName())
.displayName(displayName)
.description(description)
.fqdn(glossaryTerm.getName())
.glossaryId(glossaryTerm.getGlossary().getId().toString())
.glossaryName(glossaryTerm.getGlossary().getName())
.lastUpdatedTimestamp(updatedTimestamp)
.entityType("glossaryTerm")
.suggest(suggest)
.tags(parseTags.tags);
return builder;
}
}

View File

@ -153,8 +153,8 @@ public class SearchResource {
case "team_search_index":
searchSourceBuilder = buildTeamSearchBuilder(query, from, size);
break;
case "glossary_search_index":
searchSourceBuilder = buildGlossarySearchBuilder(query, from, size);
case "glossary_term_search_index":
searchSourceBuilder = buildGlossaryTermSearchBuilder(query, from, size);
break;
default:
searchSourceBuilder = buildAggregateSearchBuilder(query, from, size);
@ -384,9 +384,9 @@ public class SearchResource {
return searchSourceBuilder;
}
private SearchSourceBuilder buildGlossarySearchBuilder(String query, int from, int size) {
private SearchSourceBuilder buildGlossaryTermSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder.Field highlightGlossaryName = new HighlightBuilder.Field("glossary_name");
HighlightBuilder.Field highlightGlossaryName = new HighlightBuilder.Field("name");
highlightGlossaryName.highlighterType("unified");
HighlightBuilder.Field highlightDescription = new HighlightBuilder.Field("description");
highlightDescription.highlighterType("unified");
@ -396,9 +396,8 @@ public class SearchResource {
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder
.query(QueryBuilders.queryStringQuery(query).field("glossary_name", 5.0f).field("description").lenient(true))
.query(QueryBuilders.queryStringQuery(query).field("name", 5.0f).field("description").lenient(true))
.aggregation(AggregationBuilders.terms("EntityType").field("entity_type"))
.aggregation(AggregationBuilders.terms("Tier").field("tier"))
.aggregation(AggregationBuilders.terms("Tags").field("tags"))
.highlighter(hb)
.from(from)

View File

@ -0,0 +1,38 @@
{
"properties": {
"name": {
"type": "text"
},
"display_name": {
"type": "text"
},
"owner": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"glossary_name": {
"type": "keyword"
},
"glossary_id": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
}
}
}

View File

@ -1,4 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8 (ingestion)" project-jdk-type="Python SDK" />
<component name="PythonCompatibilityInspectionAdvertiser">
<option name="version" value="3" />
</component>
</project>

View File

@ -189,6 +189,24 @@ class TeamESDocument(BaseModel):
doc_as_upsert: bool = True
class GlossaryTermESDocument(BaseModel):
"""Elastic Search Mapping doc for Glossary Term"""
glossary_term_id: str
deleted: bool
entity_type: str = "glossaryTerm"
name: str
display_name: str
description: str
glossary_name: str
glossary_id: str
status: str
suggest: List[dict]
last_updated_timestamp: Optional[int]
doc_as_upsert: bool = True
class DashboardOwner(BaseModel):
"""Dashboard owner"""

View File

@ -0,0 +1,24 @@
"""
Mixin class containing Glossary Term specific methods
To be used be OpenMetadata
"""
import logging
from typing import Type, TypeVar
from pydantic import BaseModel
T = TypeVar("T", bound=BaseModel) # pylint: disable=invalid-name
logger = logging.getLogger(__name__)
class GlossaryTermMixin:
def create_glossary_term(self, entity: Type[T], glossary_term_body):
"""Method to create new Glossary Term
Args:
glossary_term_body (Glossary): body of the request
"""
resp = self.client.put(
path=self.get_suffix(entity), data=glossary_term_body.json()
)
logger.info(f"Created a Glossary Term: {resp}")

View File

@ -26,6 +26,7 @@ from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.metrics import Metrics
from metadata.generated.schema.entity.data.mlmodel import MlModel
@ -233,6 +234,9 @@ class OpenMetadata(
if issubclass(entity, Glossary):
return "/glossaries"
if issubclass(entity, GlossaryTerm):
return "/glossaryTerms"
if issubclass(entity, get_args(Union[Role, self.get_create_entity_type(Role)])):
return "/roles"

View File

@ -24,6 +24,7 @@ from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.data.topic import Topic
@ -39,6 +40,7 @@ from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.table_metadata import (
ChangeDescription,
DashboardESDocument,
GlossaryTermESDocument,
PipelineESDocument,
TableESDocument,
TeamESDocument,
@ -49,6 +51,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.sink.elasticsearch_constants import (
DASHBOARD_ELASTICSEARCH_INDEX_MAPPING,
GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING,
PIPELINE_ELASTICSEARCH_INDEX_MAPPING,
TABLE_ELASTICSEARCH_INDEX_MAPPING,
TEAM_ELASTICSEARCH_INDEX_MAPPING,
@ -74,12 +77,14 @@ class ElasticSearchConfig(ConfigModel):
index_pipelines: Optional[bool] = True
index_users: Optional[bool] = True
index_teams: Optional[bool] = True
index_glossary_terms: Optional[bool] = True
table_index_name: str = "table_search_index"
topic_index_name: str = "topic_search_index"
dashboard_index_name: str = "dashboard_search_index"
pipeline_index_name: str = "pipeline_search_index"
user_index_name: str = "user_search_index"
team_index_name: str = "team_search_index"
glossary_term_index_name: str = "glossary_term_search_index"
scheme: str = "http"
use_ssl: bool = False
verify_certs: bool = False
@ -163,6 +168,12 @@ class ElasticsearchSink(Sink[Entity]):
self.config.team_index_name, TEAM_ELASTICSEARCH_INDEX_MAPPING
)
if self.config.index_glossary_terms:
self._check_or_create_index(
self.config.glossary_term_index_name,
GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING,
)
def _check_or_create_index(self, index_name: str, es_mapping: str):
"""
Retrieve all indices that currently have {elasticsearch_alias} alias
@ -229,6 +240,7 @@ class ElasticsearchSink(Sink[Entity]):
if isinstance(record, User):
user_doc = self._create_user_es_doc(record)
print(user_doc.json())
self.elasticsearch_client.index(
index=self.config.user_index_name,
id=str(user_doc.user_id),
@ -245,6 +257,15 @@ class ElasticsearchSink(Sink[Entity]):
request_timeout=self.config.timeout,
)
if isinstance(record, GlossaryTerm):
glossary_term_doc = self._create_glossary_term_es_doc(record)
self.elasticsearch_client.index(
index=self.config.glossary_term_index_name,
id=str(glossary_term_doc.glossary_term_id),
body=glossary_term_doc.json(),
request_timeout=self.config.timeout,
)
if hasattr(record.name, "__root__"):
self.status.records_written(record.name.__root__)
else:
@ -481,8 +502,9 @@ class ElasticsearchSink(Sink[Entity]):
return pipeline_doc
def _create_user_es_doc(self, user: User):
display_name = user.displayName if user.displayName else user.name.__root__
suggest = [
{"input": [user.displayName], "weight": 5},
{"input": [display_name], "weight": 5},
{"input": [user.name], "weight": 10},
]
timestamp = user.updatedAt.__root__
@ -500,7 +522,7 @@ class ElasticsearchSink(Sink[Entity]):
user_id=str(user.id.__root__),
deleted=user.deleted,
name=user.name.__root__,
display_name=user.displayName,
display_name=display_name,
email=user.email.__root__,
suggest=suggest,
last_updated_timestamp=timestamp,
@ -539,6 +561,28 @@ class ElasticsearchSink(Sink[Entity]):
return team_doc
def _create_glossary_term_es_doc(self, glossary_term: GlossaryTerm):
suggest = [
{"input": [glossary_term.displayName], "weight": 5},
{"input": [glossary_term.name], "weight": 10},
]
timestamp = glossary_term.updatedAt.__root__
glossary_term_doc = GlossaryTermESDocument(
glossary_term_id=str(glossary_term.id.__root__),
deleted=glossary_term.deleted,
name=glossary_term.name.__root__,
display_name=glossary_term.displayName,
description=glossary_term.description,
glossary_id=str(glossary_term.glossary.id.__root__),
glossary_name=glossary_term.glossary.name,
status=glossary_term.status.name,
suggest=suggest,
last_updated_timestamp=timestamp,
)
return glossary_term_doc
def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]):
charts = []
if chart_refs:

View File

@ -465,3 +465,48 @@ TEAM_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
}
"""
)
GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"""
{
"mappings": {
"properties": {
"name": {
"type": "text"
},
"display_name": {
"type": "text"
},
"owner": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"glossary_name": {
"type": "keyword"
},
"glossary_id": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
}
}
}
}
"""
)

View File

@ -16,6 +16,7 @@ from typing import Iterable, List, Optional
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
@ -38,6 +39,7 @@ class MetadataTablesRestSourceConfig(ConfigModel):
include_pipelines: Optional[bool] = True
include_users: Optional[bool] = True
include_teams: Optional[bool] = True
include_glossary_terms: Optional[bool] = True
limit_records: int = 1000
@ -100,6 +102,15 @@ class MetadataSourceStatus(SourceStatus):
self.success.append(user_name)
logger.info("User Scanned: %s", user_name)
def scanned_glossary_term(self, glossary_term: str) -> None:
"""scanned glossary method
Args:
glossary_term (str)
"""
self.success.append(glossary_term)
logger.info("Glossary Term Scanned: %s", glossary_term)
# pylint: disable=unused-argument
def filtered(
self, table_name: str, err: str, dataset_name: str = None, col_type: str = None
@ -169,6 +180,7 @@ class MetadataSource(Source[Entity]):
yield from self.fetch_pipeline()
yield from self.fetch_users()
yield from self.fetch_teams()
yield from self.fetch_glossary_terms()
def fetch_table(self) -> Table:
"""Fetch table method
@ -315,6 +327,28 @@ class MetadataSource(Source[Entity]):
break
after = team_entities.after
def fetch_glossary_terms(self) -> GlossaryTerm:
"""fetch glossary terms method
Returns:
GlossaryTerm:
"""
if self.config.include_glossary_terms:
after = None
while True:
glossary_term_entities = self.metadata.list_entities(
entity=GlossaryTerm,
fields=[],
after=after,
limit=self.config.limit_records,
)
for glossary_term in glossary_term_entities.entities:
self.status.scanned_team(glossary_term.name)
yield glossary_term
if glossary_term_entities.after is None:
break
after = glossary_term_entities.after
def get_status(self) -> SourceStatus:
return self.status