mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 23:57:03 +00:00
284 lines
12 KiB
Python
284 lines
12 KiB
Python
import unittest
|
|
from typing import Any, Dict
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
from datahub.ingestion.source.pulsar import (
|
|
PulsarSchema,
|
|
PulsarSource,
|
|
PulsarSourceConfig,
|
|
PulsarTopic,
|
|
)
|
|
|
|
mock_schema_response: Dict[str, Any] = {
|
|
"version": 1,
|
|
"type": "AVRO",
|
|
"timestamp": 0,
|
|
"data": '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}',
|
|
"properties": {"__jsr310ConversionEnabled": "false", "__alwaysAllowNull": "true"},
|
|
}
|
|
|
|
|
|
class TestPulsarSourceConfig:
|
|
# TODO: While these tests work, we really shouldn't be calling pydantic
|
|
# validator methods directly.
|
|
|
|
def test_pulsar_source_config_valid_web_service_url(self):
|
|
assert (
|
|
PulsarSourceConfig().web_service_url_scheme_host_port(
|
|
"http://localhost:8080/"
|
|
)
|
|
== "http://localhost:8080"
|
|
)
|
|
|
|
def test_pulsar_source_config_invalid_web_service_url_scheme(self):
|
|
with pytest.raises(
|
|
ValueError, match=r"Scheme should be http or https, found ftp"
|
|
):
|
|
PulsarSourceConfig().web_service_url_scheme_host_port(
|
|
"ftp://localhost:8080/"
|
|
)
|
|
|
|
def test_pulsar_source_config_invalid_web_service_url_host(self):
|
|
with pytest.raises(
|
|
ValueError,
|
|
match=r"Not a valid hostname, hostname contains invalid characters, found localhost&",
|
|
):
|
|
PulsarSourceConfig().web_service_url_scheme_host_port(
|
|
"http://localhost&:8080/"
|
|
)
|
|
|
|
|
|
class TestPulsarTopic:
|
|
def test_pulsar_source_parse_topic_string(self) -> None:
|
|
topic = "persistent://tenant/namespace/topic"
|
|
pulsar_topic = PulsarTopic(topic)
|
|
assert pulsar_topic.type == "persistent"
|
|
assert pulsar_topic.tenant == "tenant"
|
|
assert pulsar_topic.namespace == "namespace"
|
|
assert pulsar_topic.topic == "topic"
|
|
assert pulsar_topic.fullname == "persistent://tenant/namespace/topic"
|
|
|
|
|
|
class TestPulsarSchema:
|
|
def test_pulsar_source_parse_pulsar_schema(self) -> None:
|
|
pulsar_schema = PulsarSchema(mock_schema_response)
|
|
assert pulsar_schema.schema_type == "AVRO"
|
|
assert (
|
|
pulsar_schema.schema_str
|
|
== '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}'
|
|
)
|
|
assert pulsar_schema.schema_name == "foo.bar.FooSchema"
|
|
assert pulsar_schema.schema_version == 1
|
|
assert pulsar_schema.schema_description == "Description of FooSchema"
|
|
assert pulsar_schema.properties == {
|
|
"__jsr310ConversionEnabled": "false",
|
|
"__alwaysAllowNull": "true",
|
|
}
|
|
|
|
|
|
class TestPulsarSource(unittest.TestCase):
|
|
def test_pulsar_source_get_token_jwt(self):
|
|
ctx = PipelineContext(run_id="test")
|
|
pulsar_source = PulsarSource.create(
|
|
{"web_service_url": "http://localhost:8080", "token": "jwt_token"},
|
|
ctx,
|
|
)
|
|
# source = PulsarSource(
|
|
# ctx=PipelineContext(run_id="pulsar-source-test"),
|
|
# config=self.token_config)
|
|
assert pulsar_source.get_access_token() == "jwt_token"
|
|
|
|
@patch("datahub.ingestion.source.pulsar.requests.get", autospec=True)
|
|
@patch("datahub.ingestion.source.pulsar.requests.post", autospec=True)
|
|
def test_pulsar_source_get_token_oauth(self, mock_post, mock_get):
|
|
ctx = PipelineContext(run_id="test")
|
|
mock_get.return_value.json.return_value = {
|
|
"token_endpoint": "http://127.0.0.1:8083/realms/pulsar/protocol/openid-connect/token"
|
|
}
|
|
|
|
pulsar_source = PulsarSource.create(
|
|
{
|
|
"web_service_url": "http://localhost:8080",
|
|
"issuer_url": "http://localhost:8083/realms/pulsar",
|
|
"client_id": "client_id",
|
|
"client_secret": "client_secret",
|
|
},
|
|
ctx,
|
|
)
|
|
mock_post.return_value.json.return_value = {"access_token": "oauth_token"}
|
|
assert pulsar_source.get_access_token() == "oauth_token"
|
|
|
|
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
|
|
def test_pulsar_source_get_workunits_all_tenant(self, mock_session):
|
|
ctx = PipelineContext(run_id="test")
|
|
pulsar_source = PulsarSource.create(
|
|
{
|
|
"web_service_url": "http://localhost:8080",
|
|
},
|
|
ctx,
|
|
)
|
|
|
|
# Mock fetching Pulsar metadata
|
|
with patch(
|
|
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
|
|
) as mock:
|
|
mock.side_effect = [
|
|
["t_1"], # tenant list
|
|
["t_1/ns_1"], # namespaces list
|
|
["persistent://t_1/ns_1/topic_1"], # persistent topic list
|
|
[], # persistent partitioned topic list
|
|
[], # none-persistent topic list
|
|
[], # none-persistent partitioned topic list
|
|
mock_schema_response,
|
|
] # schema for persistent://t_1/ns_1/topic
|
|
|
|
work_units = list(pulsar_source.get_workunits())
|
|
first_mcp = work_units[0].metadata
|
|
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
|
|
|
|
# Expected calls 7
|
|
# http://localhost:8080/admin/v2/tenants
|
|
# http://localhost:8080/admin/v2/namespaces/t_1
|
|
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
|
|
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
|
|
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
|
|
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
|
|
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
|
|
assert mock.call_count == 7
|
|
# expecting 6 mcp for one topic with default config
|
|
assert len(work_units) == 6
|
|
aspect_names = set(
|
|
wu.metadata.aspectName
|
|
for wu in work_units
|
|
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
|
|
)
|
|
assert len(aspect_names) == 6
|
|
assert aspect_names == {
|
|
"status",
|
|
"schemaMetadata",
|
|
"datasetProperties",
|
|
"browsePaths",
|
|
"subTypes",
|
|
"browsePathsV2",
|
|
}
|
|
|
|
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
|
|
def test_pulsar_source_get_workunits_custom_tenant(self, mock_session):
|
|
ctx = PipelineContext(run_id="test")
|
|
pulsar_source = PulsarSource.create(
|
|
{
|
|
"web_service_url": "http://localhost:8080",
|
|
"tenants": ["t_1", "t_2"],
|
|
},
|
|
ctx,
|
|
)
|
|
|
|
# Mock fetching Pulsar metadata
|
|
with patch(
|
|
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
|
|
) as mock:
|
|
mock.side_effect = [
|
|
["t_1/ns_1"], # namespaces list
|
|
["persistent://t_1/ns_1/topic_1"], # topic list
|
|
[], # empty persistent partitioned topic list
|
|
[], # empty none-persistent topic list
|
|
[], # empty none-persistent partitioned topic list
|
|
mock_schema_response, # schema for persistent://t_1/ns_1/topic
|
|
[], # no namespaces for tenant t_2
|
|
]
|
|
|
|
work_units = list(pulsar_source.get_workunits())
|
|
first_mcp = work_units[0].metadata
|
|
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
|
|
|
|
# Expected calls 7
|
|
# http://localhost:8080/admin/v2/namespaces/t_1
|
|
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
|
|
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
|
|
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
|
|
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
|
|
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
|
|
# http://localhost:8080/admin/v2/namespaces/t_2
|
|
assert mock.call_count == 7
|
|
# expecting 6 mcp for one topic with default config
|
|
assert len(work_units) == 6
|
|
aspect_names = set(
|
|
wu.metadata.aspectName
|
|
for wu in work_units
|
|
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
|
|
)
|
|
assert len(aspect_names) == 6
|
|
assert aspect_names == {
|
|
"status",
|
|
"schemaMetadata",
|
|
"datasetProperties",
|
|
"browsePaths",
|
|
"subTypes",
|
|
"browsePathsV2",
|
|
}
|
|
|
|
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
|
|
def test_pulsar_source_get_workunits_patterns(self, mock_session):
|
|
ctx = PipelineContext(run_id="test")
|
|
pulsar_source = PulsarSource.create(
|
|
{
|
|
"web_service_url": "http://localhost:8080",
|
|
"tenants": ["t_1", "t_2", "bad_t_3"],
|
|
"tenant_patterns": {"deny": ["bad_t_3"]},
|
|
"namespace_patterns": {"allow": [r"t_1/ns_1"]},
|
|
"topic_patterns": {"allow": [r"persistent://t_1/ns_1/topic_1"]},
|
|
},
|
|
ctx,
|
|
)
|
|
|
|
# Mock fetching Pulsar metadata
|
|
with patch(
|
|
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
|
|
) as mock:
|
|
mock.side_effect = [
|
|
["t_1/ns_1", "t_2/ns_1"], # namespaces list
|
|
[
|
|
"persistent://t_1/ns_1/topic_1", # persistent topic list
|
|
"non-persistent://t_1/ns_1/bad_topic",
|
|
], # topic will be filtered out
|
|
[], # persistent partitioned topic list
|
|
[], # none-persistent topic list
|
|
[], # none-persistent partitioned topic list
|
|
mock_schema_response, # schema for persistent://t_1/ns_1/topic
|
|
[], # no namespaces for tenant t_2
|
|
]
|
|
|
|
work_units = list(pulsar_source.get_workunits())
|
|
first_mcp = work_units[0].metadata
|
|
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
|
|
|
|
# Expected calls 7
|
|
# http://localhost:8080/admin/v2/namespaces/t_1
|
|
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
|
|
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
|
|
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
|
|
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
|
|
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
|
|
# http://localhost:8080/admin/v2/namespaces/t_2
|
|
assert mock.call_count == 7
|
|
# expecting 6 mcp for one topic with default config
|
|
assert len(work_units) == 6
|
|
aspect_names = set(
|
|
wu.metadata.aspectName
|
|
for wu in work_units
|
|
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
|
|
)
|
|
assert len(aspect_names) == 6
|
|
assert aspect_names == {
|
|
"status",
|
|
"schemaMetadata",
|
|
"datasetProperties",
|
|
"browsePaths",
|
|
"subTypes",
|
|
"browsePathsV2",
|
|
}
|