2022-03-30 16:20:23 -04:00
|
|
|
import unittest
|
|
|
|
from unittest.mock import patch
|
|
|
|
|
|
|
|
from confluent_kafka.schema_registry.schema_registry_client import (
|
|
|
|
RegisteredSchema,
|
|
|
|
Schema,
|
2023-08-16 10:33:09 +05:30
|
|
|
SchemaReference,
|
2022-03-30 16:20:23 -04:00
|
|
|
)
|
|
|
|
|
|
|
|
from datahub.ingestion.source.confluent_schema_registry import ConfluentSchemaRegistry
|
2024-11-22 13:08:23 +05:30
|
|
|
from datahub.ingestion.source.kafka.kafka import KafkaSourceConfig, KafkaSourceReport
|
2022-03-30 16:20:23 -04:00
|
|
|
|
|
|
|
|
|
|
|
class ConfluentSchemaRegistryTest(unittest.TestCase):
|
|
|
|
def test_get_schema_str_replace_confluent_ref_avro(self):
|
|
|
|
schema_str_orig = """
|
|
|
|
{
|
|
|
|
"fields": [
|
|
|
|
{
|
|
|
|
"name": "my_field1",
|
|
|
|
"type": "TestTopic1"
|
|
|
|
}
|
|
|
|
],
|
|
|
|
"name": "TestTopic1Val",
|
|
|
|
"namespace": "io.acryl",
|
|
|
|
"type": "record"
|
|
|
|
}
|
|
|
|
"""
|
|
|
|
schema_str_ref = """
|
|
|
|
{
|
|
|
|
"doc": "Sample schema to help you get started.",
|
|
|
|
"fields": [
|
|
|
|
{
|
|
|
|
"doc": "The int type is a 32-bit signed integer.",
|
|
|
|
"name": "my_field1",
|
|
|
|
"type": "int"
|
|
|
|
}
|
|
|
|
],
|
|
|
|
"name": "TestTopic1",
|
|
|
|
"namespace": "io.acryl",
|
|
|
|
"type": "record"
|
|
|
|
}
|
|
|
|
"""
|
|
|
|
|
|
|
|
schema_str_final = (
|
|
|
|
"""
|
|
|
|
{
|
|
|
|
"fields": [
|
|
|
|
{
|
|
|
|
"name": "my_field1",
|
|
|
|
"type": """
|
|
|
|
+ schema_str_ref
|
|
|
|
+ """
|
|
|
|
}
|
|
|
|
],
|
|
|
|
"name": "TestTopic1Val",
|
|
|
|
"namespace": "io.acryl",
|
|
|
|
"type": "record"
|
|
|
|
}
|
|
|
|
"""
|
|
|
|
)
|
|
|
|
|
|
|
|
kafka_source_config = KafkaSourceConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"connection": {
|
|
|
|
"bootstrap": "localhost:9092",
|
|
|
|
"schema_registry_url": "http://localhost:8081",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
confluent_schema_registry = ConfluentSchemaRegistry.create(
|
|
|
|
kafka_source_config, KafkaSourceReport()
|
|
|
|
)
|
|
|
|
|
|
|
|
def new_get_latest_version(subject_name: str) -> RegisteredSchema:
|
|
|
|
return RegisteredSchema(
|
|
|
|
schema_id="schema_id_1",
|
|
|
|
schema=Schema(schema_str=schema_str_ref, schema_type="AVRO"),
|
|
|
|
subject="test",
|
|
|
|
version=1,
|
|
|
|
)
|
|
|
|
|
|
|
|
with patch.object(
|
|
|
|
confluent_schema_registry.schema_registry_client,
|
|
|
|
"get_latest_version",
|
|
|
|
new_get_latest_version,
|
|
|
|
):
|
2025-01-18 15:06:20 +05:30
|
|
|
schema_str = (
|
|
|
|
confluent_schema_registry.get_schema_str_replace_confluent_ref_avro(
|
|
|
|
# The external reference would match by name.
|
|
|
|
schema=Schema(
|
|
|
|
schema_str=schema_str_orig,
|
|
|
|
schema_type="AVRO",
|
|
|
|
references=[
|
|
|
|
SchemaReference(
|
|
|
|
name="TestTopic1", subject="schema_subject_1", version=1
|
|
|
|
)
|
|
|
|
],
|
|
|
|
)
|
2022-03-30 16:20:23 -04:00
|
|
|
)
|
|
|
|
)
|
|
|
|
assert schema_str == ConfluentSchemaRegistry._compact_schema(
|
|
|
|
schema_str_final
|
|
|
|
)
|
|
|
|
|
|
|
|
with patch.object(
|
|
|
|
confluent_schema_registry.schema_registry_client,
|
|
|
|
"get_latest_version",
|
|
|
|
new_get_latest_version,
|
|
|
|
):
|
2025-01-18 15:06:20 +05:30
|
|
|
schema_str = (
|
|
|
|
confluent_schema_registry.get_schema_str_replace_confluent_ref_avro(
|
|
|
|
# The external reference would match by subject.
|
|
|
|
schema=Schema(
|
|
|
|
schema_str=schema_str_orig,
|
|
|
|
schema_type="AVRO",
|
|
|
|
references=[
|
|
|
|
SchemaReference(
|
|
|
|
name="schema_subject_1", subject="TestTopic1", version=1
|
|
|
|
)
|
|
|
|
],
|
|
|
|
)
|
2022-03-30 16:20:23 -04:00
|
|
|
)
|
|
|
|
)
|
|
|
|
assert schema_str == ConfluentSchemaRegistry._compact_schema(
|
|
|
|
schema_str_final
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
unittest.main()
|