diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/messaging/redpandaConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/messaging/redpandaConnection.json new file mode 100644 index 00000000000..755636ae5f3 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/messaging/redpandaConnection.json @@ -0,0 +1,55 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/messaging/RedpandaConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "RedpandaConnection", + "description": "Redpanda Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.messaging.RedpandaConnection", + "definitions": { + "RedpandaType": { + "description": "Redpanda service type", + "type": "string", + "enum": ["Redpanda"], + "default": "Redpanda" + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/RedpandaType", + "default": "Redpanda" + }, + "bootstrapServers": { + "title": "Bootstrap Servers", + "description": "Redpanda bootstrap servers. add them in comma separated values ex: host1:9092,host2:9092", + "type": "string" + }, + "schemaRegistryURL": { + "title": "Schema Registry URL", + "description": "Confluent Redpanda Schema Registry URL.", + "type": "string", + "format": "uri" + }, + "consumerConfig": { + "title": "Consumer Config", + "description": "Confluent Redpanda Consumer Config", + "type": "object", + "default": {}, + "additionalProperties": true + }, + "schemaRegistryConfig": { + "title": "Schema Registry Config", + "description": "Confluent Redpanda Schema Registry Config.", + "type": "object", + "default": {}, + "additionalProperties": true + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false, + "required": ["bootstrapServers"] +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json index 8f33a0ed638..ed6506fee6d 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/messagingService.json @@ -14,13 +14,16 @@ "description": "Type of messaging service - Kafka or Pulsar.", "type": "string", "javaInterfaces": ["org.openmetadata.catalog.EnumInterface"], - "enum": ["Kafka", "Pulsar"], + "enum": ["Kafka", "Pulsar", "Redpanda"], "javaEnums": [ { "name": "Kafka" }, { "name": "Pulsar" + }, + { + "name": "Redpanda" } ] }, @@ -48,6 +51,9 @@ }, { "$ref": "./connections/messaging/pulsarConnection.json" + }, + { + "$ref": "./connections/messaging/redpandaConnection.json" } ] } diff --git a/ingestion/setup.py b/ingestion/setup.py index a485bc21bbf..18c55cb7898 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -93,6 +93,7 @@ plugins: Dict[str, Set[str]] = { "presto-types-parser==0.0.2", }, "kafka": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"}, + "redpanda": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"}, "ldap-users": {"ldap3==2.9.1"}, "looker": {"looker-sdk>=22.4.0"}, "mssql": {"sqlalchemy-pytds>=0.3"}, diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index 3bee9974804..aae82a64c25 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -200,7 +200,7 @@ class TopologyRunnerMixin(Generic[C]): # We have ack the sink waiting for a response, but got nothing back if stage.must_return and entity is None: # Safe access to Entity Request name - raise MissingExpectedEntityAck( + raise MissingExpectedEntityAckException( f"Missing ack back from [{stage.type_.__name__}: {getattr(entity_request, 'name')}] - " "Possible causes are changes in the server Fernet key or mismatched JSON Schemas " "for the service connection." diff --git a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py new file mode 100644 index 00000000000..d82768d825e --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py @@ -0,0 +1,211 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import concurrent.futures +import traceback +from abc import ABC +from typing import Any, Iterable, Optional + +import confluent_kafka +from confluent_kafka import KafkaError, KafkaException +from confluent_kafka.admin import ConfigResource +from confluent_kafka.schema_registry.schema_registry_client import Schema +from pydantic import BaseModel + +from metadata.generated.schema.api.data.createTopic import CreateTopicRequest +from metadata.generated.schema.entity.data.topic import SchemaType, TopicSampleData +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.source.messaging.messaging_service import MessagingServiceSource +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class BrokerTopicDetails(BaseModel): + """ + Wrapper Class to combine the topic_name with topic_metadata + """ + + topic_name: str + topic_metadata: Any + + +class CommonBrokerSource(MessagingServiceSource, ABC): + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + super().__init__(config, metadata_config) + self.generate_sample_data = self.config.sourceConfig.config.generateSampleData + self.admin_client = self.connection.admin_client + self.schema_registry_client = self.connection.schema_registry_client + if self.generate_sample_data: + self.consumer_client = self.connection.consumer_client + + def get_topic_list(self) -> Iterable[BrokerTopicDetails]: + topics_dict = self.admin_client.list_topics().topics + for topic_name, topic_metadata in topics_dict.items(): + yield BrokerTopicDetails( + topic_name=topic_name, topic_metadata=topic_metadata + ) + + def get_topic_name(self, topic_details: BrokerTopicDetails) -> str: + """ + Get Topic Name + """ + return topic_details.topic_name + + def yield_topic( + self, topic_details: BrokerTopicDetails + ) -> Iterable[CreateTopicRequest]: + try: + logger.info("Fetching topic schema {}".format(topic_details.topic_name)) + topic_schema = self._parse_topic_metadata(topic_details.topic_name) + logger.info("Fetching topic config {}".format(topic_details.topic_name)) + topic = CreateTopicRequest( + name=topic_details.topic_name, + service=EntityReference( + id=self.context.messaging_service.id.__root__, + type="messagingService", + ), + partitions=len(topic_details.topic_metadata.partitions), + replicationFactor=len( + topic_details.topic_metadata.partitions.get(0).replicas + ), + ) + topic_config_resource = self.admin_client.describe_configs( + [ + ConfigResource( + confluent_kafka.admin.RESOURCE_TOPIC, topic_details.topic_name + ) + ] + ) + self.add_properties_to_topic_from_resource(topic, topic_config_resource) + + if topic_schema is not None: + topic.schemaText = topic_schema.schema_str + if topic_schema.schema_type.lower() == SchemaType.Avro.value.lower(): + topic.schemaType = SchemaType.Avro.name + if self.generate_sample_data: + topic.sampleData = self._get_sample_data(topic.name) + elif ( + topic_schema.schema_type.lower() == SchemaType.Protobuf.name.lower() + ): + topic.schemaType = SchemaType.Protobuf.name + elif topic_schema.schema_type.lower() == SchemaType.JSON.name.lower(): + topic.schemaType = SchemaType.JSON.name + else: + topic.schemaType = SchemaType.Other.name + + self.status.topic_scanned(topic.name.__root__) + yield topic + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Unexpected exception to yield topic [{topic_details.topic_name}]: {exc}" + ) + self.status.failures.append( + "{}.{}".format(self.config.serviceName, topic_details.topic_name) + ) + + @staticmethod + def add_properties_to_topic_from_resource( + topic: CreateTopicRequest, topic_config_resource: dict + ) -> None: + """ + Stateful operation that adds new properties to a given Topic + """ + try: + for resource_value in concurrent.futures.as_completed( + iter(topic_config_resource.values()) + ): + config_response = resource_value.result(timeout=10) + if "max.message.bytes" in config_response: + topic.maximumMessageSize = config_response.get( + "max.message.bytes", {} + ).value + + if "min.insync.replicas" in config_response: + topic.minimumInSyncReplicas = config_response.get( + "min.insync.replicas" + ).value + + if "retention.ms" in config_response: + topic.retentionTime = config_response.get("retention.ms").value + + if "cleanup.policy" in config_response: + cleanup_policies = config_response.get("cleanup.policy").value + topic.cleanupPolicies = cleanup_policies.split(",") + + topic_config = {} + for key, conf_response in config_response.items(): + topic_config[key] = conf_response.value + topic.topicConfig = topic_config + + except (KafkaException, KafkaError) as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Exception adding properties to topic [{topic.name}]: {exc}" + ) + + def _parse_topic_metadata(self, topic_name: str) -> Optional[Schema]: + try: + if self.schema_registry_client: + registered_schema = self.schema_registry_client.get_latest_version( + topic_name + "-value" + ) + return registered_schema.schema + + return None + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Failed to get schema for topic [{topic_name}]: {exc}") + self.status.warning( + topic_name, f"failed to get schema: {exc} for topic {topic_name}" + ) + + def _get_sample_data(self, topic_name): + sample_data = [] + try: + self.consumer_client.subscribe([topic_name.__root__]) + logger.info( + f"Broker consumer polling for sample messages in topic {topic_name.__root__}" + ) + messages = self.consumer_client.consume(num_messages=10, timeout=10) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed to fetch sample data from topic {topic_name.__root__}: {exc}" + ) + else: + if messages: + for message in messages: + sample_data.append( + str( + self.consumer_client._serializer.decode_message( + message.value() + ) + ) + ) + self.consumer_client.unsubscribe() + return TopicSampleData(messages=sample_data) + + def close(self): + if self.generate_sample_data and self.consumer_client: + self.consumer_client.close() diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka.py b/ingestion/src/metadata/ingestion/source/messaging/kafka.py index ab33a554128..0054047e87a 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka.py @@ -9,17 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import concurrent.futures -import traceback -from typing import Any, Iterable, Optional - -import confluent_kafka -from confluent_kafka.admin import ConfigResource -from confluent_kafka.schema_registry.schema_registry_client import Schema -from pydantic import BaseModel - -from metadata.generated.schema.api.data.createTopic import CreateTopicRequest -from metadata.generated.schema.entity.data.topic import SchemaType, TopicSampleData from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( KafkaConnection, ) @@ -29,34 +18,11 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import logger from metadata.ingestion.api.source import InvalidSourceException -from metadata.ingestion.source.messaging.messaging_service import MessagingServiceSource +from metadata.ingestion.source.messaging.common_broker_source import CommonBrokerSource -class KafkaTopicDetails(BaseModel): - """ - Wrapper Class to combine the topic_name with topic_metadata - """ - - topic_name: str - topic_metadata: Any - - -class KafkaSource(MessagingServiceSource): - def __init__( - self, - config: WorkflowSource, - metadata_config: OpenMetadataConnection, - ): - super().__init__(config, metadata_config) - self.generate_sample_data = self.config.sourceConfig.config.generateSampleData - self.admin_client = self.connection.admin_client - self.schema_registry_client = self.connection.schema_registry_client - if self.generate_sample_data: - self.consumer_client = self.connection.consumer_client - +class KafkaSource(CommonBrokerSource): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) @@ -66,123 +32,3 @@ class KafkaSource(MessagingServiceSource): f"Expected KafkaConnection, but got {connection}" ) return cls(config, metadata_config) - - def get_topic_list(self) -> Iterable[KafkaTopicDetails]: - topics_dict = self.admin_client.list_topics().topics - for topic_name, topic_metadata in topics_dict.items(): - yield KafkaTopicDetails( - topic_name=topic_name, topic_metadata=topic_metadata - ) - - def get_topic_name(self, topic_details: KafkaTopicDetails) -> str: - """ - Get Topic Name - """ - return topic_details.topic_name - - def yield_topic( - self, topic_details: KafkaTopicDetails - ) -> Iterable[CreateTopicRequest]: - logger.info("Fetching topic schema {}".format(topic_details.topic_name)) - topic_schema = self._parse_topic_metadata(topic_details.topic_name) - logger.info("Fetching topic config {}".format(topic_details.topic_name)) - topic = CreateTopicRequest( - name=topic_details.topic_name, - service=EntityReference( - id=self.context.messaging_service.id.__root__, type="messagingService" - ), - partitions=len(topic_details.topic_metadata.partitions), - replicationFactor=len( - topic_details.topic_metadata.partitions.get(0).replicas - ), - ) - topic_configResource = self.admin_client.describe_configs( - [ - ConfigResource( - confluent_kafka.admin.RESOURCE_TOPIC, topic_details.topic_name - ) - ] - ) - for j in concurrent.futures.as_completed(iter(topic_configResource.values())): - config_response = j.result(timeout=10) - if "max.message.bytes" in config_response: - topic.maximumMessageSize = config_response.get( - "max.message.bytes", {} - ).value - - if "min.insync.replicas" in config_response: - topic.minimumInSyncReplicas = config_response.get( - "min.insync.replicas" - ).value - - if "retention.ms" in config_response: - topic.retentionTime = config_response.get("retention.ms").value - - if "cleanup.policy" in config_response: - cleanup_policies = config_response.get("cleanup.policy").value - topic.cleanupPolicies = cleanup_policies.split(",") - - topic_config = {} - for key, conf_response in config_response.items(): - topic_config[key] = conf_response.value - topic.topicConfig = topic_config - - if topic_schema is not None: - topic.schemaText = topic_schema.schema_str - if topic_schema.schema_type.lower() == SchemaType.Avro.value.lower(): - topic.schemaType = SchemaType.Avro.name - if self.generate_sample_data: - topic.sampleData = self._get_sample_data(topic.name) - elif topic_schema.schema_type.lower() == SchemaType.Protobuf.name.lower(): - topic.schemaType = SchemaType.Protobuf.name - elif topic_schema.schema_type.lower() == SchemaType.JSON.name.lower(): - topic.schemaType = SchemaType.JSON.name - else: - topic.schemaType = SchemaType.Other.name - - self.status.topic_scanned(topic.name.__root__) - yield topic - - def _parse_topic_metadata(self, topic: str) -> Optional[Schema]: - schema: Optional[Schema] = None - try: - registered_schema = self.schema_registry_client.get_latest_version( - topic + "-value" - ) - schema = registered_schema.schema - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Failed to get schema for topic [{topic}]: {exc}") - self.status.warning(topic, f"failed to get schema: {exc} for topic {topic}") - - return schema - - def _get_sample_data(self, topic_name): - sample_data = [] - try: - self.consumer_client.subscribe([topic_name.__root__]) - logger.info( - f"Kafka consumer polling for sample messages in topic {topic_name.__root__}" - ) - messages = self.consumer_client.consume(num_messages=10, timeout=10) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to fetch sample data from topic {topic_name.__root__}: {exc}" - ) - else: - if messages: - for message in messages: - sample_data.append( - str( - self.consumer_client._serializer.decode_message( - message.value() - ) - ) - ) - self.consumer_client.unsubscribe() - return TopicSampleData(messages=sample_data) - - def close(self): - if self.generate_sample_data and self.consumer_client: - self.consumer_client.close() diff --git a/ingestion/src/metadata/ingestion/source/messaging/redpanda.py b/ingestion/src/metadata/ingestion/source/messaging/redpanda.py new file mode 100644 index 00000000000..1fe012b830f --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/messaging/redpanda.py @@ -0,0 +1,34 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import ( + RedpandaConnection, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.messaging.common_broker_source import CommonBrokerSource + + +class RedpandaSource(CommonBrokerSource): + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: RedpandaConnection = config.serviceConnection.__root__.config + if not isinstance(connection, RedpandaConnection): + raise InvalidSourceException( + f"Expected RedpandaConnection, but got {connection}" + ) + return cls(config, metadata_config) diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index 27699df7870..f82fb98818f 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -103,6 +103,9 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( KafkaConnection, ) +from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import ( + RedpandaConnection, +) from metadata.generated.schema.entity.services.connections.mlmodel.mlflowConnection import ( MlflowConnection, ) @@ -329,8 +332,9 @@ def _(connection: DeltaLakeConnection, verbose: bool = False) -> DeltaLakeClient return deltalake_connection -@get_connection.register -def _(connection: KafkaConnection, verbose: bool = False) -> KafkaClient: +@get_connection.register(KafkaConnection) +@get_connection.register(RedpandaConnection) +def _(connection, verbose: bool = False) -> KafkaClient: """ Prepare Kafka Admin Client and Schema Registry Client """ diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/MessagingServiceUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/MessagingServiceUtils.ts index a6de8ae9368..e007ca2d65f 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/MessagingServiceUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/MessagingServiceUtils.ts @@ -18,6 +18,7 @@ import { MessagingServiceType, } from '../generated/entity/services/messagingService'; import kafkaConnection from '../jsons/connectionSchemas/connections/messaging/kafkaConnection.json'; +import redpandaConnection from '../jsons/connectionSchemas/connections/messaging/redpandaConnection.json'; export const getBrokers = (config: MessagingConnection['config']) => { let retVal: string | undefined; @@ -33,8 +34,20 @@ export const getBrokers = (config: MessagingConnection['config']) => { export const getMessagingConfig = (type: MessagingServiceType) => { let schema = {}; const uiSchema = { ...COMMON_UI_SCHEMA }; - if (type === MessagingServiceType.Kafka) { - schema = kafkaConnection; + + switch (type) { + case MessagingServiceType.Kafka: + schema = kafkaConnection; + + break; + + case MessagingServiceType.Redpanda: + schema = redpandaConnection; + + break; + + default: + break; } return cloneDeep({ schema, uiSchema }); diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts index 193725d5e70..d9c649b3b44 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts @@ -19,6 +19,7 @@ import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/a import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json'; import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json'; import glueConnection from '../jsons/connectionSchemas/connections/pipeline/glueConnection.json'; +import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json'; export const getPipelineConfig = (type: PipelineServiceType) => { let schema = {}; @@ -50,6 +51,14 @@ export const getPipelineConfig = (type: PipelineServiceType) => { break; } + case PipelineServiceType.Nifi: { + schema = nifiConnection; + + break; + } + + default: + break; } return cloneDeep({ schema, uiSchema });