diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 7587b6816d3..cdd9d927b19 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -225,7 +225,7 @@ class ElasticsearchSink(Sink[Entity]): column_descriptions = [] tags = set() - timestamp = epoch_ms(table.updatedAt.__root__) + timestamp = table.updatedAt.__root__ tier = None for table_tag in table.tags: if "Tier" in table_tag.tagFQN.__root__: @@ -289,7 +289,7 @@ class ElasticsearchSink(Sink[Entity]): {"input": [topic_name], "weight": 10}, ] tags = set() - timestamp = epoch_ms(topic.updatedAt.__root__) + timestamp = topic.updatedAt.__root__ service_entity = self.metadata.get_by_id( entity=MessagingService, entity_id=str(topic.service.id.__root__) ) @@ -329,7 +329,7 @@ class ElasticsearchSink(Sink[Entity]): dashboard_name = dashboard.name suggest = [{"input": [dashboard.displayName], "weight": 10}] tags = set() - timestamp = epoch_ms(dashboard.updatedAt.__root__) + timestamp = dashboard.updatedAt.__root__ service_entity = self.metadata.get_by_id( entity=DashboardService, entity_id=str(dashboard.service.id.__root__) ) @@ -391,7 +391,7 @@ class ElasticsearchSink(Sink[Entity]): fqdn = pipeline.fullyQualifiedName suggest = [{"input": [pipeline.displayName], "weight": 10}] tags = set() - timestamp = epoch_ms(pipeline.updatedAt.__root__) + timestamp = pipeline.updatedAt.__root__ service_entity = self.metadata.get_by_id( entity=PipelineService, entity_id=str(pipeline.service.id.__root__) ) @@ -488,9 +488,9 @@ class ElasticsearchSink(Sink[Entity]): change_descriptions = [] for version in entity_versions.versions: version_json = json.loads(version) - updatedAt = parser.parse(version_json["updatedAt"]) + updatedAt = version_json["updatedAt"] change_description = ChangeDescription( - updatedBy=version_json["updatedBy"], updatedAt=epoch_ms(updatedAt) + updatedBy=version_json["updatedBy"], updatedAt=updatedAt ) if "changeDescription" in version_json: change_description.fieldsAdded = version_json["changeDescription"][