mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-26 09:45:02 +00:00
363 lines
12 KiB
Python
363 lines
12 KiB
Python
from unittest import mock
|
|
|
|
import pytest
|
|
|
|
from datahub.ingestion.source.kafka_connect.common import (
|
|
KafkaConnectSourceConfig,
|
|
KafkaConnectSourceReport,
|
|
)
|
|
from datahub.ingestion.source.kafka_connect.consumer_group_analyzer import (
|
|
ConsumerGroupAnalyzer,
|
|
)
|
|
|
|
|
|
class TestConsumerGroupAnalyzer:
|
|
@pytest.fixture
|
|
def mock_session(self):
|
|
return mock.Mock()
|
|
|
|
@pytest.fixture
|
|
def config(self):
|
|
return KafkaConnectSourceConfig(connect_uri="http://localhost:8083")
|
|
|
|
@pytest.fixture
|
|
def report(self):
|
|
return KafkaConnectSourceReport()
|
|
|
|
@pytest.fixture
|
|
def analyzer(self, mock_session, config, report):
|
|
return ConsumerGroupAnalyzer(mock_session, config, report)
|
|
|
|
def test_is_connector_group_direct_match(self, analyzer):
|
|
assert analyzer._is_connector_group("my-connector", "my-connector")
|
|
assert analyzer._is_connector_group("snowflake-sink", "snowflake-sink")
|
|
|
|
def test_is_connector_group_connect_prefix(self, analyzer):
|
|
assert analyzer._is_connector_group("connect-my-connector", "my-connector")
|
|
assert analyzer._is_connector_group("connect-bigquery-sink", "bigquery-sink")
|
|
|
|
def test_is_connector_group_connector_prefix(self, analyzer):
|
|
assert analyzer._is_connector_group("connector-my-sink", "my-sink")
|
|
|
|
def test_is_connector_group_suffix_match(self, analyzer):
|
|
assert analyzer._is_connector_group("cluster-my-connector", "my-connector")
|
|
|
|
def test_is_connector_group_contains_match(self, analyzer):
|
|
assert analyzer._is_connector_group(
|
|
"prefix-my-connector-suffix", "my-connector"
|
|
)
|
|
|
|
def test_is_connector_group_sink_type_match(self, analyzer):
|
|
assert analyzer._is_connector_group(
|
|
"my-bigquery-group", "test-connector-bigquery"
|
|
)
|
|
assert analyzer._is_connector_group("snowflake-consumer", "my-snowflake-sink")
|
|
assert analyzer._is_connector_group("s3-writer", "data-s3-connector")
|
|
|
|
def test_is_connector_group_case_insensitive(self, analyzer):
|
|
assert analyzer._is_connector_group("MY-CONNECTOR", "my-connector")
|
|
assert analyzer._is_connector_group("Connect-MyConnector", "myconnector")
|
|
|
|
def test_is_connector_group_no_match(self, analyzer):
|
|
assert not analyzer._is_connector_group("other-group", "my-connector")
|
|
assert not analyzer._is_connector_group("unrelated", "my-sink")
|
|
|
|
def test_get_connector_consumer_groups(self, analyzer, mock_session):
|
|
mock_response = mock.Mock()
|
|
mock_response.json.return_value = {
|
|
"kind": "KafkaConsumerGroupList",
|
|
"data": [
|
|
{"consumer_group_id": "connect-my-connector"},
|
|
{"consumer_group_id": "other-group"},
|
|
{"consumer_group_id": "my-connector-1"},
|
|
],
|
|
}
|
|
mock_session.get.return_value = mock_response
|
|
|
|
result = analyzer.get_connector_consumer_groups(
|
|
"my-connector", "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
assert len(result) == 2
|
|
assert "connect-my-connector" in result
|
|
assert "my-connector-1" in result
|
|
assert "other-group" not in result
|
|
|
|
def test_get_connector_consumer_groups_api_failure(self, analyzer, mock_session):
|
|
mock_session.get.side_effect = Exception("API Error")
|
|
|
|
result = analyzer.get_connector_consumer_groups(
|
|
"my-connector", "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
assert result == []
|
|
|
|
def test_get_topics_for_consumer_groups(self, analyzer, mock_session):
|
|
mock_response = mock.Mock()
|
|
mock_response.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [
|
|
{"assignments": [{"topic_name": "topic1"}, {"topic_name": "topic2"}]}
|
|
],
|
|
}
|
|
mock_session.get.return_value = mock_response
|
|
|
|
result = analyzer.get_topics_for_consumer_groups(
|
|
["group1"], "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
# Topics may be in any order due to set usage
|
|
assert "group1" in result
|
|
assert set(result["group1"]) == {"topic1", "topic2"}
|
|
|
|
def test_get_topics_for_consumer_groups_multiple_groups(
|
|
self, analyzer, mock_session
|
|
):
|
|
def mock_get(url, **kwargs):
|
|
response = mock.Mock()
|
|
if "group1" in url:
|
|
response.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [{"assignments": [{"topic_name": "topic1"}]}],
|
|
}
|
|
elif "group2" in url:
|
|
response.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [{"assignments": [{"topic_name": "topic2"}]}],
|
|
}
|
|
return response
|
|
|
|
mock_session.get.side_effect = mock_get
|
|
|
|
result = analyzer.get_topics_for_consumer_groups(
|
|
["group1", "group2"], "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
assert result == {"group1": ["topic1"], "group2": ["topic2"]}
|
|
|
|
def test_get_topics_for_consumer_groups_handles_failures(
|
|
self, analyzer, mock_session
|
|
):
|
|
def mock_get(url, **kwargs):
|
|
response = mock.Mock()
|
|
if "group1" in url:
|
|
response.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [{"assignments": [{"topic_name": "topic1"}]}],
|
|
}
|
|
else:
|
|
raise Exception("API Error")
|
|
return response
|
|
|
|
mock_session.get.side_effect = mock_get
|
|
|
|
result = analyzer.get_topics_for_consumer_groups(
|
|
["group1", "group2"], "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
# Should have group1 but not group2
|
|
assert "group1" in result
|
|
assert "group2" not in result
|
|
|
|
def test_analyze_connector_consumption(self, analyzer, mock_session):
|
|
# Mock get_all_consumer_groups
|
|
groups_response = mock.Mock()
|
|
groups_response.json.return_value = {
|
|
"kind": "KafkaConsumerGroupList",
|
|
"data": [{"consumer_group_id": "connect-my-connector"}],
|
|
}
|
|
|
|
# Mock get_topics_for_group
|
|
topics_response = mock.Mock()
|
|
topics_response.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [
|
|
{"assignments": [{"topic_name": "topic1"}, {"topic_name": "topic2"}]}
|
|
],
|
|
}
|
|
|
|
mock_session.get.side_effect = [groups_response, topics_response]
|
|
|
|
result = analyzer.analyze_connector_consumption(
|
|
"my-connector", "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
assert len(result) == 2
|
|
assert "topic1" in result
|
|
assert "topic2" in result
|
|
|
|
def test_analyze_connector_consumption_no_groups(self, analyzer, mock_session):
|
|
mock_response = mock.Mock()
|
|
mock_response.json.return_value = {"kind": "KafkaConsumerGroupList", "data": []}
|
|
mock_session.get.return_value = mock_response
|
|
|
|
result = analyzer.analyze_connector_consumption(
|
|
"my-connector", "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
assert result == []
|
|
|
|
def test_analyze_connector_consumption_deduplicates_topics(
|
|
self, analyzer, mock_session
|
|
):
|
|
groups_response = mock.Mock()
|
|
groups_response.json.return_value = {
|
|
"kind": "KafkaConsumerGroupList",
|
|
"data": [
|
|
{"consumer_group_id": "connect-my-connector-1"},
|
|
{"consumer_group_id": "connect-my-connector-2"},
|
|
],
|
|
}
|
|
|
|
# Both groups consume same topics
|
|
topics_response = mock.Mock()
|
|
topics_response.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [{"assignments": [{"topic_name": "shared_topic"}]}],
|
|
}
|
|
|
|
mock_session.get.side_effect = [
|
|
groups_response,
|
|
topics_response,
|
|
topics_response,
|
|
]
|
|
|
|
result = analyzer.analyze_connector_consumption(
|
|
"my-connector", "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
# Should deduplicate - only one "shared_topic"
|
|
assert result == ["shared_topic"]
|
|
|
|
def test_filter_sink_relevant_groups(self, analyzer):
|
|
all_groups = {
|
|
"connect-bigquery-sink": ["topic1"],
|
|
"snowflake-writer": ["topic2"],
|
|
"random-consumer": ["topic3"],
|
|
"my-s3-connector": ["topic4"],
|
|
}
|
|
|
|
result = analyzer.filter_sink_relevant_groups(all_groups)
|
|
|
|
assert len(result) == 3
|
|
assert "connect-bigquery-sink" in result
|
|
assert "snowflake-writer" in result
|
|
assert "my-s3-connector" in result
|
|
assert "random-consumer" not in result
|
|
|
|
def test_filter_sink_relevant_groups_custom_patterns(self, analyzer):
|
|
all_groups = {"custom-sink-pattern": ["topic1"], "another-group": ["topic2"]}
|
|
|
|
result = analyzer.filter_sink_relevant_groups(
|
|
all_groups, sink_connector_patterns=["custom-sink"]
|
|
)
|
|
|
|
assert len(result) == 1
|
|
assert "custom-sink-pattern" in result
|
|
|
|
def test_get_comprehensive_consumer_analysis(self, analyzer, mock_session):
|
|
groups_response = mock.Mock()
|
|
groups_response.json.return_value = {
|
|
"kind": "KafkaConsumerGroupList",
|
|
"data": [{"consumer_group_id": "group1"}, {"consumer_group_id": "group2"}],
|
|
}
|
|
|
|
topics_response1 = mock.Mock()
|
|
topics_response1.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [{"assignments": [{"topic_name": "topic1"}]}],
|
|
}
|
|
|
|
topics_response2 = mock.Mock()
|
|
topics_response2.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [{"assignments": [{"topic_name": "topic2"}]}],
|
|
}
|
|
|
|
mock_session.get.side_effect = [
|
|
groups_response,
|
|
topics_response1,
|
|
topics_response2,
|
|
]
|
|
|
|
result = analyzer.get_comprehensive_consumer_analysis(
|
|
"https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
assert result == {"group1": ["topic1"], "group2": ["topic2"]}
|
|
|
|
def test_get_comprehensive_consumer_analysis_no_groups(
|
|
self, analyzer, mock_session
|
|
):
|
|
mock_response = mock.Mock()
|
|
mock_response.json.return_value = {"kind": "KafkaConsumerGroupList", "data": []}
|
|
mock_session.get.return_value = mock_response
|
|
|
|
result = analyzer.get_comprehensive_consumer_analysis(
|
|
"https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
assert result == {}
|
|
|
|
def test_get_topics_for_group_handles_empty_assignments(
|
|
self, analyzer, mock_session
|
|
):
|
|
mock_response = mock.Mock()
|
|
mock_response.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [
|
|
{"assignments": []} # Empty assignments
|
|
],
|
|
}
|
|
mock_session.get.return_value = mock_response
|
|
|
|
result = analyzer._get_topics_for_group(
|
|
"group1", "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
assert result == []
|
|
|
|
def test_get_topics_for_group_handles_missing_topic_name(
|
|
self, analyzer, mock_session
|
|
):
|
|
mock_response = mock.Mock()
|
|
mock_response.json.return_value = {
|
|
"kind": "KafkaConsumerList",
|
|
"data": [
|
|
{
|
|
"assignments": [
|
|
{"topic_name": "topic1"},
|
|
{}, # Missing topic_name
|
|
{"other_field": "value"}, # Wrong field
|
|
]
|
|
}
|
|
],
|
|
}
|
|
mock_session.get.return_value = mock_response
|
|
|
|
result = analyzer._get_topics_for_group(
|
|
"group1", "https://kafka-rest.confluent.cloud", "cluster1"
|
|
)
|
|
|
|
# Should only include valid topic
|
|
assert result == ["topic1"]
|
|
|
|
def test_get_all_consumer_groups_handles_trailing_slash(
|
|
self, analyzer, mock_session
|
|
):
|
|
mock_response = mock.Mock()
|
|
mock_response.json.return_value = {
|
|
"kind": "KafkaConsumerGroupList",
|
|
"data": [{"consumer_group_id": "group1"}],
|
|
}
|
|
mock_session.get.return_value = mock_response
|
|
|
|
analyzer._get_all_consumer_groups(
|
|
"https://kafka-rest.confluent.cloud/", # Trailing slash
|
|
"cluster1",
|
|
)
|
|
|
|
# Verify URL was constructed correctly
|
|
called_url = mock_session.get.call_args[0][0]
|
|
assert "//" not in called_url.replace("https://", "")
|