Create Messaging Topology (#6201)

Create Messaging Topology (#6201)
This commit is contained in:
Milan Bariya 2022-07-25 10:47:15 +05:30 committed by GitHub
parent 52b416ba65
commit 5e5e728ea9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 271 additions and 136 deletions

View File

@ -44,6 +44,11 @@
"$ref": "../../entity/data/topic.json#/definitions/cleanupPolicy"
}
},
"sampleData": {
"description": "Sample data for a topic.",
"$ref": "../../entity/data/topic.json#/definitions/topicSampleData",
"default": null
},
"replicationFactor": {
"description": "Replication Factor in integer (more than 1).",
"type": "integer"

View File

@ -12,20 +12,15 @@
import concurrent.futures
import sys
import traceback
import uuid
from dataclasses import dataclass, field
from typing import Iterable, List, Optional
from typing import Any, Iterable, Optional
import confluent_kafka
from confluent_kafka.admin import AdminClient, ConfigResource
from confluent_kafka.schema_registry.schema_registry_client import Schema
from pydantic import BaseModel
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import (
SchemaType,
Topic,
TopicSampleData,
)
from metadata.generated.schema.entity.data.topic import SchemaType, TopicSampleData
# This import verifies that the dependencies are available.
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
@ -34,57 +29,32 @@ from metadata.generated.schema.entity.services.connections.messaging.kafkaConnec
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.messagingService import (
MessagingService,
MessagingServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import logger
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.connection_clients import KafkaClient
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_topic
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.messaging.messaging_service import MessagingServiceSource
@dataclass
class KafkaSourceStatus(SourceStatus):
topics_scanned: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
class KafkaTopicDetails(BaseModel):
"""
Wrapper Class to combine the topic_name with topic_metadata
"""
def topic_scanned(self, topic: str) -> None:
self.topics_scanned.append(topic)
def dropped(self, topic: str) -> None:
self.filtered.append(topic)
topic_name: str
topic_metadata: Any
@dataclass
class KafkaSource(Source[CreateTopicRequest]):
config: WorkflowSource
admin_client: AdminClient
report: KafkaSourceStatus
class KafkaSource(MessagingServiceSource):
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.source_config = self.config.sourceConfig.config
self.service_connection = self.config.serviceConnection.__root__.config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(self.metadata_config)
self.status = KafkaSourceStatus()
self.service = self.metadata.get_service_or_create(
entity=MessagingService, config=config
)
super().__init__(config, metadata_config)
self.generate_sample_data = self.config.sourceConfig.config.generateSampleData
self.connection: KafkaClient = get_connection(self.service_connection)
self.admin_client = self.connection.admin_client
self.schema_registry_client = self.connection.schema_registry_client
if self.generate_sample_data:
@ -100,90 +70,81 @@ class KafkaSource(Source[CreateTopicRequest]):
)
return cls(config, metadata_config)
def prepare(self):
pass
def next_record(self) -> Iterable[Topic]:
def get_topic_list(self) -> Iterable[KafkaTopicDetails]:
topics_dict = self.admin_client.list_topics().topics
for topic_name, topic_metadata in topics_dict.items():
try:
if not filter_by_topic(
self.source_config.topicFilterPattern, topic_name
):
logger.info("Fetching topic schema {}".format(topic_name))
topic_schema = self._parse_topic_metadata(topic_name)
logger.info("Fetching topic config {}".format(topic_name))
topic = Topic(
id=uuid.uuid4(),
name=topic_name,
service=EntityReference(
id=self.service.id, type="messagingService"
),
partitions=len(topic_metadata.partitions),
replicationFactor=len(
topic_metadata.partitions.get(0).replicas
),
)
topic_configResource = self.admin_client.describe_configs(
[
ConfigResource(
confluent_kafka.admin.RESOURCE_TOPIC, topic_name
)
]
)
for j in concurrent.futures.as_completed(
iter(topic_configResource.values())
):
config_response = j.result(timeout=10)
if "max.message.bytes" in config_response:
topic.maximumMessageSize = config_response.get(
"max.message.bytes", {}
).value
yield KafkaTopicDetails(
topic_name=topic_name, topic_metadata=topic_metadata
)
if "min.insync.replicas" in config_response:
topic.minimumInSyncReplicas = config_response.get(
"min.insync.replicas"
).value
def get_topic_name(self, topic_details: KafkaTopicDetails) -> str:
"""
Get Topic Name
"""
return topic_details.topic_name
if "retention.ms" in config_response:
topic.retentionTime = config_response.get(
"retention.ms"
).value
def yield_topic(
self, topic_details: KafkaTopicDetails
) -> Iterable[CreateTopicRequest]:
logger.info("Fetching topic schema {}".format(topic_details.topic_name))
topic_schema = self._parse_topic_metadata(topic_details.topic_name)
logger.info("Fetching topic config {}".format(topic_details.topic_name))
topic = CreateTopicRequest(
name=topic_details.topic_name,
service=EntityReference(
id=self.context.messaging_service.id.__root__, type="messagingService"
),
partitions=len(topic_details.topic_metadata.partitions),
replicationFactor=len(
topic_details.topic_metadata.partitions.get(0).replicas
),
)
topic_configResource = self.admin_client.describe_configs(
[
ConfigResource(
confluent_kafka.admin.RESOURCE_TOPIC, topic_details.topic_name
)
]
)
for j in concurrent.futures.as_completed(iter(topic_configResource.values())):
config_response = j.result(timeout=10)
if "max.message.bytes" in config_response:
topic.maximumMessageSize = config_response.get(
"max.message.bytes", {}
).value
if "cleanup.policy" in config_response:
cleanup_policies = config_response.get(
"cleanup.policy"
).value
topic.cleanupPolicies = cleanup_policies.split(",")
if "min.insync.replicas" in config_response:
topic.minimumInSyncReplicas = config_response.get(
"min.insync.replicas"
).value
topic_config = {}
for key, conf_response in config_response.items():
topic_config[key] = conf_response.value
topic.topicConfig = topic_config
if "retention.ms" in config_response:
topic.retentionTime = config_response.get("retention.ms").value
if topic_schema is not None:
topic.schemaText = topic_schema.schema_str
if topic_schema.schema_type == "AVRO":
topic.schemaType = SchemaType.Avro.name
if self.generate_sample_data:
pass
topic.sampleData = self._get_sample_data(topic.name)
elif topic_schema.schema_type == "PROTOBUF":
topic.schemaType = SchemaType.Protobuf.name
elif topic_schema.schema_type == "JSON":
topic.schemaType = SchemaType.JSON.name
else:
topic.schemaType = SchemaType.Other.name
if "cleanup.policy" in config_response:
cleanup_policies = config_response.get("cleanup.policy").value
topic.cleanupPolicies = cleanup_policies.split(",")
self.status.topic_scanned(topic.name.__root__)
yield topic
else:
self.status.dropped(topic_name)
except Exception as err:
logger.error(repr(err))
logger.debug(traceback.format_exc())
logger.debug(sys.exc_info()[2])
self.status.failure(topic_name, repr(err))
topic_config = {}
for key, conf_response in config_response.items():
topic_config[key] = conf_response.value
topic.topicConfig = topic_config
if topic_schema is not None:
topic.schemaText = topic_schema.schema_str
if topic_schema.schema_type.lower() == SchemaType.Avro.value.lower():
topic.schemaType = SchemaType.Avro.name
if self.generate_sample_data:
topic.sampleData = self._get_sample_data(topic.name)
elif topic_schema.schema_type.lower() == SchemaType.Protobuf.name.lower():
topic.schemaType = SchemaType.Protobuf.name
elif topic_schema.schema_type.lower() == SchemaType.JSON.name.lower():
topic.schemaType = SchemaType.JSON.name
else:
topic.schemaType = SchemaType.Other.name
self.status.topic_scanned(topic.name.__root__)
yield topic
def _parse_topic_metadata(self, topic: str) -> Optional[Schema]:
schema: Optional[Schema] = None
@ -224,23 +185,6 @@ class KafkaSource(Source[CreateTopicRequest]):
self.consumer_client.unsubscribe()
return TopicSampleData(messages=sample_data)
def on_assign(self, a_consumer, partitions):
# get offset tuple from the first partition
new_partitions = []
for partition in partitions:
last_offset = a_consumer.get_watermark_offsets(partition)
offset = last_offset[1]
if offset > 0:
partition.offset = offset - 10 if offset > 10 else offset
new_partitions.append(partition)
self.consumer_client.assign(new_partitions)
def get_status(self):
return self.status
def close(self):
if self.generate_sample_data and self.consumer_client:
self.consumer_client.close()
def test_connection(self) -> None:
test_connection(self.connection)

View File

@ -0,0 +1,186 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Base class for ingesting messaging services
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Iterable, List, Optional
from confluent_kafka.admin import AdminClient, ConfigResource
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import (
SchemaType,
Topic,
TopicSampleData,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.messagingService import (
MessagingConnection,
MessagingService,
)
from metadata.generated.schema.metadataIngestion.messagingServiceMetadataPipeline import (
MessagingServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_topic
class MessagingServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Messaging Services.
service -> messaging -> topics.
We could have a topology validator. We can only consume
data that has been produced by any parent node.
"""
root = TopologyNode(
producer="get_services",
stages=[
NodeStage(
type_=MessagingService,
context="messaging_service",
processor="yield_messaging_service",
)
],
children=["topic"],
)
topic = TopologyNode(
producer="get_topic",
stages=[
NodeStage(
type_=Topic,
context="Topic",
processor="yield_topic",
consumer=["messaging_service"],
)
],
)
@dataclass
class MessagingSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
topics_scanned: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def topic_scanned(self, topic: str) -> None:
self.topics_scanned.append(topic)
def dropped(self, topic: str) -> None:
self.filtered.append(topic)
class MessagingServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Base class for Messaging Services.
It implements the topology and context.
"""
@abstractmethod
def yield_topic(self, messaging_details: Any) -> Iterable[CreateTopicRequest]:
"""
Method to Get Messaging Entity
"""
@abstractmethod
def get_topic_list(self) -> Optional[List[Any]]:
"""
Get List of all topics
"""
@abstractmethod
def get_topic_name(self, topic_details: Any) -> str:
"""
Get Topic Name
"""
status: MessagingSourceStatus
source_config: MessagingServiceMetadataPipeline
config: WorkflowSource
metadata: OpenMetadata
# Big union of types we want to fetch dynamically
service_connection: MessagingConnection.__fields__["config"].type_
topology = MessagingServiceTopology()
context = create_source_context(topology)
@abstractmethod
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config: MessagingServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.connection = get_connection(self.service_connection)
self.test_connection()
self.status = MessagingSourceStatus()
def get_topic(self) -> Any:
for topic_details in self.get_topic_list():
if filter_by_topic(
self.source_config.topicFilterPattern,
self.get_topic_name(topic_details),
):
self.status.filter(
self.get_topic_name(topic_details),
"Topic Pattern not Allowed",
)
continue
yield topic_details
def yield_messaging_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=MessagingService, config=config
)
def get_services(self) -> Iterable[WorkflowSource]:
yield self.config
def prepare(self):
pass
def get_status(self):
return self.status
def test_connection(self) -> None:
test_connection(self.connection)
def close(self):
pass