Fix Ingestion ElasticSearch indexing (#4435)

This commit is contained in:
Sriharsha Chintalapani 2022-04-24 00:18:06 -07:00 committed by GitHub
parent 2a51148eee
commit c3bbeb9e93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 12 deletions

View File

@ -57,7 +57,9 @@ public abstract class AbstractEventPublisher implements EventPublisher {
LOG.error("Failed to publish event {} due to {}, will try again in {} ms", changeEvent, ex, currentBackoffTime); LOG.error("Failed to publish event {} due to {}, will try again in {} ms", changeEvent, ex, currentBackoffTime);
Thread.sleep(currentBackoffTime); Thread.sleep(currentBackoffTime);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to publish event {}", changeEvent); LOG.error(
"Failed to publish event type {} for entity {}", changeEvent.getEventType(), changeEvent.getEntityType());
LOG.error(e.getMessage());
} }
} }

View File

@ -299,7 +299,6 @@ class ElasticsearchSink(Sink[Entity]):
service_entity = self.metadata.get_by_id( service_entity = self.metadata.get_by_id(
entity=DatabaseService, entity_id=str(database_entity.service.id.__root__) entity=DatabaseService, entity_id=str(database_entity.service.id.__root__)
) )
table_owner = str(table.owner.id.__root__) if table.owner is not None else None
table_followers = [] table_followers = []
if table.followers: if table.followers:
for follower in table.followers.__root__: for follower in table.followers.__root__:
@ -331,7 +330,7 @@ class ElasticsearchSink(Sink[Entity]):
tier=tier, tier=tier,
tags=list(tags), tags=list(tags),
fqdn=fqdn, fqdn=fqdn,
owner=table_owner, owner=table.owner,
followers=table_followers, followers=table_followers,
) )
return table_doc return table_doc
@ -348,7 +347,6 @@ class ElasticsearchSink(Sink[Entity]):
service_entity = self.metadata.get_by_id( service_entity = self.metadata.get_by_id(
entity=MessagingService, entity_id=str(topic.service.id.__root__) entity=MessagingService, entity_id=str(topic.service.id.__root__)
) )
topic_owner = str(topic.owner.id.__root__) if topic.owner else None
topic_followers = [] topic_followers = []
if topic.followers: if topic.followers:
for follower in topic.followers.__root__: for follower in topic.followers.__root__:
@ -373,7 +371,7 @@ class ElasticsearchSink(Sink[Entity]):
tier=tier, tier=tier,
tags=list(tags), tags=list(tags),
fqdn=fqdn, fqdn=fqdn,
owner=topic_owner, owner=topic.owner,
followers=topic_followers, followers=topic_followers,
) )
return topic_doc return topic_doc
@ -386,9 +384,6 @@ class ElasticsearchSink(Sink[Entity]):
service_entity = self.metadata.get_by_id( service_entity = self.metadata.get_by_id(
entity=DashboardService, entity_id=str(dashboard.service.id.__root__) entity=DashboardService, entity_id=str(dashboard.service.id.__root__)
) )
dashboard_owner = (
str(dashboard.owner.id.__root__) if dashboard.owner is not None else None
)
dashboard_followers = [] dashboard_followers = []
if dashboard.followers: if dashboard.followers:
for follower in dashboard.followers.__root__: for follower in dashboard.followers.__root__:
@ -425,7 +420,7 @@ class ElasticsearchSink(Sink[Entity]):
tier=tier, tier=tier,
tags=list(tags), tags=list(tags),
fqdn=fqdn, fqdn=fqdn,
owner=dashboard_owner, owner=dashboard.owner,
followers=dashboard_followers, followers=dashboard_followers,
monthly_stats=dashboard.usageSummary.monthlyStats.count, monthly_stats=dashboard.usageSummary.monthlyStats.count,
monthly_percentile_rank=dashboard.usageSummary.monthlyStats.percentileRank, monthly_percentile_rank=dashboard.usageSummary.monthlyStats.percentileRank,
@ -445,7 +440,6 @@ class ElasticsearchSink(Sink[Entity]):
service_entity = self.metadata.get_by_id( service_entity = self.metadata.get_by_id(
entity=PipelineService, entity_id=str(pipeline.service.id.__root__) entity=PipelineService, entity_id=str(pipeline.service.id.__root__)
) )
pipeline_owner = str(pipeline.owner.id.__root__) if pipeline.owner else None
pipeline_followers = [] pipeline_followers = []
if pipeline.followers: if pipeline.followers:
for follower in pipeline.followers.__root__: for follower in pipeline.followers.__root__:
@ -482,7 +476,7 @@ class ElasticsearchSink(Sink[Entity]):
tier=tier, tier=tier,
tags=list(tags), tags=list(tags),
fqdn=fqdn, fqdn=fqdn,
owner=pipeline_owner, owner=pipeline.owner,
followers=pipeline_followers, followers=pipeline_followers,
) )
@ -529,7 +523,7 @@ class ElasticsearchSink(Sink[Entity]):
owns = [] owns = []
if team.users: if team.users:
for user in team.users.__root__: for user in team.users.__root__:
users.append(str(team.id.__root__)) users.append(user)
if team.owns: if team.owns:
for own in team.owns.__root__: for own in team.owns.__root__: