feat(ingestion): Support pluggable Schema Registry for Kafka Source (#4535)

* Support for pluggable schema registry for the Kafka source.
Co-authored-by: Sunil Patil <spatil@twilio.com>
Co-authored-by: Ravindra Lanka <rlanka@acryl.io>
This commit is contained in:
Sunil Patil 2022-03-30 16:20:23 -04:00 committed by GitHub
parent dad6a53890
commit 36e9552d61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 466 additions and 308 deletions

View File

@ -107,6 +107,41 @@ source:
"my_topic_2-value": "io.acryl.Schema3" "my_topic_2-value": "io.acryl.Schema3"
``` ```
## Custom Schema Registry
The Kafka Source uses the schema registry to figure out the schema associated with both `key` and `value` for the topic.
By default it uses the [Confluent's Kafka Schema registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
and supports the `AVRO` schema type.
If you're using a custom schema registry, or you are using schema type other than `AVRO`, then you can provide your own
custom implementation of the `KafkaSchemaRegistryBase` class, and implement the `get_schema_metadata(topic, platform_urn)` method that
given a topic name would return object of `SchemaMetadata` containing schema for that topic. Please refer
`datahub.ingestion.source.confluent_schema_registry::ConfluentSchemaRegistry` for sample implementation of this class.
```python
class KafkaSchemaRegistryBase(ABC):
@abstractmethod
def get_schema_metadata(
self, topic: str, platform_urn: str
) -> Optional[SchemaMetadata]:
pass
```
The custom schema registry class can be configured using the `schema_registry_class` config param of the `kafka` source as shown below.
```YAML
source:
type: "kafka"
config:
# Set the custom schema registry implementation class
schema_registry_class: "datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
# Coordinates
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081
sink:
# sink configs
```
## Config details ## Config details
Note that a `.` is used to denote nested fields in the YAML recipe. Note that a `.` is used to denote nested fields in the YAML recipe.
@ -126,7 +161,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `domain.domain_urn.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. | | `domain.domain_urn.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. |
| `platform_instance` | | None | The Platform instance to use while constructing URNs. | | `platform_instance` | | None | The Platform instance to use while constructing URNs. |
| `topic_subject_map` | | `{}` | Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used. | | `topic_subject_map` | | `{}` | Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used. |
| `schema_registry_class` | | `datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry` | The Kafka schema registry class implementation that would be used for obtaining both the key schema and the value schema of the kafka topic. |
The options in the consumer config and schema registry config are passed to the Kafka DeserializingConsumer and SchemaRegistryClient respectively. The options in the consumer config and schema registry config are passed to the Kafka DeserializingConsumer and SchemaRegistryClient respectively.

View File

@ -0,0 +1,261 @@
import json
import logging
from hashlib import md5
from typing import List, Optional, Tuple
import confluent_kafka
from confluent_kafka.schema_registry.schema_registry_client import Schema
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.kafka import KafkaSourceConfig, KafkaSourceReport
from datahub.ingestion.source.kafka_schema_registry_base import KafkaSchemaRegistryBase
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
KafkaSchema,
SchemaField,
SchemaMetadata,
)
logger = logging.getLogger(__name__)
class ConfluentSchemaRegistry(KafkaSchemaRegistryBase):
"""
This is confluent schema registry specific implementation of datahub.ingestion.source.kafka import SchemaRegistry
It knows how to get SchemaMetadata of a topic from ConfluentSchemaRegistry
"""
def __init__(
self, source_config: KafkaSourceConfig, report: KafkaSourceReport
) -> None:
self.source_config: KafkaSourceConfig = source_config
self.report: KafkaSourceReport = report
# Use the fully qualified name for SchemaRegistryClient to make it mock patchable for testing.
self.schema_registry_client = (
confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient(
{
"url": source_config.connection.schema_registry_url,
**source_config.connection.schema_registry_config,
}
)
)
self.known_schema_registry_subjects: List[str] = []
try:
self.known_schema_registry_subjects.extend(
self.schema_registry_client.get_subjects()
)
except Exception as e:
logger.warning(f"Failed to get subjects from schema registry: {e}")
@classmethod
def create(
cls, source_config: KafkaSourceConfig, report: KafkaSourceReport
) -> "ConfluentSchemaRegistry":
return cls(source_config, report)
def _get_subject_for_topic(self, topic: str, is_key_schema: bool) -> Optional[str]:
subject_key_suffix: str = "-key" if is_key_schema else "-value"
# For details on schema registry subject name strategy,
# see: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work
# User-provided subject for the topic overrides the rest, regardless of the subject name strategy.
# However, it is a must when the RecordNameStrategy is used as the schema registry subject name strategy.
# The subject name format for RecordNameStrategy is: <fully-qualified record name>-<key/value> (cannot be inferred from topic name).
subject_key: str = topic + subject_key_suffix
if subject_key in self.source_config.topic_subject_map:
return self.source_config.topic_subject_map[subject_key]
# Subject name format when the schema registry subject name strategy is
# (a) TopicNameStrategy(default strategy): <topic name>-<key/value>
# (b) TopicRecordNameStrategy: <topic name>-<fully-qualified record name>-<key/value>
for subject in self.known_schema_registry_subjects:
if subject.startswith(topic) and subject.endswith(subject_key_suffix):
return subject
return None
@staticmethod
def _compact_schema(schema_str: str) -> str:
# Eliminate all white-spaces for a compact representation.
return json.dumps(json.loads(schema_str), separators=(",", ":"))
def get_schema_str_replace_confluent_ref_avro(
self, schema: Schema, schema_seen: Optional[set] = None
) -> str:
if not schema.references:
return self._compact_schema(schema.schema_str)
if schema_seen is None:
schema_seen = set()
schema_str = self._compact_schema(schema.schema_str)
for schema_ref in schema.references:
ref_subject = schema_ref["subject"]
if ref_subject in schema_seen:
continue
if ref_subject not in self.known_schema_registry_subjects:
logger.warning(
f"{ref_subject} is not present in the list of registered subjects with schema registry!"
)
reference_schema = self.schema_registry_client.get_latest_version(
subject_name=ref_subject
)
schema_seen.add(ref_subject)
logger.debug(
f"ref for {ref_subject} is {reference_schema.schema.schema_str}"
)
# Replace only external type references with the reference schema recursively.
# NOTE: The type pattern is dependent on _compact_schema.
avro_type_kwd = '"type"'
ref_name = schema_ref["name"]
# Try by name first
pattern_to_replace = f'{avro_type_kwd}:"{ref_name}"'
if pattern_to_replace not in schema_str:
# Try by subject
pattern_to_replace = f'{avro_type_kwd}:"{ref_subject}"'
if pattern_to_replace not in schema_str:
logger.warning(
f"Not match for external schema type: {{name:{ref_name}, subject:{ref_subject}}} in schema:{schema_str}"
)
else:
logger.debug(
f"External schema matches by subject, {pattern_to_replace}"
)
else:
logger.debug(f"External schema matches by name, {pattern_to_replace}")
schema_str = schema_str.replace(
pattern_to_replace,
f"{avro_type_kwd}:{self.get_schema_str_replace_confluent_ref_avro(reference_schema.schema, schema_seen)}",
)
return schema_str
def _get_schema_and_fields(
self, topic: str, is_key_schema: bool
) -> Tuple[Optional[Schema], List[SchemaField]]:
schema: Optional[Schema] = None
schema_type_str: str = "key" if is_key_schema else "value"
topic_subject: Optional[str] = self._get_subject_for_topic(
topic=topic, is_key_schema=is_key_schema
)
if topic_subject is not None:
logger.debug(
f"The {schema_type_str} schema subject:'{topic_subject}' is found for topic:'{topic}'."
)
try:
registered_schema = self.schema_registry_client.get_latest_version(
subject_name=topic_subject
)
schema = registered_schema.schema
except Exception as e:
logger.warning(
f"For topic: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}."
)
self.report.report_warning(
topic,
f"failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}.",
)
else:
logger.debug(
f"For topic: {topic}, the schema registry subject for the {schema_type_str} schema is not found."
)
if not is_key_schema:
# Value schema is always expected. Report a warning.
self.report.report_warning(
topic,
f"The schema registry subject for the {schema_type_str} schema is not found."
f" The topic is either schema-less, or no messages have been written to the topic yet.",
)
# Obtain the schema fields from schema for the topic.
fields: List[SchemaField] = []
if schema is not None:
fields = self._get_schema_fields(
topic=topic, schema=schema, is_key_schema=is_key_schema
)
return (schema, fields)
def _get_schema_fields(
self, topic: str, schema: Schema, is_key_schema: bool
) -> List[SchemaField]:
# Parse the schema and convert it to SchemaFields.
fields: List[SchemaField] = []
if schema.schema_type == "AVRO":
cleaned_str: str = self.get_schema_str_replace_confluent_ref_avro(schema)
# "value.id" or "value.[type=string]id"
fields = schema_util.avro_schema_to_mce_fields(
cleaned_str, is_key_schema=is_key_schema
)
else:
self.report.report_warning(
topic,
f"Parsing kafka schema type {schema.schema_type} is currently not implemented",
)
return fields
def _get_schema_metadata(
self, topic: str, platform_urn: str
) -> Optional[SchemaMetadata]:
# Process the value schema
schema, fields = self._get_schema_and_fields(
topic=topic, is_key_schema=False
) # type: Tuple[Optional[Schema], List[SchemaField]]
# Process the key schema
key_schema, key_fields = self._get_schema_and_fields(
topic=topic, is_key_schema=True
) # type:Tuple[Optional[Schema], List[SchemaField]]
# Create the schemaMetadata aspect.
if schema is not None or key_schema is not None:
# create a merged string for the combined schemas and compute an md5 hash across
schema_as_string = (schema.schema_str if schema is not None else "") + (
key_schema.schema_str if key_schema is not None else ""
)
md5_hash: str = md5(schema_as_string.encode()).hexdigest()
return SchemaMetadata(
schemaName=topic,
version=0,
hash=md5_hash,
platform=platform_urn,
platformSchema=KafkaSchema(
documentSchema=schema.schema_str if schema is not None else "",
keySchema=key_schema.schema_str if key_schema else None,
),
fields=key_fields + fields,
)
return None
def get_schema_metadata(
self, topic: str, platform_urn: str
) -> Optional[SchemaMetadata]:
logger.debug(f"Inside _get_schema_metadata {topic} {platform_urn}")
# Process the value schema
schema, fields = self._get_schema_and_fields(
topic=topic, is_key_schema=False
) # type: Tuple[Optional[Schema], List[SchemaField]]
# Process the key schema
key_schema, key_fields = self._get_schema_and_fields(
topic=topic, is_key_schema=True
) # type:Tuple[Optional[Schema], List[SchemaField]]
# Create the schemaMetadata aspect.
if schema is not None or key_schema is not None:
# create a merged string for the combined schemas and compute an md5 hash across
schema_as_string = (schema.schema_str if schema is not None else "") + (
key_schema.schema_str if key_schema is not None else ""
)
md5_hash = md5(schema_as_string.encode()).hexdigest()
return SchemaMetadata(
schemaName=topic,
version=0,
hash=md5_hash,
platform=platform_urn,
platformSchema=KafkaSchema(
documentSchema=schema.schema_str if schema is not None else "",
keySchema=key_schema.schema_str if key_schema else None,
),
fields=key_fields + fields,
)
return None

View File

@ -1,11 +1,10 @@
import json
import logging import logging
import types
from dataclasses import dataclass, field from dataclasses import dataclass, field
from hashlib import md5 from importlib import import_module
from typing import Dict, Iterable, List, Optional, Tuple, cast from typing import Dict, Iterable, List, Optional, Tuple, Type, cast
import confluent_kafka import confluent_kafka
from confluent_kafka.schema_registry.schema_registry_client import Schema
from datahub.configuration.common import AllowDenyPattern, ConfigurationError from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.kafka import KafkaConsumerConnectionConfig from datahub.configuration.kafka import KafkaConsumerConnectionConfig
@ -21,7 +20,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import add_domain_to_entity_wu from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util from datahub.ingestion.source.kafka_schema_registry_base import KafkaSchemaRegistryBase
from datahub.ingestion.source.state.checkpoint import Checkpoint from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.kafka_state import KafkaCheckpointState from datahub.ingestion.source.state.kafka_state import KafkaCheckpointState
from datahub.ingestion.source.state.stateful_ingestion_base import ( from datahub.ingestion.source.state.stateful_ingestion_base import (
@ -34,11 +33,6 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
from datahub.metadata.com.linkedin.pegasus2avro.common import Status from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
KafkaSchema,
SchemaField,
SchemaMetadata,
)
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
BrowsePathsClass, BrowsePathsClass,
ChangeTypeClass, ChangeTypeClass,
@ -69,6 +63,9 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
topic_subject_map: Dict[str, str] = dict() topic_subject_map: Dict[str, str] = dict()
# Custom Stateful Ingestion settings # Custom Stateful Ingestion settings
stateful_ingestion: Optional[KafkaSourceStatefulIngestionConfig] = None stateful_ingestion: Optional[KafkaSourceStatefulIngestionConfig] = None
schema_registry_class: str = (
"datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
)
@dataclass @dataclass
@ -94,9 +91,23 @@ class KafkaSource(StatefulIngestionSourceBase):
report: KafkaSourceReport report: KafkaSourceReport
platform: str = "kafka" platform: str = "kafka"
@classmethod
def create_schema_registry(
cls, config: KafkaSourceConfig, report: KafkaSourceReport
) -> KafkaSchemaRegistryBase:
try:
module_path, class_name = config.schema_registry_class.rsplit(
".", 1
) # type: Tuple[str, str]
module: types.ModuleType = import_module(module_path)
schema_registry_class: Type = getattr(module, class_name)
return schema_registry_class.create(config, report)
except (ImportError, AttributeError):
raise ImportError(config.schema_registry_class)
def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext): def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx) super().__init__(config, ctx)
self.source_config = config self.source_config: KafkaSourceConfig = config
if ( if (
self.is_stateful_ingestion_configured() self.is_stateful_ingestion_configured()
and not self.source_config.platform_instance and not self.source_config.platform_instance
@ -112,43 +123,10 @@ class KafkaSource(StatefulIngestionSourceBase):
**self.source_config.connection.consumer_config, **self.source_config.connection.consumer_config,
} }
) )
# Use the fully qualified name for SchemaRegistryClient to make it mock patchable for testing.
self.schema_registry_client = (
confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient(
{
"url": self.source_config.connection.schema_registry_url,
**self.source_config.connection.schema_registry_config,
}
)
)
self.report = KafkaSourceReport() self.report = KafkaSourceReport()
self.known_schema_registry_subjects: List[str] = [] self.schema_registry_client = KafkaSource.create_schema_registry(
try: config, self.report
self.known_schema_registry_subjects.extend(
self.schema_registry_client.get_subjects()
) )
except Exception as e:
logger.warning(f"Failed to get subjects from schema registry: {e}")
def _get_subject_for_topic(self, topic: str, is_key_schema: bool) -> Optional[str]:
subject_key_suffix: str = "-key" if is_key_schema else "-value"
# For details on schema registry subject name strategy,
# see: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work
# User-provided subject for the topic overrides the rest, regardless of the subject name strategy.
# However, it is a must when the RecordNameStrategy is used as the schema registry subject name strategy.
# The subject name format for RecordNameStrategy is: <fully-qualified record name>-<key/value> (cannot be inferred from topic name).
subject_key = topic + subject_key_suffix
if subject_key in self.source_config.topic_subject_map:
return self.source_config.topic_subject_map[subject_key]
# Subject name format when the schema registry subject name strategy is
# (a) TopicNameStrategy(default strategy): <topic name>-<key/value>
# (b) TopicRecordNameStrategy: <topic name>-<fully-qualified record name>-<key/value>
for subject in self.known_schema_registry_subjects:
if subject.startswith(topic) and subject.endswith(subject_key_suffix):
return subject
return None
def is_checkpointing_enabled(self, job_id: JobId) -> bool: def is_checkpointing_enabled(self, job_id: JobId) -> bool:
if ( if (
@ -188,8 +166,8 @@ class KafkaSource(StatefulIngestionSourceBase):
return self.source_config.platform_instance return self.source_config.platform_instance
@classmethod @classmethod
def create(cls, config_dict, ctx): def create(cls, config_dict: Dict, ctx: PipelineContext) -> "KafkaSource":
config = KafkaSourceConfig.parse_obj(config_dict) config: KafkaSourceConfig = KafkaSourceConfig.parse_obj(config_dict)
return cls(config, ctx) return cls(config, ctx)
def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:
@ -246,62 +224,6 @@ class KafkaSource(StatefulIngestionSourceBase):
# Clean up stale entities. # Clean up stale entities.
yield from self.gen_removed_entity_workunits() yield from self.gen_removed_entity_workunits()
@staticmethod
def _compact_schema(schema_str: str) -> str:
# Eliminate all white-spaces for a compact representation.
return json.dumps(json.loads(schema_str), separators=(",", ":"))
def get_schema_str_replace_confluent_ref_avro(
self, schema: Schema, schema_seen: Optional[set] = None
) -> str:
if not schema.references:
return self._compact_schema(schema.schema_str)
if schema_seen is None:
schema_seen = set()
schema_str = self._compact_schema(schema.schema_str)
for schema_ref in schema.references:
ref_subject = schema_ref["subject"]
if ref_subject in schema_seen:
continue
if ref_subject not in self.known_schema_registry_subjects:
logger.warning(
f"{ref_subject} is not present in the list of registered subjects with schema registry!"
)
reference_schema = self.schema_registry_client.get_latest_version(
subject_name=ref_subject
)
schema_seen.add(ref_subject)
logger.debug(
f"ref for {ref_subject} is {reference_schema.schema.schema_str}"
)
# Replace only external type references with the reference schema recursively.
# NOTE: The type pattern is dependent on _compact_schema.
avro_type_kwd = '"type"'
ref_name = schema_ref["name"]
# Try by name first
pattern_to_replace = f'{avro_type_kwd}:"{ref_name}"'
if pattern_to_replace not in schema_str:
# Try by subject
pattern_to_replace = f'{avro_type_kwd}:"{ref_subject}"'
if pattern_to_replace not in schema_str:
logger.warning(
f"Not match for external schema type: {{name:{ref_name}, subject:{ref_subject}}} in schema:{schema_str}"
)
else:
logger.debug(
f"External schema matches by subject, {pattern_to_replace}"
)
else:
logger.debug(f"External schema matches by name, {pattern_to_replace}")
schema_str = schema_str.replace(
pattern_to_replace,
f"{avro_type_kwd}:{self.get_schema_str_replace_confluent_ref_avro(reference_schema.schema, schema_seen)}",
)
return schema_str
def _add_topic_to_checkpoint(self, topic: str) -> None: def _add_topic_to_checkpoint(self, topic: str) -> None:
cur_checkpoint = self.get_current_checkpoint( cur_checkpoint = self.get_current_checkpoint(
self.get_default_ingestion_job_id() self.get_default_ingestion_job_id()
@ -317,103 +239,6 @@ class KafkaSource(StatefulIngestionSourceBase):
) )
) )
def _get_schema_and_fields(
self, topic: str, is_key_schema: bool
) -> Tuple[Optional[Schema], List[SchemaField]]:
schema: Optional[Schema] = None
schema_type_str: str = "key" if is_key_schema else "value"
topic_subject: Optional[str] = self._get_subject_for_topic(
topic=topic, is_key_schema=is_key_schema
)
if topic_subject is not None:
logger.debug(
f"The {schema_type_str} schema subject:'{topic_subject}' is found for topic:'{topic}'."
)
try:
registered_schema = self.schema_registry_client.get_latest_version(
subject_name=topic_subject
)
schema = registered_schema.schema
except Exception as e:
logger.warning(
f"For topic: {topic}, failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}."
)
self.report.report_warning(
topic,
f"failed to get {schema_type_str} schema from schema registry using subject:'{topic_subject}': {e}.",
)
else:
logger.debug(
f"For topic: {topic}, the schema registry subject for the {schema_type_str} schema is not found."
)
if not is_key_schema:
# Value schema is always expected. Report a warning.
self.report.report_warning(
topic,
f"The schema registry subject for the {schema_type_str} schema is not found."
f" The topic is either schema-less, or no messages have been written to the topic yet.",
)
# Obtain the schema fields from schema for the topic.
fields: List[SchemaField] = []
if schema is not None:
fields = self._get_schema_fields(
topic=topic, schema=schema, is_key_schema=is_key_schema
)
return (schema, fields)
def _get_schema_fields(
self, topic: str, schema: Schema, is_key_schema: bool
) -> List[SchemaField]:
# Parse the schema and convert it to SchemaFields.
fields: List[SchemaField] = []
if schema.schema_type == "AVRO":
cleaned_str = self.get_schema_str_replace_confluent_ref_avro(schema)
# "value.id" or "value.[type=string]id"
fields = schema_util.avro_schema_to_mce_fields(
cleaned_str, is_key_schema=is_key_schema
)
else:
self.report.report_warning(
topic,
f"Parsing kafka schema type {schema.schema_type} is currently not implemented",
)
return fields
def _get_schema_metadata(
self, topic: str, platform_urn: str
) -> Optional[SchemaMetadata]:
# Process the value schema
schema, fields = self._get_schema_and_fields(
topic=topic, is_key_schema=False
) # type: Tuple[Optional[Schema], List[SchemaField]]
# Process the key schema
key_schema, key_fields = self._get_schema_and_fields(
topic=topic, is_key_schema=True
) # type:Tuple[Optional[Schema], List[SchemaField]]
# Create the schemaMetadata aspect.
if schema is not None or key_schema is not None:
# create a merged string for the combined schemas and compute an md5 hash across
schema_as_string = (schema.schema_str if schema is not None else "") + (
key_schema.schema_str if key_schema is not None else ""
)
md5_hash = md5(schema_as_string.encode()).hexdigest()
return SchemaMetadata(
schemaName=topic,
version=0,
hash=md5_hash,
platform=platform_urn,
platformSchema=KafkaSchema(
documentSchema=schema.schema_str if schema is not None else "",
keySchema=key_schema.schema_str if key_schema else None,
),
fields=key_fields + fields,
)
return None
def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]: # noqa: C901 def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]: # noqa: C901
logger.debug(f"topic = {topic}") logger.debug(f"topic = {topic}")
@ -431,8 +256,10 @@ class KafkaSource(StatefulIngestionSourceBase):
aspects=[Status(removed=False)], # we append to this list later on aspects=[Status(removed=False)], # we append to this list later on
) )
# 2. Attach schemaMetadata aspect # 2. Attach schemaMetadata aspect (pass control to SchemaRegistry)
schema_metadata = self._get_schema_metadata(topic, platform_urn) schema_metadata = self.schema_registry_client.get_schema_metadata(
topic, platform_urn
)
if schema_metadata is not None: if schema_metadata is not None:
dataset_snapshot.aspects.append(schema_metadata) dataset_snapshot.aspects.append(schema_metadata)
@ -458,7 +285,7 @@ class KafkaSource(StatefulIngestionSourceBase):
) )
) )
# 4. Emit the datasetSnapshot MCE # 5. Emit the datasetSnapshot MCE
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
wu = MetadataWorkUnit(id=f"kafka-{topic}", mce=mce) wu = MetadataWorkUnit(id=f"kafka-{topic}", mce=mce)
self.report.report_workunit(wu) self.report.report_workunit(wu)
@ -494,7 +321,7 @@ class KafkaSource(StatefulIngestionSourceBase):
self.report.report_workunit(wu) self.report.report_workunit(wu)
yield wu yield wu
def get_report(self): def get_report(self) -> KafkaSourceReport:
return self.report return self.report
def update_default_job_run_summary(self) -> None: def update_default_job_run_summary(self) -> None:
@ -509,7 +336,7 @@ class KafkaSource(StatefulIngestionSourceBase):
else JobStatusClass.COMPLETED else JobStatusClass.COMPLETED
) )
def close(self): def close(self) -> None:
self.update_default_job_run_summary() self.update_default_job_run_summary()
self.prepare_for_commit() self.prepare_for_commit()
if self.consumer: if self.consumer:

View File

@ -0,0 +1,12 @@
from abc import ABC, abstractmethod
from typing import Optional
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata
class KafkaSchemaRegistryBase(ABC):
@abstractmethod
def get_schema_metadata(
self, topic: str, platform_urn: str
) -> Optional[SchemaMetadata]:
pass

View File

@ -0,0 +1,122 @@
import unittest
from unittest.mock import patch
from confluent_kafka.schema_registry.schema_registry_client import (
RegisteredSchema,
Schema,
)
from datahub.ingestion.source.confluent_schema_registry import ConfluentSchemaRegistry
from datahub.ingestion.source.kafka import KafkaSourceConfig, KafkaSourceReport
class ConfluentSchemaRegistryTest(unittest.TestCase):
def test_get_schema_str_replace_confluent_ref_avro(self):
schema_str_orig = """
{
"fields": [
{
"name": "my_field1",
"type": "TestTopic1"
}
],
"name": "TestTopic1Val",
"namespace": "io.acryl",
"type": "record"
}
"""
schema_str_ref = """
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The int type is a 32-bit signed integer.",
"name": "my_field1",
"type": "int"
}
],
"name": "TestTopic1",
"namespace": "io.acryl",
"type": "record"
}
"""
schema_str_final = (
"""
{
"fields": [
{
"name": "my_field1",
"type": """
+ schema_str_ref
+ """
}
],
"name": "TestTopic1Val",
"namespace": "io.acryl",
"type": "record"
}
"""
)
kafka_source_config = KafkaSourceConfig.parse_obj(
{
"connection": {
"bootstrap": "localhost:9092",
"schema_registry_url": "http://localhost:8081",
},
}
)
confluent_schema_registry = ConfluentSchemaRegistry.create(
kafka_source_config, KafkaSourceReport()
)
def new_get_latest_version(subject_name: str) -> RegisteredSchema:
return RegisteredSchema(
schema_id="schema_id_1",
schema=Schema(schema_str=schema_str_ref, schema_type="AVRO"),
subject="test",
version=1,
)
with patch.object(
confluent_schema_registry.schema_registry_client,
"get_latest_version",
new_get_latest_version,
):
schema_str = confluent_schema_registry.get_schema_str_replace_confluent_ref_avro(
# The external reference would match by name.
schema=Schema(
schema_str=schema_str_orig,
schema_type="AVRO",
references=[
dict(name="TestTopic1", subject="schema_subject_1", version=1)
],
)
)
assert schema_str == ConfluentSchemaRegistry._compact_schema(
schema_str_final
)
with patch.object(
confluent_schema_registry.schema_registry_client,
"get_latest_version",
new_get_latest_version,
):
schema_str = confluent_schema_registry.get_schema_str_replace_confluent_ref_avro(
# The external reference would match by subject.
schema=Schema(
schema_str=schema_str_orig,
schema_type="AVRO",
references=[
dict(name="schema_subject_1", subject="TestTopic1", version=1)
],
)
)
assert schema_str == ConfluentSchemaRegistry._compact_schema(
schema_str_final
)
if __name__ == "__main__":
unittest.main()

View File

@ -26,106 +26,6 @@ from datahub.metadata.schema_classes import (
class KafkaSourceTest(unittest.TestCase): class KafkaSourceTest(unittest.TestCase):
def test_get_schema_str_replace_confluent_ref_avro(self):
# References external schema 'TestTopic1' in the definition of 'my_field1' field.
schema_str_orig = """
{
"fields": [
{
"name": "my_field1",
"type": "TestTopic1"
}
],
"name": "TestTopic1Val",
"namespace": "io.acryl",
"type": "record"
}
"""
schema_str_ref = """
{
"doc": "Sample schema to help you get started.",
"fields": [
{
"doc": "The int type is a 32-bit signed integer.",
"name": "my_field1",
"type": "int"
}
],
"name": "TestTopic1",
"namespace": "io.acryl",
"type": "record"
}
"""
schema_str_final = (
"""
{
"fields": [
{
"name": "my_field1",
"type": """
+ schema_str_ref
+ """
}
],
"name": "TestTopic1Val",
"namespace": "io.acryl",
"type": "record"
}
"""
)
ctx = PipelineContext(run_id="test")
kafka_source = KafkaSource.create(
{
"connection": {"bootstrap": "localhost:9092"},
},
ctx,
)
def new_get_latest_version(subject_name: str) -> RegisteredSchema:
return RegisteredSchema(
schema_id="schema_id_1",
schema=Schema(schema_str=schema_str_ref, schema_type="AVRO"),
subject="test",
version=1,
)
with patch.object(
kafka_source.schema_registry_client,
"get_latest_version",
new_get_latest_version,
):
schema_str = kafka_source.get_schema_str_replace_confluent_ref_avro(
# The external reference would match by name.
schema=Schema(
schema_str=schema_str_orig,
schema_type="AVRO",
references=[
dict(name="TestTopic1", subject="schema_subject_1", version=1)
],
)
)
assert schema_str == KafkaSource._compact_schema(schema_str_final)
with patch.object(
kafka_source.schema_registry_client,
"get_latest_version",
new_get_latest_version,
):
schema_str = kafka_source.get_schema_str_replace_confluent_ref_avro(
# The external reference would match by subject.
schema=Schema(
schema_str=schema_str_orig,
schema_type="AVRO",
references=[
dict(name="schema_subject_1", subject="TestTopic1", version=1)
],
)
)
assert schema_str == KafkaSource._compact_schema(schema_str_final)
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True) @patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
def test_kafka_source_configuration(self, mock_kafka): def test_kafka_source_configuration(self, mock_kafka):
ctx = PipelineContext(run_id="test") ctx = PipelineContext(run_id="test")
@ -211,6 +111,7 @@ class KafkaSourceTest(unittest.TestCase):
# We should only have 1 topic + sub-type wu. # We should only have 1 topic + sub-type wu.
assert len(workunits) == 2 assert len(workunits) == 2
assert isinstance(workunits[0].metadata, MetadataChangeEvent)
proposed_snap = workunits[0].metadata.proposedSnapshot proposed_snap = workunits[0].metadata.proposedSnapshot
assert proposed_snap.urn == make_dataset_urn_with_platform_instance( assert proposed_snap.urn == make_dataset_urn_with_platform_instance(
platform=PLATFORM, platform=PLATFORM,