Refactor kafka connection (#4326)

This commit is contained in:
Ayush Shah 2022-04-21 21:54:30 +05:30 committed by GitHub
parent 2444d3de3d
commit f4e6d08812
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 30 deletions

View File

@ -1,12 +1,17 @@
{ {
"source": { "source": {
"type": "kafka", "type": "kafka",
"config": { "serviceName": "local_kafka",
"service_name": "local_kafka", "serviceConnection": {
"bootstrap_servers": "localhost:9092", "config": {
"schema_registry_url": "http://192.168.1.43:8081", "type": "Kafka",
"filter_pattern": { "bootstrapServers": "localhost:9092",
"excludes": ["_confluent.*"] "schemaRegistryURL": "http://192.168.1.43:8081"
}
},
"sourceConfig": {
"config": {
"topicFilterPattern": {}
} }
} }
}, },

View File

@ -9,8 +9,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# This import verifies that the dependencies are available.
import concurrent.futures import concurrent.futures
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Iterable, List, Optional from typing import Iterable, List, Optional
@ -22,18 +20,26 @@ from confluent_kafka.schema_registry.schema_registry_client import (
SchemaRegistryClient, SchemaRegistryClient,
) )
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import SchemaType from metadata.generated.schema.entity.data.topic import SchemaType
# This import verifies that the dependencies are available.
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection, OpenMetadataConnection,
) )
from metadata.generated.schema.entity.services.messagingService import ( from metadata.generated.schema.entity.services.messagingService import (
MessagingServiceType, MessagingServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import IncludeFilterPattern, logger from metadata.ingestion.api.common import logger
from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.utils.filters import filter_by_topic
from metadata.utils.helpers import get_messaging_service_or_create from metadata.utils.helpers import get_messaging_service_or_create
@ -49,50 +55,51 @@ class KafkaSourceStatus(SourceStatus):
self.filtered.append(topic) self.filtered.append(topic)
class KafkaSourceConfig(ConfigModel):
bootstrap_servers: str = "localhost:9092"
schema_registry_url: str = "http://localhost:8081"
consumer_config: dict = {}
service_name: str
service_type: str = MessagingServiceType.Kafka.value
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
@dataclass @dataclass
class KafkaSource(Source[CreateTopicRequest]): class KafkaSource(Source[CreateTopicRequest]):
config: KafkaSourceConfig config: WorkflowSource
admin_client: AdminClient admin_client: AdminClient
report: KafkaSourceStatus report: KafkaSourceStatus
def __init__( def __init__(
self, self,
config: KafkaSourceConfig, config: WorkflowSource,
metadata_config: OpenMetadataConnection, metadata_config: OpenMetadataConnection,
): ):
super().__init__() super().__init__()
self.config = config self.config = config
self.source_config = self.config.sourceConfig.config
self.service_connection = self.config.serviceConnection.__root__.config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.status = KafkaSourceStatus() self.status = KafkaSourceStatus()
self.kafka_service_config = {
"bootstrapServers": self.service_connection.bootstrapServers,
"schemaRegistryURL": self.service_connection.schemaRegistryURL,
}
self.service = get_messaging_service_or_create( self.service = get_messaging_service_or_create(
service_name=config.service_name, service_name=config.serviceName,
message_service_type=MessagingServiceType.Kafka.name, message_service_type=MessagingServiceType.Kafka.name,
schema_registry_url=config.schema_registry_url, config=self.kafka_service_config,
brokers=config.bootstrap_servers.split(","),
metadata_config=metadata_config, metadata_config=metadata_config,
) )
self.schema_registry_client = SchemaRegistryClient( self.schema_registry_client = SchemaRegistryClient(
{"url": self.config.schema_registry_url} {"url": self.service_connection.schemaRegistryURL}
) )
self.admin_client = AdminClient( self.admin_client = AdminClient(
{ {
"bootstrap.servers": self.config.bootstrap_servers, "bootstrap.servers": self.service_connection.bootstrapServers,
"session.timeout.ms": 6000, "session.timeout.ms": 6000,
} }
) )
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection): def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config = KafkaSourceConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: KafkaConnection = config.serviceConnection.__root__.config
if not isinstance(connection, KafkaConnection):
raise InvalidSourceException(
f"Expected KafkaConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
@ -102,7 +109,7 @@ class KafkaSource(Source[CreateTopicRequest]):
topics_dict = self.admin_client.list_topics().topics topics_dict = self.admin_client.list_topics().topics
for topic_name, topic_metadata in topics_dict.items(): for topic_name, topic_metadata in topics_dict.items():
try: try:
if self.config.filter_pattern.included(topic_name): if filter_by_topic(self.source_config.topicFilterPattern, topic_name):
logger.info("Fetching topic schema {}".format(topic_name)) logger.info("Fetching topic schema {}".format(topic_name))
topic_schema = self._parse_topic_metadata(topic_name) topic_schema = self._parse_topic_metadata(topic_name)
logger.info("Fetching topic config {}".format(topic_name)) logger.info("Fetching topic config {}".format(topic_name))
@ -161,7 +168,7 @@ class KafkaSource(Source[CreateTopicRequest]):
self.status.dropped(topic_name) self.status.dropped(topic_name)
except Exception as err: except Exception as err:
logger.error(repr(err)) logger.error(repr(err))
self.status.failure(topic_name) self.status.failure(topic_name, repr(err))
def _parse_topic_metadata(self, topic: str) -> Optional[Schema]: def _parse_topic_metadata(self, topic: str) -> Optional[Schema]:
logger.debug(f"topic = {topic}") logger.debug(f"topic = {topic}")

View File

@ -121,6 +121,19 @@ def filter_by_chart(
return _filter(chart_filter_pattern, chart_name) return _filter(chart_filter_pattern, chart_name)
def filter_by_topic(topic_filter_pattern: Optional[FilterPattern], topic: str) -> bool:
"""
Return True if the topic needs to be filtered, False otherwise
Include takes precedence over exclude
:param topic_filter_pattern: Model defining chart filtering logic
:param topic_name: topic name
:return: True for filtering, False otherwise
"""
return _filter(topic_filter_pattern, topic)
def filter_by_dashboard( def filter_by_dashboard(
dashboard_filter_pattern: Optional[FilterPattern], dashboard_name: str dashboard_filter_pattern: Optional[FilterPattern], dashboard_name: str
) -> bool: ) -> bool: