datahub/metadata-ingestion/tests/unit/test_pulsar_source.py

284 lines
12 KiB
Python
Raw Permalink Normal View History

import unittest
from typing import Any, Dict
from unittest.mock import patch
import pytest
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.pulsar import (
PulsarSchema,
PulsarSource,
PulsarSourceConfig,
PulsarTopic,
)
mock_schema_response: Dict[str, Any] = {
"version": 1,
"type": "AVRO",
"timestamp": 0,
"data": '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}',
"properties": {"__jsr310ConversionEnabled": "false", "__alwaysAllowNull": "true"},
}
class TestPulsarSourceConfig:
# TODO: While these tests work, we really shouldn't be calling pydantic
# validator methods directly.
def test_pulsar_source_config_valid_web_service_url(self):
assert (
PulsarSourceConfig().web_service_url_scheme_host_port(
"http://localhost:8080/"
)
== "http://localhost:8080"
)
def test_pulsar_source_config_invalid_web_service_url_scheme(self):
with pytest.raises(
ValueError, match=r"Scheme should be http or https, found ftp"
):
PulsarSourceConfig().web_service_url_scheme_host_port(
"ftp://localhost:8080/"
)
def test_pulsar_source_config_invalid_web_service_url_host(self):
with pytest.raises(
ValueError,
match=r"Not a valid hostname, hostname contains invalid characters, found localhost&",
):
PulsarSourceConfig().web_service_url_scheme_host_port(
"http://localhost&:8080/"
)
class TestPulsarTopic:
def test_pulsar_source_parse_topic_string(self) -> None:
topic = "persistent://tenant/namespace/topic"
pulsar_topic = PulsarTopic(topic)
assert pulsar_topic.type == "persistent"
assert pulsar_topic.tenant == "tenant"
assert pulsar_topic.namespace == "namespace"
assert pulsar_topic.topic == "topic"
assert pulsar_topic.fullname == "persistent://tenant/namespace/topic"
class TestPulsarSchema:
def test_pulsar_source_parse_pulsar_schema(self) -> None:
pulsar_schema = PulsarSchema(mock_schema_response)
assert pulsar_schema.schema_type == "AVRO"
assert (
pulsar_schema.schema_str
== '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}'
)
assert pulsar_schema.schema_name == "foo.bar.FooSchema"
assert pulsar_schema.schema_version == 1
assert pulsar_schema.schema_description == "Description of FooSchema"
assert pulsar_schema.properties == {
"__jsr310ConversionEnabled": "false",
"__alwaysAllowNull": "true",
}
class TestPulsarSource(unittest.TestCase):
def test_pulsar_source_get_token_jwt(self):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{"web_service_url": "http://localhost:8080", "token": "jwt_token"},
ctx,
)
# source = PulsarSource(
# ctx=PipelineContext(run_id="pulsar-source-test"),
# config=self.token_config)
assert pulsar_source.get_access_token() == "jwt_token"
@patch("datahub.ingestion.source.pulsar.requests.get", autospec=True)
@patch("datahub.ingestion.source.pulsar.requests.post", autospec=True)
def test_pulsar_source_get_token_oauth(self, mock_post, mock_get):
ctx = PipelineContext(run_id="test")
mock_get.return_value.json.return_value = {
"token_endpoint": "http://127.0.0.1:8083/realms/pulsar/protocol/openid-connect/token"
}
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
"issuer_url": "http://localhost:8083/realms/pulsar",
"client_id": "client_id",
"client_secret": "client_secret",
},
ctx,
)
mock_post.return_value.json.return_value = {"access_token": "oauth_token"}
assert pulsar_source.get_access_token() == "oauth_token"
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_all_tenant(self, mock_session):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
},
ctx,
)
# Mock fetching Pulsar metadata
with patch(
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
) as mock:
mock.side_effect = [
["t_1"], # tenant list
["t_1/ns_1"], # namespaces list
["persistent://t_1/ns_1/topic_1"], # persistent topic list
[], # persistent partitioned topic list
[], # none-persistent topic list
[], # none-persistent partitioned topic list
mock_schema_response,
] # schema for persistent://t_1/ns_1/topic
work_units = list(pulsar_source.get_workunits())
first_mcp = work_units[0].metadata
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
# Expected calls 7
# http://localhost:8080/admin/v2/tenants
# http://localhost:8080/admin/v2/namespaces/t_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
assert mock.call_count == 7
# expecting 6 mcp for one topic with default config
assert len(work_units) == 6
aspect_names = set(
wu.metadata.aspectName
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
)
assert len(aspect_names) == 6
assert aspect_names == {
"status",
"schemaMetadata",
"datasetProperties",
"browsePaths",
"subTypes",
"browsePathsV2",
}
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_custom_tenant(self, mock_session):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
"tenants": ["t_1", "t_2"],
},
ctx,
)
# Mock fetching Pulsar metadata
with patch(
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
) as mock:
mock.side_effect = [
["t_1/ns_1"], # namespaces list
["persistent://t_1/ns_1/topic_1"], # topic list
[], # empty persistent partitioned topic list
[], # empty none-persistent topic list
[], # empty none-persistent partitioned topic list
mock_schema_response, # schema for persistent://t_1/ns_1/topic
[], # no namespaces for tenant t_2
]
work_units = list(pulsar_source.get_workunits())
first_mcp = work_units[0].metadata
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
# Expected calls 7
# http://localhost:8080/admin/v2/namespaces/t_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
# http://localhost:8080/admin/v2/namespaces/t_2
assert mock.call_count == 7
# expecting 6 mcp for one topic with default config
assert len(work_units) == 6
aspect_names = set(
wu.metadata.aspectName
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
)
assert len(aspect_names) == 6
assert aspect_names == {
"status",
"schemaMetadata",
"datasetProperties",
"browsePaths",
"subTypes",
"browsePathsV2",
}
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_patterns(self, mock_session):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
"tenants": ["t_1", "t_2", "bad_t_3"],
"tenant_patterns": {"deny": ["bad_t_3"]},
"namespace_patterns": {"allow": [r"t_1/ns_1"]},
"topic_patterns": {"allow": [r"persistent://t_1/ns_1/topic_1"]},
},
ctx,
)
# Mock fetching Pulsar metadata
with patch(
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
) as mock:
mock.side_effect = [
["t_1/ns_1", "t_2/ns_1"], # namespaces list
[
"persistent://t_1/ns_1/topic_1", # persistent topic list
"non-persistent://t_1/ns_1/bad_topic",
], # topic will be filtered out
[], # persistent partitioned topic list
[], # none-persistent topic list
[], # none-persistent partitioned topic list
mock_schema_response, # schema for persistent://t_1/ns_1/topic
[], # no namespaces for tenant t_2
]
work_units = list(pulsar_source.get_workunits())
first_mcp = work_units[0].metadata
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
# Expected calls 7
# http://localhost:8080/admin/v2/namespaces/t_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
# http://localhost:8080/admin/v2/namespaces/t_2
assert mock.call_count == 7
# expecting 6 mcp for one topic with default config
assert len(work_units) == 6
aspect_names = set(
wu.metadata.aspectName
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
)
assert len(aspect_names) == 6
assert aspect_names == {
"status",
"schemaMetadata",
"datasetProperties",
"browsePaths",
"subTypes",
"browsePathsV2",
}