diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index ef9f7cfbbc3..b4145896126 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -490,7 +490,7 @@ class ElasticsearchSink(Sink): if hasattr(dbt_model.dbtNodeType, "name"): dbt_node_type = dbt_model.dbtNodeType.name change_descriptions = self._get_change_descriptions( - Dashboard, dbt_model.id.__root__ + DbtModel, dbt_model.id.__root__ ) dbt_model_doc = DbtModelESDocument( dbt_model_id=str(dbt_model.id.__root__), @@ -555,26 +555,29 @@ class ElasticsearchSink(Sink): ) def _get_change_descriptions(self, entity_type, entity_id): - entity_versions = self.metadata.list_versions(entity_id, entity_type) - change_descriptions = [] - for version in entity_versions.versions: - version_json = json.loads(version) - updatedAt = parser.parse(version_json["updatedAt"]) - change_description = ChangeDescription( - updatedBy=version_json["updatedBy"], updatedAt=updatedAt.timestamp() - ) - if "changeDescription" in version_json: - change_description.fieldsAdded = version_json["changeDescription"][ - "fieldsAdded" - ] - change_description.fieldsDeleted = version_json["changeDescription"][ - "fieldsDeleted" - ] - change_description.fieldsUpdated = version_json["changeDescription"][ - "fieldsUpdated" - ] - change_descriptions.append(change_description) - return change_descriptions + try: + entity_versions = self.metadata.list_versions(entity_id, entity_type) + change_descriptions = [] + for version in entity_versions.versions: + version_json = json.loads(version) + updatedAt = parser.parse(version_json["updatedAt"]) + change_description = ChangeDescription( + updatedBy=version_json["updatedBy"], updatedAt=updatedAt.timestamp() + ) + if "changeDescription" in version_json: + change_description.fieldsAdded = version_json["changeDescription"][ + "fieldsAdded" + ] + change_description.fieldsDeleted = version_json[ + "changeDescription" + ]["fieldsDeleted"] + change_description.fieldsUpdated = version_json[ + "changeDescription" + ]["fieldsUpdated"] + change_descriptions.append(change_description) + return change_descriptions + except Exception as err: + logger.error(repr(err)) def get_status(self): return self.status