MINOR: Improve Kafka Test Connection (#15207)

This commit is contained in:
Mayur Singal 2024-02-19 10:11:52 +05:30 committed by GitHub
parent 95ff5620a5
commit 64e3cedeea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 85 additions and 7 deletions

View File

@ -15,7 +15,7 @@ Source connection handler
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Union from typing import Optional, Union
from confluent_kafka.admin import AdminClient from confluent_kafka.admin import AdminClient, KafkaException
from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro import AvroConsumer
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
@ -35,6 +35,15 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
class InvalidKafkaCreds(Exception):
"""
Class to indicate invalid kafka credentials exception
"""
TIMEOUT_SECONDS = 10
@dataclass @dataclass
class KafkaClient: class KafkaClient:
def __init__(self, admin_client, schema_registry_client, consumer_client) -> None: def __init__(self, admin_client, schema_registry_client, consumer_client) -> None:
@ -60,6 +69,14 @@ def get_connection(
if connection.saslMechanism: if connection.saslMechanism:
connection.consumerConfig["sasl.mechanism"] = connection.saslMechanism.value connection.consumerConfig["sasl.mechanism"] = connection.saslMechanism.value
if (
connection.consumerConfig.get("security.protocol") is None
and connection.securityProtocol
):
connection.consumerConfig[
"security.protocol"
] = connection.securityProtocol.value
if connection.basicAuthUserInfo: if connection.basicAuthUserInfo:
connection.schemaRegistryConfig = connection.schemaRegistryConfig or {} connection.schemaRegistryConfig = connection.schemaRegistryConfig or {}
connection.schemaRegistryConfig[ connection.schemaRegistryConfig[
@ -109,9 +126,18 @@ def test_connection(
""" """
def custom_executor(): def custom_executor():
_ = client.admin_client.list_topics().topics try:
client.admin_client.list_topics(timeout=TIMEOUT_SECONDS).topics
except KafkaException as err:
raise InvalidKafkaCreds(
f"Failed to fetch topics due to: {err}. "
"Please validate credentials and check if you are using correct security protocol"
)
test_fn = {"GetTopics": custom_executor} test_fn = {
"GetTopics": custom_executor,
"CheckSchemaRegistry": client.schema_registry_client.get_subjects,
}
test_connection_steps( test_connection_steps(
metadata=metadata, metadata=metadata,

View File

@ -9,6 +9,13 @@
"errorMessage": "Failed to fetch topics, please validate the credentials", "errorMessage": "Failed to fetch topics, please validate the credentials",
"shortCircuit": true, "shortCircuit": true,
"mandatory": true "mandatory": true
},
{
"name": "CheckSchemaRegistry",
"description": "Validate schema registry credentials",
"errorMessage": "Failed to interact with schema registry API, please validate the schema registry credentials",
"shortCircuit": false,
"mandatory": false
} }
] ]
} }

View File

@ -9,6 +9,13 @@
"errorMessage": "Failed to fetch topics, please validate the credentials", "errorMessage": "Failed to fetch topics, please validate the credentials",
"shortCircuit": true, "shortCircuit": true,
"mandatory": true "mandatory": true
},
{
"name": "CheckSchemaRegistry",
"description": "Validate schema registry credentials",
"errorMessage": "Failed to interact with schema registry API, please validate the schema registry credentials",
"shortCircuit": false,
"mandatory": false
} }
] ]
} }

View File

@ -42,6 +42,13 @@
"type": "string", "type": "string",
"format": "password" "format": "password"
}, },
"securityProtocol": {
"title": "Security Protocol",
"description": "security.protocol consumer config property",
"type": "string",
"enum": ["PLAINTEXT","SASL_PLAINTEXT","SASL_SSL","SSL"],
"default": "PLAINTEXT"
},
"saslMechanism": { "saslMechanism": {
"title": "SASL Mechanism", "title": "SASL Mechanism",
"description": "sasl.mechanism Consumer Config property", "description": "sasl.mechanism Consumer Config property",
@ -49,7 +56,7 @@
"default": "PLAIN" "default": "PLAIN"
}, },
"basicAuthUserInfo": { "basicAuthUserInfo": {
"title": "Basic Auth User Info", "title": "Schema Registry Basic Auth User Info",
"description": "basic.auth.user.info schema registry config property, Client HTTP credentials in the form of username:password.", "description": "basic.auth.user.info schema registry config property, Client HTTP credentials in the form of username:password.",
"type": "string", "type": "string",
"format": "password" "format": "password"

View File

@ -42,6 +42,13 @@
"type": "string", "type": "string",
"format": "password" "format": "password"
}, },
"securityProtocol": {
"title": "Security Protocol",
"description": "security.protocol consumer config property",
"type": "string",
"enum": ["PLAINTEXT","SASL_PLAINTEXT","SASL_SSL","SSL"],
"default": "PLAINTEXT"
},
"saslMechanism": { "saslMechanism": {
"title": "SASL Mechanism", "title": "SASL Mechanism",
"description": "sasl.mechanism Consumer Config property", "description": "sasl.mechanism Consumer Config property",
@ -49,7 +56,7 @@
"default": "PLAIN" "default": "PLAIN"
}, },
"basicAuthUserInfo": { "basicAuthUserInfo": {
"title": "Basic Auth User Info", "title": "Schema Registry Basic Auth User Info",
"description": "basic.auth.user.info schema registry config property, Client HTTP credentials in the form of username:password.", "description": "basic.auth.user.info schema registry config property, Client HTTP credentials in the form of username:password.",
"type": "string", "type": "string",
"format": "password" "format": "password"

View File

@ -40,6 +40,18 @@ $$section
SASL password for use with the PLAIN and SASL-SCRAM mechanisms. SASL password for use with the PLAIN and SASL-SCRAM mechanisms.
$$ $$
$$section
### Security Protocol $(id="securityProtocol")
Security Protocol used in bootstrap server.
Supported:
`PLAINTEXT`: Un-authenticated, non-encrypted channel
`SASL_PLAINTEXT`: SASL authenticated, non-encrypted channel
`SASL_SSL`: SASL authenticated, SSL channel
`SSL`: SSL channel
$$
$$section $$section
### SASL Mechanism $(id="saslMechanism") ### SASL Mechanism $(id="saslMechanism")
@ -51,7 +63,7 @@ Supported: `GSSAPI`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `OAUTHBEARER`.
$$ $$
$$section $$section
### Basic Auth User Info $(id="basicAuthUserInfo") ### Schema Registry Basic Auth User Info $(id="basicAuthUserInfo")
Schema Registry Client HTTP credentials in the form of `username:password`. Schema Registry Client HTTP credentials in the form of `username:password`.

View File

@ -26,6 +26,18 @@ URL of the Schema Registry used to ingest the schemas of the topics.
**NOTE**: For now, the schema will be the last version found for the schema name `{topic-name}-value`. An [issue](https://github.com/open-metadata/OpenMetadata/issues/10399) to improve how it currently works has been opened. **NOTE**: For now, the schema will be the last version found for the schema name `{topic-name}-value`. An [issue](https://github.com/open-metadata/OpenMetadata/issues/10399) to improve how it currently works has been opened.
$$ $$
$$section
### Security Protocol $(id="securityProtocol")
Security Protocol used in bootstrap server.
Supported:
`PLAINTEXT`: Un-authenticated, non-encrypted channel
`SASL_PLAINTEXT`: SASL authenticated, non-encrypted channel
`SASL_SSL`: SASL authenticated, SSL channel
`SSL`: SSL channel
$$
$$section $$section
### SASL Username $(id="saslUsername") ### SASL Username $(id="saslUsername")
@ -49,7 +61,7 @@ Supported: `GSSAPI`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`, `OAUTHBEARER`.
$$ $$
$$section $$section
### Basic Auth User Info $(id="basicAuthUserInfo") ### Schema Registry Basic Auth User Info $(id="basicAuthUserInfo")
Schema Registry Client HTTP credentials in the form of `username:password`. Schema Registry Client HTTP credentials in the form of `username:password`.