diff --git a/metadata-ingestion/src/gometa/ingestion/extractor/schema_util.py b/metadata-ingestion/src/gometa/ingestion/extractor/schema_util.py index 4b33c24ae6..5d06917e4d 100644 --- a/metadata-ingestion/src/gometa/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/gometa/ingestion/extractor/schema_util.py @@ -1,38 +1,65 @@ import logging +from typing import List, Dict, Any import avro.schema -"""A helper file for Avro schema -> MCE schema transformations""" +from gometa.metadata.com.linkedin.pegasus2avro.schema import ( + SchemaMetadata, KafkaSchema, SchemaField, SchemaFieldDataType, + BooleanTypeClass, FixedTypeClass, StringTypeClass, BytesTypeClass, NumberTypeClass, EnumTypeClass, NullTypeClass, MapTypeClass, ArrayTypeClass, UnionTypeClass, RecordTypeClass, +) + +"""A helper file for Avro schema -> MCE schema transformations""" logger = logging.getLogger(__name__) -#TODO: Broken (UnionSchemas) _field_type_mapping = { - "int" : "int", - "string" : "string", - "record" : "struct", + "null": NullTypeClass, + "bool": BooleanTypeClass, + "int" : NumberTypeClass, + "long" : NumberTypeClass, + "float" : NumberTypeClass, + "double" : NumberTypeClass, + "bytes" : BytesTypeClass, + "string" : StringTypeClass, + "record" : RecordTypeClass, + "enum" : EnumTypeClass, + "array" : ArrayTypeClass, + "union" : UnionTypeClass, + "fixed" : FixedTypeClass, } -#TODO: Broken -def _get_column_type(field_type): - return _field_type_mapping.get(str(field_type), str(field_type)) - -#TODO: Broken -def avro_schema_to_mce_fields(avro_schema_string): +def _get_column_type(field_type) -> SchemaFieldDataType: + tp = field_type + if hasattr(tp, 'type'): + tp = tp.type + tp = str(tp) + TypeClass: Any = _field_type_mapping.get(tp) + # TODO: we could populate the nestedTypes field for unions and similar fields + # for the other types as well. However, since we already populate the nativeDataType + # field below, it is mostly ok to leave this as not fully initialized. + dt = SchemaFieldDataType(type=TypeClass()) + return dt + +def avro_schema_to_mce_fields(avro_schema_string: str) -> List[SchemaField]: """Converts an avro schema into a schema compatible with MCE""" - schema: avro.schema.RecordSchema = avro.schema.Parse(avro_schema_string) - canonical_fields = [] - fields_skipped = 0 - for field in schema.fields: - # only transform the fields we can, ignore the rest - if _field_type_mapping.get(str(field.type),None): - canonical_field = { - 'fieldPath': field.name, - 'nativeDataType': str(field.type), - 'type': { "type": _get_column_type(field.type)}, - 'description': field.doc, - } - canonical_fields.append(canonical_field) - else: - fields_skipped = fields_skipped + 1 - logger.warn(f'Schema {schema.name}: Skipped {fields_skipped} fields during Avro schema to canonical schema conversion') - return canonical_fields + + # Handle some library compatability issues. + if hasattr(avro.schema, 'parse'): + schema_parse_fn = avro.schema.parse + else: + schema_parse_fn = avro.schema.Parse + + parsed_schema: avro.schema.RecordSchema = schema_parse_fn(avro_schema_string) + + fields: List[SchemaField] = [] + for parsed_field in parsed_schema.fields: + field = SchemaField( + fieldPath=parsed_field.name, + nativeDataType=str(parsed_field.type), + type=_get_column_type(parsed_field.type), + description=parsed_field.props.get('doc', None), + ) + + fields.append(field) + + breakpoint() + return fields diff --git a/metadata-ingestion/src/gometa/ingestion/source/kafka.py b/metadata-ingestion/src/gometa/ingestion/source/kafka.py index 6d292a3061..66ff8f03d7 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/kafka.py +++ b/metadata-ingestion/src/gometa/ingestion/source/kafka.py @@ -16,7 +16,7 @@ import gometa.ingestion.extractor.schema_util as schema_util from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from gometa.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot -from gometa.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata, KafkaSchema +from gometa.metadata.com.linkedin.pegasus2avro.schema import SchemaMetadata, KafkaSchema, SchemaField from gometa.metadata.com.linkedin.pegasus2avro.common import AuditStamp logger = logging.getLogger(__name__) @@ -99,11 +99,18 @@ class KafkaSource(Source): except Exception as e: logger.debug(f"failed to get schema for {topic} with {e}") + fields: Optional[List[SchemaField]] = None + if schema and schema.schema_type == 'AVRO': + fields = schema_util.avro_schema_to_mce_fields(schema.schema_str) + elif schema: + logger.debug(f"unable to parse kafka schema type {schema.schema_type}") + + is_incomplete = True if schema: - # TODO: add schema parsing capabilities - # canonical_schema = [] - # if schema.schema_type == "AVRO": - # canonical_schema = schema_util.avro_schema_to_mce_fields(schema.schema_str) + if not fields: + fields = [] + else: + is_incomplete = False schema_metadata = SchemaMetadata( schemaName=topic, @@ -111,16 +118,16 @@ class KafkaSource(Source): hash=str(schema._hash), platform=f"urn:li:dataPlatform:{platform}", platformSchema = KafkaSchema( - # TODO: keySchema documentSchema=schema.schema_str ), - fields=[], + fields=(fields if fields is not None else []), created=AuditStamp(time=sys_time, actor=actor), lastModified=AuditStamp(time=sys_time, actor=actor), ) dataset_snapshot.aspects.append(schema_metadata) - else: + + if is_incomplete: self.report.report_schema_incomplete(topic) metadata_record.proposedSnapshot = dataset_snapshot