diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json index dd38a1b17c2..17a32a4be25 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json @@ -44,6 +44,11 @@ "$ref": "../../entity/data/topic.json#/definitions/cleanupPolicy" } }, + "sampleData": { + "description": "Sample data for a topic.", + "$ref": "../../entity/data/topic.json#/definitions/topicSampleData", + "default": null + }, "replicationFactor": { "description": "Replication Factor in integer (more than 1).", "type": "integer" diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka.py b/ingestion/src/metadata/ingestion/source/messaging/kafka.py index 875e00398ae..d06b2164aa4 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka.py @@ -12,20 +12,15 @@ import concurrent.futures import sys import traceback -import uuid -from dataclasses import dataclass, field -from typing import Iterable, List, Optional +from typing import Any, Iterable, Optional import confluent_kafka from confluent_kafka.admin import AdminClient, ConfigResource from confluent_kafka.schema_registry.schema_registry_client import Schema +from pydantic import BaseModel from metadata.generated.schema.api.data.createTopic import CreateTopicRequest -from metadata.generated.schema.entity.data.topic import ( - SchemaType, - Topic, - TopicSampleData, -) +from metadata.generated.schema.entity.data.topic import SchemaType, TopicSampleData # This import verifies that the dependencies are available. from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( @@ -34,57 +29,32 @@ from metadata.generated.schema.entity.services.connections.messaging.kafkaConnec from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.generated.schema.entity.services.messagingService import ( - MessagingService, - MessagingServiceType, -) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import logger -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.connection_clients import KafkaClient -from metadata.utils.connections import get_connection, test_connection -from metadata.utils.filters import filter_by_topic +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.messaging.messaging_service import MessagingServiceSource -@dataclass -class KafkaSourceStatus(SourceStatus): - topics_scanned: List[str] = field(default_factory=list) - filtered: List[str] = field(default_factory=list) +class KafkaTopicDetails(BaseModel): + """ + Wrapper Class to combine the topic_name with topic_metadata + """ - def topic_scanned(self, topic: str) -> None: - self.topics_scanned.append(topic) - - def dropped(self, topic: str) -> None: - self.filtered.append(topic) + topic_name: str + topic_metadata: Any -@dataclass -class KafkaSource(Source[CreateTopicRequest]): - config: WorkflowSource - admin_client: AdminClient - report: KafkaSourceStatus - +class KafkaSource(MessagingServiceSource): def __init__( self, config: WorkflowSource, metadata_config: OpenMetadataConnection, ): - super().__init__() - self.config = config - self.source_config = self.config.sourceConfig.config - self.service_connection = self.config.serviceConnection.__root__.config - self.metadata_config = metadata_config - self.metadata = OpenMetadata(self.metadata_config) - self.status = KafkaSourceStatus() - self.service = self.metadata.get_service_or_create( - entity=MessagingService, config=config - ) + super().__init__(config, metadata_config) self.generate_sample_data = self.config.sourceConfig.config.generateSampleData - self.connection: KafkaClient = get_connection(self.service_connection) self.admin_client = self.connection.admin_client self.schema_registry_client = self.connection.schema_registry_client if self.generate_sample_data: @@ -100,90 +70,81 @@ class KafkaSource(Source[CreateTopicRequest]): ) return cls(config, metadata_config) - def prepare(self): - pass - - def next_record(self) -> Iterable[Topic]: + def get_topic_list(self) -> Iterable[KafkaTopicDetails]: topics_dict = self.admin_client.list_topics().topics for topic_name, topic_metadata in topics_dict.items(): - try: - if not filter_by_topic( - self.source_config.topicFilterPattern, topic_name - ): - logger.info("Fetching topic schema {}".format(topic_name)) - topic_schema = self._parse_topic_metadata(topic_name) - logger.info("Fetching topic config {}".format(topic_name)) - topic = Topic( - id=uuid.uuid4(), - name=topic_name, - service=EntityReference( - id=self.service.id, type="messagingService" - ), - partitions=len(topic_metadata.partitions), - replicationFactor=len( - topic_metadata.partitions.get(0).replicas - ), - ) - topic_configResource = self.admin_client.describe_configs( - [ - ConfigResource( - confluent_kafka.admin.RESOURCE_TOPIC, topic_name - ) - ] - ) - for j in concurrent.futures.as_completed( - iter(topic_configResource.values()) - ): - config_response = j.result(timeout=10) - if "max.message.bytes" in config_response: - topic.maximumMessageSize = config_response.get( - "max.message.bytes", {} - ).value + yield KafkaTopicDetails( + topic_name=topic_name, topic_metadata=topic_metadata + ) - if "min.insync.replicas" in config_response: - topic.minimumInSyncReplicas = config_response.get( - "min.insync.replicas" - ).value + def get_topic_name(self, topic_details: KafkaTopicDetails) -> str: + """ + Get Topic Name + """ + return topic_details.topic_name - if "retention.ms" in config_response: - topic.retentionTime = config_response.get( - "retention.ms" - ).value + def yield_topic( + self, topic_details: KafkaTopicDetails + ) -> Iterable[CreateTopicRequest]: + logger.info("Fetching topic schema {}".format(topic_details.topic_name)) + topic_schema = self._parse_topic_metadata(topic_details.topic_name) + logger.info("Fetching topic config {}".format(topic_details.topic_name)) + topic = CreateTopicRequest( + name=topic_details.topic_name, + service=EntityReference( + id=self.context.messaging_service.id.__root__, type="messagingService" + ), + partitions=len(topic_details.topic_metadata.partitions), + replicationFactor=len( + topic_details.topic_metadata.partitions.get(0).replicas + ), + ) + topic_configResource = self.admin_client.describe_configs( + [ + ConfigResource( + confluent_kafka.admin.RESOURCE_TOPIC, topic_details.topic_name + ) + ] + ) + for j in concurrent.futures.as_completed(iter(topic_configResource.values())): + config_response = j.result(timeout=10) + if "max.message.bytes" in config_response: + topic.maximumMessageSize = config_response.get( + "max.message.bytes", {} + ).value - if "cleanup.policy" in config_response: - cleanup_policies = config_response.get( - "cleanup.policy" - ).value - topic.cleanupPolicies = cleanup_policies.split(",") + if "min.insync.replicas" in config_response: + topic.minimumInSyncReplicas = config_response.get( + "min.insync.replicas" + ).value - topic_config = {} - for key, conf_response in config_response.items(): - topic_config[key] = conf_response.value - topic.topicConfig = topic_config + if "retention.ms" in config_response: + topic.retentionTime = config_response.get("retention.ms").value - if topic_schema is not None: - topic.schemaText = topic_schema.schema_str - if topic_schema.schema_type == "AVRO": - topic.schemaType = SchemaType.Avro.name - if self.generate_sample_data: - pass - topic.sampleData = self._get_sample_data(topic.name) - elif topic_schema.schema_type == "PROTOBUF": - topic.schemaType = SchemaType.Protobuf.name - elif topic_schema.schema_type == "JSON": - topic.schemaType = SchemaType.JSON.name - else: - topic.schemaType = SchemaType.Other.name + if "cleanup.policy" in config_response: + cleanup_policies = config_response.get("cleanup.policy").value + topic.cleanupPolicies = cleanup_policies.split(",") - self.status.topic_scanned(topic.name.__root__) - yield topic - else: - self.status.dropped(topic_name) - except Exception as err: - logger.error(repr(err)) - logger.debug(traceback.format_exc()) - logger.debug(sys.exc_info()[2]) - self.status.failure(topic_name, repr(err)) + topic_config = {} + for key, conf_response in config_response.items(): + topic_config[key] = conf_response.value + topic.topicConfig = topic_config + + if topic_schema is not None: + topic.schemaText = topic_schema.schema_str + if topic_schema.schema_type.lower() == SchemaType.Avro.value.lower(): + topic.schemaType = SchemaType.Avro.name + if self.generate_sample_data: + topic.sampleData = self._get_sample_data(topic.name) + elif topic_schema.schema_type.lower() == SchemaType.Protobuf.name.lower(): + topic.schemaType = SchemaType.Protobuf.name + elif topic_schema.schema_type.lower() == SchemaType.JSON.name.lower(): + topic.schemaType = SchemaType.JSON.name + else: + topic.schemaType = SchemaType.Other.name + + self.status.topic_scanned(topic.name.__root__) + yield topic def _parse_topic_metadata(self, topic: str) -> Optional[Schema]: schema: Optional[Schema] = None @@ -224,23 +185,6 @@ class KafkaSource(Source[CreateTopicRequest]): self.consumer_client.unsubscribe() return TopicSampleData(messages=sample_data) - def on_assign(self, a_consumer, partitions): - # get offset tuple from the first partition - new_partitions = [] - for partition in partitions: - last_offset = a_consumer.get_watermark_offsets(partition) - offset = last_offset[1] - if offset > 0: - partition.offset = offset - 10 if offset > 10 else offset - new_partitions.append(partition) - self.consumer_client.assign(new_partitions) - - def get_status(self): - return self.status - def close(self): if self.generate_sample_data and self.consumer_client: self.consumer_client.close() - - def test_connection(self) -> None: - test_connection(self.connection) diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py new file mode 100644 index 00000000000..8230f049ed3 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -0,0 +1,186 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Base class for ingesting messaging services +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any, Iterable, List, Optional + +from confluent_kafka.admin import AdminClient, ConfigResource + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.data.createTopic import CreateTopicRequest +from metadata.generated.schema.entity.data.topic import ( + SchemaType, + Topic, + TopicSampleData, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.messagingService import ( + MessagingConnection, + MessagingService, +) +from metadata.generated.schema.metadataIngestion.messagingServiceMetadataPipeline import ( + MessagingServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.models.topology import ( + NodeStage, + ServiceTopology, + TopologyNode, + create_source_context, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.connections import get_connection, test_connection +from metadata.utils.filters import filter_by_topic + + +class MessagingServiceTopology(ServiceTopology): + """ + Defines the hierarchy in Messaging Services. + service -> messaging -> topics. + + We could have a topology validator. We can only consume + data that has been produced by any parent node. + """ + + root = TopologyNode( + producer="get_services", + stages=[ + NodeStage( + type_=MessagingService, + context="messaging_service", + processor="yield_messaging_service", + ) + ], + children=["topic"], + ) + topic = TopologyNode( + producer="get_topic", + stages=[ + NodeStage( + type_=Topic, + context="Topic", + processor="yield_topic", + consumer=["messaging_service"], + ) + ], + ) + + +@dataclass +class MessagingSourceStatus(SourceStatus): + """ + Reports the source status after ingestion + """ + + topics_scanned: List[str] = field(default_factory=list) + filtered: List[str] = field(default_factory=list) + + def topic_scanned(self, topic: str) -> None: + self.topics_scanned.append(topic) + + def dropped(self, topic: str) -> None: + self.filtered.append(topic) + + +class MessagingServiceSource(TopologyRunnerMixin, Source, ABC): + """ + Base class for Messaging Services. + It implements the topology and context. + """ + + @abstractmethod + def yield_topic(self, messaging_details: Any) -> Iterable[CreateTopicRequest]: + """ + Method to Get Messaging Entity + """ + + @abstractmethod + def get_topic_list(self) -> Optional[List[Any]]: + """ + Get List of all topics + """ + + @abstractmethod + def get_topic_name(self, topic_details: Any) -> str: + """ + Get Topic Name + """ + + status: MessagingSourceStatus + source_config: MessagingServiceMetadataPipeline + config: WorkflowSource + metadata: OpenMetadata + # Big union of types we want to fetch dynamically + service_connection: MessagingConnection.__fields__["config"].type_ + + topology = MessagingServiceTopology() + context = create_source_context(topology) + + @abstractmethod + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + super().__init__() + self.config = config + self.metadata_config = metadata_config + self.metadata = OpenMetadata(metadata_config) + self.service_connection = self.config.serviceConnection.__root__.config + self.source_config: MessagingServiceMetadataPipeline = ( + self.config.sourceConfig.config + ) + self.connection = get_connection(self.service_connection) + self.test_connection() + self.status = MessagingSourceStatus() + + def get_topic(self) -> Any: + for topic_details in self.get_topic_list(): + if filter_by_topic( + self.source_config.topicFilterPattern, + self.get_topic_name(topic_details), + ): + self.status.filter( + self.get_topic_name(topic_details), + "Topic Pattern not Allowed", + ) + continue + yield topic_details + + def yield_messaging_service(self, config: WorkflowSource): + yield self.metadata.get_create_service_from_source( + entity=MessagingService, config=config + ) + + def get_services(self) -> Iterable[WorkflowSource]: + yield self.config + + def prepare(self): + pass + + def get_status(self): + return self.status + + def test_connection(self) -> None: + test_connection(self.connection) + + def close(self): + pass