feat(ingest): kafka - support schema references (#3862)

This commit is contained in:
Aseem Bansal 2022-01-18 03:59:54 +05:30 committed by GitHub
parent 0987a6f579
commit 400e0fe838
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 119 additions and 2 deletions

View File

@ -91,6 +91,35 @@ class KafkaSource(Source):
else:
self.report.report_dropped(t)
def get_schema_str_replace_confluent_ref_avro(
self, schema: Schema, schema_seen: Optional[set] = None
) -> str:
if not schema.references:
return schema.schema_str
if schema_seen is None:
schema_seen = set()
schema_str = schema.schema_str
for schema_ref in schema.references:
ref_subject = schema_ref["subject"]
if ref_subject in schema_seen:
continue
reference_schema = self.schema_registry_client.get_latest_version(
ref_subject
)
schema_seen.add(ref_subject)
logger.debug(
f"ref for {ref_subject} is {reference_schema.schema.schema_str}"
)
ref_name = schema_ref["name"]
schema_str = schema_str.replace(
f'"{ref_name}"',
self.get_schema_str_replace_confluent_ref_avro(
reference_schema.schema, schema_seen
),
)
return schema_str
def _extract_record(self, topic: str) -> MetadataChangeEvent:
logger.debug(f"topic = {topic}")
platform = "kafka"
@ -114,8 +143,9 @@ class KafkaSource(Source):
# Parse the schema
fields: List[SchemaField] = []
if schema and schema.schema_type == "AVRO":
cleaned_str = self.get_schema_str_replace_confluent_ref_avro(schema)
# "value.id" or "value.[type=string]id"
fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)
fields = schema_util.avro_schema_to_mce_fields(cleaned_str)
elif schema is not None:
self.report.report_warning(
topic,
@ -136,8 +166,9 @@ class KafkaSource(Source):
# Parse the key schema
key_fields: List[SchemaField] = []
if key_schema and key_schema.schema_type == "AVRO":
cleaned_key_str = self.get_schema_str_replace_confluent_ref_avro(key_schema)
key_fields = schema_util.avro_schema_to_mce_fields(
key_schema.schema_str, is_key_schema=True
cleaned_key_str, is_key_schema=True
)
elif key_schema is not None:
self.report.report_warning(

View File

@ -1,12 +1,98 @@
import unittest
from unittest.mock import MagicMock, patch
from confluent_kafka.schema_registry.schema_registry_client import (
RegisteredSchema,
Schema,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.kafka import KafkaSource
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
class KafkaSourceTest(unittest.TestCase):
def test_get_schema_str_replace_confluent_ref_avro(self):
schema_str_orig = """
{
"fields": [
{
"name": "my_field1",
"type": "TestTopic1"
}
],
"name": "TestTopic1Val",
"namespace": "io.acryl",
"type": "record"
}
"""
schema_str_ref = """
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The int type is a 32-bit signed integer.",
"name": "my_field1",
"type": "int"
}
],
"name": "TestTopic1",
"namespace": "io.acryl",
"type": "record"
}
"""
schema_str_final = (
"""
{
"fields": [
{
"name": "my_field1",
"type": """
+ schema_str_ref
+ """
}
],
"name": "TestTopic1Val",
"namespace": "io.acryl",
"type": "record"
}
"""
)
ctx = PipelineContext(run_id="test")
kafka_source = KafkaSource.create(
{
"connection": {"bootstrap": "localhost:9092"},
},
ctx,
)
def new_get_latest_version(_):
return RegisteredSchema(
schema_id="schema_id_1",
schema=Schema(schema_str=schema_str_ref, schema_type="AVRO"),
subject="test",
version=1,
)
with patch.object(
kafka_source.schema_registry_client,
"get_latest_version",
new_get_latest_version,
):
schema_str = kafka_source.get_schema_str_replace_confluent_ref_avro(
schema=Schema(
schema_str=schema_str_orig,
schema_type="AVRO",
references=[
dict(name="TestTopic1", subject="schema_subject_1", version=1)
],
)
)
assert schema_str == schema_str_final
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
def test_kafka_source_configuration(self, mock_kafka):
ctx = PipelineContext(run_id="test")