mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-14 17:17:15 +00:00
parent
7d2527bbeb
commit
613a329f40
@ -0,0 +1,55 @@
|
|||||||
|
{
|
||||||
|
"$id": "https://open-metadata.org/schema/entity/services/connections/messaging/RedpandaConnection.json",
|
||||||
|
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||||
|
"title": "RedpandaConnection",
|
||||||
|
"description": "Redpanda Connection Config",
|
||||||
|
"type": "object",
|
||||||
|
"javaType": "org.openmetadata.catalog.services.connections.messaging.RedpandaConnection",
|
||||||
|
"definitions": {
|
||||||
|
"RedpandaType": {
|
||||||
|
"description": "Redpanda service type",
|
||||||
|
"type": "string",
|
||||||
|
"enum": ["Redpanda"],
|
||||||
|
"default": "Redpanda"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"properties": {
|
||||||
|
"type": {
|
||||||
|
"title": "Service Type",
|
||||||
|
"description": "Service Type",
|
||||||
|
"$ref": "#/definitions/RedpandaType",
|
||||||
|
"default": "Redpanda"
|
||||||
|
},
|
||||||
|
"bootstrapServers": {
|
||||||
|
"title": "Bootstrap Servers",
|
||||||
|
"description": "Redpanda bootstrap servers. add them in comma separated values ex: host1:9092,host2:9092",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"schemaRegistryURL": {
|
||||||
|
"title": "Schema Registry URL",
|
||||||
|
"description": "Confluent Redpanda Schema Registry URL.",
|
||||||
|
"type": "string",
|
||||||
|
"format": "uri"
|
||||||
|
},
|
||||||
|
"consumerConfig": {
|
||||||
|
"title": "Consumer Config",
|
||||||
|
"description": "Confluent Redpanda Consumer Config",
|
||||||
|
"type": "object",
|
||||||
|
"default": {},
|
||||||
|
"additionalProperties": true
|
||||||
|
},
|
||||||
|
"schemaRegistryConfig": {
|
||||||
|
"title": "Schema Registry Config",
|
||||||
|
"description": "Confluent Redpanda Schema Registry Config.",
|
||||||
|
"type": "object",
|
||||||
|
"default": {},
|
||||||
|
"additionalProperties": true
|
||||||
|
},
|
||||||
|
"supportsMetadataExtraction": {
|
||||||
|
"title": "Supports Metadata Extraction",
|
||||||
|
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"additionalProperties": false,
|
||||||
|
"required": ["bootstrapServers"]
|
||||||
|
}
|
||||||
@ -14,13 +14,16 @@
|
|||||||
"description": "Type of messaging service - Kafka or Pulsar.",
|
"description": "Type of messaging service - Kafka or Pulsar.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"javaInterfaces": ["org.openmetadata.catalog.EnumInterface"],
|
"javaInterfaces": ["org.openmetadata.catalog.EnumInterface"],
|
||||||
"enum": ["Kafka", "Pulsar"],
|
"enum": ["Kafka", "Pulsar", "Redpanda"],
|
||||||
"javaEnums": [
|
"javaEnums": [
|
||||||
{
|
{
|
||||||
"name": "Kafka"
|
"name": "Kafka"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "Pulsar"
|
"name": "Pulsar"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Redpanda"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
@ -48,6 +51,9 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"$ref": "./connections/messaging/pulsarConnection.json"
|
"$ref": "./connections/messaging/pulsarConnection.json"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "./connections/messaging/redpandaConnection.json"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
@ -93,6 +93,7 @@ plugins: Dict[str, Set[str]] = {
|
|||||||
"presto-types-parser==0.0.2",
|
"presto-types-parser==0.0.2",
|
||||||
},
|
},
|
||||||
"kafka": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"},
|
"kafka": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"},
|
||||||
|
"redpanda": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"},
|
||||||
"ldap-users": {"ldap3==2.9.1"},
|
"ldap-users": {"ldap3==2.9.1"},
|
||||||
"looker": {"looker-sdk>=22.4.0"},
|
"looker": {"looker-sdk>=22.4.0"},
|
||||||
"mssql": {"sqlalchemy-pytds>=0.3"},
|
"mssql": {"sqlalchemy-pytds>=0.3"},
|
||||||
|
|||||||
@ -200,7 +200,7 @@ class TopologyRunnerMixin(Generic[C]):
|
|||||||
# We have ack the sink waiting for a response, but got nothing back
|
# We have ack the sink waiting for a response, but got nothing back
|
||||||
if stage.must_return and entity is None:
|
if stage.must_return and entity is None:
|
||||||
# Safe access to Entity Request name
|
# Safe access to Entity Request name
|
||||||
raise MissingExpectedEntityAck(
|
raise MissingExpectedEntityAckException(
|
||||||
f"Missing ack back from [{stage.type_.__name__}: {getattr(entity_request, 'name')}] - "
|
f"Missing ack back from [{stage.type_.__name__}: {getattr(entity_request, 'name')}] - "
|
||||||
"Possible causes are changes in the server Fernet key or mismatched JSON Schemas "
|
"Possible causes are changes in the server Fernet key or mismatched JSON Schemas "
|
||||||
"for the service connection."
|
"for the service connection."
|
||||||
|
|||||||
@ -0,0 +1,211 @@
|
|||||||
|
# Copyright 2021 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import concurrent.futures
|
||||||
|
import traceback
|
||||||
|
from abc import ABC
|
||||||
|
from typing import Any, Iterable, Optional
|
||||||
|
|
||||||
|
import confluent_kafka
|
||||||
|
from confluent_kafka import KafkaError, KafkaException
|
||||||
|
from confluent_kafka.admin import ConfigResource
|
||||||
|
from confluent_kafka.schema_registry.schema_registry_client import Schema
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
|
||||||
|
from metadata.generated.schema.entity.data.topic import SchemaType, TopicSampleData
|
||||||
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||||
|
OpenMetadataConnection,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
|
Source as WorkflowSource,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.type.entityReference import EntityReference
|
||||||
|
from metadata.ingestion.source.messaging.messaging_service import MessagingServiceSource
|
||||||
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
|
logger = ingestion_logger()
|
||||||
|
|
||||||
|
|
||||||
|
class BrokerTopicDetails(BaseModel):
|
||||||
|
"""
|
||||||
|
Wrapper Class to combine the topic_name with topic_metadata
|
||||||
|
"""
|
||||||
|
|
||||||
|
topic_name: str
|
||||||
|
topic_metadata: Any
|
||||||
|
|
||||||
|
|
||||||
|
class CommonBrokerSource(MessagingServiceSource, ABC):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: WorkflowSource,
|
||||||
|
metadata_config: OpenMetadataConnection,
|
||||||
|
):
|
||||||
|
super().__init__(config, metadata_config)
|
||||||
|
self.generate_sample_data = self.config.sourceConfig.config.generateSampleData
|
||||||
|
self.admin_client = self.connection.admin_client
|
||||||
|
self.schema_registry_client = self.connection.schema_registry_client
|
||||||
|
if self.generate_sample_data:
|
||||||
|
self.consumer_client = self.connection.consumer_client
|
||||||
|
|
||||||
|
def get_topic_list(self) -> Iterable[BrokerTopicDetails]:
|
||||||
|
topics_dict = self.admin_client.list_topics().topics
|
||||||
|
for topic_name, topic_metadata in topics_dict.items():
|
||||||
|
yield BrokerTopicDetails(
|
||||||
|
topic_name=topic_name, topic_metadata=topic_metadata
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_topic_name(self, topic_details: BrokerTopicDetails) -> str:
|
||||||
|
"""
|
||||||
|
Get Topic Name
|
||||||
|
"""
|
||||||
|
return topic_details.topic_name
|
||||||
|
|
||||||
|
def yield_topic(
|
||||||
|
self, topic_details: BrokerTopicDetails
|
||||||
|
) -> Iterable[CreateTopicRequest]:
|
||||||
|
try:
|
||||||
|
logger.info("Fetching topic schema {}".format(topic_details.topic_name))
|
||||||
|
topic_schema = self._parse_topic_metadata(topic_details.topic_name)
|
||||||
|
logger.info("Fetching topic config {}".format(topic_details.topic_name))
|
||||||
|
topic = CreateTopicRequest(
|
||||||
|
name=topic_details.topic_name,
|
||||||
|
service=EntityReference(
|
||||||
|
id=self.context.messaging_service.id.__root__,
|
||||||
|
type="messagingService",
|
||||||
|
),
|
||||||
|
partitions=len(topic_details.topic_metadata.partitions),
|
||||||
|
replicationFactor=len(
|
||||||
|
topic_details.topic_metadata.partitions.get(0).replicas
|
||||||
|
),
|
||||||
|
)
|
||||||
|
topic_config_resource = self.admin_client.describe_configs(
|
||||||
|
[
|
||||||
|
ConfigResource(
|
||||||
|
confluent_kafka.admin.RESOURCE_TOPIC, topic_details.topic_name
|
||||||
|
)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
self.add_properties_to_topic_from_resource(topic, topic_config_resource)
|
||||||
|
|
||||||
|
if topic_schema is not None:
|
||||||
|
topic.schemaText = topic_schema.schema_str
|
||||||
|
if topic_schema.schema_type.lower() == SchemaType.Avro.value.lower():
|
||||||
|
topic.schemaType = SchemaType.Avro.name
|
||||||
|
if self.generate_sample_data:
|
||||||
|
topic.sampleData = self._get_sample_data(topic.name)
|
||||||
|
elif (
|
||||||
|
topic_schema.schema_type.lower() == SchemaType.Protobuf.name.lower()
|
||||||
|
):
|
||||||
|
topic.schemaType = SchemaType.Protobuf.name
|
||||||
|
elif topic_schema.schema_type.lower() == SchemaType.JSON.name.lower():
|
||||||
|
topic.schemaType = SchemaType.JSON.name
|
||||||
|
else:
|
||||||
|
topic.schemaType = SchemaType.Other.name
|
||||||
|
|
||||||
|
self.status.topic_scanned(topic.name.__root__)
|
||||||
|
yield topic
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.warning(
|
||||||
|
f"Unexpected exception to yield topic [{topic_details.topic_name}]: {exc}"
|
||||||
|
)
|
||||||
|
self.status.failures.append(
|
||||||
|
"{}.{}".format(self.config.serviceName, topic_details.topic_name)
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def add_properties_to_topic_from_resource(
|
||||||
|
topic: CreateTopicRequest, topic_config_resource: dict
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Stateful operation that adds new properties to a given Topic
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
for resource_value in concurrent.futures.as_completed(
|
||||||
|
iter(topic_config_resource.values())
|
||||||
|
):
|
||||||
|
config_response = resource_value.result(timeout=10)
|
||||||
|
if "max.message.bytes" in config_response:
|
||||||
|
topic.maximumMessageSize = config_response.get(
|
||||||
|
"max.message.bytes", {}
|
||||||
|
).value
|
||||||
|
|
||||||
|
if "min.insync.replicas" in config_response:
|
||||||
|
topic.minimumInSyncReplicas = config_response.get(
|
||||||
|
"min.insync.replicas"
|
||||||
|
).value
|
||||||
|
|
||||||
|
if "retention.ms" in config_response:
|
||||||
|
topic.retentionTime = config_response.get("retention.ms").value
|
||||||
|
|
||||||
|
if "cleanup.policy" in config_response:
|
||||||
|
cleanup_policies = config_response.get("cleanup.policy").value
|
||||||
|
topic.cleanupPolicies = cleanup_policies.split(",")
|
||||||
|
|
||||||
|
topic_config = {}
|
||||||
|
for key, conf_response in config_response.items():
|
||||||
|
topic_config[key] = conf_response.value
|
||||||
|
topic.topicConfig = topic_config
|
||||||
|
|
||||||
|
except (KafkaException, KafkaError) as exc:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.warning(
|
||||||
|
f"Exception adding properties to topic [{topic.name}]: {exc}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _parse_topic_metadata(self, topic_name: str) -> Optional[Schema]:
|
||||||
|
try:
|
||||||
|
if self.schema_registry_client:
|
||||||
|
registered_schema = self.schema_registry_client.get_latest_version(
|
||||||
|
topic_name + "-value"
|
||||||
|
)
|
||||||
|
return registered_schema.schema
|
||||||
|
|
||||||
|
return None
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.warning(f"Failed to get schema for topic [{topic_name}]: {exc}")
|
||||||
|
self.status.warning(
|
||||||
|
topic_name, f"failed to get schema: {exc} for topic {topic_name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_sample_data(self, topic_name):
|
||||||
|
sample_data = []
|
||||||
|
try:
|
||||||
|
self.consumer_client.subscribe([topic_name.__root__])
|
||||||
|
logger.info(
|
||||||
|
f"Broker consumer polling for sample messages in topic {topic_name.__root__}"
|
||||||
|
)
|
||||||
|
messages = self.consumer_client.consume(num_messages=10, timeout=10)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.warning(
|
||||||
|
f"Failed to fetch sample data from topic {topic_name.__root__}: {exc}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
if messages:
|
||||||
|
for message in messages:
|
||||||
|
sample_data.append(
|
||||||
|
str(
|
||||||
|
self.consumer_client._serializer.decode_message(
|
||||||
|
message.value()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.consumer_client.unsubscribe()
|
||||||
|
return TopicSampleData(messages=sample_data)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.generate_sample_data and self.consumer_client:
|
||||||
|
self.consumer_client.close()
|
||||||
@ -9,17 +9,6 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import concurrent.futures
|
|
||||||
import traceback
|
|
||||||
from typing import Any, Iterable, Optional
|
|
||||||
|
|
||||||
import confluent_kafka
|
|
||||||
from confluent_kafka.admin import ConfigResource
|
|
||||||
from confluent_kafka.schema_registry.schema_registry_client import Schema
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
|
|
||||||
from metadata.generated.schema.entity.data.topic import SchemaType, TopicSampleData
|
|
||||||
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
|
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
|
||||||
KafkaConnection,
|
KafkaConnection,
|
||||||
)
|
)
|
||||||
@ -29,34 +18,11 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
|
|||||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
Source as WorkflowSource,
|
Source as WorkflowSource,
|
||||||
)
|
)
|
||||||
from metadata.generated.schema.type.entityReference import EntityReference
|
|
||||||
from metadata.ingestion.api.common import logger
|
|
||||||
from metadata.ingestion.api.source import InvalidSourceException
|
from metadata.ingestion.api.source import InvalidSourceException
|
||||||
from metadata.ingestion.source.messaging.messaging_service import MessagingServiceSource
|
from metadata.ingestion.source.messaging.common_broker_source import CommonBrokerSource
|
||||||
|
|
||||||
|
|
||||||
class KafkaTopicDetails(BaseModel):
|
class KafkaSource(CommonBrokerSource):
|
||||||
"""
|
|
||||||
Wrapper Class to combine the topic_name with topic_metadata
|
|
||||||
"""
|
|
||||||
|
|
||||||
topic_name: str
|
|
||||||
topic_metadata: Any
|
|
||||||
|
|
||||||
|
|
||||||
class KafkaSource(MessagingServiceSource):
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
config: WorkflowSource,
|
|
||||||
metadata_config: OpenMetadataConnection,
|
|
||||||
):
|
|
||||||
super().__init__(config, metadata_config)
|
|
||||||
self.generate_sample_data = self.config.sourceConfig.config.generateSampleData
|
|
||||||
self.admin_client = self.connection.admin_client
|
|
||||||
self.schema_registry_client = self.connection.schema_registry_client
|
|
||||||
if self.generate_sample_data:
|
|
||||||
self.consumer_client = self.connection.consumer_client
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
||||||
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
||||||
@ -66,123 +32,3 @@ class KafkaSource(MessagingServiceSource):
|
|||||||
f"Expected KafkaConnection, but got {connection}"
|
f"Expected KafkaConnection, but got {connection}"
|
||||||
)
|
)
|
||||||
return cls(config, metadata_config)
|
return cls(config, metadata_config)
|
||||||
|
|
||||||
def get_topic_list(self) -> Iterable[KafkaTopicDetails]:
|
|
||||||
topics_dict = self.admin_client.list_topics().topics
|
|
||||||
for topic_name, topic_metadata in topics_dict.items():
|
|
||||||
yield KafkaTopicDetails(
|
|
||||||
topic_name=topic_name, topic_metadata=topic_metadata
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_topic_name(self, topic_details: KafkaTopicDetails) -> str:
|
|
||||||
"""
|
|
||||||
Get Topic Name
|
|
||||||
"""
|
|
||||||
return topic_details.topic_name
|
|
||||||
|
|
||||||
def yield_topic(
|
|
||||||
self, topic_details: KafkaTopicDetails
|
|
||||||
) -> Iterable[CreateTopicRequest]:
|
|
||||||
logger.info("Fetching topic schema {}".format(topic_details.topic_name))
|
|
||||||
topic_schema = self._parse_topic_metadata(topic_details.topic_name)
|
|
||||||
logger.info("Fetching topic config {}".format(topic_details.topic_name))
|
|
||||||
topic = CreateTopicRequest(
|
|
||||||
name=topic_details.topic_name,
|
|
||||||
service=EntityReference(
|
|
||||||
id=self.context.messaging_service.id.__root__, type="messagingService"
|
|
||||||
),
|
|
||||||
partitions=len(topic_details.topic_metadata.partitions),
|
|
||||||
replicationFactor=len(
|
|
||||||
topic_details.topic_metadata.partitions.get(0).replicas
|
|
||||||
),
|
|
||||||
)
|
|
||||||
topic_configResource = self.admin_client.describe_configs(
|
|
||||||
[
|
|
||||||
ConfigResource(
|
|
||||||
confluent_kafka.admin.RESOURCE_TOPIC, topic_details.topic_name
|
|
||||||
)
|
|
||||||
]
|
|
||||||
)
|
|
||||||
for j in concurrent.futures.as_completed(iter(topic_configResource.values())):
|
|
||||||
config_response = j.result(timeout=10)
|
|
||||||
if "max.message.bytes" in config_response:
|
|
||||||
topic.maximumMessageSize = config_response.get(
|
|
||||||
"max.message.bytes", {}
|
|
||||||
).value
|
|
||||||
|
|
||||||
if "min.insync.replicas" in config_response:
|
|
||||||
topic.minimumInSyncReplicas = config_response.get(
|
|
||||||
"min.insync.replicas"
|
|
||||||
).value
|
|
||||||
|
|
||||||
if "retention.ms" in config_response:
|
|
||||||
topic.retentionTime = config_response.get("retention.ms").value
|
|
||||||
|
|
||||||
if "cleanup.policy" in config_response:
|
|
||||||
cleanup_policies = config_response.get("cleanup.policy").value
|
|
||||||
topic.cleanupPolicies = cleanup_policies.split(",")
|
|
||||||
|
|
||||||
topic_config = {}
|
|
||||||
for key, conf_response in config_response.items():
|
|
||||||
topic_config[key] = conf_response.value
|
|
||||||
topic.topicConfig = topic_config
|
|
||||||
|
|
||||||
if topic_schema is not None:
|
|
||||||
topic.schemaText = topic_schema.schema_str
|
|
||||||
if topic_schema.schema_type.lower() == SchemaType.Avro.value.lower():
|
|
||||||
topic.schemaType = SchemaType.Avro.name
|
|
||||||
if self.generate_sample_data:
|
|
||||||
topic.sampleData = self._get_sample_data(topic.name)
|
|
||||||
elif topic_schema.schema_type.lower() == SchemaType.Protobuf.name.lower():
|
|
||||||
topic.schemaType = SchemaType.Protobuf.name
|
|
||||||
elif topic_schema.schema_type.lower() == SchemaType.JSON.name.lower():
|
|
||||||
topic.schemaType = SchemaType.JSON.name
|
|
||||||
else:
|
|
||||||
topic.schemaType = SchemaType.Other.name
|
|
||||||
|
|
||||||
self.status.topic_scanned(topic.name.__root__)
|
|
||||||
yield topic
|
|
||||||
|
|
||||||
def _parse_topic_metadata(self, topic: str) -> Optional[Schema]:
|
|
||||||
schema: Optional[Schema] = None
|
|
||||||
try:
|
|
||||||
registered_schema = self.schema_registry_client.get_latest_version(
|
|
||||||
topic + "-value"
|
|
||||||
)
|
|
||||||
schema = registered_schema.schema
|
|
||||||
except Exception as exc:
|
|
||||||
logger.debug(traceback.format_exc())
|
|
||||||
logger.warning(f"Failed to get schema for topic [{topic}]: {exc}")
|
|
||||||
self.status.warning(topic, f"failed to get schema: {exc} for topic {topic}")
|
|
||||||
|
|
||||||
return schema
|
|
||||||
|
|
||||||
def _get_sample_data(self, topic_name):
|
|
||||||
sample_data = []
|
|
||||||
try:
|
|
||||||
self.consumer_client.subscribe([topic_name.__root__])
|
|
||||||
logger.info(
|
|
||||||
f"Kafka consumer polling for sample messages in topic {topic_name.__root__}"
|
|
||||||
)
|
|
||||||
messages = self.consumer_client.consume(num_messages=10, timeout=10)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.debug(traceback.format_exc())
|
|
||||||
logger.warning(
|
|
||||||
f"Failed to fetch sample data from topic {topic_name.__root__}: {exc}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
if messages:
|
|
||||||
for message in messages:
|
|
||||||
sample_data.append(
|
|
||||||
str(
|
|
||||||
self.consumer_client._serializer.decode_message(
|
|
||||||
message.value()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.consumer_client.unsubscribe()
|
|
||||||
return TopicSampleData(messages=sample_data)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
if self.generate_sample_data and self.consumer_client:
|
|
||||||
self.consumer_client.close()
|
|
||||||
|
|||||||
@ -0,0 +1,34 @@
|
|||||||
|
# Copyright 2021 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import (
|
||||||
|
RedpandaConnection,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||||
|
OpenMetadataConnection,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
|
Source as WorkflowSource,
|
||||||
|
)
|
||||||
|
from metadata.ingestion.api.source import InvalidSourceException
|
||||||
|
from metadata.ingestion.source.messaging.common_broker_source import CommonBrokerSource
|
||||||
|
|
||||||
|
|
||||||
|
class RedpandaSource(CommonBrokerSource):
|
||||||
|
@classmethod
|
||||||
|
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
||||||
|
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
||||||
|
connection: RedpandaConnection = config.serviceConnection.__root__.config
|
||||||
|
if not isinstance(connection, RedpandaConnection):
|
||||||
|
raise InvalidSourceException(
|
||||||
|
f"Expected RedpandaConnection, but got {connection}"
|
||||||
|
)
|
||||||
|
return cls(config, metadata_config)
|
||||||
@ -103,6 +103,9 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon
|
|||||||
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
|
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
|
||||||
KafkaConnection,
|
KafkaConnection,
|
||||||
)
|
)
|
||||||
|
from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import (
|
||||||
|
RedpandaConnection,
|
||||||
|
)
|
||||||
from metadata.generated.schema.entity.services.connections.mlmodel.mlflowConnection import (
|
from metadata.generated.schema.entity.services.connections.mlmodel.mlflowConnection import (
|
||||||
MlflowConnection,
|
MlflowConnection,
|
||||||
)
|
)
|
||||||
@ -329,8 +332,9 @@ def _(connection: DeltaLakeConnection, verbose: bool = False) -> DeltaLakeClient
|
|||||||
return deltalake_connection
|
return deltalake_connection
|
||||||
|
|
||||||
|
|
||||||
@get_connection.register
|
@get_connection.register(KafkaConnection)
|
||||||
def _(connection: KafkaConnection, verbose: bool = False) -> KafkaClient:
|
@get_connection.register(RedpandaConnection)
|
||||||
|
def _(connection, verbose: bool = False) -> KafkaClient:
|
||||||
"""
|
"""
|
||||||
Prepare Kafka Admin Client and Schema Registry Client
|
Prepare Kafka Admin Client and Schema Registry Client
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -18,6 +18,7 @@ import {
|
|||||||
MessagingServiceType,
|
MessagingServiceType,
|
||||||
} from '../generated/entity/services/messagingService';
|
} from '../generated/entity/services/messagingService';
|
||||||
import kafkaConnection from '../jsons/connectionSchemas/connections/messaging/kafkaConnection.json';
|
import kafkaConnection from '../jsons/connectionSchemas/connections/messaging/kafkaConnection.json';
|
||||||
|
import redpandaConnection from '../jsons/connectionSchemas/connections/messaging/redpandaConnection.json';
|
||||||
|
|
||||||
export const getBrokers = (config: MessagingConnection['config']) => {
|
export const getBrokers = (config: MessagingConnection['config']) => {
|
||||||
let retVal: string | undefined;
|
let retVal: string | undefined;
|
||||||
@ -33,8 +34,20 @@ export const getBrokers = (config: MessagingConnection['config']) => {
|
|||||||
export const getMessagingConfig = (type: MessagingServiceType) => {
|
export const getMessagingConfig = (type: MessagingServiceType) => {
|
||||||
let schema = {};
|
let schema = {};
|
||||||
const uiSchema = { ...COMMON_UI_SCHEMA };
|
const uiSchema = { ...COMMON_UI_SCHEMA };
|
||||||
if (type === MessagingServiceType.Kafka) {
|
|
||||||
schema = kafkaConnection;
|
switch (type) {
|
||||||
|
case MessagingServiceType.Kafka:
|
||||||
|
schema = kafkaConnection;
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MessagingServiceType.Redpanda:
|
||||||
|
schema = redpandaConnection;
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return cloneDeep({ schema, uiSchema });
|
return cloneDeep({ schema, uiSchema });
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/a
|
|||||||
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
|
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
|
||||||
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
|
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
|
||||||
import glueConnection from '../jsons/connectionSchemas/connections/pipeline/glueConnection.json';
|
import glueConnection from '../jsons/connectionSchemas/connections/pipeline/glueConnection.json';
|
||||||
|
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
|
||||||
|
|
||||||
export const getPipelineConfig = (type: PipelineServiceType) => {
|
export const getPipelineConfig = (type: PipelineServiceType) => {
|
||||||
let schema = {};
|
let schema = {};
|
||||||
@ -50,6 +51,14 @@ export const getPipelineConfig = (type: PipelineServiceType) => {
|
|||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case PipelineServiceType.Nifi: {
|
||||||
|
schema = nifiConnection;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return cloneDeep({ schema, uiSchema });
|
return cloneDeep({ schema, uiSchema });
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user