From f4e6d08812e3e28f45a62c9684da5996b77a2984 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 21 Apr 2022 21:54:30 +0530 Subject: [PATCH] Refactor kafka connection (#4326) --- ingestion/examples/workflows/kafka.json | 17 ++++-- .../src/metadata/ingestion/source/kafka.py | 55 +++++++++++-------- ingestion/src/metadata/utils/filters.py | 13 +++++ 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/ingestion/examples/workflows/kafka.json b/ingestion/examples/workflows/kafka.json index 07d38d3da2e..7c8e26f6f9b 100644 --- a/ingestion/examples/workflows/kafka.json +++ b/ingestion/examples/workflows/kafka.json @@ -1,12 +1,17 @@ { "source": { "type": "kafka", - "config": { - "service_name": "local_kafka", - "bootstrap_servers": "localhost:9092", - "schema_registry_url": "http://192.168.1.43:8081", - "filter_pattern": { - "excludes": ["_confluent.*"] + "serviceName": "local_kafka", + "serviceConnection": { + "config": { + "type": "Kafka", + "bootstrapServers": "localhost:9092", + "schemaRegistryURL": "http://192.168.1.43:8081" + } + }, + "sourceConfig": { + "config": { + "topicFilterPattern": {} } } }, diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py index 3c62daa9197..eebe3d69bb2 100644 --- a/ingestion/src/metadata/ingestion/source/kafka.py +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -9,8 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# This import verifies that the dependencies are available. - import concurrent.futures from dataclasses import dataclass, field from typing import Iterable, List, Optional @@ -22,18 +20,26 @@ from confluent_kafka.schema_registry.schema_registry_client import ( SchemaRegistryClient, ) -from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.entity.data.topic import SchemaType + +# This import verifies that the dependencies are available. +from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( + KafkaConnection, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) from metadata.generated.schema.entity.services.messagingService import ( MessagingServiceType, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import IncludeFilterPattern, logger -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.common import logger +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.utils.filters import filter_by_topic from metadata.utils.helpers import get_messaging_service_or_create @@ -49,50 +55,51 @@ class KafkaSourceStatus(SourceStatus): self.filtered.append(topic) -class KafkaSourceConfig(ConfigModel): - bootstrap_servers: str = "localhost:9092" - schema_registry_url: str = "http://localhost:8081" - consumer_config: dict = {} - service_name: str - service_type: str = MessagingServiceType.Kafka.value - filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() - - @dataclass class KafkaSource(Source[CreateTopicRequest]): - config: KafkaSourceConfig + config: WorkflowSource admin_client: AdminClient report: KafkaSourceStatus def __init__( self, - config: KafkaSourceConfig, + config: WorkflowSource, metadata_config: OpenMetadataConnection, ): super().__init__() self.config = config + self.source_config = self.config.sourceConfig.config + self.service_connection = self.config.serviceConnection.__root__.config self.metadata_config = metadata_config self.status = KafkaSourceStatus() + self.kafka_service_config = { + "bootstrapServers": self.service_connection.bootstrapServers, + "schemaRegistryURL": self.service_connection.schemaRegistryURL, + } self.service = get_messaging_service_or_create( - service_name=config.service_name, + service_name=config.serviceName, message_service_type=MessagingServiceType.Kafka.name, - schema_registry_url=config.schema_registry_url, - brokers=config.bootstrap_servers.split(","), + config=self.kafka_service_config, metadata_config=metadata_config, ) self.schema_registry_client = SchemaRegistryClient( - {"url": self.config.schema_registry_url} + {"url": self.service_connection.schemaRegistryURL} ) self.admin_client = AdminClient( { - "bootstrap.servers": self.config.bootstrap_servers, + "bootstrap.servers": self.service_connection.bootstrapServers, "session.timeout.ms": 6000, } ) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): - config = KafkaSourceConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: KafkaConnection = config.serviceConnection.__root__.config + if not isinstance(connection, KafkaConnection): + raise InvalidSourceException( + f"Expected KafkaConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): @@ -102,7 +109,7 @@ class KafkaSource(Source[CreateTopicRequest]): topics_dict = self.admin_client.list_topics().topics for topic_name, topic_metadata in topics_dict.items(): try: - if self.config.filter_pattern.included(topic_name): + if filter_by_topic(self.source_config.topicFilterPattern, topic_name): logger.info("Fetching topic schema {}".format(topic_name)) topic_schema = self._parse_topic_metadata(topic_name) logger.info("Fetching topic config {}".format(topic_name)) @@ -161,7 +168,7 @@ class KafkaSource(Source[CreateTopicRequest]): self.status.dropped(topic_name) except Exception as err: logger.error(repr(err)) - self.status.failure(topic_name) + self.status.failure(topic_name, repr(err)) def _parse_topic_metadata(self, topic: str) -> Optional[Schema]: logger.debug(f"topic = {topic}") diff --git a/ingestion/src/metadata/utils/filters.py b/ingestion/src/metadata/utils/filters.py index f547e35d769..d825648ee54 100644 --- a/ingestion/src/metadata/utils/filters.py +++ b/ingestion/src/metadata/utils/filters.py @@ -121,6 +121,19 @@ def filter_by_chart( return _filter(chart_filter_pattern, chart_name) +def filter_by_topic(topic_filter_pattern: Optional[FilterPattern], topic: str) -> bool: + """ + Return True if the topic needs to be filtered, False otherwise + + Include takes precedence over exclude + + :param topic_filter_pattern: Model defining chart filtering logic + :param topic_name: topic name + :return: True for filtering, False otherwise + """ + return _filter(topic_filter_pattern, topic) + + def filter_by_dashboard( dashboard_filter_pattern: Optional[FilterPattern], dashboard_name: str ) -> bool: