Properly extract kafka schema

This commit is contained in:
Harshal Sheth 2021-02-10 14:37:14 -08:00 committed by Shirshanka Das
parent affd5728e6
commit 82c29d3c94
2 changed files with 70 additions and 36 deletions

View File

@ -1,38 +1,65 @@
import logging import logging
from typing import List, Dict, Any
import avro.schema import avro.schema
"""A helper file for Avro schema -> MCE schema transformations"""
from gometa.metadata.com.linkedin.pegasus2avro.schema import (
SchemaMetadata, KafkaSchema, SchemaField, SchemaFieldDataType,
BooleanTypeClass, FixedTypeClass, StringTypeClass, BytesTypeClass, NumberTypeClass, EnumTypeClass, NullTypeClass, MapTypeClass, ArrayTypeClass, UnionTypeClass, RecordTypeClass,
)
"""A helper file for Avro schema -> MCE schema transformations"""
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
#TODO: Broken (UnionSchemas)
_field_type_mapping = { _field_type_mapping = {
"int" : "int", "null": NullTypeClass,
"string" : "string", "bool": BooleanTypeClass,
"record" : "struct", "int" : NumberTypeClass,
"long" : NumberTypeClass,
"float" : NumberTypeClass,
"double" : NumberTypeClass,
"bytes" : BytesTypeClass,
"string" : StringTypeClass,
"record" : RecordTypeClass,
"enum" : EnumTypeClass,
"array" : ArrayTypeClass,
"union" : UnionTypeClass,
"fixed" : FixedTypeClass,
} }
#TODO: Broken def _get_column_type(field_type) -> SchemaFieldDataType:
def _get_column_type(field_type): tp = field_type
return _field_type_mapping.get(str(field_type), str(field_type)) if hasattr(tp, 'type'):
tp = tp.type
#TODO: Broken tp = str(tp)
def avro_schema_to_mce_fields(avro_schema_string): TypeClass: Any = _field_type_mapping.get(tp)
# TODO: we could populate the nestedTypes field for unions and similar fields
# for the other types as well. However, since we already populate the nativeDataType
# field below, it is mostly ok to leave this as not fully initialized.
dt = SchemaFieldDataType(type=TypeClass())
return dt
def avro_schema_to_mce_fields(avro_schema_string: str) -> List[SchemaField]:
"""Converts an avro schema into a schema compatible with MCE""" """Converts an avro schema into a schema compatible with MCE"""
schema: avro.schema.RecordSchema = avro.schema.Parse(avro_schema_string)
canonical_fields = [] # Handle some library compatability issues.
fields_skipped = 0 if hasattr(avro.schema, 'parse'):
for field in schema.fields: schema_parse_fn = avro.schema.parse
# only transform the fields we can, ignore the rest else:
if _field_type_mapping.get(str(field.type),None): schema_parse_fn = avro.schema.Parse
canonical_field = {
'fieldPath': field.name, parsed_schema: avro.schema.RecordSchema = schema_parse_fn(avro_schema_string)
'nativeDataType': str(field.type),
'type': { "type": _get_column_type(field.type)}, fields: List[SchemaField] = []
'description': field.doc, for parsed_field in parsed_schema.fields:
} field = SchemaField(
canonical_fields.append(canonical_field) fieldPath=parsed_field.name,
else: nativeDataType=str(parsed_field.type),
fields_skipped = fields_skipped + 1 type=_get_column_type(parsed_field.type),
logger.warn(f'Schema {schema.name}: Skipped {fields_skipped} fields during Avro schema to canonical schema conversion') description=parsed_field.props.get('doc', None),
return canonical_fields )
fields.append(field)
breakpoint()
return fields

View File

@ -16,7 +16,7 @@ import gometa.ingestion.extractor.schema_util as schema_util
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from gometa.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from gometa.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from gometa.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata, KafkaSchema from gometa.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata, KafkaSchema, SchemaField
from gometa.metadata.com.linkedin.pegasus2avro.common import AuditStamp from gometa.metadata.com.linkedin.pegasus2avro.common import AuditStamp
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -99,11 +99,18 @@ class KafkaSource(Source):
except Exception as e: except Exception as e:
logger.debug(f"failed to get schema for {topic} with {e}") logger.debug(f"failed to get schema for {topic} with {e}")
fields: Optional[List[SchemaField]] = None
if schema and schema.schema_type == 'AVRO':
fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)
elif schema:
logger.debug(f"unable to parse kafka schema type {schema.schema_type}")
is_incomplete = True
if schema: if schema:
# TODO: add schema parsing capabilities if not fields:
# canonical_schema = [] fields = []
# if schema.schema_type == "AVRO": else:
# canonical_schema = schema_util.avro_schema_to_mce_fields(schema.schema_str) is_incomplete = False
schema_metadata = SchemaMetadata( schema_metadata = SchemaMetadata(
schemaName=topic, schemaName=topic,
@ -111,16 +118,16 @@ class KafkaSource(Source):
hash=str(schema._hash), hash=str(schema._hash),
platform=f"urn:li:dataPlatform:{platform}", platform=f"urn:li:dataPlatform:{platform}",
platformSchema = KafkaSchema( platformSchema = KafkaSchema(
# TODO: keySchema
documentSchema=schema.schema_str documentSchema=schema.schema_str
), ),
fields=[], fields=(fields if fields is not None else []),
created=AuditStamp(time=sys_time, actor=actor), created=AuditStamp(time=sys_time, actor=actor),
lastModified=AuditStamp(time=sys_time, actor=actor), lastModified=AuditStamp(time=sys_time, actor=actor),
) )
dataset_snapshot.aspects.append(schema_metadata) dataset_snapshot.aspects.append(schema_metadata)
else:
if is_incomplete:
self.report.report_schema_incomplete(topic) self.report.report_schema_incomplete(topic)
metadata_record.proposedSnapshot = dataset_snapshot metadata_record.proposedSnapshot = dataset_snapshot