Fix #1733: Ingestion: ElasticSearch index update should use changeLog… (#1736)

* Fix #1733: Ingestion: ElasticSearch index update should use changeLog timestamp
This commit is contained in:
Sriharsha Chintalapani 2021-12-14 09:49:25 -08:00 committed by GitHub
parent e3d1a95d2c
commit c6ef94cac7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 198 additions and 121 deletions

View File

@ -137,7 +137,17 @@ public class ElasticSearchEventHandler implements EventHandler {
List<FieldChange> fieldsAdded = changeDescription.getFieldsAdded(); List<FieldChange> fieldsAdded = changeDescription.getFieldsAdded();
StringBuilder scriptTxt = new StringBuilder(); StringBuilder scriptTxt = new StringBuilder();
Map<String, Object> fieldAddParams = new HashMap<>(); Map<String, Object> fieldAddParams = new HashMap<>();
ESChangeDescription esChangeDescription = ESChangeDescription.builder()
.updatedAt(event.getDateTime().getTime())
.updatedBy(event.getUserName())
.fieldsAdded(changeDescription.getFieldsAdded())
.fieldsUpdated(changeDescription.getFieldsUpdated())
.fieldsDeleted(changeDescription.getFieldsDeleted()).build();
Map<String, Object> esChangeDescriptionDoc = JsonUtils.getMap(esChangeDescription);
fieldAddParams.put("change_description", esChangeDescriptionDoc);
fieldAddParams.put("last_updated_timestamp", event.getDateTime().getTime());
scriptTxt.append("ctx._source.change_descriptions.add(params.change_description); ");
scriptTxt.append("ctx._source.last_updated_timestamp=params.last_updated_timestamp;");
for (FieldChange fieldChange: fieldsAdded) { for (FieldChange fieldChange: fieldsAdded) {
if (fieldChange.getName().equalsIgnoreCase("followers")) { if (fieldChange.getName().equalsIgnoreCase("followers")) {
List<EntityReference> entityReferences = (List<EntityReference>) fieldChange.getNewValue(); List<EntityReference> entityReferences = (List<EntityReference>) fieldChange.getNewValue();
@ -159,15 +169,7 @@ public class ElasticSearchEventHandler implements EventHandler {
scriptTxt.append("ctx._source.followers.removeAll(Collections.singleton(params.followers));"); scriptTxt.append("ctx._source.followers.removeAll(Collections.singleton(params.followers));");
} }
} }
ESChangeDescription esChangeDescription = ESChangeDescription.builder()
.updatedAt(event.getDateTime().getTime())
.updatedBy(event.getUserName()).build();
esChangeDescription.setFieldsAdded(changeDescription.getFieldsAdded());
esChangeDescription.setFieldsDeleted(changeDescription.getFieldsDeleted());
esChangeDescription.setFieldsUpdated(changeDescription.getFieldsUpdated());
Map<String, Object> esChangeDescriptionDoc = JsonUtils.getMap(esChangeDescription);
fieldAddParams.put("change_description", esChangeDescriptionDoc);
scriptTxt.append("ctx._source.change_descriptions.add(params.change_description);");
if (!scriptTxt.toString().isEmpty()) { if (!scriptTxt.toString().isEmpty()) {
Script script = new Script(ScriptType.INLINE, "painless", Script script = new Script(ScriptType.INLINE, "painless",
scriptTxt.toString(), scriptTxt.toString(),

View File

@ -66,8 +66,7 @@ public class ElasticSearchIndexDefinition {
TABLE_SEARCH_INDEX("table_search_index", "/elasticsearch/table_index_mapping.json"), TABLE_SEARCH_INDEX("table_search_index", "/elasticsearch/table_index_mapping.json"),
TOPIC_SEARCH_INDEX("topic_search_index", "/elasticsearch/topic_index_mapping.json"), TOPIC_SEARCH_INDEX("topic_search_index", "/elasticsearch/topic_index_mapping.json"),
DASHBOARD_SEARCH_INDEX("dashboard_search_index", "/elasticsearch/dashboard_index_mapping.json"), DASHBOARD_SEARCH_INDEX("dashboard_search_index", "/elasticsearch/dashboard_index_mapping.json"),
PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json"), PIPELINE_SEARCH_INDEX("pipeline_search_index", "/elasticsearch/pipeline_index_mapping.json");
DBT_MODEL_SEARCH_INDEX("dbt_model_search_index", "/elasticsearch/dbt_index_mapping.json");
public final String indexName; public final String indexName;
public final String indexMappingFile; public final String indexMappingFile;
@ -212,9 +211,9 @@ class FlattenColumn {
class ESChangeDescription { class ESChangeDescription {
String updatedBy; String updatedBy;
Long updatedAt; Long updatedAt;
List<FieldChange> fieldsAdded; List<FieldChange> fieldsAdded = new ArrayList<>();
List<FieldChange> fieldsUpdated; List<FieldChange> fieldsUpdated = new ArrayList<>();
List<FieldChange> fieldsDeleted; List<FieldChange> fieldsDeleted = new ArrayList<>();
} }
class ParseTags { class ParseTags {
@ -351,7 +350,11 @@ class TableESIndex extends ElasticSearchIndex {
esChangeDescription.setFieldsUpdated(table.getChangeDescription().getFieldsUpdated()); esChangeDescription.setFieldsUpdated(table.getChangeDescription().getFieldsUpdated());
} else if (responseCode == Response.Status.CREATED.getStatusCode()) { } else if (responseCode == Response.Status.CREATED.getStatusCode()) {
esChangeDescription = ESChangeDescription.builder().updatedAt(table.getUpdatedAt().getTime()) esChangeDescription = ESChangeDescription.builder().updatedAt(table.getUpdatedAt().getTime())
.updatedBy(table.getUpdatedBy()).build(); .updatedBy(table.getUpdatedBy())
.fieldsAdded(new ArrayList<>())
.fieldsUpdated(new ArrayList<>())
.fieldsDeleted(new ArrayList<>())
.build();
} }
tableESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null); tableESIndexBuilder.changeDescriptions(esChangeDescription != null ? List.of(esChangeDescription) : null);
return tableESIndexBuilder; return tableESIndexBuilder;

View File

@ -68,7 +68,27 @@
"type": "long" "type": "long"
}, },
"change_descriptions": { "change_descriptions": {
"type": "nested" "type": "nested",
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "object",
"enabled": false
},
"fieldsDeleted": {
"type": "object",
"enabled": false
},
"fieldsUpdated": {
"type": "object",
"enabled": false
}
}
} }
} }
} }

View File

@ -53,24 +53,7 @@
"type": "completion" "type": "completion"
}, },
"change_descriptions": { "change_descriptions": {
"type": "nested", "type": "nested"
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "text"
},
"fieldsDeleted": {
"type": "text"
},
"fieldsUpdated": {
"type": "text"
}
}
} }
} }
} }

View File

@ -50,7 +50,27 @@
"type": "completion" "type": "completion"
}, },
"change_descriptions": { "change_descriptions": {
"type": "nested" "type": "nested",
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "object",
"enabled": false
},
"fieldsDeleted": {
"type": "object",
"enabled": false
},
"fieldsUpdated": {
"type": "object",
"enabled": false
}
}
} }
} }
} }

View File

@ -71,7 +71,27 @@
"type": "long" "type": "long"
}, },
"change_descriptions": { "change_descriptions": {
"type": "nested" "type": "nested",
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "object",
"enabled": false
},
"fieldsDeleted": {
"type": "object",
"enabled": false
},
"fieldsUpdated": {
"type": "object",
"enabled": false
}
}
} }
} }
} }

View File

@ -44,7 +44,27 @@
"type": "completion" "type": "completion"
}, },
"change_descriptions": { "change_descriptions": {
"type": "nested" "type": "nested",
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "object",
"enabled": false
},
"fieldsDeleted": {
"type": "object",
"enabled": false
},
"fieldsUpdated": {
"type": "object",
"enabled": false
}
}
} }
} }
} }

View File

@ -36,7 +36,6 @@ from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.table_metadata import ( from metadata.ingestion.models.table_metadata import (
ChangeDescription, ChangeDescription,
DashboardESDocument, DashboardESDocument,
DbtModelESDocument,
PipelineESDocument, PipelineESDocument,
TableESDocument, TableESDocument,
TopicESDocument, TopicESDocument,
@ -45,7 +44,6 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.sink.elasticsearch_constants import ( from metadata.ingestion.sink.elasticsearch_constants import (
DASHBOARD_ELASTICSEARCH_INDEX_MAPPING, DASHBOARD_ELASTICSEARCH_INDEX_MAPPING,
DBT_ELASTICSEARCH_INDEX_MAPPING,
PIPELINE_ELASTICSEARCH_INDEX_MAPPING, PIPELINE_ELASTICSEARCH_INDEX_MAPPING,
TABLE_ELASTICSEARCH_INDEX_MAPPING, TABLE_ELASTICSEARCH_INDEX_MAPPING,
TOPIC_ELASTICSEARCH_INDEX_MAPPING, TOPIC_ELASTICSEARCH_INDEX_MAPPING,
@ -140,10 +138,6 @@ class ElasticsearchSink(Sink[Entity]):
self._check_or_create_index( self._check_or_create_index(
self.config.pipeline_index_name, PIPELINE_ELASTICSEARCH_INDEX_MAPPING self.config.pipeline_index_name, PIPELINE_ELASTICSEARCH_INDEX_MAPPING
) )
if self.config.index_dbt_models:
self._check_or_create_index(
self.config.dbt_index_name, DBT_ELASTICSEARCH_INDEX_MAPPING
)
def _check_or_create_index(self, index_name: str, es_mapping: str): def _check_or_create_index(self, index_name: str, es_mapping: str):
""" """
@ -226,7 +220,7 @@ class ElasticsearchSink(Sink[Entity]):
column_descriptions = [] column_descriptions = []
tags = set() tags = set()
timestamp = time.time() timestamp = int(table.updatedAt.__root__.timestamp())
tier = None tier = None
for table_tag in table.tags: for table_tag in table.tags:
if "Tier" in table_tag.tagFQN: if "Tier" in table_tag.tagFQN:
@ -289,7 +283,7 @@ class ElasticsearchSink(Sink[Entity]):
{"input": [topic_name], "weight": 10}, {"input": [topic_name], "weight": 10},
] ]
tags = set() tags = set()
timestamp = time.time() timestamp = topic.updatedAt.__root__.timestamp()
service_entity = self.metadata.get_by_id( service_entity = self.metadata.get_by_id(
entity=MessagingService, entity_id=str(topic.service.id.__root__) entity=MessagingService, entity_id=str(topic.service.id.__root__)
) )
@ -321,7 +315,6 @@ class ElasticsearchSink(Sink[Entity]):
followers=topic_followers, followers=topic_followers,
change_descriptions=change_descriptions, change_descriptions=change_descriptions,
) )
print(topic_doc.json())
return topic_doc return topic_doc
def _create_dashboard_es_doc(self, dashboard: Dashboard): def _create_dashboard_es_doc(self, dashboard: Dashboard):
@ -329,7 +322,7 @@ class ElasticsearchSink(Sink[Entity]):
dashboard_name = dashboard.name dashboard_name = dashboard.name
suggest = [{"input": [dashboard.displayName], "weight": 10}] suggest = [{"input": [dashboard.displayName], "weight": 10}]
tags = set() tags = set()
timestamp = time.time() timestamp = dashboard.updatedAt.__root__.timestamp()
service_entity = self.metadata.get_by_id( service_entity = self.metadata.get_by_id(
entity=DashboardService, entity_id=str(dashboard.service.id.__root__) entity=DashboardService, entity_id=str(dashboard.service.id.__root__)
) )
@ -390,7 +383,7 @@ class ElasticsearchSink(Sink[Entity]):
fqdn = pipeline.fullyQualifiedName fqdn = pipeline.fullyQualifiedName
suggest = [{"input": [pipeline.displayName], "weight": 10}] suggest = [{"input": [pipeline.displayName], "weight": 10}]
tags = set() tags = set()
timestamp = time.time() timestamp = pipeline.updatedAt.__root__.timestamp()
service_entity = self.metadata.get_by_id( service_entity = self.metadata.get_by_id(
entity=PipelineService, entity_id=str(pipeline.service.id.__root__) entity=PipelineService, entity_id=str(pipeline.service.id.__root__)
) )

View File

@ -87,9 +87,30 @@ TABLE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"type": "long" "type": "long"
}, },
"change_descriptions": { "change_descriptions": {
"type": "nested" "type": "nested",
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "object",
"enabled": false
},
"fieldsDeleted": {
"type": "object",
"enabled": false
},
"fieldsUpdated": {
"type": "object",
"enabled": false
}
} }
} }
}
}
} }
} }
""" """
@ -144,9 +165,30 @@ TOPIC_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"type": "completion" "type": "completion"
}, },
"change_descriptions": { "change_descriptions": {
"type": "nested" "type": "nested",
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "object",
"enabled": false
},
"fieldsDeleted": {
"type": "object",
"enabled": false
},
"fieldsUpdated": {
"type": "object",
"enabled": false
}
} }
} }
}
}
} }
} }
""" """
@ -225,9 +267,30 @@ DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"type": "long" "type": "long"
}, },
"change_descriptions": { "change_descriptions": {
"type": "nested" "type": "nested",
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "object",
"enabled": false
},
"fieldsDeleted": {
"type": "object",
"enabled": false
},
"fieldsUpdated": {
"type": "object",
"enabled": false
}
} }
} }
}
}
} }
} }
""" """
@ -288,77 +351,30 @@ PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"type": "completion" "type": "completion"
}, },
"change_descriptions": { "change_descriptions": {
"type": "nested" "type": "nested",
"properties": {
"updatedAt": {
"type": "long"
},
"updatedBy": {
"type": "text"
},
"fieldsAdded": {
"type": "object",
"enabled": false
},
"fieldsDeleted": {
"type": "object",
"enabled": false
},
"fieldsUpdated": {
"type": "object",
"enabled": false
}
} }
} }
} }
} }
"""
)
DBT_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
"""
{
"mappings":{
"properties": {
"name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"owner": {
"type": "text"
},
"followers": {
"type": "keyword"
},
"fqdn": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"tier": {
"type": "keyword"
},
"column_names": {
"type":"text"
},
"column_descriptions": {
"type": "text"
},
"tags": {
"type": "keyword"
},
"service": {
"type": "keyword"
},
"service_type": {
"type": "keyword"
},
"service_category": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"database": {
"type": "text"
},
"suggest": {
"type": "completion"
},
"change_descriptions": {
"type": "nested"
}
}
}
} }
""" """
) )