diff --git a/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py b/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py index 66bf5524c2..5fc9ed8ec5 100644 --- a/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py +++ b/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Callable, Iterable, Optional, Union, cast +from typing import Callable, Iterable, Optional, Union import datahub.emitter.mce_builder as builder from datahub.emitter.kafka_emitter import DatahubKafkaEmitter @@ -81,10 +81,4 @@ class CorpGroup: :param callback: The callback method for KafkaEmitter if it is used """ for mcp in self.generate_mcp(): - if type(emitter).__name__ == "DatahubKafkaEmitter": - assert callback is not None - kafka_emitter = cast("DatahubKafkaEmitter", emitter) - kafka_emitter.emit(mcp, callback) - else: - rest_emitter = cast("DatahubRestEmitter", emitter) - rest_emitter.emit(mcp) + emitter.emit(mcp, callback) diff --git a/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py index 00fe35ded5..1f4c1d6fdc 100644 --- a/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py +++ b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Callable, Iterable, List, Optional, Union, cast +from typing import Callable, Iterable, List, Optional, Union import datahub.emitter.mce_builder as builder from datahub.emitter.kafka_emitter import DatahubKafkaEmitter @@ -100,10 +100,4 @@ class CorpUser: :param callback: The callback method for KafkaEmitter if it is used """ for mcp in self.generate_mcp(): - if type(emitter).__name__ == "DatahubKafkaEmitter": - assert callback is not None - kafka_emitter = cast("DatahubKafkaEmitter", emitter) - kafka_emitter.emit(mcp, callback) - else: - rest_emitter = cast("DatahubRestEmitter", emitter) - rest_emitter.emit(mcp) + emitter.emit(mcp, callback) diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py index 0a1354aa20..8c4e2da92f 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py @@ -135,10 +135,6 @@ class DataFlow: :param emitter: Datahub Emitter to emit the process event :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ - from datahub.emitter.kafka_emitter import DatahubKafkaEmitter for mcp in self.generate_mcp(): - if isinstance(emitter, DatahubKafkaEmitter): - emitter.emit(mcp, callback) - else: - emitter.emit(mcp) + emitter.emit(mcp, callback) diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index 68a876138c..8335b0f662 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -178,13 +178,9 @@ class DataJob: :param emitter: Datahub Emitter to emit the process event :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ - from datahub.emitter.kafka_emitter import DatahubKafkaEmitter for mcp in self.generate_mcp(): - if isinstance(emitter, DatahubKafkaEmitter): - emitter.emit(mcp, callback) - else: - emitter.emit(mcp) + emitter.emit(mcp, callback) def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: mcp = MetadataChangeProposalWrapper( diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index 09484d2461..5dde5fd1ff 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -1,7 +1,7 @@ import time from dataclasses import dataclass, field from enum import Enum -from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Union, cast +from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Union from datahub.api.entities.datajob import DataFlow, DataJob from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -282,13 +282,7 @@ class DataProcessInstance: :param emitter: (Union[DatahubRestEmitter, DatahubKafkaEmitter]) the datahub emitter to emit generated mcps :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ - if type(emitter).__name__ == "DatahubKafkaEmitter": - assert callback is not None - kafka_emitter = cast("DatahubKafkaEmitter", emitter) - kafka_emitter.emit(mcp, callback) - else: - rest_emitter = cast("DatahubRestEmitter", emitter) - rest_emitter.emit(mcp) + emitter.emit(mcp, callback) def emit( self, diff --git a/metadata-ingestion/src/datahub/emitter/kafka_emitter.py b/metadata-ingestion/src/datahub/emitter/kafka_emitter.py index 75d07e16f6..dff659b186 100644 --- a/metadata-ingestion/src/datahub/emitter/kafka_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/kafka_emitter.py @@ -116,9 +116,9 @@ class DatahubKafkaEmitter: callback: Optional[Callable[[Exception, str], None]] = None, ) -> None: if isinstance(item, (MetadataChangeProposal, MetadataChangeProposalWrapper)): - return self.emit_mcp_async(item, callback or _noop_callback) + return self.emit_mcp_async(item, callback or _error_reporting_callback) else: - return self.emit_mce_async(item, callback or _noop_callback) + return self.emit_mce_async(item, callback or _error_reporting_callback) def emit_mce_async( self, @@ -155,5 +155,6 @@ class DatahubKafkaEmitter: producer.flush() -def _noop_callback(err: Exception, msg: str) -> None: - pass +def _error_reporting_callback(err: Exception, msg: str) -> None: + if err: + logger.error(f"Failed to emit to kafka: {err} {msg}") diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 00eee29e21..0ad5eb3d76 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -4,7 +4,7 @@ import json import logging import os from json.decoder import JSONDecodeError -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import requests from requests.adapters import HTTPAdapter, Retry @@ -175,15 +175,29 @@ class DataHubRestEmitter(Closeable): MetadataChangeProposalWrapper, UsageAggregation, ], + # NOTE: This signature should have the exception be optional rather than + # required. However, this would be a breaking change that may need + # more careful consideration. + callback: Optional[Callable[[Exception, str], None]] = None, ) -> Tuple[datetime.datetime, datetime.datetime]: start_time = datetime.datetime.now() - if isinstance(item, UsageAggregation): - self.emit_usage(item) - elif isinstance(item, (MetadataChangeProposal, MetadataChangeProposalWrapper)): - self.emit_mcp(item) + try: + if isinstance(item, UsageAggregation): + self.emit_usage(item) + elif isinstance( + item, (MetadataChangeProposal, MetadataChangeProposalWrapper) + ): + self.emit_mcp(item) + else: + self.emit_mce(item) + except Exception as e: + if callback: + callback(e, str(e)) + raise else: - self.emit_mce(item) - return start_time, datetime.datetime.now() + if callback: + callback(None, "success") # type: ignore + return start_time, datetime.datetime.now() def emit_mce(self, mce: MetadataChangeEvent) -> None: url = f"{self._gms_server}/entities?action=ingest"