Fix #2819: Kafka connector security settings along with all configs to be retrieved (#2823)

* Fix #2811: Log level option for running metadata

* Fix #2819: Kafka connector security settings along with all configs to be retrieved

* Fix #2819: Kafka connector security settings along with all configs to be retrieved
This commit is contained in:
Sriharsha Chintalapani 2022-02-17 01:47:43 -08:00 committed by GitHub
parent b3b5f44c23
commit 165f5bb0e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 117 additions and 34 deletions

View File

@ -303,6 +303,7 @@ public class TopicRepository extends EntityRepository<Topic> {
recordChange("retentionSize", origTopic.getRetentionSize(), updatedTopic.getRetentionSize()); recordChange("retentionSize", origTopic.getRetentionSize(), updatedTopic.getRetentionSize());
recordChange("schemaText", origTopic.getSchemaText(), updatedTopic.getSchemaText()); recordChange("schemaText", origTopic.getSchemaText(), updatedTopic.getSchemaText());
recordChange("schemaType", origTopic.getSchemaType(), updatedTopic.getSchemaType()); recordChange("schemaType", origTopic.getSchemaType(), updatedTopic.getSchemaType());
recordChange("topicConfig", origTopic.getTopicConfig(), updatedTopic.getTopicConfig());
updateCleanupPolicies(origTopic, updatedTopic); updateCleanupPolicies(origTopic, updatedTopic);
} }

View File

@ -434,6 +434,7 @@ public class TopicResource {
.withRetentionSize(create.getRetentionSize()) .withRetentionSize(create.getRetentionSize())
.withRetentionTime(create.getRetentionTime()) .withRetentionTime(create.getRetentionTime())
.withReplicationFactor(create.getReplicationFactor()) .withReplicationFactor(create.getReplicationFactor())
.withTopicConfig(create.getTopicConfig())
.withTags(create.getTags()) .withTags(create.getTags())
.withOwner(create.getOwner()) .withOwner(create.getOwner())
.withUpdatedBy(securityContext.getUserPrincipal().getName()) .withUpdatedBy(securityContext.getUserPrincipal().getName())

View File

@ -17,11 +17,6 @@
"description": "Link to the messaging service where this topic is hosted in", "description": "Link to the messaging service where this topic is hosted in",
"$ref": "../../type/entityReference.json" "$ref": "../../type/entityReference.json"
}, },
"partitions": {
"description": "Number of partitions into which the topic is divided.",
"type": "integer",
"minimum": 1
},
"schemaText": { "schemaText": {
"description": "Schema used for message serialization. Optional as some topics may not have associated schemas.", "description": "Schema used for message serialization. Optional as some topics may not have associated schemas.",
"type": "string" "type": "string"
@ -30,6 +25,11 @@
"description": "Schema used for message serialization.", "description": "Schema used for message serialization.",
"$ref": "../../entity/data/topic.json#/definitions/schemaType" "$ref": "../../entity/data/topic.json#/definitions/schemaType"
}, },
"partitions": {
"description": "Number of partitions into which the topic is divided.",
"type": "integer",
"minimum": 1
},
"cleanupPolicies": { "cleanupPolicies": {
"description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.", "description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.",
"type": "array", "type": "array",
@ -58,6 +58,10 @@
"type": "integer", "type": "integer",
"default": "-1" "default": "-1"
}, },
"topicConfig": {
"description": "Contains key/value pair of topic configuration.",
"$ref": "../../entity/data/topic.json#/definitions/topicConfig"
},
"owner": { "owner": {
"description": "Owner of this topic", "description": "Owner of this topic",
"$ref": "../../type/entityReference.json" "$ref": "../../type/entityReference.json"

View File

@ -35,6 +35,11 @@
"javaType": "org.openmetadata.catalog.type.topic.CleanupPolicy", "javaType": "org.openmetadata.catalog.type.topic.CleanupPolicy",
"description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.", "description": "Topic clean up policy. For Kafka - `cleanup.policy` configuration.",
"enum": ["delete", "compact"] "enum": ["delete", "compact"]
},
"topicConfig": {
"javaType": "org.openmetadata.catalog.type.topic.TopicConfig",
"description": "Contains key/value pair of topic configuration.",
"type": "object"
} }
}, },
"properties": { "properties": {
@ -78,11 +83,6 @@
"description": "Service type where this topic is hosted in.", "description": "Service type where this topic is hosted in.",
"$ref": "../services/messagingService.json#/definitions/messagingServiceType" "$ref": "../services/messagingService.json#/definitions/messagingServiceType"
}, },
"partitions": {
"description": "Number of partitions into which the topic is divided.",
"type": "integer",
"minimum": 1
},
"schemaText": { "schemaText": {
"description": "Schema used for message serialization. Optional as some topics may not have associated schemas.", "description": "Schema used for message serialization. Optional as some topics may not have associated schemas.",
"type": "string" "type": "string"
@ -91,6 +91,11 @@
"description": "Schema used for message serialization.", "description": "Schema used for message serialization.",
"$ref": "#/definitions/schemaType" "$ref": "#/definitions/schemaType"
}, },
"partitions": {
"description": "Number of partitions into which the topic is divided.",
"type": "integer",
"minimum": 1
},
"cleanupPolicies": { "cleanupPolicies": {
"description": "Topic clean up policies. For Kafka - `cleanup.policy` configuration.", "description": "Topic clean up policies. For Kafka - `cleanup.policy` configuration.",
"type": "array", "type": "array",
@ -119,6 +124,10 @@
"type": "integer", "type": "integer",
"default": "-1" "default": "-1"
}, },
"topicConfig": {
"description": "Contains key/value pair of topic configuration.",
"$ref": "#/definitions/topicConfig"
},
"owner": { "owner": {
"description": "Owner of this topic.", "description": "Owner of this topic.",
"$ref": "../../type/entityReference.json" "$ref": "../../type/entityReference.json"

View File

@ -111,6 +111,8 @@ public class TopicResourceTest extends EntityResourceTest<Topic, CreateTopic> {
@Test @Test
void put_topicAttributes_200_ok(TestInfo test) throws IOException { void put_topicAttributes_200_ok(TestInfo test) throws IOException {
Map<String, Object> topicConfig = new HashMap<>();
CreateTopic createTopic = CreateTopic createTopic =
createRequest(test) createRequest(test)
.withOwner(USER_OWNER1) .withOwner(USER_OWNER1)

View File

@ -7,5 +7,5 @@ Provides metadata version information.
from incremental import Version from incremental import Version
__version__ = Version("metadata", 0, 9, 0, dev=7) __version__ = Version("metadata", 0, 9, 0, dev=8)
__all__ = ["__version__"] __all__ = ["__version__"]

View File

@ -0,0 +1,25 @@
{
"source": {
"type": "kafka",
"config": {
"service_name": "local_kafka",
"bootstrap_servers": "localhost:9092",
"schema_registry_url": "http://192.168.1.43:8081",
"filter_pattern": {
"excludes": ["_confluent.*"]
}
}
},
"sink": {
"type": "metadata-rest",
"config": {
}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
}

View File

@ -45,10 +45,19 @@ def check() -> None:
@click.option( @click.option(
"--debug/--no-debug", default=lambda: os.environ.get("METADATA_DEBUG", False) "--debug/--no-debug", default=lambda: os.environ.get("METADATA_DEBUG", False)
) )
def metadata(debug: bool) -> None: @click.option(
"--log-level",
"-l",
type=click.Choice(["INFO", "DEBUG", "WARNING", "ERROR", "CRITICAL"]),
help="Log level",
required=False,
)
def metadata(debug: bool, log_level: str) -> None:
if debug: if debug:
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
logging.getLogger("metadata").setLevel(logging.DEBUG) logging.getLogger("metadata").setLevel(logging.DEBUG)
elif log_level:
logging.getLogger().setLevel(log_level)
else: else:
logging.getLogger().setLevel(logging.WARNING) logging.getLogger().setLevel(logging.WARNING)
logging.getLogger("metadata").setLevel(logging.INFO) logging.getLogger("metadata").setLevel(logging.INFO)

View File

@ -99,37 +99,69 @@ class KafkaSource(Source[CreateTopicRequest]):
pass pass
def next_record(self) -> Iterable[CreateTopicRequest]: def next_record(self) -> Iterable[CreateTopicRequest]:
topics = self.admin_client.list_topics().topics topics_dict = self.admin_client.list_topics().topics
for t in topics: for topic_name, topic_metadata in topics_dict.items():
try: try:
if self.config.filter_pattern.included(t): if self.config.filter_pattern.included(topic_name):
logger.info("Fetching topic schema {}".format(t)) logger.info("Fetching topic schema {}".format(topic_name))
topic_schema = self._parse_topic_metadata(t) topic_schema = self._parse_topic_metadata(topic_name)
topic = CreateTopicRequest( logger.info("Fetching topic config {}".format(topic_name))
name=t.replace(".", "_DOT_"), topic_request = CreateTopicRequest(
name=topic_name.replace(".", "_DOT_"),
service=EntityReference( service=EntityReference(
id=self.service.id, type="messagingService" id=self.service.id, type="messagingService"
), ),
partitions=1, partitions=len(topic_metadata.partitions),
replicationFactor=len(
topic_metadata.partitions.get(0).replicas
),
) )
if topic_schema is not None: topic_configResource = self.admin_client.describe_configs(
topic.schemaText = topic_schema.schema_str [
if topic_schema.schema_type == "AVRO": ConfigResource(
topic.schemaType = SchemaType.Avro.name confluent_kafka.admin.RESOURCE_TOPIC, topic_name
elif topic_schema.schema_type == "PROTOBUF": )
topic.schemaType = SchemaType.Protobuf.name ]
elif topic_schema.schema_type == "JSON": )
topic.schemaType = SchemaType.JSON.name for j in concurrent.futures.as_completed(
else: iter(topic_configResource.values())
topic.schemaType = SchemaType.Other.name ):
config_response = j.result(timeout=10)
topic_request.maximumMessageSize = config_response.get(
"max.message.bytes"
).value
topic_request.minimumInSyncReplicas = config_response.get(
"min.insync.replicas"
).value
topic_request.retentionTime = config_response.get(
"retention.ms"
).value
topic_request.cleanupPolicies = [
config_response.get("cleanup.policy").value
]
topic_config = {}
for key, conf_response in config_response.items():
topic_config[key] = conf_response.value
topic_request.topicConfig = topic_config
self.status.topic_scanned(topic.name.__root__) if topic_schema is not None:
yield topic topic_request.schemaText = topic_schema.schema_str
if topic_schema.schema_type == "AVRO":
topic_request.schemaType = SchemaType.Avro.name
elif topic_schema.schema_type == "PROTOBUF":
topic_request.schemaType = SchemaType.Protobuf.name
elif topic_schema.schema_type == "JSON":
topic_request.schemaType = SchemaType.JSON.name
else: else:
self.status.dropped(t) topic_request.schemaType = SchemaType.Other.name
self.status.topic_scanned(topic_request.name.__root__)
yield topic_request
else:
self.status.dropped(topic_name)
except Exception as err: except Exception as err:
logger.error(repr(err)) logger.error(repr(err))
self.status.failure(t) self.status.failure(topic_name)
def _parse_topic_metadata(self, topic: str) -> Optional[Schema]: def _parse_topic_metadata(self, topic: str) -> Optional[Schema]:
logger.debug(f"topic = {topic}") logger.debug(f"topic = {topic}")