diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChartRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChartRepository.java index e0ec892a941..892731fe40f 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChartRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/ChartRepository.java @@ -256,7 +256,7 @@ public abstract class ChartRepository { private EntityReference getService(Chart chart) throws IOException { return chart == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(), - chart.getId()))); + chart.getId(), Entity.DASHBOARD_SERVICE))); } private EntityReference getService(EntityReference service) throws IOException { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO.java index 812be6f43a6..8b086ddd9f7 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/EntityRelationshipDAO.java @@ -53,6 +53,12 @@ public interface EntityRelationshipDAO { @RegisterMapper(FromEntityReferenceMapper.class) List findFrom(@Bind("toId") String toId, @Bind("relation") int relation); + @SqlQuery("SELECT fromId, fromEntity FROM entity_relationship WHERE toId = :toId AND relation = :relation AND " + + "fromEntity = :fromEntity ORDER BY fromId") + @RegisterMapper(FromEntityReferenceMapper.class) + List findFromEntity(@Bind("toId") String toId, @Bind("relation") int relation, + @Bind("fromEntity") String fromEntity); + @SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND toId = :toId AND relation = :relation") void delete(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("relation") int relation); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java index df8dbd0c734..bc5255ef061 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/search/SearchResource.java @@ -118,9 +118,12 @@ public class SearchResource { } if (index.equals("topic_search_index")) { searchSourceBuilder = buildTopicSearchBuilder(query, from, size); + } else if (index.equals("dashboard_search_index")) { + searchSourceBuilder = buildDashboardSearchBuilder(query, from, size); } else { searchSourceBuilder = buildTableSearchBuilder(query, from, size); } + if (sortFieldParam != null && !sortFieldParam.isEmpty()) { searchSourceBuilder.sort(sortFieldParam, sortOrder); } @@ -186,7 +189,7 @@ public class SearchResource { .field("column_names") .field("column_descriptions") .lenient(true)) - .aggregation(AggregationBuilders.terms("Service Type").field("service_type")) + .aggregation(AggregationBuilders.terms("Service").field("service_type")) .aggregation(AggregationBuilders.terms("Tier").field("tier")) .aggregation(AggregationBuilders.terms("Tags").field("tags")) .highlighter(hb) @@ -212,7 +215,35 @@ public class SearchResource { .field("topic_name", 5.0f) .field("description") .lenient(true)) - .aggregation(AggregationBuilders.terms("Service Type").field("service_type")) + .aggregation(AggregationBuilders.terms("Service").field("service_type")) + .aggregation(AggregationBuilders.terms("Tier").field("tier")) + .aggregation(AggregationBuilders.terms("Tags").field("tags")) + .highlighter(hb) + .from(from).size(size); + + return searchSourceBuilder; + } + + private SearchSourceBuilder buildDashboardSearchBuilder(String query, int from, int size) { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + HighlightBuilder.Field highlightTableName = + new HighlightBuilder.Field("dashboard_name"); + highlightTableName.highlighterType("unified"); + HighlightBuilder.Field highlightDescription = + new HighlightBuilder.Field("description"); + highlightDescription.highlighterType("unified"); + HighlightBuilder hb = new HighlightBuilder(); + hb.field(highlightDescription); + hb.field(highlightTableName); + hb.preTags(""); + hb.postTags(""); + searchSourceBuilder.query(QueryBuilders.queryStringQuery(query) + .field("dashboard_name", 5.0f) + .field("description") + .field("chart_names") + .field("chart_descriptions") + .lenient(true)) + .aggregation(AggregationBuilders.terms("Service").field("service_type")) .aggregation(AggregationBuilders.terms("Tier").field("tier")) .aggregation(AggregationBuilders.terms("Tags").field("tags")) .highlighter(hb) diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index e4df758e8bc..ccd358b36a0 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -107,6 +107,15 @@ public final class EntityUtil { return refs.isEmpty() ? null : refs.get(0); } + public static EntityReference getService(EntityRelationshipDAO dao, UUID entityId, String serviceType) { + List refs = dao.findFromEntity(entityId.toString(), Relationship.CONTAINS.ordinal(), serviceType); + if (refs.size() > 1) { + LOG.warn("Possible database issues - multiple services found for entity {}", entityId); + return refs.get(0); + } + return refs.isEmpty() ? null : refs.get(0); + } + /** * Populate EntityRef with href */ diff --git a/ingestion/pipelines/metadata_to_es.json b/ingestion/pipelines/metadata_to_es.json index a4d5a32f2e4..06f5bb7a7d4 100644 --- a/ingestion/pipelines/metadata_to_es.json +++ b/ingestion/pipelines/metadata_to_es.json @@ -3,7 +3,8 @@ "type": "metadata", "config": { "include_tables": "true", - "include_topics": "true" + "include_topics": "true", + "include_dashboards": "true" } }, "sink": { @@ -11,6 +12,7 @@ "config": { "index_tables": "true", "index_topics": "true", + "index_dashboards": "true", "es_host_port": "localhost" } }, diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index 5a543b972fc..6605471de76 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -209,6 +209,24 @@ class TopicESDocument(BaseModel): followers: List[str] +class DashboardESDocument(BaseModel): + """ Elastic Search Mapping doc for Dashboards """ + dashboard_id: str + service: str + service_type: str + dashboard_name: str + suggest: List[dict] + description: Optional[str] = None + last_updated_timestamp: Optional[int] + chart_names: List[str] + chart_descriptions: List[str] + tags: List[str] + fqdn: str + tier: Optional[str] = None + owner: str + followers: List[str] + + class DashboardOwner(BaseModel): """Dashboard owner""" username: str @@ -219,7 +237,7 @@ class DashboardOwner(BaseModel): class Chart(BaseModel): """Chart""" name: str - displayName:str + displayName: str description: str chart_type: str url: str diff --git a/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py b/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py index dc34e34e878..a73d5e3b05a 100644 --- a/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py +++ b/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py @@ -55,6 +55,7 @@ DatabaseEntities = List[Database] TableEntities = List[Table] Tags = List[Tag] Topics = List[Topic] +Dashboards = List[Dashboard] class MetadataServerConfig(ConfigModel): @@ -337,10 +338,27 @@ class OpenMetadataAPIClient(object): resp = self.client.put('/charts', data=create_chart_request.json()) return Chart(**resp) + def get_chart_by_id(self, chart_id: str, fields: [] = ['tags,service']) -> Chart: + """Get Chart By ID""" + params = {'fields': ",".join(fields)} + resp = self.client.get('/charts/{}'.format(chart_id), data=params) + return Chart(**resp) + def create_or_update_dashboard(self, create_dashboard_request: CreateDashboardEntityRequest) -> Dashboard: """Create or Update a Dashboard """ resp = self.client.put('/dashboards', data=create_dashboard_request.json()) return Dashboard(**resp) + def list_dashboards(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Dashboards: + """ List all dashboards""" + if fields is None: + resp = self.client.get('/dashboards') + else: + resp = self.client.get('/dashboards?fields={}&offset={}&limit={}'.format(fields, offset, limit)) + if self._use_raw_data: + return resp + else: + return [Dashboard(**t) for t in resp['data']] + def close(self): self.client.close() diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 425a39c9fd8..e0d89419d0e 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -15,22 +15,23 @@ import json import logging import time -from typing import Optional +from typing import Optional, List from elasticsearch import Elasticsearch -from elasticsearch.exceptions import NotFoundError -import metadata +from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.type import entityReference from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig from metadata.ingestion.sink.elasticsearch_constants import TABLE_ELASTICSEARCH_INDEX_MAPPING, \ - TOPIC_ELASTICSEARCH_INDEX_MAPPING + TOPIC_ELASTICSEARCH_INDEX_MAPPING, DASHBOARD_ELASTICSEARCH_INDEX_MAPPING from metadata.config.common import ConfigModel from metadata.ingestion.api.common import WorkflowContext, Record -from metadata.ingestion.models.table_metadata import TableESDocument, TopicESDocument +from metadata.ingestion.models.table_metadata import TableESDocument, TopicESDocument, DashboardESDocument logger = logging.getLogger(__name__) @@ -39,18 +40,15 @@ class ElasticSearchConfig(ConfigModel): es_host_port: str index_tables: Optional[bool] = True index_topics: Optional[bool] = False + index_dashboards: Optional[bool] = False table_index_name: str = "table_search_index" topic_index_name: str = "topic_search_index" + dashboard_index_name: str = "dashboard_search_index" class ElasticsearchSink(Sink): """ - Elasticsearch Publisher uses Bulk API to load data from JSON file. - A new index is created and data is uploaded into it. After the upload - is complete, index alias is swapped to point to new index from old index - and traffic is routed to new index. - Old index is deleted after the alias swap is complete """ DEFAULT_ELASTICSEARCH_INDEX_MAPPING = TABLE_ELASTICSEARCH_INDEX_MAPPING @@ -76,6 +74,8 @@ class ElasticsearchSink(Sink): self._check_or_create_index(self.config.table_index_name, TABLE_ELASTICSEARCH_INDEX_MAPPING) if self.config.index_topics: self._check_or_create_index(self.config.topic_index_name, TOPIC_ELASTICSEARCH_INDEX_MAPPING) + if self.config.index_dashboards: + self._check_or_create_index(self.config.dashboard_index_name, DASHBOARD_ELASTICSEARCH_INDEX_MAPPING) def _check_or_create_index(self, index_name: str, es_mapping: str): """ @@ -96,15 +96,19 @@ class ElasticsearchSink(Sink): self.elasticsearch_client.indices.create(index=index_name, body=es_mapping) def write_record(self, record: Record) -> None: - if isinstance(record, metadata.generated.schema.entity.data.table.Table): + if isinstance(record, Table): table_doc = self._create_table_es_doc(record) self.elasticsearch_client.index(index=self.config.table_index_name, id=str(table_doc.table_id), body=table_doc.json()) - if isinstance(record, metadata.generated.schema.entity.data.topic.Topic): + if isinstance(record, Topic): topic_doc = self._create_topic_es_doc(record) self.elasticsearch_client.index(index=self.config.topic_index_name, id=str(topic_doc.topic_id), body=topic_doc.json()) - self.status.records_written(record.name.__root__) + if isinstance(record, Dashboard): + dashboard_doc = self._create_dashboard_es_doc(record) + self.elasticsearch_client.index(index=self.config.dashboard_index_name, id=str(dashboard_doc.dashboard_id), + body=dashboard_doc.json()) + self.status.records_written(record.name) def _create_table_es_doc(self, table: Table): fqdn = table.fullyQualifiedName @@ -196,6 +200,59 @@ class ElasticsearchSink(Sink): return topic_doc + def _create_dashboard_es_doc(self, dashboard: Dashboard): + fqdn = dashboard.fullyQualifiedName + dashboard_name = dashboard.name + suggest = [{'input': [fqdn], 'weight': 5}, {'input': [dashboard_name], 'weight': 10}] + tags = set() + timestamp = time.time() + service_entity = self.rest.get_dashboard_service_by_id(str(dashboard.service.id.__root__)) + dashboard_owner = str(dashboard.owner.id.__root__) if dashboard.owner is not None else "" + dashboard_followers = [] + if dashboard.followers: + for follower in dashboard.followers.__root__: + dashboard_followers.append(str(follower.id.__root__)) + tier = None + for dashboard_tag in dashboard.tags: + if "Tier" in dashboard_tag.tagFQN: + tier = dashboard_tag.tagFQN + else: + tags.add(dashboard_tag.tagFQN) + charts: List[Chart] = self._get_charts(dashboard.charts) + chart_names = [] + chart_descriptions = [] + for chart in charts: + chart_names.append(chart.displayName) + if chart.description is not None: + chart_descriptions.append(chart.description) + if len(chart.tags) > 0: + for col_tag in chart.tags: + tags.add(col_tag.tagFQN) + dashboard_doc = DashboardESDocument(dashboard_id=str(dashboard.id.__root__), + service=service_entity.name, + service_type=service_entity.serviceType.name, + dashboard_name=dashboard.displayName, + chart_names=chart_names, + chart_descriptions=chart_descriptions, + suggest=suggest, + description=dashboard.description, + last_updated_timestamp=timestamp, + tier=tier, + tags=list(tags), + fqdn=fqdn, + owner=dashboard_owner, + followers=dashboard_followers) + + return dashboard_doc + + def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]): + charts = [] + if chart_refs is not None: + for chart_ref in chart_refs: + chart = self.rest.get_chart_by_id(str(chart_ref.id.__root__)) + charts.append(chart) + return charts + def get_status(self): return self.status diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py index 31871d4149b..98ca011ce01 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch_constants.py @@ -153,4 +153,56 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( } } """ +) + +DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent( + """ + { + "mappings":{ + "properties": { + "dashboard_name": { + "type":"text", + "analyzer": "keyword" + }, + "display_name": { + "type": "keyword" + }, + "owner": { + "type": "keyword" + }, + "followers": { + "type": "keyword" + }, + "last_updated_timestamp": { + "type": "date", + "format": "epoch_second" + }, + "description": { + "type": "text" + }, + "chart_names": { + "type":"keyword" + }, + "chart_descriptions": { + "type": "text" + }, + "tier": { + "type": "keyword" + }, + "tags": { + "type": "keyword" + }, + "service": { + "type": "keyword" + }, + "service_type": { + "type": "keyword" + }, + "suggest": { + "type": "completion" + } + } + } + } + """ ) \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index 9c407729988..c6fbbdc038f 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -24,12 +24,17 @@ from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient from typing import Iterable, List from dataclasses import dataclass, field +from ...generated.schema.entity.data.dashboard import Dashboard +from ...generated.schema.entity.data.table import Table +from ...generated.schema.entity.data.topic import Topic + logger = logging.getLogger(__name__) class MetadataTablesRestSourceConfig(ConfigModel): include_tables: Optional[bool] = True - include_topics: Optional[bool] = False + include_topics: Optional[bool] = True + include_dashboards: Optional[bool] = True limit_records: int = 50000 @@ -48,9 +53,13 @@ class MetadataSourceStatus(SourceStatus): self.success.append(topic_name) logger.info('Topic Scanned: {}'.format(topic_name)) + def scanned_dashboard(self, dashboard_name: str) -> None: + self.success.append(dashboard_name) + logger.info('Dashboard Scanned: {}'.format(dashboard_name)) + def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None: self.warnings.append(table_name) - logger.warning("Dropped Table {} due to {}".format(table_name, err)) + logger.warning("Dropped Entity {} due to {}".format(table_name, err)) class MetadataSource(Source): config: MetadataTablesRestSourceConfig @@ -68,14 +77,7 @@ class MetadataSource(Source): self.topics = None def prepare(self): - if self.config.include_tables: - self.tables = self.client.list_tables( - fields="columns,tableConstraints,usageSummary,owner,database,tags,followers", - offset=0, limit=self.config.limit_records) - - if self.config.include_topics: - self.topics = self.client.list_topics( - fields="owner,service,tags,followers", offset=0, limit=self.config.limit_records) + pass @classmethod def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): @@ -84,12 +86,34 @@ class MetadataSource(Source): return cls(config, metadata_config, ctx) def next_record(self) -> Iterable[Record]: - for table in self.tables: - self.status.scanned_table(table.name.__root__) - yield table - for topic in self.topics: - self.status.scanned_topic(topic.name.__root__) - yield topic + yield from self.fetch_table() + yield from self.fetch_topic() + yield from self.fetch_dashboard() + + def fetch_table(self) -> Table: + if self.config.include_tables: + tables = self.client.list_tables( + fields="columns,tableConstraints,usageSummary,owner,database,tags,followers", + offset=0, limit=self.config.limit_records) + for table in tables: + self.status.scanned_table(table.name.__root__) + yield table + + def fetch_topic(self) -> Topic: + if self.config.include_topics: + topics = self.client.list_topics( + fields="owner,service,tags,followers", offset=0, limit=self.config.limit_records) + for topic in topics: + self.status.scanned_topic(topic.name.__root__) + yield topic + + def fetch_dashboard(self) -> Dashboard: + if self.config.include_dashboards: + dashboards = self.client.list_dashboards( + fields="owner,service,tags,followers,charts", offset=0, limit=self.config.limit_records) + for dashboard in dashboards: + self.status.scanned_dashboard(dashboard.name) + yield dashboard def get_status(self) -> SourceStatus: return self.status