datahub/metadata-ingestion/tests/unit/test_pulsar_source.py

284 lines
12 KiB
Python

import unittest
from typing import Any, Dict
from unittest.mock import patch
import pytest
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.pulsar import (
PulsarSchema,
PulsarSource,
PulsarSourceConfig,
PulsarTopic,
)
mock_schema_response: Dict[str, Any] = {
"version": 1,
"type": "AVRO",
"timestamp": 0,
"data": '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}',
"properties": {"__jsr310ConversionEnabled": "false", "__alwaysAllowNull": "true"},
}
class TestPulsarSourceConfig:
# TODO: While these tests work, we really shouldn't be calling pydantic
# validator methods directly.
def test_pulsar_source_config_valid_web_service_url(self):
assert (
PulsarSourceConfig().web_service_url_scheme_host_port(
"http://localhost:8080/"
)
== "http://localhost:8080"
)
def test_pulsar_source_config_invalid_web_service_url_scheme(self):
with pytest.raises(
ValueError, match=r"Scheme should be http or https, found ftp"
):
PulsarSourceConfig().web_service_url_scheme_host_port(
"ftp://localhost:8080/"
)
def test_pulsar_source_config_invalid_web_service_url_host(self):
with pytest.raises(
ValueError,
match=r"Not a valid hostname, hostname contains invalid characters, found localhost&",
):
PulsarSourceConfig().web_service_url_scheme_host_port(
"http://localhost&:8080/"
)
class TestPulsarTopic:
def test_pulsar_source_parse_topic_string(self) -> None:
topic = "persistent://tenant/namespace/topic"
pulsar_topic = PulsarTopic(topic)
assert pulsar_topic.type == "persistent"
assert pulsar_topic.tenant == "tenant"
assert pulsar_topic.namespace == "namespace"
assert pulsar_topic.topic == "topic"
assert pulsar_topic.fullname == "persistent://tenant/namespace/topic"
class TestPulsarSchema:
def test_pulsar_source_parse_pulsar_schema(self) -> None:
pulsar_schema = PulsarSchema(mock_schema_response)
assert pulsar_schema.schema_type == "AVRO"
assert (
pulsar_schema.schema_str
== '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}'
)
assert pulsar_schema.schema_name == "foo.bar.FooSchema"
assert pulsar_schema.schema_version == 1
assert pulsar_schema.schema_description == "Description of FooSchema"
assert pulsar_schema.properties == {
"__jsr310ConversionEnabled": "false",
"__alwaysAllowNull": "true",
}
class TestPulsarSource(unittest.TestCase):
def test_pulsar_source_get_token_jwt(self):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{"web_service_url": "http://localhost:8080", "token": "jwt_token"},
ctx,
)
# source = PulsarSource(
# ctx=PipelineContext(run_id="pulsar-source-test"),
# config=self.token_config)
assert pulsar_source.get_access_token() == "jwt_token"
@patch("datahub.ingestion.source.pulsar.requests.get", autospec=True)
@patch("datahub.ingestion.source.pulsar.requests.post", autospec=True)
def test_pulsar_source_get_token_oauth(self, mock_post, mock_get):
ctx = PipelineContext(run_id="test")
mock_get.return_value.json.return_value = {
"token_endpoint": "http://127.0.0.1:8083/realms/pulsar/protocol/openid-connect/token"
}
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
"issuer_url": "http://localhost:8083/realms/pulsar",
"client_id": "client_id",
"client_secret": "client_secret",
},
ctx,
)
mock_post.return_value.json.return_value = {"access_token": "oauth_token"}
assert pulsar_source.get_access_token() == "oauth_token"
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_all_tenant(self, mock_session):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
},
ctx,
)
# Mock fetching Pulsar metadata
with patch(
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
) as mock:
mock.side_effect = [
["t_1"], # tenant list
["t_1/ns_1"], # namespaces list
["persistent://t_1/ns_1/topic_1"], # persistent topic list
[], # persistent partitioned topic list
[], # none-persistent topic list
[], # none-persistent partitioned topic list
mock_schema_response,
] # schema for persistent://t_1/ns_1/topic
work_units = list(pulsar_source.get_workunits())
first_mcp = work_units[0].metadata
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
# Expected calls 7
# http://localhost:8080/admin/v2/tenants
# http://localhost:8080/admin/v2/namespaces/t_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
assert mock.call_count == 7
# expecting 6 mcp for one topic with default config
assert len(work_units) == 6
aspect_names = set(
wu.metadata.aspectName
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
)
assert len(aspect_names) == 6
assert aspect_names == {
"status",
"schemaMetadata",
"datasetProperties",
"browsePaths",
"subTypes",
"browsePathsV2",
}
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_custom_tenant(self, mock_session):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
"tenants": ["t_1", "t_2"],
},
ctx,
)
# Mock fetching Pulsar metadata
with patch(
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
) as mock:
mock.side_effect = [
["t_1/ns_1"], # namespaces list
["persistent://t_1/ns_1/topic_1"], # topic list
[], # empty persistent partitioned topic list
[], # empty none-persistent topic list
[], # empty none-persistent partitioned topic list
mock_schema_response, # schema for persistent://t_1/ns_1/topic
[], # no namespaces for tenant t_2
]
work_units = list(pulsar_source.get_workunits())
first_mcp = work_units[0].metadata
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
# Expected calls 7
# http://localhost:8080/admin/v2/namespaces/t_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
# http://localhost:8080/admin/v2/namespaces/t_2
assert mock.call_count == 7
# expecting 6 mcp for one topic with default config
assert len(work_units) == 6
aspect_names = set(
wu.metadata.aspectName
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
)
assert len(aspect_names) == 6
assert aspect_names == {
"status",
"schemaMetadata",
"datasetProperties",
"browsePaths",
"subTypes",
"browsePathsV2",
}
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_patterns(self, mock_session):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
"tenants": ["t_1", "t_2", "bad_t_3"],
"tenant_patterns": {"deny": ["bad_t_3"]},
"namespace_patterns": {"allow": [r"t_1/ns_1"]},
"topic_patterns": {"allow": [r"persistent://t_1/ns_1/topic_1"]},
},
ctx,
)
# Mock fetching Pulsar metadata
with patch(
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
) as mock:
mock.side_effect = [
["t_1/ns_1", "t_2/ns_1"], # namespaces list
[
"persistent://t_1/ns_1/topic_1", # persistent topic list
"non-persistent://t_1/ns_1/bad_topic",
], # topic will be filtered out
[], # persistent partitioned topic list
[], # none-persistent topic list
[], # none-persistent partitioned topic list
mock_schema_response, # schema for persistent://t_1/ns_1/topic
[], # no namespaces for tenant t_2
]
work_units = list(pulsar_source.get_workunits())
first_mcp = work_units[0].metadata
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
# Expected calls 7
# http://localhost:8080/admin/v2/namespaces/t_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
# http://localhost:8080/admin/v2/namespaces/t_2
assert mock.call_count == 7
# expecting 6 mcp for one topic with default config
assert len(work_units) == 6
aspect_names = set(
wu.metadata.aspectName
for wu in work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
)
assert len(aspect_names) == 6
assert aspect_names == {
"status",
"schemaMetadata",
"datasetProperties",
"browsePaths",
"subTypes",
"browsePathsV2",
}