diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json index 514edec275e..5bc7342207e 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/messaging/kafkaConnection.json @@ -23,15 +23,13 @@ "bootstrapServers": { "title": "Bootstrap Servers", "description": "Kafka bootstrap servers. add them in comma separated values ex: host1:9092,host2:9092", - "type": "string", - "default": "localhost:9092" + "type": "string" }, "schemaRegistryURL": { "title": "Schema Registry URL", "description": "Confluent Kafka Schema Registry URL.", "type": "string", - "format": "uri", - "default": "http://localhost:8081" + "format": "uri" }, "consumerConfig": { "title": "Consumer Config", @@ -52,5 +50,6 @@ "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" } }, - "additionalProperties": false + "additionalProperties": false, + "required": ["bootstrapServers"] } diff --git a/ingestion/examples/sample_data/topics/service.json b/ingestion/examples/sample_data/topics/service.json index 11785adaab2..9a5ff24088b 100644 --- a/ingestion/examples/sample_data/topics/service.json +++ b/ingestion/examples/sample_data/topics/service.json @@ -3,7 +3,8 @@ "serviceName": "sample_kafka", "serviceConnection": { "config": { - "type": "Kafka" + "type": "Kafka", + "bootstrapServers": "localhost:9092" } }, "sourceConfig": { diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py index f26173799a7..b1cc8d7e3c9 100644 --- a/ingestion/src/metadata/ingestion/source/kafka.py +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -42,7 +42,7 @@ from metadata.ingestion.api.common import logger from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.connection_clients import KafkaClient -from metadata.utils.connections import test_connection +from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_topic from metadata.utils.helpers import get_messaging_service_or_create @@ -80,17 +80,10 @@ class KafkaSource(Source[CreateTopicRequest]): self.service = self.metadata.get_service_or_create( entity=MessagingService, config=config ) - self.service_connection.schemaRegistryConfig[ - "url" - ] = self.service_connection.schemaRegistryURL - self.schema_registry_client = SchemaRegistryClient( - self.service_connection.schemaRegistryConfig - ) - admin_client_config = self.service_connection.consumerConfig - admin_client_config[ - "bootstrap.servers" - ] = self.service_connection.bootstrapServers - self.admin_client = AdminClient(admin_client_config) + + self.connection: KafkaClient = get_connection(self.service_connection) + self.admin_client = self.connection.admin_client + self.schema_registry_client = self.connection.schema_registry_client @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -192,6 +185,4 @@ class KafkaSource(Source[CreateTopicRequest]): pass def test_connection(self) -> None: - test_connection(KafkaClient(client=self.admin_client)) - if self.service_connection.schemaRegistryURL: - test_connection(KafkaClient(client=self.schema_registry_client)) + test_connection(self.connection) diff --git a/ingestion/src/metadata/utils/connection_clients.py b/ingestion/src/metadata/utils/connection_clients.py index 00129b9c0e9..89d5fce5eb7 100644 --- a/ingestion/src/metadata/utils/connection_clients.py +++ b/ingestion/src/metadata/utils/connection_clients.py @@ -44,8 +44,9 @@ class DeltaLakeClient: @dataclass class KafkaClient: - def __init__(self, client) -> None: - self.client = client + def __init__(self, admin_client, schema_registry_client) -> None: + self.admin_client = admin_client + self.schema_registry_client = schema_registry_client # Optional @dataclass diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index 9ed5e51114f..8891062d726 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -61,6 +61,9 @@ from metadata.generated.schema.entity.services.connections.database.salesforceCo from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( SnowflakeConnection, ) +from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( + KafkaConnection, +) from metadata.utils.connection_clients import ( DeltaLakeClient, DynamoClient, @@ -85,7 +88,7 @@ class SourceConnectionException(Exception): """ -def create_generic_connection(connection, verbose: bool = False): +def create_generic_connection(connection, verbose: bool = False) -> Engine: """ Generic Engine creation from connection object :param connection: JSON Schema connection model @@ -109,7 +112,7 @@ def create_generic_connection(connection, verbose: bool = False): @singledispatch def get_connection( connection, verbose: bool = False -) -> Union[Engine, DynamoClient, GlueClient, SalesforceClient]: +) -> Union[Engine, DynamoClient, GlueClient, SalesforceClient, KafkaClient]: """ Given an SQL configuration, build the SQLAlchemy Engine """ @@ -126,7 +129,7 @@ def _(connection: DatabricksConnection, verbose: bool = False): @get_connection.register -def _(connection: SnowflakeConnection, verbose: bool = False): +def _(connection: SnowflakeConnection, verbose: bool = False) -> Engine: if connection.privateKey: from cryptography.hazmat.backends import default_backend @@ -161,7 +164,7 @@ def _(connection: SnowflakeConnection, verbose: bool = False): @get_connection.register -def _(connection: BigQueryConnection, verbose: bool = False): +def _(connection: BigQueryConnection, verbose: bool = False) -> Engine: """ Prepare the engine and the GCS credentials :param connection: BigQuery connection @@ -173,7 +176,7 @@ def _(connection: BigQueryConnection, verbose: bool = False): @get_connection.register -def _(connection: DynamoDBConnection, verbose: bool = False): +def _(connection: DynamoDBConnection, verbose: bool = False) -> DynamoClient: from metadata.utils.aws_client import AWSClient dynomo_connection = AWSClient(connection.awsConfig).get_dynomo_client() @@ -181,7 +184,7 @@ def _(connection: DynamoDBConnection, verbose: bool = False): @get_connection.register -def _(connection: GlueConnection, verbose: bool = False): +def _(connection: GlueConnection, verbose: bool = False) -> GlueClient: from metadata.utils.aws_client import AWSClient glue_connection = AWSClient(connection.awsConfig).get_glue_client() @@ -189,7 +192,7 @@ def _(connection: GlueConnection, verbose: bool = False): @get_connection.register -def _(connection: SalesforceConnection, verbose: bool = False): +def _(connection: SalesforceConnection, verbose: bool = False) -> SalesforceClient: from simple_salesforce import Salesforce salesforce_connection = SalesforceClient( @@ -202,6 +205,59 @@ def _(connection: SalesforceConnection, verbose: bool = False): return salesforce_connection +@get_connection.register +def _(connection: DeltaLakeConnection, verbose: bool = False) -> DeltaLakeClient: + import pyspark + from delta import configure_spark_with_delta_pip + + builder = ( + pyspark.sql.SparkSession.builder.appName(connection.appName) + .enableHiveSupport() + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + ) + if connection.metastoreHostPort: + builder.config( + "hive.metastore.uris", + f"thrift://{connection.metastoreHostPort}", + ) + elif connection.metastoreFilePath: + builder.config("spark.sql.warehouse.dir", f"{connection.metastoreFilePath}") + + deltalake_connection = DeltaLakeClient( + configure_spark_with_delta_pip(builder).getOrCreate() + ) + return deltalake_connection + + +@get_connection.register +def _(connection: KafkaConnection, verbose: bool = False) -> KafkaClient: + """ + Prepare Kafka Admin Client and Schema Registry Client + """ + from confluent_kafka.admin import AdminClient, ConfigResource + from confluent_kafka.schema_registry.schema_registry_client import ( + Schema, + SchemaRegistryClient, + ) + + schema_registry_client = None + if connection.schemaRegistryURL: + connection.schemaRegistryConfig["url"] = connection.schemaRegistryURL + schema_registry_client = SchemaRegistryClient(connection.schemaRegistryConfig) + + admin_client_config = connection.consumerConfig + admin_client_config["bootstrap.servers"] = connection.bootstrapServers + admin_client = AdminClient(admin_client_config) + + return KafkaClient( + admin_client=admin_client, schema_registry_client=schema_registry_client + ) + + def create_and_bind_session(engine: Engine) -> Session: """ Given an engine, create a session bound @@ -214,10 +270,12 @@ def create_and_bind_session(engine: Engine) -> Session: @timeout(seconds=120) @singledispatch -def test_connection(connection: Engine) -> None: +def test_connection(connection) -> None: """ + Default implementation is the engine to test. + Test that we can connect to the source using the given engine - :param engine: Engine to test + :param connection: Engine to test :return: None or raise an exception if we cannot connect """ try: @@ -291,43 +349,17 @@ def _(connection: SalesforceClient) -> None: ) -@get_connection.register -def _(connection: DeltaLakeConnection, verbose: bool = False): - import pyspark - from delta import configure_spark_with_delta_pip - - builder = ( - pyspark.sql.SparkSession.builder.appName(connection.appName) - .enableHiveSupport() - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") - .config( - "spark.sql.catalog.spark_catalog", - "org.apache.spark.sql.delta.catalog.DeltaCatalog", - ) - ) - if connection.metastoreHostPort: - builder.config( - "hive.metastore.uris", - f"thrift://{connection.metastoreHostPort}", - ) - elif connection.metastoreFilePath: - builder.config("spark.sql.warehouse.dir", f"{connection.metastoreFilePath}") - - deltalake_connection = DeltaLakeClient( - configure_spark_with_delta_pip(builder).getOrCreate() - ) - return deltalake_connection - - @test_connection.register def _(connection: KafkaClient) -> None: - from confluent_kafka.admin import AdminClient + """ + Test AdminClient. + If exists, test the Schema Registry client as well. + """ try: - if isinstance(connection.client, AdminClient): - return connection.client.list_topics().topics - else: - return connection.client.get_subjects() + _ = connection.admin_client.list_topics().topics + if connection.schema_registry_client: + _ = connection.schema_registry_client.get_subjects() except Exception as err: raise SourceConnectionException( f"Unknown error connecting with {connection} - {err}." diff --git a/ingestion/src/metadata/utils/filters.py b/ingestion/src/metadata/utils/filters.py index 36d92dbd0d1..82edc5a89bb 100644 --- a/ingestion/src/metadata/utils/filters.py +++ b/ingestion/src/metadata/utils/filters.py @@ -110,7 +110,7 @@ def filter_by_chart( chart_filter_pattern: Optional[FilterPattern], chart_name: str ) -> bool: """ - Return True if the schema needs to be filtered, False otherwise + Return True if the chart needs to be filtered, False otherwise Include takes precedence over exclude @@ -138,7 +138,7 @@ def filter_by_dashboard( dashboard_filter_pattern: Optional[FilterPattern], dashboard_name: str ) -> bool: """ - Return True if the schema needs to be filtered, False otherwise + Return True if the dashboard needs to be filtered, False otherwise Include takes precedence over exclude @@ -151,7 +151,7 @@ def filter_by_dashboard( def filter_by_fqn(fqn_filter_pattern: Optional[FilterPattern], fqn: str) -> bool: """ - Return True if the schema needs to be filtered, False otherwise + Return True if the FQN needs to be filtered, False otherwise Include takes precedence over exclude diff --git a/ingestion/tests/integration/ometa/test_ometa_service_api.py b/ingestion/tests/integration/ometa/test_ometa_service_api.py index 999fb9bbe12..4505a021ec5 100644 --- a/ingestion/tests/integration/ometa/test_ometa_service_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_service_api.py @@ -253,9 +253,7 @@ class OMetaServiceTest(TestCase): "type": "kafka", "serviceName": "local_kafka", "serviceConnection": { - "config": { - "type": "Kafka", - } + "config": {"type": "Kafka", "bootstrapServers": "localhost:9092"} }, "sourceConfig": {"config": {}}, } diff --git a/ingestion/tests/integration/ometa/test_ometa_topic_api.py b/ingestion/tests/integration/ometa/test_ometa_topic_api.py index 7b17136a4f7..bff010d4aa2 100644 --- a/ingestion/tests/integration/ometa/test_ometa_topic_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_topic_api.py @@ -57,7 +57,9 @@ class OMetaTopicTest(TestCase): service = CreateMessagingServiceRequest( name="test-service-topic", serviceType=MessagingServiceType.Kafka, - connection=MessagingConnection(config=KafkaConnection()), + connection=MessagingConnection( + config=KafkaConnection(bootstrapServers="localhost:9092") + ), ) service_type = "messagingService" diff --git a/ingestion/tests/unit/filter_pattern.py b/ingestion/tests/unit/filter_pattern.py new file mode 100644 index 00000000000..335edaa6e89 --- /dev/null +++ b/ingestion/tests/unit/filter_pattern.py @@ -0,0 +1,38 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Validate filter patterns +""" +from unittest import TestCase + +from metadata.generated.schema.type.filterPattern import FilterPattern +from metadata.utils.filters import filter_by_fqn + + +class FilterPatternTests(TestCase): + """ + Validate filter patterns + """ + + def test_filter_by_fqn(self): + """ + Check FQN filters + """ + fqn_filter_db = FilterPattern(includes=["^.*my_database.*$"]) + + assert not filter_by_fqn(fqn_filter_db, "service.my_database.schema.table") + assert filter_by_fqn(fqn_filter_db, "service.another_db.schema.table") + + fqn_filter_schema = FilterPattern(includes=["^.*my_db.my_schema.*$"]) + + assert not filter_by_fqn(fqn_filter_schema, "service.my_db.my_schema.table") + assert filter_by_fqn(fqn_filter_schema, "service.another_db.my_schema.table")