fix(ingest): unify emit interface (#6592)

This commit is contained in:
Harshal Sheth 2022-12-01 17:02:50 -05:00 committed by GitHub
parent 6fe9ad4fbb
commit d6dd8ccc51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 34 additions and 45 deletions

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Callable, Iterable, Optional, Union, cast from typing import Callable, Iterable, Optional, Union
import datahub.emitter.mce_builder as builder import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
@ -81,10 +81,4 @@ class CorpGroup:
:param callback: The callback method for KafkaEmitter if it is used :param callback: The callback method for KafkaEmitter if it is used
""" """
for mcp in self.generate_mcp(): for mcp in self.generate_mcp():
if type(emitter).__name__ == "DatahubKafkaEmitter": emitter.emit(mcp, callback)
assert callback is not None
kafka_emitter = cast("DatahubKafkaEmitter", emitter)
kafka_emitter.emit(mcp, callback)
else:
rest_emitter = cast("DatahubRestEmitter", emitter)
rest_emitter.emit(mcp)

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Callable, Iterable, List, Optional, Union, cast from typing import Callable, Iterable, List, Optional, Union
import datahub.emitter.mce_builder as builder import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
@ -100,10 +100,4 @@ class CorpUser:
:param callback: The callback method for KafkaEmitter if it is used :param callback: The callback method for KafkaEmitter if it is used
""" """
for mcp in self.generate_mcp(): for mcp in self.generate_mcp():
if type(emitter).__name__ == "DatahubKafkaEmitter": emitter.emit(mcp, callback)
assert callback is not None
kafka_emitter = cast("DatahubKafkaEmitter", emitter)
kafka_emitter.emit(mcp, callback)
else:
rest_emitter = cast("DatahubRestEmitter", emitter)
rest_emitter.emit(mcp)

View File

@ -135,10 +135,6 @@ class DataFlow:
:param emitter: Datahub Emitter to emit the process event :param emitter: Datahub Emitter to emit the process event
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
""" """
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
for mcp in self.generate_mcp(): for mcp in self.generate_mcp():
if isinstance(emitter, DatahubKafkaEmitter): emitter.emit(mcp, callback)
emitter.emit(mcp, callback)
else:
emitter.emit(mcp)

View File

@ -178,13 +178,9 @@ class DataJob:
:param emitter: Datahub Emitter to emit the process event :param emitter: Datahub Emitter to emit the process event
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
""" """
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
for mcp in self.generate_mcp(): for mcp in self.generate_mcp():
if isinstance(emitter, DatahubKafkaEmitter): emitter.emit(mcp, callback)
emitter.emit(mcp, callback)
else:
emitter.emit(mcp)
def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper( mcp = MetadataChangeProposalWrapper(

View File

@ -1,7 +1,7 @@
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum from enum import Enum
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Union, cast from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Union
from datahub.api.entities.datajob import DataFlow, DataJob from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -282,13 +282,7 @@ class DataProcessInstance:
:param emitter: (Union[DatahubRestEmitter, DatahubKafkaEmitter]) the datahub emitter to emit generated mcps :param emitter: (Union[DatahubRestEmitter, DatahubKafkaEmitter]) the datahub emitter to emit generated mcps
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
""" """
if type(emitter).__name__ == "DatahubKafkaEmitter": emitter.emit(mcp, callback)
assert callback is not None
kafka_emitter = cast("DatahubKafkaEmitter", emitter)
kafka_emitter.emit(mcp, callback)
else:
rest_emitter = cast("DatahubRestEmitter", emitter)
rest_emitter.emit(mcp)
def emit( def emit(
self, self,

View File

@ -116,9 +116,9 @@ class DatahubKafkaEmitter:
callback: Optional[Callable[[Exception, str], None]] = None, callback: Optional[Callable[[Exception, str], None]] = None,
) -> None: ) -> None:
if isinstance(item, (MetadataChangeProposal, MetadataChangeProposalWrapper)): if isinstance(item, (MetadataChangeProposal, MetadataChangeProposalWrapper)):
return self.emit_mcp_async(item, callback or _noop_callback) return self.emit_mcp_async(item, callback or _error_reporting_callback)
else: else:
return self.emit_mce_async(item, callback or _noop_callback) return self.emit_mce_async(item, callback or _error_reporting_callback)
def emit_mce_async( def emit_mce_async(
self, self,
@ -155,5 +155,6 @@ class DatahubKafkaEmitter:
producer.flush() producer.flush()
def _noop_callback(err: Exception, msg: str) -> None: def _error_reporting_callback(err: Exception, msg: str) -> None:
pass if err:
logger.error(f"Failed to emit to kafka: {err} {msg}")

View File

@ -4,7 +4,7 @@ import json
import logging import logging
import os import os
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import requests import requests
from requests.adapters import HTTPAdapter, Retry from requests.adapters import HTTPAdapter, Retry
@ -175,15 +175,29 @@ class DataHubRestEmitter(Closeable):
MetadataChangeProposalWrapper, MetadataChangeProposalWrapper,
UsageAggregation, UsageAggregation,
], ],
# NOTE: This signature should have the exception be optional rather than
# required. However, this would be a breaking change that may need
# more careful consideration.
callback: Optional[Callable[[Exception, str], None]] = None,
) -> Tuple[datetime.datetime, datetime.datetime]: ) -> Tuple[datetime.datetime, datetime.datetime]:
start_time = datetime.datetime.now() start_time = datetime.datetime.now()
if isinstance(item, UsageAggregation): try:
self.emit_usage(item) if isinstance(item, UsageAggregation):
elif isinstance(item, (MetadataChangeProposal, MetadataChangeProposalWrapper)): self.emit_usage(item)
self.emit_mcp(item) elif isinstance(
item, (MetadataChangeProposal, MetadataChangeProposalWrapper)
):
self.emit_mcp(item)
else:
self.emit_mce(item)
except Exception as e:
if callback:
callback(e, str(e))
raise
else: else:
self.emit_mce(item) if callback:
return start_time, datetime.datetime.now() callback(None, "success") # type: ignore
return start_time, datetime.datetime.now()
def emit_mce(self, mce: MetadataChangeEvent) -> None: def emit_mce(self, mce: MetadataChangeEvent) -> None:
url = f"{self._gms_server}/entities?action=ingest" url = f"{self._gms_server}/entities?action=ingest"