diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py index baadcb5b474..b760d8e696e 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/client.py @@ -23,7 +23,6 @@ from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnect from metadata.ingestion.source.pipeline.kafkaconnect.models import ( KafkaConnectDatasetDetails, KafkaConnectPipelineDetails, - KafkaConnectTasks, KafkaConnectTopics, ) from metadata.utils.helpers import clean_uri @@ -57,6 +56,20 @@ class KafkaConnectClient: auth = f"{config.KafkaConnectConfig.username}:{config.KafkaConnectConfig.password.get_secret_value()}" self.client = KafkaConnect(url=url, auth=auth, ssl_verify=ssl_verify) + def _enrich_connector_details( + self, connector_details: KafkaConnectPipelineDetails, connector_name: str + ) -> None: + """Helper method to enrich connector details with additional information.""" + connector_details.topics = self.get_connector_topics(connector=connector_name) + connector_details.config = self.get_connector_config(connector=connector_name) + if connector_details.config: + connector_details.description = connector_details.config.get( + "description", None + ) + connector_details.dataset = self.get_connector_dataset_info( + connector_details.config + ) + def get_cluster_info(self) -> Optional[dict]: """ Get the version and other details of the Kafka Connect cluster. @@ -101,14 +114,11 @@ class KafkaConnectClient: Get the list of connector plugins. """ try: - result = self.client.list_connector_plugins() - return result + return self.client.list_connector_plugins() except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Unable to get connector plugins {exc}") - return None - def get_connector_config(self, connector: str) -> Optional[dict]: """ Get the details of a single connector. @@ -125,7 +135,9 @@ class KafkaConnectClient: return None - def get_connector_dataset_info(self, connector: str) -> Optional[dict]: + def get_connector_dataset_info( + self, connector_config: dict + ) -> Optional[KafkaConnectDatasetDetails]: """ Get the details of dataset of connector if there is any. Checks in the connector configurations for dataset fields @@ -139,16 +151,14 @@ class KafkaConnectClient: is not found, has no dataset, or an error occurs. """ try: - conn_config = self.get_connector_config(connector=connector) - - if not conn_config: + if not connector_config: return None result = {} for dataset in SUPPORTED_DATASETS or []: for key in SUPPORTED_DATASETS[dataset] or []: - if conn_config.get(key): - result[dataset] = conn_config[key] + if connector_config.get(key): + result[dataset] = connector_config[key] return KafkaConnectDatasetDetails(**result) except Exception as exc: @@ -157,49 +167,6 @@ class KafkaConnectClient: return None - def get_connector_tasks(self, connector: str) -> Optional[List[KafkaConnectTasks]]: - """ - Get the list of tasks for a connector. - Args: - connector (str): The name of the connector. - Returns: - Optional[List[KafkaConnectTasks]]: A list of KafkaConnectTasks objects - representing the connector's tasks, - or None if the connector is not found - or an error occurs. - """ - try: - result = self.client.get_connector_status(connector=connector) - tasks = [KafkaConnectTasks(**task) for task in result.get("tasks") or []] - return tasks - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error(f"Unable to get connector tasks list {exc}") - - return None - - def get_connector_task_status(self, connector: str, task_id: int) -> Optional[dict]: - """ - Get the status of a specific task for a connector. - Args: - connector (str): The name of the connector. - task_id (int): The ID of the task. - Returns: - Optional[Dict]: A dictionary containing the task status information, - or None if the connector or task is not found - or an error occurs. - """ - try: - result = self.client.get_connector_task_status( - connector=connector, task_id=task_id - ) - return result - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error(f"Unable to get connector tasks status {exc}") - - return None - def get_connector_topics( self, connector: str ) -> Optional[List[KafkaConnectTopics]]: @@ -231,45 +198,27 @@ class KafkaConnectClient: return None - def get_connector_state(self, connector: str) -> Optional[str]: - """ - Get the status of a single connector. - Args: - connector (str): The name of the connector. - """ - try: - result = self.client.get_connector_status(connector=connector) - if result.get("connector"): - return result["connector"].get("state") - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Unable to get connector state due to {exc}") - - return None - def get_connector_list(self) -> Optional[List[KafkaConnectPipelineDetails]]: """ - Get the information of a single connector. - Args: - connector (str): The name of the connector. + Get the information of all connectors. Returns: - Optional[KafkaConnectPipelineDetails]: A KafkaConnectPipelineDetails - object containing connector information, - or None if the connector is not found - or an error occurs. + Optional[List[KafkaConnectPipelineDetails]]: A list of KafkaConnectPipelineDetails + objects containing connector information, + or None if an error occurs. """ try: - connectors = [] - for connector in self.get_connectors() or []: - result = self.client.get_connector_status(connector=connector) - connector_details = KafkaConnectPipelineDetails(**result) - connector_details.status = self.get_connector_state(connector=connector) - connector_details.tasks = self.get_connector_tasks(connector=connector) - connector_details.topics = self.get_connector_topics( - connector=connector - ) - connectors.append(connector_details) - return connectors + connector_data = self.get_connectors(expand="status") or {} + + for connector_name, connector_info in connector_data.items(): + if isinstance(connector_info, dict) and "status" in connector_info: + status_info = connector_info["status"] + connector_details = KafkaConnectPipelineDetails(**status_info) + connector_details.status = status_info.get("connector", {}).get( + "state", "UNASSIGNED" + ) + self._enrich_connector_details(connector_details, connector_name) + if connector_details: + yield connector_details except Exception as exc: logger.debug(traceback.format_exc()) logger.error(f"Unable to get connector information {exc}") diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py index 697c93bc485..be4cc6751e5 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/metadata.py @@ -36,7 +36,12 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.basic import EntityName, SourceUrl, Timestamp +from metadata.generated.schema.type.basic import ( + EntityName, + Markdown, + SourceUrl, + Timestamp, +) from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference @@ -65,7 +70,7 @@ STATUS_MAP = { class KafkaconnectSource(PipelineServiceSource): """ - Implements the necessary methods ot extract + Implements the necessary methods to extract Pipeline metadata from Kafka Connect """ @@ -100,6 +105,9 @@ class KafkaconnectSource(PipelineServiceSource): for task in pipeline_details.tasks or [] ], service=self.context.get().pipeline_service, + description=Markdown(pipeline_details.description) + if pipeline_details.description + else None, ) yield Either(right=pipeline_request) self.register_record(pipeline_request=pipeline_request) @@ -119,17 +127,12 @@ class KafkaconnectSource(PipelineServiceSource): Get lineage dataset entity """ try: - dataset_details = self.client.get_connector_dataset_info( - connector=pipeline_details.name - ) + dataset_details = pipeline_details.dataset if dataset_details: - if ( - isinstance(dataset_details.dataset_type, type(Table)) - and self.source_config.lineageInformation.dbServiceNames - ): - for dbservicename in ( - self.source_config.lineageInformation.dbServiceNames or [] - ): + if dataset_details.dataset_type == Table: + for ( + dbservicename + ) in self.source_config.lineageInformation.dbServiceNames or ["*"]: dataset_entity = self.metadata.get_by_name( entity=dataset_details.dataset_type, fqn=fqn.build( @@ -145,25 +148,25 @@ class KafkaconnectSource(PipelineServiceSource): if dataset_entity: return dataset_entity - if ( - isinstance(dataset_details.dataset_type, type(Container)) - and self.source_config.lineageInformation.storageServiceNames - ): - for storageservicename in ( - self.source_config.lineageInformation.storageServiceNames or [] - ): - dataset_entity = self.metadata.get_by_name( + if dataset_details.dataset_type == Container: + for ( + storageservicename + ) in self.source_config.lineageInformation.storageServiceNames or [ + "*" + ]: + storage_entity = self.metadata.get_by_name( entity=dataset_details.dataset_type, fqn=fqn.build( metadata=self.metadata, entity_type=dataset_details.dataset_type, container_name=dataset_details.container_name, service_name=storageservicename, + parent_container=None, ), ) - if dataset_entity: - return dataset_entity + if storage_entity: + return storage_entity except Exception as exc: logger.debug(traceback.format_exc()) @@ -180,7 +183,7 @@ class KafkaconnectSource(PipelineServiceSource): try: if not self.service_connection.messagingServiceName: logger.debug("Kafka messagingServiceName not found") - return None + return pipeline_fqn = fqn.build( metadata=self.metadata, @@ -246,8 +249,6 @@ class KafkaconnectSource(PipelineServiceSource): ) ) - return None - def get_pipelines_list(self) -> Iterable[KafkaConnectPipelineDetails]: """ Get List of all pipelines @@ -300,6 +301,7 @@ class KafkaconnectSource(PipelineServiceSource): service_name=self.context.get().pipeline_service, pipeline_name=self.context.get().pipeline, ) + yield Either( right=OMetaPipelineStatus( pipeline_fqn=pipeline_fqn, diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py index 720976db406..6d03d59deae 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kafkaconnect/models.py @@ -13,7 +13,7 @@ KafkaConnect Source Model module """ -from typing import List, Optional +from typing import List, Optional, Type, Union from pydantic import BaseModel, Field @@ -35,6 +35,20 @@ class KafkaConnectTopics(BaseModel): name: str = Field(..., description="Name of the topic (e.g., random-source-avro)") +class KafkaConnectDatasetDetails(BaseModel): + table: Optional[str] = None + database: Optional[str] = None + container_name: Optional[str] = None + + @property + def dataset_type(self) -> Optional[Type[Union[Table, Container]]]: + if self.table or self.database: + return Table + if self.container_name: + return Container + return None + + class KafkaConnectPipelineDetails(BaseModel): name: str = Field( ..., description="Name of the status source (e.g., random-source-json)" @@ -43,20 +57,9 @@ class KafkaConnectPipelineDetails(BaseModel): default="UNASSIGNED", description="State of the connector (e.g., RUNNING, STOPPED)", ) - tasks: Optional[List[KafkaConnectTasks]] = [] - topics: Optional[List[KafkaConnectTopics]] = [] + tasks: Optional[List[KafkaConnectTasks]] = Field(default_factory=list) + topics: Optional[List[KafkaConnectTopics]] = Field(default_factory=list) conn_type: Optional[str] = Field(default="UNKNOWN", alias="type") - - -class KafkaConnectDatasetDetails(BaseModel): - table: Optional[str] = None - database: Optional[str] = None - container_name: Optional[str] = None - - @property - def dataset_type(self): - if self.table or self.database: - return Table - if self.container_name: - return Container - return None + description: Optional[str] = None + dataset: Optional[KafkaConnectDatasetDetails] = None + config: Optional[dict] = Field(default_factory=dict) diff --git a/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py b/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py new file mode 100644 index 00000000000..233283e0388 --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_kafkaconnect.py @@ -0,0 +1,300 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test KafkaConnect client and models +""" +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnectConnection import ( + KafkaConnectConnection, +) +from metadata.ingestion.source.pipeline.kafkaconnect.client import KafkaConnectClient +from metadata.ingestion.source.pipeline.kafkaconnect.models import ( + KafkaConnectDatasetDetails, + KafkaConnectPipelineDetails, + KafkaConnectTasks, + KafkaConnectTopics, +) + + +class TestKafkaConnectModels(TestCase): + """Test KafkaConnect data models""" + + def test_kafka_connect_tasks_model(self): + """Test KafkaConnectTasks model creation and defaults""" + task = KafkaConnectTasks(id=1, state="RUNNING", worker_id="worker-1") + self.assertEqual(task.id, 1) + self.assertEqual(task.state, "RUNNING") + self.assertEqual(task.worker_id, "worker-1") + + def test_kafka_connect_tasks_defaults(self): + """Test KafkaConnectTasks default values""" + task = KafkaConnectTasks(id=1) + self.assertEqual(task.id, 1) + self.assertEqual(task.state, "UNASSIGNED") + self.assertIsNone(task.worker_id) + + def test_kafka_connect_topics_model(self): + """Test KafkaConnectTopics model creation""" + topic = KafkaConnectTopics(name="test-topic") + self.assertEqual(topic.name, "test-topic") + + def test_kafka_connect_dataset_details_table_type(self): + """Test KafkaConnectDatasetDetails with table type""" + dataset = KafkaConnectDatasetDetails(table="users", database="mydb") + self.assertEqual(dataset.table, "users") + self.assertEqual(dataset.database, "mydb") + self.assertIsNone(dataset.container_name) + + # Import here to avoid circular dependency issues + from metadata.generated.schema.entity.data.table import Table + + self.assertEqual(dataset.dataset_type, Table) + + def test_kafka_connect_dataset_details_container_type(self): + """Test KafkaConnectDatasetDetails with container type""" + dataset = KafkaConnectDatasetDetails(container_name="my-bucket") + self.assertEqual(dataset.container_name, "my-bucket") + self.assertIsNone(dataset.table) + self.assertIsNone(dataset.database) + + # Import here to avoid circular dependency issues + from metadata.generated.schema.entity.data.container import Container + + self.assertEqual(dataset.dataset_type, Container) + + def test_kafka_connect_dataset_details_no_type(self): + """Test KafkaConnectDatasetDetails with no type""" + dataset = KafkaConnectDatasetDetails() + self.assertIsNone(dataset.table) + self.assertIsNone(dataset.database) + self.assertIsNone(dataset.container_name) + self.assertIsNone(dataset.dataset_type) + + def test_kafka_connect_pipeline_details_model(self): + """Test KafkaConnectPipelineDetails model with default factory""" + pipeline = KafkaConnectPipelineDetails(name="test-connector") + self.assertEqual(pipeline.name, "test-connector") + self.assertEqual(pipeline.status, "UNASSIGNED") + self.assertEqual(pipeline.conn_type, "UNKNOWN") + self.assertEqual(pipeline.tasks, []) + self.assertEqual(pipeline.topics, []) + self.assertEqual(pipeline.config, {}) + self.assertIsNone(pipeline.description) + self.assertIsNone(pipeline.dataset) + + def test_kafka_connect_pipeline_details_with_data(self): + """Test KafkaConnectPipelineDetails with full data""" + tasks = [KafkaConnectTasks(id=1, state="RUNNING")] + topics = [KafkaConnectTopics(name="test-topic")] + dataset = KafkaConnectDatasetDetails(table="users") + + pipeline = KafkaConnectPipelineDetails( + name="test-connector", + status="RUNNING", + tasks=tasks, + topics=topics, + type="source", # Using the alias 'type' instead of 'conn_type' + description="Test connector", + dataset=dataset, + config={"key": "value"}, + ) + + self.assertEqual(pipeline.name, "test-connector") + self.assertEqual(pipeline.status, "RUNNING") + self.assertEqual(pipeline.conn_type, "source") + self.assertEqual(len(pipeline.tasks), 1) + self.assertEqual(len(pipeline.topics), 1) + self.assertEqual(pipeline.description, "Test connector") + self.assertIsNotNone(pipeline.dataset) + self.assertEqual(pipeline.config["key"], "value") + + +class TestKafkaConnectClient(TestCase): + """Test KafkaConnect client functionality""" + + def setUp(self): + """Set up test fixtures""" + self.mock_config = MagicMock(spec=KafkaConnectConnection) + self.mock_config.hostPort = "http://localhost:8083" + self.mock_config.verifySSL = True + self.mock_config.KafkaConnectConfig = None + + def test_client_initialization_no_auth(self): + """Test client initialization without authentication""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ) as mock_kafka_connect: + client = KafkaConnectClient(self.mock_config) + + mock_kafka_connect.assert_called_once_with( + url="http://localhost:8083", auth=None, ssl_verify=True + ) + + def test_client_initialization_with_auth(self): + """Test client initialization with authentication""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ) as mock_kafka_connect: + mock_auth_config = MagicMock() + mock_auth_config.username = "user" + mock_auth_config.password.get_secret_value.return_value = "pass" + self.mock_config.KafkaConnectConfig = mock_auth_config + + client = KafkaConnectClient(self.mock_config) + + mock_kafka_connect.assert_called_once_with( + url="http://localhost:8083", auth="user:pass", ssl_verify=True + ) + + def test_enrich_connector_details_helper(self): + """Test _enrich_connector_details helper method""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + client = KafkaConnectClient(self.mock_config) + connector_details = KafkaConnectPipelineDetails(name="test-connector") + + # Mock the methods called by _enrich_connector_details + client.get_connector_topics = MagicMock( + return_value=[KafkaConnectTopics(name="topic1")] + ) + client.get_connector_config = MagicMock( + return_value={"description": "Test connector"} + ) + client.get_connector_dataset_info = MagicMock( + return_value=KafkaConnectDatasetDetails(table="users") + ) + + client._enrich_connector_details(connector_details, "test-connector") + + # Verify method calls + client.get_connector_topics.assert_called_once_with( + connector="test-connector" + ) + client.get_connector_config.assert_called_once_with( + connector="test-connector" + ) + client.get_connector_dataset_info.assert_called_once_with( + {"description": "Test connector"} + ) + + # Verify results + self.assertEqual(len(connector_details.topics), 1) + self.assertEqual(connector_details.description, "Test connector") + self.assertIsNotNone(connector_details.dataset) + + def test_enrich_connector_details_no_config(self): + """Test _enrich_connector_details with no config""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + client = KafkaConnectClient(self.mock_config) + connector_details = KafkaConnectPipelineDetails(name="test-connector") + + client.get_connector_topics = MagicMock(return_value=[]) + client.get_connector_config = MagicMock(return_value=None) + client.get_connector_dataset_info = MagicMock() + + client._enrich_connector_details(connector_details, "test-connector") + + # Verify dataset info is not called when config is None + client.get_connector_dataset_info.assert_not_called() + self.assertIsNone(connector_details.description) + self.assertIsNone(connector_details.dataset) + + def test_get_cluster_info(self): + """Test get_cluster_info method""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ) as mock_kafka_connect: + client = KafkaConnectClient(self.mock_config) + mock_client = mock_kafka_connect.return_value + mock_client.get_cluster_info.return_value = {"version": "3.0.0"} + + result = client.get_cluster_info() + + mock_client.get_cluster_info.assert_called_once() + self.assertEqual(result, {"version": "3.0.0"}) + + def test_get_connector_dataset_info_table(self): + """Test get_connector_dataset_info with table configuration""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + client = KafkaConnectClient(self.mock_config) + config = {"table": "users", "database": "mydb"} + + result = client.get_connector_dataset_info(config) + + self.assertIsInstance(result, KafkaConnectDatasetDetails) + self.assertEqual(result.table, "users") + + def test_get_connector_dataset_info_container(self): + """Test get_connector_dataset_info with container configuration""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + client = KafkaConnectClient(self.mock_config) + config = {"s3.bucket.name": "my-bucket"} + + result = client.get_connector_dataset_info(config) + + self.assertIsInstance(result, KafkaConnectDatasetDetails) + self.assertEqual(result.container_name, "my-bucket") + + def test_get_connector_dataset_info_no_match(self): + """Test get_connector_dataset_info with no matching configuration""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + client = KafkaConnectClient(self.mock_config) + config = {"some.other.config": "value"} + + result = client.get_connector_dataset_info(config) + + self.assertIsNone(result) + + def test_supported_datasets_configuration(self): + """Test supported dataset configurations are properly handled""" + with patch( + "metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect" + ): + client = KafkaConnectClient(self.mock_config) + + # Test various supported dataset configurations + test_configs = [ + # Table configurations + ({"table": "users", "database": "mydb"}, "table", "users"), + ({"collection": "users"}, "table", "users"), + ({"snowflake.schema.name": "schema1"}, "table", "schema1"), + ({"table.whitelist": "table1,table2"}, "table", "table1,table2"), + ({"fields.whitelist": "field1,field2"}, "table", "field1,field2"), + # Database configurations + ({"database": "mydb"}, "database", "mydb"), + ({"db.name": "testdb"}, "database", "testdb"), + ({"snowflake.database.name": "snowdb"}, "database", "snowdb"), + # Container configurations + ({"s3.bucket.name": "my-bucket"}, "container_name", "my-bucket"), + ] + + for config, expected_field, expected_value in test_configs: + with self.subTest(config=config): + result = client.get_connector_dataset_info(config) + self.assertIsNotNone(result, f"Failed for config: {config}") + actual_value = getattr(result, expected_field) + self.assertEqual( + actual_value, + expected_value, + f"Expected {expected_field}={expected_value}, got {actual_value}", + )