diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 277cc5a1f23..7391ea21a1e 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -13,6 +13,7 @@ import json import logging import ssl import time +import traceback from datetime import datetime from typing import List, Optional @@ -205,71 +206,75 @@ class ElasticsearchSink(Sink[Entity]): ) def write_record(self, record: Entity) -> None: - if isinstance(record, Table): - table_doc = self._create_table_es_doc(record) - self.elasticsearch_client.index( - index=self.config.table_index_name, - id=str(table_doc.table_id), - body=table_doc.json(), - request_timeout=self.config.timeout, - ) - if isinstance(record, Topic): - topic_doc = self._create_topic_es_doc(record) - self.elasticsearch_client.index( - index=self.config.topic_index_name, - id=str(topic_doc.topic_id), - body=topic_doc.json(), - request_timeout=self.config.timeout, - ) - if isinstance(record, Dashboard): - dashboard_doc = self._create_dashboard_es_doc(record) - self.elasticsearch_client.index( - index=self.config.dashboard_index_name, - id=str(dashboard_doc.dashboard_id), - body=dashboard_doc.json(), - request_timeout=self.config.timeout, - ) - if isinstance(record, Pipeline): - pipeline_doc = self._create_pipeline_es_doc(record) - self.elasticsearch_client.index( - index=self.config.pipeline_index_name, - id=str(pipeline_doc.pipeline_id), - body=pipeline_doc.json(), - request_timeout=self.config.timeout, - ) + try: + if isinstance(record, Table): + table_doc = self._create_table_es_doc(record) + self.elasticsearch_client.index( + index=self.config.table_index_name, + id=str(table_doc.table_id), + body=table_doc.json(), + request_timeout=self.config.timeout, + ) + if isinstance(record, Topic): + topic_doc = self._create_topic_es_doc(record) + self.elasticsearch_client.index( + index=self.config.topic_index_name, + id=str(topic_doc.topic_id), + body=topic_doc.json(), + request_timeout=self.config.timeout, + ) + if isinstance(record, Dashboard): + dashboard_doc = self._create_dashboard_es_doc(record) + self.elasticsearch_client.index( + index=self.config.dashboard_index_name, + id=str(dashboard_doc.dashboard_id), + body=dashboard_doc.json(), + request_timeout=self.config.timeout, + ) + if isinstance(record, Pipeline): + pipeline_doc = self._create_pipeline_es_doc(record) + self.elasticsearch_client.index( + index=self.config.pipeline_index_name, + id=str(pipeline_doc.pipeline_id), + body=pipeline_doc.json(), + request_timeout=self.config.timeout, + ) - if isinstance(record, User): - user_doc = self._create_user_es_doc(record) - print(user_doc.json()) - self.elasticsearch_client.index( - index=self.config.user_index_name, - id=str(user_doc.user_id), - body=user_doc.json(), - request_timeout=self.config.timeout, - ) + if isinstance(record, User): + user_doc = self._create_user_es_doc(record) + print(user_doc.json()) + self.elasticsearch_client.index( + index=self.config.user_index_name, + id=str(user_doc.user_id), + body=user_doc.json(), + request_timeout=self.config.timeout, + ) - if isinstance(record, Team): - team_doc = self._create_team_es_doc(record) - self.elasticsearch_client.index( - index=self.config.team_index_name, - id=str(team_doc.team_id), - body=team_doc.json(), - request_timeout=self.config.timeout, - ) + if isinstance(record, Team): + team_doc = self._create_team_es_doc(record) + self.elasticsearch_client.index( + index=self.config.team_index_name, + id=str(team_doc.team_id), + body=team_doc.json(), + request_timeout=self.config.timeout, + ) - if isinstance(record, GlossaryTerm): - glossary_term_doc = self._create_glossary_term_es_doc(record) - self.elasticsearch_client.index( - index=self.config.glossary_term_index_name, - id=str(glossary_term_doc.glossary_term_id), - body=glossary_term_doc.json(), - request_timeout=self.config.timeout, - ) + if isinstance(record, GlossaryTerm): + glossary_term_doc = self._create_glossary_term_es_doc(record) + self.elasticsearch_client.index( + index=self.config.glossary_term_index_name, + id=str(glossary_term_doc.glossary_term_id), + body=glossary_term_doc.json(), + request_timeout=self.config.timeout, + ) - if hasattr(record.name, "__root__"): - self.status.records_written(record.name.__root__) - else: - self.status.records_written(record.name) + if hasattr(record.name, "__root__"): + self.status.records_written(record.name.__root__) + else: + self.status.records_written(record.name) + except Exception as e: + logger.error(f"Failed to index entity {record} due to {e}") + logger.debug(traceback.print_exc()) def _create_table_es_doc(self, table: Table): fqdn = table.fullyQualifiedName