Fix #4728 - Fix Kafka test connection (#4730)

* Fix Kafka connection

* Update defaults and required values

* Update test connection

* Add filter tests

* Fix kafka tests
This commit is contained in:
Pere Miquel Brull 2022-05-06 08:59:05 +02:00 committed by GitHub
parent d2eef5ec0e
commit a22ff4627f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 134 additions and 72 deletions

View File

@ -23,15 +23,13 @@
"bootstrapServers": {
"title": "Bootstrap Servers",
"description": "Kafka bootstrap servers. add them in comma separated values ex: host1:9092,host2:9092",
"type": "string",
"default": "localhost:9092"
"type": "string"
},
"schemaRegistryURL": {
"title": "Schema Registry URL",
"description": "Confluent Kafka Schema Registry URL.",
"type": "string",
"format": "uri",
"default": "http://localhost:8081"
"format": "uri"
},
"consumerConfig": {
"title": "Consumer Config",
@ -52,5 +50,6 @@
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false
"additionalProperties": false,
"required": ["bootstrapServers"]
}

View File

@ -3,7 +3,8 @@
"serviceName": "sample_kafka",
"serviceConnection": {
"config": {
"type": "Kafka"
"type": "Kafka",
"bootstrapServers": "localhost:9092"
}
},
"sourceConfig": {

View File

@ -42,7 +42,7 @@ from metadata.ingestion.api.common import logger
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.connection_clients import KafkaClient
from metadata.utils.connections import test_connection
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_topic
from metadata.utils.helpers import get_messaging_service_or_create
@ -80,17 +80,10 @@ class KafkaSource(Source[CreateTopicRequest]):
self.service = self.metadata.get_service_or_create(
entity=MessagingService, config=config
)
self.service_connection.schemaRegistryConfig[
"url"
] = self.service_connection.schemaRegistryURL
self.schema_registry_client = SchemaRegistryClient(
self.service_connection.schemaRegistryConfig
)
admin_client_config = self.service_connection.consumerConfig
admin_client_config[
"bootstrap.servers"
] = self.service_connection.bootstrapServers
self.admin_client = AdminClient(admin_client_config)
self.connection: KafkaClient = get_connection(self.service_connection)
self.admin_client = self.connection.admin_client
self.schema_registry_client = self.connection.schema_registry_client
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -192,6 +185,4 @@ class KafkaSource(Source[CreateTopicRequest]):
pass
def test_connection(self) -> None:
test_connection(KafkaClient(client=self.admin_client))
if self.service_connection.schemaRegistryURL:
test_connection(KafkaClient(client=self.schema_registry_client))
test_connection(self.connection)

View File

@ -44,8 +44,9 @@ class DeltaLakeClient:
@dataclass
class KafkaClient:
def __init__(self, client) -> None:
self.client = client
def __init__(self, admin_client, schema_registry_client) -> None:
self.admin_client = admin_client
self.schema_registry_client = schema_registry_client # Optional
@dataclass

View File

@ -61,6 +61,9 @@ from metadata.generated.schema.entity.services.connections.database.salesforceCo
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
)
from metadata.utils.connection_clients import (
DeltaLakeClient,
DynamoClient,
@ -85,7 +88,7 @@ class SourceConnectionException(Exception):
"""
def create_generic_connection(connection, verbose: bool = False):
def create_generic_connection(connection, verbose: bool = False) -> Engine:
"""
Generic Engine creation from connection object
:param connection: JSON Schema connection model
@ -109,7 +112,7 @@ def create_generic_connection(connection, verbose: bool = False):
@singledispatch
def get_connection(
connection, verbose: bool = False
) -> Union[Engine, DynamoClient, GlueClient, SalesforceClient]:
) -> Union[Engine, DynamoClient, GlueClient, SalesforceClient, KafkaClient]:
"""
Given an SQL configuration, build the SQLAlchemy Engine
"""
@ -126,7 +129,7 @@ def _(connection: DatabricksConnection, verbose: bool = False):
@get_connection.register
def _(connection: SnowflakeConnection, verbose: bool = False):
def _(connection: SnowflakeConnection, verbose: bool = False) -> Engine:
if connection.privateKey:
from cryptography.hazmat.backends import default_backend
@ -161,7 +164,7 @@ def _(connection: SnowflakeConnection, verbose: bool = False):
@get_connection.register
def _(connection: BigQueryConnection, verbose: bool = False):
def _(connection: BigQueryConnection, verbose: bool = False) -> Engine:
"""
Prepare the engine and the GCS credentials
:param connection: BigQuery connection
@ -173,7 +176,7 @@ def _(connection: BigQueryConnection, verbose: bool = False):
@get_connection.register
def _(connection: DynamoDBConnection, verbose: bool = False):
def _(connection: DynamoDBConnection, verbose: bool = False) -> DynamoClient:
from metadata.utils.aws_client import AWSClient
dynomo_connection = AWSClient(connection.awsConfig).get_dynomo_client()
@ -181,7 +184,7 @@ def _(connection: DynamoDBConnection, verbose: bool = False):
@get_connection.register
def _(connection: GlueConnection, verbose: bool = False):
def _(connection: GlueConnection, verbose: bool = False) -> GlueClient:
from metadata.utils.aws_client import AWSClient
glue_connection = AWSClient(connection.awsConfig).get_glue_client()
@ -189,7 +192,7 @@ def _(connection: GlueConnection, verbose: bool = False):
@get_connection.register
def _(connection: SalesforceConnection, verbose: bool = False):
def _(connection: SalesforceConnection, verbose: bool = False) -> SalesforceClient:
from simple_salesforce import Salesforce
salesforce_connection = SalesforceClient(
@ -202,6 +205,59 @@ def _(connection: SalesforceConnection, verbose: bool = False):
return salesforce_connection
@get_connection.register
def _(connection: DeltaLakeConnection, verbose: bool = False) -> DeltaLakeClient:
import pyspark
from delta import configure_spark_with_delta_pip
builder = (
pyspark.sql.SparkSession.builder.appName(connection.appName)
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
if connection.metastoreHostPort:
builder.config(
"hive.metastore.uris",
f"thrift://{connection.metastoreHostPort}",
)
elif connection.metastoreFilePath:
builder.config("spark.sql.warehouse.dir", f"{connection.metastoreFilePath}")
deltalake_connection = DeltaLakeClient(
configure_spark_with_delta_pip(builder).getOrCreate()
)
return deltalake_connection
@get_connection.register
def _(connection: KafkaConnection, verbose: bool = False) -> KafkaClient:
"""
Prepare Kafka Admin Client and Schema Registry Client
"""
from confluent_kafka.admin import AdminClient, ConfigResource
from confluent_kafka.schema_registry.schema_registry_client import (
Schema,
SchemaRegistryClient,
)
schema_registry_client = None
if connection.schemaRegistryURL:
connection.schemaRegistryConfig["url"] = connection.schemaRegistryURL
schema_registry_client = SchemaRegistryClient(connection.schemaRegistryConfig)
admin_client_config = connection.consumerConfig
admin_client_config["bootstrap.servers"] = connection.bootstrapServers
admin_client = AdminClient(admin_client_config)
return KafkaClient(
admin_client=admin_client, schema_registry_client=schema_registry_client
)
def create_and_bind_session(engine: Engine) -> Session:
"""
Given an engine, create a session bound
@ -214,10 +270,12 @@ def create_and_bind_session(engine: Engine) -> Session:
@timeout(seconds=120)
@singledispatch
def test_connection(connection: Engine) -> None:
def test_connection(connection) -> None:
"""
Default implementation is the engine to test.
Test that we can connect to the source using the given engine
:param engine: Engine to test
:param connection: Engine to test
:return: None or raise an exception if we cannot connect
"""
try:
@ -291,43 +349,17 @@ def _(connection: SalesforceClient) -> None:
)
@get_connection.register
def _(connection: DeltaLakeConnection, verbose: bool = False):
import pyspark
from delta import configure_spark_with_delta_pip
builder = (
pyspark.sql.SparkSession.builder.appName(connection.appName)
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
if connection.metastoreHostPort:
builder.config(
"hive.metastore.uris",
f"thrift://{connection.metastoreHostPort}",
)
elif connection.metastoreFilePath:
builder.config("spark.sql.warehouse.dir", f"{connection.metastoreFilePath}")
deltalake_connection = DeltaLakeClient(
configure_spark_with_delta_pip(builder).getOrCreate()
)
return deltalake_connection
@test_connection.register
def _(connection: KafkaClient) -> None:
from confluent_kafka.admin import AdminClient
"""
Test AdminClient.
If exists, test the Schema Registry client as well.
"""
try:
if isinstance(connection.client, AdminClient):
return connection.client.list_topics().topics
else:
return connection.client.get_subjects()
_ = connection.admin_client.list_topics().topics
if connection.schema_registry_client:
_ = connection.schema_registry_client.get_subjects()
except Exception as err:
raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}."

View File

@ -110,7 +110,7 @@ def filter_by_chart(
chart_filter_pattern: Optional[FilterPattern], chart_name: str
) -> bool:
"""
Return True if the schema needs to be filtered, False otherwise
Return True if the chart needs to be filtered, False otherwise
Include takes precedence over exclude
@ -138,7 +138,7 @@ def filter_by_dashboard(
dashboard_filter_pattern: Optional[FilterPattern], dashboard_name: str
) -> bool:
"""
Return True if the schema needs to be filtered, False otherwise
Return True if the dashboard needs to be filtered, False otherwise
Include takes precedence over exclude
@ -151,7 +151,7 @@ def filter_by_dashboard(
def filter_by_fqn(fqn_filter_pattern: Optional[FilterPattern], fqn: str) -> bool:
"""
Return True if the schema needs to be filtered, False otherwise
Return True if the FQN needs to be filtered, False otherwise
Include takes precedence over exclude

View File

@ -253,9 +253,7 @@ class OMetaServiceTest(TestCase):
"type": "kafka",
"serviceName": "local_kafka",
"serviceConnection": {
"config": {
"type": "Kafka",
}
"config": {"type": "Kafka", "bootstrapServers": "localhost:9092"}
},
"sourceConfig": {"config": {}},
}

View File

@ -57,7 +57,9 @@ class OMetaTopicTest(TestCase):
service = CreateMessagingServiceRequest(
name="test-service-topic",
serviceType=MessagingServiceType.Kafka,
connection=MessagingConnection(config=KafkaConnection()),
connection=MessagingConnection(
config=KafkaConnection(bootstrapServers="localhost:9092")
),
)
service_type = "messagingService"

View File

@ -0,0 +1,38 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate filter patterns
"""
from unittest import TestCase
from metadata.generated.schema.type.filterPattern import FilterPattern
from metadata.utils.filters import filter_by_fqn
class FilterPatternTests(TestCase):
"""
Validate filter patterns
"""
def test_filter_by_fqn(self):
"""
Check FQN filters
"""
fqn_filter_db = FilterPattern(includes=["^.*my_database.*$"])
assert not filter_by_fqn(fqn_filter_db, "service.my_database.schema.table")
assert filter_by_fqn(fqn_filter_db, "service.another_db.schema.table")
fqn_filter_schema = FilterPattern(includes=["^.*my_db.my_schema.*$"])
assert not filter_by_fqn(fqn_filter_schema, "service.my_db.my_schema.table")
assert filter_by_fqn(fqn_filter_schema, "service.another_db.my_schema.table")