Fix #1220: catch and log any errors during the es index (#1220)

This commit is contained in:
Sriharsha Chintalapani 2022-02-28 22:57:50 -08:00 committed by GitHub
parent 06d9329ae3
commit 15d0440599
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -13,6 +13,7 @@ import json
import logging
import ssl
import time
import traceback
from datetime import datetime
from typing import List, Optional
@ -205,71 +206,75 @@ class ElasticsearchSink(Sink[Entity]):
)
def write_record(self, record: Entity) -> None:
if isinstance(record, Table):
table_doc = self._create_table_es_doc(record)
self.elasticsearch_client.index(
index=self.config.table_index_name,
id=str(table_doc.table_id),
body=table_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Topic):
topic_doc = self._create_topic_es_doc(record)
self.elasticsearch_client.index(
index=self.config.topic_index_name,
id=str(topic_doc.topic_id),
body=topic_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Dashboard):
dashboard_doc = self._create_dashboard_es_doc(record)
self.elasticsearch_client.index(
index=self.config.dashboard_index_name,
id=str(dashboard_doc.dashboard_id),
body=dashboard_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Pipeline):
pipeline_doc = self._create_pipeline_es_doc(record)
self.elasticsearch_client.index(
index=self.config.pipeline_index_name,
id=str(pipeline_doc.pipeline_id),
body=pipeline_doc.json(),
request_timeout=self.config.timeout,
)
try:
if isinstance(record, Table):
table_doc = self._create_table_es_doc(record)
self.elasticsearch_client.index(
index=self.config.table_index_name,
id=str(table_doc.table_id),
body=table_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Topic):
topic_doc = self._create_topic_es_doc(record)
self.elasticsearch_client.index(
index=self.config.topic_index_name,
id=str(topic_doc.topic_id),
body=topic_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Dashboard):
dashboard_doc = self._create_dashboard_es_doc(record)
self.elasticsearch_client.index(
index=self.config.dashboard_index_name,
id=str(dashboard_doc.dashboard_id),
body=dashboard_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Pipeline):
pipeline_doc = self._create_pipeline_es_doc(record)
self.elasticsearch_client.index(
index=self.config.pipeline_index_name,
id=str(pipeline_doc.pipeline_id),
body=pipeline_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, User):
user_doc = self._create_user_es_doc(record)
print(user_doc.json())
self.elasticsearch_client.index(
index=self.config.user_index_name,
id=str(user_doc.user_id),
body=user_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, User):
user_doc = self._create_user_es_doc(record)
print(user_doc.json())
self.elasticsearch_client.index(
index=self.config.user_index_name,
id=str(user_doc.user_id),
body=user_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Team):
team_doc = self._create_team_es_doc(record)
self.elasticsearch_client.index(
index=self.config.team_index_name,
id=str(team_doc.team_id),
body=team_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, Team):
team_doc = self._create_team_es_doc(record)
self.elasticsearch_client.index(
index=self.config.team_index_name,
id=str(team_doc.team_id),
body=team_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, GlossaryTerm):
glossary_term_doc = self._create_glossary_term_es_doc(record)
self.elasticsearch_client.index(
index=self.config.glossary_term_index_name,
id=str(glossary_term_doc.glossary_term_id),
body=glossary_term_doc.json(),
request_timeout=self.config.timeout,
)
if isinstance(record, GlossaryTerm):
glossary_term_doc = self._create_glossary_term_es_doc(record)
self.elasticsearch_client.index(
index=self.config.glossary_term_index_name,
id=str(glossary_term_doc.glossary_term_id),
body=glossary_term_doc.json(),
request_timeout=self.config.timeout,
)
if hasattr(record.name, "__root__"):
self.status.records_written(record.name.__root__)
else:
self.status.records_written(record.name)
if hasattr(record.name, "__root__"):
self.status.records_written(record.name.__root__)
else:
self.status.records_written(record.name)
except Exception as e:
logger.error(f"Failed to index entity {record} due to {e}")
logger.debug(traceback.print_exc())
def _create_table_es_doc(self, table: Table):
fqdn = table.fullyQualifiedName