diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TopicRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TopicRepository.java index 36c0b863f50..9a96def09d6 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TopicRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TopicRepository.java @@ -303,6 +303,7 @@ public class TopicRepository extends EntityRepository { recordChange("retentionSize", origTopic.getRetentionSize(), updatedTopic.getRetentionSize()); recordChange("schemaText", origTopic.getSchemaText(), updatedTopic.getSchemaText()); recordChange("schemaType", origTopic.getSchemaType(), updatedTopic.getSchemaType()); + recordChange("topicConfig", origTopic.getTopicConfig(), updatedTopic.getTopicConfig()); updateCleanupPolicies(origTopic, updatedTopic); } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java index c5f64cd0e10..c51aad02e5a 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java @@ -434,6 +434,7 @@ public class TopicResource { .withRetentionSize(create.getRetentionSize()) .withRetentionTime(create.getRetentionTime()) .withReplicationFactor(create.getReplicationFactor()) + .withTopicConfig(create.getTopicConfig()) .withTags(create.getTags()) .withOwner(create.getOwner()) .withUpdatedBy(securityContext.getUserPrincipal().getName()) diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json index e764c31b30f..ee685a82960 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json @@ -17,11 +17,6 @@ "description": "Link to the messaging service where this topic is hosted in", "$ref": "../../type/entityReference.json" }, - "partitions": { - "description": "Number of partitions into which the topic is divided.", - "type": "integer", - "minimum": 1 - }, "schemaText": { "description": "Schema used for message serialization. Optional as some topics may not have associated schemas.", "type": "string" @@ -30,6 +25,11 @@ "description": "Schema used for message serialization.", "$ref": "../../entity/data/topic.json#/definitions/schemaType" }, + "partitions": { + "description": "Number of partitions into which the topic is divided.", + "type": "integer", + "minimum": 1 + }, "cleanupPolicies": { "description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.", "type": "array", @@ -58,6 +58,10 @@ "type": "integer", "default": "-1" }, + "topicConfig": { + "description": "Contains key/value pair of topic configuration.", + "$ref": "../../entity/data/topic.json#/definitions/topicConfig" + }, "owner": { "description": "Owner of this topic", "$ref": "../../type/entityReference.json" diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json index bb689b13793..47f26137cc9 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json @@ -35,6 +35,11 @@ "javaType": "org.openmetadata.catalog.type.topic.CleanupPolicy", "description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.", "enum": ["delete", "compact"] + }, + "topicConfig": { + "javaType": "org.openmetadata.catalog.type.topic.TopicConfig", + "description": "Contains key/value pair of topic configuration.", + "type": "object" } }, "properties": { @@ -78,11 +83,6 @@ "description": "Service type where this topic is hosted in.", "$ref": "../services/messagingService.json#/definitions/messagingServiceType" }, - "partitions": { - "description": "Number of partitions into which the topic is divided.", - "type": "integer", - "minimum": 1 - }, "schemaText": { "description": "Schema used for message serialization. Optional as some topics may not have associated schemas.", "type": "string" @@ -91,6 +91,11 @@ "description": "Schema used for message serialization.", "$ref": "#/definitions/schemaType" }, + "partitions": { + "description": "Number of partitions into which the topic is divided.", + "type": "integer", + "minimum": 1 + }, "cleanupPolicies": { "description": "Topic clean up policies. For Kafka - `cleanup.policy` configuration.", "type": "array", @@ -119,6 +124,10 @@ "type": "integer", "default": "-1" }, + "topicConfig": { + "description": "Contains key/value pair of topic configuration.", + "$ref": "#/definitions/topicConfig" + }, "owner": { "description": "Owner of this topic.", "$ref": "../../type/entityReference.json" diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java index 4f7f729aea3..4986659059e 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/topics/TopicResourceTest.java @@ -111,6 +111,8 @@ public class TopicResourceTest extends EntityResourceTest { @Test void put_topicAttributes_200_ok(TestInfo test) throws IOException { + Map topicConfig = new HashMap<>(); + CreateTopic createTopic = createRequest(test) .withOwner(USER_OWNER1) diff --git a/ingestion-core/src/metadata/_version.py b/ingestion-core/src/metadata/_version.py index c94d8ac4e36..af65a6e6a60 100644 --- a/ingestion-core/src/metadata/_version.py +++ b/ingestion-core/src/metadata/_version.py @@ -7,5 +7,5 @@ Provides metadata version information. from incremental import Version -__version__ = Version("metadata", 0, 9, 0, dev=7) +__version__ = Version("metadata", 0, 9, 0, dev=8) __all__ = ["__version__"] diff --git a/ingestion/examples/workflows/kafka.json b/ingestion/examples/workflows/kafka.json new file mode 100644 index 00000000000..07d38d3da2e --- /dev/null +++ b/ingestion/examples/workflows/kafka.json @@ -0,0 +1,25 @@ +{ + "source": { + "type": "kafka", + "config": { + "service_name": "local_kafka", + "bootstrap_servers": "localhost:9092", + "schema_registry_url": "http://192.168.1.43:8081", + "filter_pattern": { + "excludes": ["_confluent.*"] + } + } + }, + "sink": { + "type": "metadata-rest", + "config": { + } + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + } +} \ No newline at end of file diff --git a/ingestion/src/metadata/cmd.py b/ingestion/src/metadata/cmd.py index f7de4480bdf..3e562a78ec3 100644 --- a/ingestion/src/metadata/cmd.py +++ b/ingestion/src/metadata/cmd.py @@ -45,10 +45,19 @@ def check() -> None: @click.option( "--debug/--no-debug", default=lambda: os.environ.get("METADATA_DEBUG", False) ) -def metadata(debug: bool) -> None: +@click.option( + "--log-level", + "-l", + type=click.Choice(["INFO", "DEBUG", "WARNING", "ERROR", "CRITICAL"]), + help="Log level", + required=False, +) +def metadata(debug: bool, log_level: str) -> None: if debug: logging.getLogger().setLevel(logging.INFO) logging.getLogger("metadata").setLevel(logging.DEBUG) + elif log_level: + logging.getLogger().setLevel(log_level) else: logging.getLogger().setLevel(logging.WARNING) logging.getLogger("metadata").setLevel(logging.INFO) diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py index 2058986feae..bb19faeb785 100644 --- a/ingestion/src/metadata/ingestion/source/kafka.py +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -99,37 +99,69 @@ class KafkaSource(Source[CreateTopicRequest]): pass def next_record(self) -> Iterable[CreateTopicRequest]: - topics = self.admin_client.list_topics().topics - for t in topics: + topics_dict = self.admin_client.list_topics().topics + for topic_name, topic_metadata in topics_dict.items(): try: - if self.config.filter_pattern.included(t): - logger.info("Fetching topic schema {}".format(t)) - topic_schema = self._parse_topic_metadata(t) - topic = CreateTopicRequest( - name=t.replace(".", "_DOT_"), + if self.config.filter_pattern.included(topic_name): + logger.info("Fetching topic schema {}".format(topic_name)) + topic_schema = self._parse_topic_metadata(topic_name) + logger.info("Fetching topic config {}".format(topic_name)) + topic_request = CreateTopicRequest( + name=topic_name.replace(".", "_DOT_"), service=EntityReference( id=self.service.id, type="messagingService" ), - partitions=1, + partitions=len(topic_metadata.partitions), + replicationFactor=len( + topic_metadata.partitions.get(0).replicas + ), ) - if topic_schema is not None: - topic.schemaText = topic_schema.schema_str - if topic_schema.schema_type == "AVRO": - topic.schemaType = SchemaType.Avro.name - elif topic_schema.schema_type == "PROTOBUF": - topic.schemaType = SchemaType.Protobuf.name - elif topic_schema.schema_type == "JSON": - topic.schemaType = SchemaType.JSON.name - else: - topic.schemaType = SchemaType.Other.name + topic_configResource = self.admin_client.describe_configs( + [ + ConfigResource( + confluent_kafka.admin.RESOURCE_TOPIC, topic_name + ) + ] + ) + for j in concurrent.futures.as_completed( + iter(topic_configResource.values()) + ): + config_response = j.result(timeout=10) + topic_request.maximumMessageSize = config_response.get( + "max.message.bytes" + ).value + topic_request.minimumInSyncReplicas = config_response.get( + "min.insync.replicas" + ).value + topic_request.retentionTime = config_response.get( + "retention.ms" + ).value + topic_request.cleanupPolicies = [ + config_response.get("cleanup.policy").value + ] + topic_config = {} + for key, conf_response in config_response.items(): + topic_config[key] = conf_response.value + topic_request.topicConfig = topic_config - self.status.topic_scanned(topic.name.__root__) - yield topic + if topic_schema is not None: + topic_request.schemaText = topic_schema.schema_str + if topic_schema.schema_type == "AVRO": + topic_request.schemaType = SchemaType.Avro.name + elif topic_schema.schema_type == "PROTOBUF": + topic_request.schemaType = SchemaType.Protobuf.name + elif topic_schema.schema_type == "JSON": + topic_request.schemaType = SchemaType.JSON.name + else: + topic_request.schemaType = SchemaType.Other.name + + self.status.topic_scanned(topic_request.name.__root__) + yield topic_request else: - self.status.dropped(t) + self.status.dropped(topic_name) except Exception as err: logger.error(repr(err)) - self.status.failure(t) + self.status.failure(topic_name) def _parse_topic_metadata(self, topic: str) -> Optional[Schema]: logger.debug(f"topic = {topic}")