Fixes - Kafkaconnect lineage & descriptions (#23234)

* Fix Kafkaconnect lineage & descriptions

* fix typos

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* address comments

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* address comms

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
(cherry picked from commit e2b903532e6b30b0988deecaddd787b25c9d30d5)
This commit is contained in:
Suman Maharana 2025-09-23 13:38:37 +05:30 committed by OpenMetadata Release Bot
parent 73acc1a1e6
commit c9d421103a
4 changed files with 384 additions and 130 deletions

View File

@ -23,7 +23,6 @@ from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnect
from metadata.ingestion.source.pipeline.kafkaconnect.models import ( from metadata.ingestion.source.pipeline.kafkaconnect.models import (
KafkaConnectDatasetDetails, KafkaConnectDatasetDetails,
KafkaConnectPipelineDetails, KafkaConnectPipelineDetails,
KafkaConnectTasks,
KafkaConnectTopics, KafkaConnectTopics,
) )
from metadata.utils.helpers import clean_uri from metadata.utils.helpers import clean_uri
@ -57,6 +56,20 @@ class KafkaConnectClient:
auth = f"{config.KafkaConnectConfig.username}:{config.KafkaConnectConfig.password.get_secret_value()}" auth = f"{config.KafkaConnectConfig.username}:{config.KafkaConnectConfig.password.get_secret_value()}"
self.client = KafkaConnect(url=url, auth=auth, ssl_verify=ssl_verify) self.client = KafkaConnect(url=url, auth=auth, ssl_verify=ssl_verify)
def _enrich_connector_details(
self, connector_details: KafkaConnectPipelineDetails, connector_name: str
) -> None:
"""Helper method to enrich connector details with additional information."""
connector_details.topics = self.get_connector_topics(connector=connector_name)
connector_details.config = self.get_connector_config(connector=connector_name)
if connector_details.config:
connector_details.description = connector_details.config.get(
"description", None
)
connector_details.dataset = self.get_connector_dataset_info(
connector_details.config
)
def get_cluster_info(self) -> Optional[dict]: def get_cluster_info(self) -> Optional[dict]:
""" """
Get the version and other details of the Kafka Connect cluster. Get the version and other details of the Kafka Connect cluster.
@ -101,14 +114,11 @@ class KafkaConnectClient:
Get the list of connector plugins. Get the list of connector plugins.
""" """
try: try:
result = self.client.list_connector_plugins() return self.client.list_connector_plugins()
return result
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.warning(f"Unable to get connector plugins {exc}") logger.warning(f"Unable to get connector plugins {exc}")
return None
def get_connector_config(self, connector: str) -> Optional[dict]: def get_connector_config(self, connector: str) -> Optional[dict]:
""" """
Get the details of a single connector. Get the details of a single connector.
@ -125,7 +135,9 @@ class KafkaConnectClient:
return None return None
def get_connector_dataset_info(self, connector: str) -> Optional[dict]: def get_connector_dataset_info(
self, connector_config: dict
) -> Optional[KafkaConnectDatasetDetails]:
""" """
Get the details of dataset of connector if there is any. Get the details of dataset of connector if there is any.
Checks in the connector configurations for dataset fields Checks in the connector configurations for dataset fields
@ -139,16 +151,14 @@ class KafkaConnectClient:
is not found, has no dataset, or an error occurs. is not found, has no dataset, or an error occurs.
""" """
try: try:
conn_config = self.get_connector_config(connector=connector) if not connector_config:
if not conn_config:
return None return None
result = {} result = {}
for dataset in SUPPORTED_DATASETS or []: for dataset in SUPPORTED_DATASETS or []:
for key in SUPPORTED_DATASETS[dataset] or []: for key in SUPPORTED_DATASETS[dataset] or []:
if conn_config.get(key): if connector_config.get(key):
result[dataset] = conn_config[key] result[dataset] = connector_config[key]
return KafkaConnectDatasetDetails(**result) return KafkaConnectDatasetDetails(**result)
except Exception as exc: except Exception as exc:
@ -157,49 +167,6 @@ class KafkaConnectClient:
return None return None
def get_connector_tasks(self, connector: str) -> Optional[List[KafkaConnectTasks]]:
"""
Get the list of tasks for a connector.
Args:
connector (str): The name of the connector.
Returns:
Optional[List[KafkaConnectTasks]]: A list of KafkaConnectTasks objects
representing the connector's tasks,
or None if the connector is not found
or an error occurs.
"""
try:
result = self.client.get_connector_status(connector=connector)
tasks = [KafkaConnectTasks(**task) for task in result.get("tasks") or []]
return tasks
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Unable to get connector tasks list {exc}")
return None
def get_connector_task_status(self, connector: str, task_id: int) -> Optional[dict]:
"""
Get the status of a specific task for a connector.
Args:
connector (str): The name of the connector.
task_id (int): The ID of the task.
Returns:
Optional[Dict]: A dictionary containing the task status information,
or None if the connector or task is not found
or an error occurs.
"""
try:
result = self.client.get_connector_task_status(
connector=connector, task_id=task_id
)
return result
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Unable to get connector tasks status {exc}")
return None
def get_connector_topics( def get_connector_topics(
self, connector: str self, connector: str
) -> Optional[List[KafkaConnectTopics]]: ) -> Optional[List[KafkaConnectTopics]]:
@ -231,45 +198,27 @@ class KafkaConnectClient:
return None return None
def get_connector_state(self, connector: str) -> Optional[str]:
"""
Get the status of a single connector.
Args:
connector (str): The name of the connector.
"""
try:
result = self.client.get_connector_status(connector=connector)
if result.get("connector"):
return result["connector"].get("state")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get connector state due to {exc}")
return None
def get_connector_list(self) -> Optional[List[KafkaConnectPipelineDetails]]: def get_connector_list(self) -> Optional[List[KafkaConnectPipelineDetails]]:
""" """
Get the information of a single connector. Get the information of all connectors.
Args:
connector (str): The name of the connector.
Returns: Returns:
Optional[KafkaConnectPipelineDetails]: A KafkaConnectPipelineDetails Optional[List[KafkaConnectPipelineDetails]]: A list of KafkaConnectPipelineDetails
object containing connector information, objects containing connector information,
or None if the connector is not found or None if an error occurs.
or an error occurs.
""" """
try: try:
connectors = [] connector_data = self.get_connectors(expand="status") or {}
for connector in self.get_connectors() or []:
result = self.client.get_connector_status(connector=connector) for connector_name, connector_info in connector_data.items():
connector_details = KafkaConnectPipelineDetails(**result) if isinstance(connector_info, dict) and "status" in connector_info:
connector_details.status = self.get_connector_state(connector=connector) status_info = connector_info["status"]
connector_details.tasks = self.get_connector_tasks(connector=connector) connector_details = KafkaConnectPipelineDetails(**status_info)
connector_details.topics = self.get_connector_topics( connector_details.status = status_info.get("connector", {}).get(
connector=connector "state", "UNASSIGNED"
) )
connectors.append(connector_details) self._enrich_connector_details(connector_details, connector_name)
return connectors if connector_details:
yield connector_details
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error(f"Unable to get connector information {exc}") logger.error(f"Unable to get connector information {exc}")

View File

@ -36,7 +36,12 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.type.basic import EntityName, SourceUrl, Timestamp from metadata.generated.schema.type.basic import (
EntityName,
Markdown,
SourceUrl,
Timestamp,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
@ -65,7 +70,7 @@ STATUS_MAP = {
class KafkaconnectSource(PipelineServiceSource): class KafkaconnectSource(PipelineServiceSource):
""" """
Implements the necessary methods ot extract Implements the necessary methods to extract
Pipeline metadata from Kafka Connect Pipeline metadata from Kafka Connect
""" """
@ -100,6 +105,9 @@ class KafkaconnectSource(PipelineServiceSource):
for task in pipeline_details.tasks or [] for task in pipeline_details.tasks or []
], ],
service=self.context.get().pipeline_service, service=self.context.get().pipeline_service,
description=Markdown(pipeline_details.description)
if pipeline_details.description
else None,
) )
yield Either(right=pipeline_request) yield Either(right=pipeline_request)
self.register_record(pipeline_request=pipeline_request) self.register_record(pipeline_request=pipeline_request)
@ -119,17 +127,12 @@ class KafkaconnectSource(PipelineServiceSource):
Get lineage dataset entity Get lineage dataset entity
""" """
try: try:
dataset_details = self.client.get_connector_dataset_info( dataset_details = pipeline_details.dataset
connector=pipeline_details.name
)
if dataset_details: if dataset_details:
if ( if dataset_details.dataset_type == Table:
isinstance(dataset_details.dataset_type, type(Table)) for (
and self.source_config.lineageInformation.dbServiceNames dbservicename
): ) in self.source_config.lineageInformation.dbServiceNames or ["*"]:
for dbservicename in (
self.source_config.lineageInformation.dbServiceNames or []
):
dataset_entity = self.metadata.get_by_name( dataset_entity = self.metadata.get_by_name(
entity=dataset_details.dataset_type, entity=dataset_details.dataset_type,
fqn=fqn.build( fqn=fqn.build(
@ -145,25 +148,25 @@ class KafkaconnectSource(PipelineServiceSource):
if dataset_entity: if dataset_entity:
return dataset_entity return dataset_entity
if ( if dataset_details.dataset_type == Container:
isinstance(dataset_details.dataset_type, type(Container)) for (
and self.source_config.lineageInformation.storageServiceNames storageservicename
): ) in self.source_config.lineageInformation.storageServiceNames or [
for storageservicename in ( "*"
self.source_config.lineageInformation.storageServiceNames or [] ]:
): storage_entity = self.metadata.get_by_name(
dataset_entity = self.metadata.get_by_name(
entity=dataset_details.dataset_type, entity=dataset_details.dataset_type,
fqn=fqn.build( fqn=fqn.build(
metadata=self.metadata, metadata=self.metadata,
entity_type=dataset_details.dataset_type, entity_type=dataset_details.dataset_type,
container_name=dataset_details.container_name, container_name=dataset_details.container_name,
service_name=storageservicename, service_name=storageservicename,
parent_container=None,
), ),
) )
if dataset_entity: if storage_entity:
return dataset_entity return storage_entity
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
@ -180,7 +183,7 @@ class KafkaconnectSource(PipelineServiceSource):
try: try:
if not self.service_connection.messagingServiceName: if not self.service_connection.messagingServiceName:
logger.debug("Kafka messagingServiceName not found") logger.debug("Kafka messagingServiceName not found")
return None return
pipeline_fqn = fqn.build( pipeline_fqn = fqn.build(
metadata=self.metadata, metadata=self.metadata,
@ -246,8 +249,6 @@ class KafkaconnectSource(PipelineServiceSource):
) )
) )
return None
def get_pipelines_list(self) -> Iterable[KafkaConnectPipelineDetails]: def get_pipelines_list(self) -> Iterable[KafkaConnectPipelineDetails]:
""" """
Get List of all pipelines Get List of all pipelines
@ -300,6 +301,7 @@ class KafkaconnectSource(PipelineServiceSource):
service_name=self.context.get().pipeline_service, service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline, pipeline_name=self.context.get().pipeline,
) )
yield Either( yield Either(
right=OMetaPipelineStatus( right=OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn, pipeline_fqn=pipeline_fqn,

View File

@ -13,7 +13,7 @@
KafkaConnect Source Model module KafkaConnect Source Model module
""" """
from typing import List, Optional from typing import List, Optional, Type, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@ -35,6 +35,20 @@ class KafkaConnectTopics(BaseModel):
name: str = Field(..., description="Name of the topic (e.g., random-source-avro)") name: str = Field(..., description="Name of the topic (e.g., random-source-avro)")
class KafkaConnectDatasetDetails(BaseModel):
table: Optional[str] = None
database: Optional[str] = None
container_name: Optional[str] = None
@property
def dataset_type(self) -> Optional[Type[Union[Table, Container]]]:
if self.table or self.database:
return Table
if self.container_name:
return Container
return None
class KafkaConnectPipelineDetails(BaseModel): class KafkaConnectPipelineDetails(BaseModel):
name: str = Field( name: str = Field(
..., description="Name of the status source (e.g., random-source-json)" ..., description="Name of the status source (e.g., random-source-json)"
@ -43,20 +57,9 @@ class KafkaConnectPipelineDetails(BaseModel):
default="UNASSIGNED", default="UNASSIGNED",
description="State of the connector (e.g., RUNNING, STOPPED)", description="State of the connector (e.g., RUNNING, STOPPED)",
) )
tasks: Optional[List[KafkaConnectTasks]] = [] tasks: Optional[List[KafkaConnectTasks]] = Field(default_factory=list)
topics: Optional[List[KafkaConnectTopics]] = [] topics: Optional[List[KafkaConnectTopics]] = Field(default_factory=list)
conn_type: Optional[str] = Field(default="UNKNOWN", alias="type") conn_type: Optional[str] = Field(default="UNKNOWN", alias="type")
description: Optional[str] = None
dataset: Optional[KafkaConnectDatasetDetails] = None
class KafkaConnectDatasetDetails(BaseModel): config: Optional[dict] = Field(default_factory=dict)
table: Optional[str] = None
database: Optional[str] = None
container_name: Optional[str] = None
@property
def dataset_type(self):
if self.table or self.database:
return Table
if self.container_name:
return Container
return None

View File

@ -0,0 +1,300 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test KafkaConnect client and models
"""
from unittest import TestCase
from unittest.mock import MagicMock, patch
from metadata.generated.schema.entity.services.connections.pipeline.kafkaConnectConnection import (
KafkaConnectConnection,
)
from metadata.ingestion.source.pipeline.kafkaconnect.client import KafkaConnectClient
from metadata.ingestion.source.pipeline.kafkaconnect.models import (
KafkaConnectDatasetDetails,
KafkaConnectPipelineDetails,
KafkaConnectTasks,
KafkaConnectTopics,
)
class TestKafkaConnectModels(TestCase):
"""Test KafkaConnect data models"""
def test_kafka_connect_tasks_model(self):
"""Test KafkaConnectTasks model creation and defaults"""
task = KafkaConnectTasks(id=1, state="RUNNING", worker_id="worker-1")
self.assertEqual(task.id, 1)
self.assertEqual(task.state, "RUNNING")
self.assertEqual(task.worker_id, "worker-1")
def test_kafka_connect_tasks_defaults(self):
"""Test KafkaConnectTasks default values"""
task = KafkaConnectTasks(id=1)
self.assertEqual(task.id, 1)
self.assertEqual(task.state, "UNASSIGNED")
self.assertIsNone(task.worker_id)
def test_kafka_connect_topics_model(self):
"""Test KafkaConnectTopics model creation"""
topic = KafkaConnectTopics(name="test-topic")
self.assertEqual(topic.name, "test-topic")
def test_kafka_connect_dataset_details_table_type(self):
"""Test KafkaConnectDatasetDetails with table type"""
dataset = KafkaConnectDatasetDetails(table="users", database="mydb")
self.assertEqual(dataset.table, "users")
self.assertEqual(dataset.database, "mydb")
self.assertIsNone(dataset.container_name)
# Import here to avoid circular dependency issues
from metadata.generated.schema.entity.data.table import Table
self.assertEqual(dataset.dataset_type, Table)
def test_kafka_connect_dataset_details_container_type(self):
"""Test KafkaConnectDatasetDetails with container type"""
dataset = KafkaConnectDatasetDetails(container_name="my-bucket")
self.assertEqual(dataset.container_name, "my-bucket")
self.assertIsNone(dataset.table)
self.assertIsNone(dataset.database)
# Import here to avoid circular dependency issues
from metadata.generated.schema.entity.data.container import Container
self.assertEqual(dataset.dataset_type, Container)
def test_kafka_connect_dataset_details_no_type(self):
"""Test KafkaConnectDatasetDetails with no type"""
dataset = KafkaConnectDatasetDetails()
self.assertIsNone(dataset.table)
self.assertIsNone(dataset.database)
self.assertIsNone(dataset.container_name)
self.assertIsNone(dataset.dataset_type)
def test_kafka_connect_pipeline_details_model(self):
"""Test KafkaConnectPipelineDetails model with default factory"""
pipeline = KafkaConnectPipelineDetails(name="test-connector")
self.assertEqual(pipeline.name, "test-connector")
self.assertEqual(pipeline.status, "UNASSIGNED")
self.assertEqual(pipeline.conn_type, "UNKNOWN")
self.assertEqual(pipeline.tasks, [])
self.assertEqual(pipeline.topics, [])
self.assertEqual(pipeline.config, {})
self.assertIsNone(pipeline.description)
self.assertIsNone(pipeline.dataset)
def test_kafka_connect_pipeline_details_with_data(self):
"""Test KafkaConnectPipelineDetails with full data"""
tasks = [KafkaConnectTasks(id=1, state="RUNNING")]
topics = [KafkaConnectTopics(name="test-topic")]
dataset = KafkaConnectDatasetDetails(table="users")
pipeline = KafkaConnectPipelineDetails(
name="test-connector",
status="RUNNING",
tasks=tasks,
topics=topics,
type="source", # Using the alias 'type' instead of 'conn_type'
description="Test connector",
dataset=dataset,
config={"key": "value"},
)
self.assertEqual(pipeline.name, "test-connector")
self.assertEqual(pipeline.status, "RUNNING")
self.assertEqual(pipeline.conn_type, "source")
self.assertEqual(len(pipeline.tasks), 1)
self.assertEqual(len(pipeline.topics), 1)
self.assertEqual(pipeline.description, "Test connector")
self.assertIsNotNone(pipeline.dataset)
self.assertEqual(pipeline.config["key"], "value")
class TestKafkaConnectClient(TestCase):
"""Test KafkaConnect client functionality"""
def setUp(self):
"""Set up test fixtures"""
self.mock_config = MagicMock(spec=KafkaConnectConnection)
self.mock_config.hostPort = "http://localhost:8083"
self.mock_config.verifySSL = True
self.mock_config.KafkaConnectConfig = None
def test_client_initialization_no_auth(self):
"""Test client initialization without authentication"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
) as mock_kafka_connect:
client = KafkaConnectClient(self.mock_config)
mock_kafka_connect.assert_called_once_with(
url="http://localhost:8083", auth=None, ssl_verify=True
)
def test_client_initialization_with_auth(self):
"""Test client initialization with authentication"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
) as mock_kafka_connect:
mock_auth_config = MagicMock()
mock_auth_config.username = "user"
mock_auth_config.password.get_secret_value.return_value = "pass"
self.mock_config.KafkaConnectConfig = mock_auth_config
client = KafkaConnectClient(self.mock_config)
mock_kafka_connect.assert_called_once_with(
url="http://localhost:8083", auth="user:pass", ssl_verify=True
)
def test_enrich_connector_details_helper(self):
"""Test _enrich_connector_details helper method"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
client = KafkaConnectClient(self.mock_config)
connector_details = KafkaConnectPipelineDetails(name="test-connector")
# Mock the methods called by _enrich_connector_details
client.get_connector_topics = MagicMock(
return_value=[KafkaConnectTopics(name="topic1")]
)
client.get_connector_config = MagicMock(
return_value={"description": "Test connector"}
)
client.get_connector_dataset_info = MagicMock(
return_value=KafkaConnectDatasetDetails(table="users")
)
client._enrich_connector_details(connector_details, "test-connector")
# Verify method calls
client.get_connector_topics.assert_called_once_with(
connector="test-connector"
)
client.get_connector_config.assert_called_once_with(
connector="test-connector"
)
client.get_connector_dataset_info.assert_called_once_with(
{"description": "Test connector"}
)
# Verify results
self.assertEqual(len(connector_details.topics), 1)
self.assertEqual(connector_details.description, "Test connector")
self.assertIsNotNone(connector_details.dataset)
def test_enrich_connector_details_no_config(self):
"""Test _enrich_connector_details with no config"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
client = KafkaConnectClient(self.mock_config)
connector_details = KafkaConnectPipelineDetails(name="test-connector")
client.get_connector_topics = MagicMock(return_value=[])
client.get_connector_config = MagicMock(return_value=None)
client.get_connector_dataset_info = MagicMock()
client._enrich_connector_details(connector_details, "test-connector")
# Verify dataset info is not called when config is None
client.get_connector_dataset_info.assert_not_called()
self.assertIsNone(connector_details.description)
self.assertIsNone(connector_details.dataset)
def test_get_cluster_info(self):
"""Test get_cluster_info method"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
) as mock_kafka_connect:
client = KafkaConnectClient(self.mock_config)
mock_client = mock_kafka_connect.return_value
mock_client.get_cluster_info.return_value = {"version": "3.0.0"}
result = client.get_cluster_info()
mock_client.get_cluster_info.assert_called_once()
self.assertEqual(result, {"version": "3.0.0"})
def test_get_connector_dataset_info_table(self):
"""Test get_connector_dataset_info with table configuration"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
client = KafkaConnectClient(self.mock_config)
config = {"table": "users", "database": "mydb"}
result = client.get_connector_dataset_info(config)
self.assertIsInstance(result, KafkaConnectDatasetDetails)
self.assertEqual(result.table, "users")
def test_get_connector_dataset_info_container(self):
"""Test get_connector_dataset_info with container configuration"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
client = KafkaConnectClient(self.mock_config)
config = {"s3.bucket.name": "my-bucket"}
result = client.get_connector_dataset_info(config)
self.assertIsInstance(result, KafkaConnectDatasetDetails)
self.assertEqual(result.container_name, "my-bucket")
def test_get_connector_dataset_info_no_match(self):
"""Test get_connector_dataset_info with no matching configuration"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
client = KafkaConnectClient(self.mock_config)
config = {"some.other.config": "value"}
result = client.get_connector_dataset_info(config)
self.assertIsNone(result)
def test_supported_datasets_configuration(self):
"""Test supported dataset configurations are properly handled"""
with patch(
"metadata.ingestion.source.pipeline.kafkaconnect.client.KafkaConnect"
):
client = KafkaConnectClient(self.mock_config)
# Test various supported dataset configurations
test_configs = [
# Table configurations
({"table": "users", "database": "mydb"}, "table", "users"),
({"collection": "users"}, "table", "users"),
({"snowflake.schema.name": "schema1"}, "table", "schema1"),
({"table.whitelist": "table1,table2"}, "table", "table1,table2"),
({"fields.whitelist": "field1,field2"}, "table", "field1,field2"),
# Database configurations
({"database": "mydb"}, "database", "mydb"),
({"db.name": "testdb"}, "database", "testdb"),
({"snowflake.database.name": "snowdb"}, "database", "snowdb"),
# Container configurations
({"s3.bucket.name": "my-bucket"}, "container_name", "my-bucket"),
]
for config, expected_field, expected_value in test_configs:
with self.subTest(config=config):
result = client.get_connector_dataset_info(config)
self.assertIsNotNone(result, f"Failed for config: {config}")
actual_value = getattr(result, expected_field)
self.assertEqual(
actual_value,
expected_value,
f"Expected {expected_field}={expected_value}, got {actual_value}",
)