Fix #393: ingestion support for indexing dashboard/charts entities (#394)

This commit is contained in:
Sriharsha Chintalapani 2021-09-03 02:24:05 -07:00 committed by GitHub
parent 4d3ec274ea
commit 59c74fffcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 251 additions and 34 deletions

View File

@ -256,7 +256,7 @@ public abstract class ChartRepository {
private EntityReference getService(Chart chart) throws IOException { private EntityReference getService(Chart chart) throws IOException {
return chart == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(), return chart == null ? null : getService(Objects.requireNonNull(EntityUtil.getService(relationshipDAO(),
chart.getId()))); chart.getId(), Entity.DASHBOARD_SERVICE)));
} }
private EntityReference getService(EntityReference service) throws IOException { private EntityReference getService(EntityReference service) throws IOException {

View File

@ -53,6 +53,12 @@ public interface EntityRelationshipDAO {
@RegisterMapper(FromEntityReferenceMapper.class) @RegisterMapper(FromEntityReferenceMapper.class)
List<EntityReference> findFrom(@Bind("toId") String toId, @Bind("relation") int relation); List<EntityReference> findFrom(@Bind("toId") String toId, @Bind("relation") int relation);
@SqlQuery("SELECT fromId, fromEntity FROM entity_relationship WHERE toId = :toId AND relation = :relation AND " +
"fromEntity = :fromEntity ORDER BY fromId")
@RegisterMapper(FromEntityReferenceMapper.class)
List<EntityReference> findFromEntity(@Bind("toId") String toId, @Bind("relation") int relation,
@Bind("fromEntity") String fromEntity);
@SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND toId = :toId AND relation = :relation") @SqlUpdate("DELETE from entity_relationship WHERE fromId = :fromId AND toId = :toId AND relation = :relation")
void delete(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("relation") int relation); void delete(@Bind("fromId") String fromId, @Bind("toId") String toId, @Bind("relation") int relation);

View File

@ -118,9 +118,12 @@ public class SearchResource {
} }
if (index.equals("topic_search_index")) { if (index.equals("topic_search_index")) {
searchSourceBuilder = buildTopicSearchBuilder(query, from, size); searchSourceBuilder = buildTopicSearchBuilder(query, from, size);
} else if (index.equals("dashboard_search_index")) {
searchSourceBuilder = buildDashboardSearchBuilder(query, from, size);
} else { } else {
searchSourceBuilder = buildTableSearchBuilder(query, from, size); searchSourceBuilder = buildTableSearchBuilder(query, from, size);
} }
if (sortFieldParam != null && !sortFieldParam.isEmpty()) { if (sortFieldParam != null && !sortFieldParam.isEmpty()) {
searchSourceBuilder.sort(sortFieldParam, sortOrder); searchSourceBuilder.sort(sortFieldParam, sortOrder);
} }
@ -186,7 +189,7 @@ public class SearchResource {
.field("column_names") .field("column_names")
.field("column_descriptions") .field("column_descriptions")
.lenient(true)) .lenient(true))
.aggregation(AggregationBuilders.terms("Service Type").field("service_type")) .aggregation(AggregationBuilders.terms("Service").field("service_type"))
.aggregation(AggregationBuilders.terms("Tier").field("tier")) .aggregation(AggregationBuilders.terms("Tier").field("tier"))
.aggregation(AggregationBuilders.terms("Tags").field("tags")) .aggregation(AggregationBuilders.terms("Tags").field("tags"))
.highlighter(hb) .highlighter(hb)
@ -212,7 +215,35 @@ public class SearchResource {
.field("topic_name", 5.0f) .field("topic_name", 5.0f)
.field("description") .field("description")
.lenient(true)) .lenient(true))
.aggregation(AggregationBuilders.terms("Service Type").field("service_type")) .aggregation(AggregationBuilders.terms("Service").field("service_type"))
.aggregation(AggregationBuilders.terms("Tier").field("tier"))
.aggregation(AggregationBuilders.terms("Tags").field("tags"))
.highlighter(hb)
.from(from).size(size);
return searchSourceBuilder;
}
private SearchSourceBuilder buildDashboardSearchBuilder(String query, int from, int size) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder.Field highlightTableName =
new HighlightBuilder.Field("dashboard_name");
highlightTableName.highlighterType("unified");
HighlightBuilder.Field highlightDescription =
new HighlightBuilder.Field("description");
highlightDescription.highlighterType("unified");
HighlightBuilder hb = new HighlightBuilder();
hb.field(highlightDescription);
hb.field(highlightTableName);
hb.preTags("<span class=\"text-highlighter\">");
hb.postTags("</span>");
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
.field("dashboard_name", 5.0f)
.field("description")
.field("chart_names")
.field("chart_descriptions")
.lenient(true))
.aggregation(AggregationBuilders.terms("Service").field("service_type"))
.aggregation(AggregationBuilders.terms("Tier").field("tier")) .aggregation(AggregationBuilders.terms("Tier").field("tier"))
.aggregation(AggregationBuilders.terms("Tags").field("tags")) .aggregation(AggregationBuilders.terms("Tags").field("tags"))
.highlighter(hb) .highlighter(hb)

View File

@ -107,6 +107,15 @@ public final class EntityUtil {
return refs.isEmpty() ? null : refs.get(0); return refs.isEmpty() ? null : refs.get(0);
} }
public static EntityReference getService(EntityRelationshipDAO dao, UUID entityId, String serviceType) {
List<EntityReference> refs = dao.findFromEntity(entityId.toString(), Relationship.CONTAINS.ordinal(), serviceType);
if (refs.size() > 1) {
LOG.warn("Possible database issues - multiple services found for entity {}", entityId);
return refs.get(0);
}
return refs.isEmpty() ? null : refs.get(0);
}
/** /**
* Populate EntityRef with href * Populate EntityRef with href
*/ */

View File

@ -3,7 +3,8 @@
"type": "metadata", "type": "metadata",
"config": { "config": {
"include_tables": "true", "include_tables": "true",
"include_topics": "true" "include_topics": "true",
"include_dashboards": "true"
} }
}, },
"sink": { "sink": {
@ -11,6 +12,7 @@
"config": { "config": {
"index_tables": "true", "index_tables": "true",
"index_topics": "true", "index_topics": "true",
"index_dashboards": "true",
"es_host_port": "localhost" "es_host_port": "localhost"
} }
}, },

View File

@ -209,6 +209,24 @@ class TopicESDocument(BaseModel):
followers: List[str] followers: List[str]
class DashboardESDocument(BaseModel):
""" Elastic Search Mapping doc for Dashboards """
dashboard_id: str
service: str
service_type: str
dashboard_name: str
suggest: List[dict]
description: Optional[str] = None
last_updated_timestamp: Optional[int]
chart_names: List[str]
chart_descriptions: List[str]
tags: List[str]
fqdn: str
tier: Optional[str] = None
owner: str
followers: List[str]
class DashboardOwner(BaseModel): class DashboardOwner(BaseModel):
"""Dashboard owner""" """Dashboard owner"""
username: str username: str
@ -219,7 +237,7 @@ class DashboardOwner(BaseModel):
class Chart(BaseModel): class Chart(BaseModel):
"""Chart""" """Chart"""
name: str name: str
displayName:str displayName: str
description: str description: str
chart_type: str chart_type: str
url: str url: str

View File

@ -55,6 +55,7 @@ DatabaseEntities = List[Database]
TableEntities = List[Table] TableEntities = List[Table]
Tags = List[Tag] Tags = List[Tag]
Topics = List[Topic] Topics = List[Topic]
Dashboards = List[Dashboard]
class MetadataServerConfig(ConfigModel): class MetadataServerConfig(ConfigModel):
@ -337,10 +338,27 @@ class OpenMetadataAPIClient(object):
resp = self.client.put('/charts', data=create_chart_request.json()) resp = self.client.put('/charts', data=create_chart_request.json())
return Chart(**resp) return Chart(**resp)
def get_chart_by_id(self, chart_id: str, fields: [] = ['tags,service']) -> Chart:
"""Get Chart By ID"""
params = {'fields': ",".join(fields)}
resp = self.client.get('/charts/{}'.format(chart_id), data=params)
return Chart(**resp)
def create_or_update_dashboard(self, create_dashboard_request: CreateDashboardEntityRequest) -> Dashboard: def create_or_update_dashboard(self, create_dashboard_request: CreateDashboardEntityRequest) -> Dashboard:
"""Create or Update a Dashboard """ """Create or Update a Dashboard """
resp = self.client.put('/dashboards', data=create_dashboard_request.json()) resp = self.client.put('/dashboards', data=create_dashboard_request.json())
return Dashboard(**resp) return Dashboard(**resp)
def list_dashboards(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Dashboards:
""" List all dashboards"""
if fields is None:
resp = self.client.get('/dashboards')
else:
resp = self.client.get('/dashboards?fields={}&offset={}&limit={}'.format(fields, offset, limit))
if self._use_raw_data:
return resp
else:
return [Dashboard(**t) for t in resp['data']]
def close(self): def close(self):
self.client.close() self.client.close()

View File

@ -15,22 +15,23 @@
import json import json
import logging import logging
import time import time
from typing import Optional from typing import Optional, List
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
import metadata from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.type import entityReference
from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
from metadata.ingestion.sink.elasticsearch_constants import TABLE_ELASTICSEARCH_INDEX_MAPPING, \ from metadata.ingestion.sink.elasticsearch_constants import TABLE_ELASTICSEARCH_INDEX_MAPPING, \
TOPIC_ELASTICSEARCH_INDEX_MAPPING TOPIC_ELASTICSEARCH_INDEX_MAPPING, DASHBOARD_ELASTICSEARCH_INDEX_MAPPING
from metadata.config.common import ConfigModel from metadata.config.common import ConfigModel
from metadata.ingestion.api.common import WorkflowContext, Record from metadata.ingestion.api.common import WorkflowContext, Record
from metadata.ingestion.models.table_metadata import TableESDocument, TopicESDocument from metadata.ingestion.models.table_metadata import TableESDocument, TopicESDocument, DashboardESDocument
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -39,18 +40,15 @@ class ElasticSearchConfig(ConfigModel):
es_host_port: str es_host_port: str
index_tables: Optional[bool] = True index_tables: Optional[bool] = True
index_topics: Optional[bool] = False index_topics: Optional[bool] = False
index_dashboards: Optional[bool] = False
table_index_name: str = "table_search_index" table_index_name: str = "table_search_index"
topic_index_name: str = "topic_search_index" topic_index_name: str = "topic_search_index"
dashboard_index_name: str = "dashboard_search_index"
class ElasticsearchSink(Sink): class ElasticsearchSink(Sink):
""" """
Elasticsearch Publisher uses Bulk API to load data from JSON file.
A new index is created and data is uploaded into it. After the upload
is complete, index alias is swapped to point to new index from old index
and traffic is routed to new index.
Old index is deleted after the alias swap is complete
""" """
DEFAULT_ELASTICSEARCH_INDEX_MAPPING = TABLE_ELASTICSEARCH_INDEX_MAPPING DEFAULT_ELASTICSEARCH_INDEX_MAPPING = TABLE_ELASTICSEARCH_INDEX_MAPPING
@ -76,6 +74,8 @@ class ElasticsearchSink(Sink):
self._check_or_create_index(self.config.table_index_name, TABLE_ELASTICSEARCH_INDEX_MAPPING) self._check_or_create_index(self.config.table_index_name, TABLE_ELASTICSEARCH_INDEX_MAPPING)
if self.config.index_topics: if self.config.index_topics:
self._check_or_create_index(self.config.topic_index_name, TOPIC_ELASTICSEARCH_INDEX_MAPPING) self._check_or_create_index(self.config.topic_index_name, TOPIC_ELASTICSEARCH_INDEX_MAPPING)
if self.config.index_dashboards:
self._check_or_create_index(self.config.dashboard_index_name, DASHBOARD_ELASTICSEARCH_INDEX_MAPPING)
def _check_or_create_index(self, index_name: str, es_mapping: str): def _check_or_create_index(self, index_name: str, es_mapping: str):
""" """
@ -96,15 +96,19 @@ class ElasticsearchSink(Sink):
self.elasticsearch_client.indices.create(index=index_name, body=es_mapping) self.elasticsearch_client.indices.create(index=index_name, body=es_mapping)
def write_record(self, record: Record) -> None: def write_record(self, record: Record) -> None:
if isinstance(record, metadata.generated.schema.entity.data.table.Table): if isinstance(record, Table):
table_doc = self._create_table_es_doc(record) table_doc = self._create_table_es_doc(record)
self.elasticsearch_client.index(index=self.config.table_index_name, id=str(table_doc.table_id), self.elasticsearch_client.index(index=self.config.table_index_name, id=str(table_doc.table_id),
body=table_doc.json()) body=table_doc.json())
if isinstance(record, metadata.generated.schema.entity.data.topic.Topic): if isinstance(record, Topic):
topic_doc = self._create_topic_es_doc(record) topic_doc = self._create_topic_es_doc(record)
self.elasticsearch_client.index(index=self.config.topic_index_name, id=str(topic_doc.topic_id), self.elasticsearch_client.index(index=self.config.topic_index_name, id=str(topic_doc.topic_id),
body=topic_doc.json()) body=topic_doc.json())
self.status.records_written(record.name.__root__) if isinstance(record, Dashboard):
dashboard_doc = self._create_dashboard_es_doc(record)
self.elasticsearch_client.index(index=self.config.dashboard_index_name, id=str(dashboard_doc.dashboard_id),
body=dashboard_doc.json())
self.status.records_written(record.name)
def _create_table_es_doc(self, table: Table): def _create_table_es_doc(self, table: Table):
fqdn = table.fullyQualifiedName fqdn = table.fullyQualifiedName
@ -196,6 +200,59 @@ class ElasticsearchSink(Sink):
return topic_doc return topic_doc
def _create_dashboard_es_doc(self, dashboard: Dashboard):
fqdn = dashboard.fullyQualifiedName
dashboard_name = dashboard.name
suggest = [{'input': [fqdn], 'weight': 5}, {'input': [dashboard_name], 'weight': 10}]
tags = set()
timestamp = time.time()
service_entity = self.rest.get_dashboard_service_by_id(str(dashboard.service.id.__root__))
dashboard_owner = str(dashboard.owner.id.__root__) if dashboard.owner is not None else ""
dashboard_followers = []
if dashboard.followers:
for follower in dashboard.followers.__root__:
dashboard_followers.append(str(follower.id.__root__))
tier = None
for dashboard_tag in dashboard.tags:
if "Tier" in dashboard_tag.tagFQN:
tier = dashboard_tag.tagFQN
else:
tags.add(dashboard_tag.tagFQN)
charts: List[Chart] = self._get_charts(dashboard.charts)
chart_names = []
chart_descriptions = []
for chart in charts:
chart_names.append(chart.displayName)
if chart.description is not None:
chart_descriptions.append(chart.description)
if len(chart.tags) > 0:
for col_tag in chart.tags:
tags.add(col_tag.tagFQN)
dashboard_doc = DashboardESDocument(dashboard_id=str(dashboard.id.__root__),
service=service_entity.name,
service_type=service_entity.serviceType.name,
dashboard_name=dashboard.displayName,
chart_names=chart_names,
chart_descriptions=chart_descriptions,
suggest=suggest,
description=dashboard.description,
last_updated_timestamp=timestamp,
tier=tier,
tags=list(tags),
fqdn=fqdn,
owner=dashboard_owner,
followers=dashboard_followers)
return dashboard_doc
def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]):
charts = []
if chart_refs is not None:
for chart_ref in chart_refs:
chart = self.rest.get_chart_by_id(str(chart_ref.id.__root__))
charts.append(chart)
return charts
def get_status(self): def get_status(self):
return self.status return self.status

View File

@ -153,4 +153,56 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
} }
} }
""" """
)
DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"""
{
"mappings":{
"properties": {
"dashboard_name": {
"type":"text",
"analyzer": "keyword"
},
"display_name": {
"type": "keyword"
},
"owner": {
"type": "keyword"
},
"followers": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"chart_names": {
"type":"keyword"
},
"chart_descriptions": {
"type": "text"
},
"tier": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"service": {
"type": "keyword"
},
"service_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
}
}
}
}
"""
) )

View File

@ -24,12 +24,17 @@ from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
from typing import Iterable, List from typing import Iterable, List
from dataclasses import dataclass, field from dataclasses import dataclass, field
from ...generated.schema.entity.data.dashboard import Dashboard
from ...generated.schema.entity.data.table import Table
from ...generated.schema.entity.data.topic import Topic
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class MetadataTablesRestSourceConfig(ConfigModel): class MetadataTablesRestSourceConfig(ConfigModel):
include_tables: Optional[bool] = True include_tables: Optional[bool] = True
include_topics: Optional[bool] = False include_topics: Optional[bool] = True
include_dashboards: Optional[bool] = True
limit_records: int = 50000 limit_records: int = 50000
@ -48,9 +53,13 @@ class MetadataSourceStatus(SourceStatus):
self.success.append(topic_name) self.success.append(topic_name)
logger.info('Topic Scanned: {}'.format(topic_name)) logger.info('Topic Scanned: {}'.format(topic_name))
def scanned_dashboard(self, dashboard_name: str) -> None:
self.success.append(dashboard_name)
logger.info('Dashboard Scanned: {}'.format(dashboard_name))
def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None: def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None:
self.warnings.append(table_name) self.warnings.append(table_name)
logger.warning("Dropped Table {} due to {}".format(table_name, err)) logger.warning("Dropped Entity {} due to {}".format(table_name, err))
class MetadataSource(Source): class MetadataSource(Source):
config: MetadataTablesRestSourceConfig config: MetadataTablesRestSourceConfig
@ -68,14 +77,7 @@ class MetadataSource(Source):
self.topics = None self.topics = None
def prepare(self): def prepare(self):
if self.config.include_tables: pass
self.tables = self.client.list_tables(
fields="columns,tableConstraints,usageSummary,owner,database,tags,followers",
offset=0, limit=self.config.limit_records)
if self.config.include_topics:
self.topics = self.client.list_topics(
fields="owner,service,tags,followers", offset=0, limit=self.config.limit_records)
@classmethod @classmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
@ -84,12 +86,34 @@ class MetadataSource(Source):
return cls(config, metadata_config, ctx) return cls(config, metadata_config, ctx)
def next_record(self) -> Iterable[Record]: def next_record(self) -> Iterable[Record]:
for table in self.tables: yield from self.fetch_table()
self.status.scanned_table(table.name.__root__) yield from self.fetch_topic()
yield table yield from self.fetch_dashboard()
for topic in self.topics:
self.status.scanned_topic(topic.name.__root__) def fetch_table(self) -> Table:
yield topic if self.config.include_tables:
tables = self.client.list_tables(
fields="columns,tableConstraints,usageSummary,owner,database,tags,followers",
offset=0, limit=self.config.limit_records)
for table in tables:
self.status.scanned_table(table.name.__root__)
yield table
def fetch_topic(self) -> Topic:
if self.config.include_topics:
topics = self.client.list_topics(
fields="owner,service,tags,followers", offset=0, limit=self.config.limit_records)
for topic in topics:
self.status.scanned_topic(topic.name.__root__)
yield topic
def fetch_dashboard(self) -> Dashboard:
if self.config.include_dashboards:
dashboards = self.client.list_dashboards(
fields="owner,service,tags,followers,charts", offset=0, limit=self.config.limit_records)
for dashboard in dashboards:
self.status.scanned_dashboard(dashboard.name)
yield dashboard
def get_status(self) -> SourceStatus: def get_status(self) -> SourceStatus:
return self.status return self.status