Integrated schema parsers (#9305)

* Integrated schema parsers

* Addressed review comments

* fixed pytests
This commit is contained in:
Onkar Ravgan 2022-12-15 16:54:55 +05:30 committed by GitHub
parent c2221fa596
commit b539b299ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 279 additions and 126 deletions

View File

@ -67,8 +67,8 @@
"schemaText": "{\"namespace\":\"openmetadata.kafka\",\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"sale_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"billing_address_id_2\",\"type\":\"int\"},{\"name\":\"api_client_id\",\"type\":\"int\"},{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"location_id\",\"type\":\"int\"},{\"name\":\"order_id\",\"type\":\"double\"}]}"
},
{
"name": "record",
"description": "All record related events gets captured in this topic",
"name": "avro_record",
"description": "All Avro record related events gets captured in this topic",
"partitions": 128,
"retentionSize": 1931232624,
"replicationFactor":4,
@ -76,6 +76,28 @@
"cleanupPolicies": ["compact","delete"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"openmetadata.kafka\",\"name\":\"level\",\"type\":\"record\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"somefield\",\"type\":\"string\"},{\"name\":\"options\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"lvl2_record\",\"fields\":[{\"name\":\"item1_lvl2\",\"type\":\"string\"},{\"name\":\"item2_lvl2\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"lvl3_record\",\"fields\":[{\"name\":\"item1_lvl3\",\"type\":\"string\"},{\"name\":\"item2_lvl3\",\"type\":\"string\"}]}}}]}}}]}"
},
{
"name": "json_schema_record",
"description": "All Json Schema record related events gets captured in this topic",
"partitions": 128,
"retentionSize": 1931232624,
"replicationFactor":4,
"maximumMessageSize":249,
"cleanupPolicies": ["compact","delete"],
"schemaType": "JSON",
"schemaText": "{\"type\":\"object\",\"required\":[\"name\",\"age\",\"club_name\"],\"properties\":{\"name\":{\"type\":\"object\",\"required\":[\"first_name\",\"last_name\"],\"properties\":{\"first_name\":{\"type\":\"string\"},\"last_name\":{\"type\":\"string\"}}},\"age\":{\"type\":\"integer\"},\"club_name\":{\"type\":\"string\"}}}"
},
{
"name": "address_book",
"description": "All Protobuf record related events gets captured in this topic",
"partitions": 128,
"retentionSize": 1931232624,
"replicationFactor":4,
"maximumMessageSize":249,
"cleanupPolicies": ["compact","delete"],
"schemaType": "Protobuf",
"schemaText": "syntax = \"proto2\";\n\npackage tutorial;\n\nmessage Person {\n optional string name = 1;\n optional int32 id = 2;\n optional string email = 3;\n\n enum PhoneType {\n MOBILE = 0;\n HOME = 1;\n WORK = 2;\n }\n\n message PhoneNumber {\n optional string number = 1;\n optional PhoneType type = 2 [default = HOME];\n }\n\n repeated PhoneNumber phones = 4;\n}\n\nmessage AddressBook {\n repeated Person people = 1;\n}"
}
]
}

View File

@ -45,6 +45,8 @@ base_requirements = {
"boto3~=1.19.12",
"botocore==1.22.12",
"avro-python3==1.10.2",
"grpcio-tools",
"protobuf",
# compatibility requirements for 3.7
"typing-compat~=0.1.0",
"importlib-metadata~=4.12.0", # From airflow constraints

View File

@ -98,7 +98,10 @@ from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.client_utils import get_chart_entities_from_id
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.database_service import TableLocationLink
from metadata.parsers.avro_parser import get_avro_fields, parse_avro_schema
from metadata.parsers.schema_parsers import (
InvalidSchemaTypeException,
schema_parser_config_registry,
)
from metadata.utils import fqn
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
@ -672,7 +675,15 @@ class SampleDataSource(
topic["service"] = EntityReference(
id=self.kafka_service.id, type="messagingService"
)
parsed_schema = parse_avro_schema(topic["schemaText"])
schema_type = topic["schemaType"].lower()
load_parser_fn = schema_parser_config_registry.registry.get(schema_type)
if not load_parser_fn:
raise InvalidSchemaTypeException(
f"Cannot find {schema_type} in parser providers registry."
)
schema_fields = load_parser_fn(topic["name"], topic["schemaText"])
create_topic = CreateTopicRequest(
name=topic["name"],
description=topic["description"],
@ -684,7 +695,7 @@ class SampleDataSource(
messageSchema=Topic(
schemaText=topic["schemaText"],
schemaType=topic["schemaType"],
schemaFields=get_avro_fields(parsed_schema),
schemaFields=schema_fields,
),
service=EntityReference(
id=self.kafka_service.id, type="messagingService"

View File

@ -32,11 +32,15 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.schema import SchemaType
from metadata.generated.schema.type.schema import SchemaType, Topic
from metadata.ingestion.source.messaging.messaging_service import (
BrokerTopicDetails,
MessagingServiceSource,
)
from metadata.parsers.schema_parsers import (
InvalidSchemaTypeException,
schema_parser_config_registry,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -101,19 +105,26 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
self.add_properties_to_topic_from_resource(topic, topic_config_resource)
if topic_schema is not None:
topic.schemaText = topic_schema.schema_str
schema_type = topic_schema.schema_type.lower()
load_parser_fn = schema_parser_config_registry.registry.get(schema_type)
if not load_parser_fn:
raise InvalidSchemaTypeException(
f"Cannot find {schema_type} in parser providers registry."
)
schema_fields = load_parser_fn(
topic_details.topic_name, topic_schema.schema_str
)
topic.messageSchema = (
Topic(
schemaText=topic_schema.schema_str,
schemaType=SchemaType(topic_schema.schema_type).value,
schemaFields=schema_fields,
),
)
if topic_schema.schema_type.lower() == SchemaType.Avro.value.lower():
topic.schemaType = SchemaType.Avro.name
if self.generate_sample_data:
topic.sampleData = self._get_sample_data(topic.name)
elif (
topic_schema.schema_type.lower() == SchemaType.Protobuf.name.lower()
):
topic.schemaType = SchemaType.Protobuf.name
elif topic_schema.schema_type.lower() == SchemaType.JSON.name.lower():
topic.schemaType = SchemaType.JSON.name
else:
topic.schemaType = SchemaType.Other.name
self.status.topic_scanned(topic.name.__root__)
yield topic

View File

@ -17,7 +17,7 @@ import traceback
from typing import List, Optional
import avro.schema as avroschema
from avro.schema import ArraySchema, RecordSchema
from avro.schema import ArraySchema
from metadata.generated.schema.type.schema import FieldModel
from metadata.utils.logger import ingestion_logger
@ -25,13 +25,20 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def parse_avro_schema(schema: str) -> Optional[RecordSchema]:
def parse_avro_schema(schema: str) -> Optional[List[FieldModel]]:
"""
Method to parse the avro schema
"""
try:
parsed_schema = avroschema.parse(schema)
return parsed_schema
field_models = [
FieldModel(
name=parsed_schema.name,
dataType=str(parsed_schema.type).upper(),
children=get_avro_fields(parsed_schema),
)
]
return field_models
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Unable to parse the avro schema: {exc}")
@ -40,7 +47,7 @@ def parse_avro_schema(schema: str) -> Optional[RecordSchema]:
def get_avro_fields(parsed_schema) -> Optional[List[FieldModel]]:
"""
Recursively convert the parsed schema into required pydantic models
Recursively convert the parsed schema into required models
"""
field_models = []

View File

@ -15,35 +15,69 @@ Utils module to parse the jsonschema
import json
import traceback
from typing import Optional
from enum import Enum
from typing import List, Optional
from metadata.generated.schema.type.schema import FieldModel
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def parse_json_schema(schema_text: str) -> Optional[dict]:
class JsonSchemaDataTypes(Enum):
"""
Enum for Json Schema Datatypes
"""
STRING = "string"
FLOAT = "number"
INT = "integer"
BOOLEAN = "boolean"
NULL = "null"
RECORD = "object"
ARRAY = "array"
def parse_json_schema(schema_text: str) -> Optional[List[FieldModel]]:
"""
Method to parse the jsonschema
"""
try:
json_schema_data = json.loads(schema_text)
properties = json_schema_data.get("properties")
parsed_schema = {}
parsed_schema["name"] = json_schema_data.get("title")
parsed_schema["type"] = json_schema_data.get("type")
parsed_schema["fields"] = []
for key, value in properties.items():
field = {
"name": key,
"type": value.get("type"),
"description": value.get("description"),
}
parsed_schema["fields"].append(field)
return parsed_schema
field_models = [
FieldModel(
name=json_schema_data.get("title", "default"),
dataType=JsonSchemaDataTypes(json_schema_data.get("type")).name,
description=json_schema_data.get("description"),
children=get_json_schema_fields(json_schema_data.get("properties")),
)
]
return field_models
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Unable to parse the jsonschema: {exc}")
return None
def get_json_schema_fields(properties) -> Optional[List[FieldModel]]:
"""
Recursively convert the parsed schema into required models
"""
field_models = []
for key, value in properties.items():
try:
field_models.append(
FieldModel(
name=value.get("title", key),
dataType=JsonSchemaDataTypes(value.get("type")).name,
description=value.get("description"),
children=get_json_schema_fields(value.get("properties"))
if value.get("type") == "object"
else None,
)
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Unable to parse the json schema into models: {exc}")
return field_models

View File

@ -20,11 +20,12 @@ import sys
import traceback
from enum import Enum
from pathlib import Path
from typing import Optional
from typing import List, Optional
import grpc_tools.protoc
from pydantic import BaseModel
from metadata.generated.schema.type.schema import FieldModel
from metadata.utils.helpers import snake_to_camel
from metadata.utils.logger import ingestion_logger
@ -36,44 +37,36 @@ class ProtobufDataTypes(Enum):
Enum for Protobuf Datatypes
"""
# Field type unknown.
TYPE_UNKNOWN = 0
# Field type double.
TYPE_DOUBLE = 1
# Field type float.
TYPE_FLOAT = 2
# Field type int64.
TYPE_INT64 = 3
# Field type uint64.
TYPE_UINT64 = 4
# Field type int32.
TYPE_INT32 = 5
# Field type fixed64.
TYPE_FIXED64 = 6
# Field type fixed32.
TYPE_FIXED32 = 7
# Field type bool.
ERROR = 0
DOUBLE = 1
FLOAT = 2
INT = 3, 4, 5, 13, 17, 18
FIXED = 6, 7, 15, 16
TYPE_BOOL = 8
# Field type string.
TYPE_STRING = 9
# Field type group. Proto2 syntax only, and deprecated.
TYPE_GROUP = 10
# Field type message.
TYPE_MESSAGE = 11
# Field type bytes.
TYPE_BYTES = 12
# Field type uint32.
TYPE_UINT32 = 13
# Field type enum.
TYPE_ENUM = 14
# Field type sfixed32.
TYPE_SFIXED32 = 15
# Field type sfixed64.
TYPE_SFIXED64 = 16
# Field type sint32.
TYPE_SINT32 = 17
# Field type sint64.
TYPE_SINT64 = 18
STRING = 9
UNION = 10
RECORD = 11
BYTES = 12
ENUM = 14
def __new__(cls, *values):
obj = object.__new__(cls)
# first value is canonical value
obj._value_ = values[0]
for other_value in values[1:]:
cls._value2member_map_[other_value] = obj
obj._all_values = values
return obj
def __repr__(self):
value = ", ".join([repr(v) for v in self._all_values])
return (
f"<" # pylint: disable=no-member
f"{self.__class__.__name__,}"
f"{self._name_}"
f"{value}"
f">"
)
class ProtobufParserConfig(BaseModel):
@ -172,7 +165,7 @@ class ProtobufParser:
)
return None
def parse_protobuf_schema(self) -> Optional[dict]:
def parse_protobuf_schema(self) -> Optional[List[FieldModel]]:
"""
Method to parse the protobuf schema
"""
@ -183,28 +176,47 @@ class ProtobufParser:
proto_path=proto_path, file_path=file_path
)
# processing the object and parsing the schema
parsed_schema = {}
parsed_schema["name"] = instance.DESCRIPTOR.name
parsed_schema["full_name"] = instance.DESCRIPTOR.full_name
parsed_schema["fields"] = []
for field in instance.DESCRIPTOR.fields:
field_dict = {
"name": field.name,
"full_name": field.full_name,
"type": ProtobufDataTypes(field.type).name,
}
parsed_schema["fields"].append(field_dict)
field_models = [
FieldModel(
name=instance.DESCRIPTOR.name,
dataType="RECORD",
children=self.get_protobuf_fields(instance.DESCRIPTOR.fields),
)
]
# Clean up the tmp folder
if Path(self.config.base_file_path).exists():
shutil.rmtree(self.config.base_file_path)
return parsed_schema
return field_models
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to parse protobuf schema for {self.config.schema_name}: {exc}"
)
return None
def get_protobuf_fields(self, fields) -> Optional[List[FieldModel]]:
"""
Recursively convert the parsed schema into required models
"""
field_models = []
for field in fields:
try:
field_models.append(
FieldModel(
name=field.name,
dataType=ProtobufDataTypes(field.type).name,
children=self.get_protobuf_fields(field.message_type.fields)
if field.type == 11
else None,
)
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to parse the protobuf schema into models: {exc}"
)
return field_models

View File

@ -0,0 +1,60 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hosts the singledispatch to get the schema parsers
"""
from typing import List, Optional
from metadata.generated.schema.type.schema import FieldModel, SchemaType
from metadata.parsers.avro_parser import parse_avro_schema
from metadata.parsers.json_schema_parser import parse_json_schema
from metadata.parsers.protobuf_parser import ProtobufParser, ProtobufParserConfig
from metadata.utils.dispatch import enum_register
schema_parser_config_registry = enum_register()
class InvalidSchemaTypeException(Exception):
"""
Raised when we cannot find the provided schema type
"""
@schema_parser_config_registry.add(SchemaType.Avro.value.lower())
def load_avro_parser(
topic_name: str, schema_text: str # pylint: disable=unused-argument
) -> Optional[List[FieldModel]]:
return parse_avro_schema(schema_text)
@schema_parser_config_registry.add(SchemaType.Protobuf.value.lower())
def load_protobuf_parser(
topic_name: str, schema_text: str
) -> Optional[List[FieldModel]]:
protobuf_parser = ProtobufParser(
config=ProtobufParserConfig(schema_name=topic_name, schema_text=schema_text)
)
return protobuf_parser.parse_protobuf_schema()
@schema_parser_config_registry.add(SchemaType.JSON.value.lower())
def load_json_schema_parser(
topic_name: str, schema_text: str # pylint: disable=unused-argument
) -> Optional[List[FieldModel]]:
return parse_json_schema(schema_text)
@schema_parser_config_registry.add(SchemaType.Other.value.lower())
def load_other_schema_parser(
topic_name: str, schema_text: str # pylint: disable=unused-argument
) -> Optional[List[FieldModel]]:
return None

View File

@ -73,14 +73,15 @@ class AvroParserTests(TestCase):
parsed_schema = parse_avro_schema(sample_avro_schema)
def test_schema_name(self):
self.assertEqual(self.parsed_schema.name, "Order")
self.assertEqual(self.parsed_schema.namespace, "example.avro")
self.assertEqual(self.parsed_schema[0].name.__root__, "Order")
def test_schema_type(self):
self.assertEqual(self.parsed_schema.type, "record")
self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD")
def test_field_names(self):
field_names = {str(field.name) for field in self.parsed_schema.fields}
field_names = {
str(field.name.__root__) for field in self.parsed_schema[0].children
}
self.assertEqual(
field_names,
{
@ -98,5 +99,7 @@ class AvroParserTests(TestCase):
)
def test_field_types(self):
field_types = {str(field.type) for field in self.parsed_schema.fields}
self.assertEqual(field_types, {'"int"', '"string"', '"double"'})
field_types = {
str(field.dataType.name) for field in self.parsed_schema[0].children
}
self.assertEqual(field_types, {"INT", "STRING", "DOUBLE"})

View File

@ -47,22 +47,26 @@ class JsonSchemaParserTests(TestCase):
parsed_schema = parse_json_schema(sample_json_schema)
def test_schema_name(self):
self.assertEqual(self.parsed_schema["name"], "Person")
self.assertEqual(self.parsed_schema[0].name.__root__, "Person")
def test_schema_type(self):
self.assertEqual(self.parsed_schema["type"], "object")
self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD")
def test_field_names(self):
field_names = {str(field["name"]) for field in self.parsed_schema["fields"]}
field_names = {
str(field.name.__root__) for field in self.parsed_schema[0].children
}
self.assertEqual(field_names, {"firstName", "lastName", "age"})
def test_field_types(self):
field_types = {str(field["type"]) for field in self.parsed_schema["fields"]}
self.assertEqual(field_types, {"integer", "string"})
field_types = {
str(field.dataType.name) for field in self.parsed_schema[0].children
}
self.assertEqual(field_types, {"INT", "STRING"})
def test_field_descriptions(self):
field_descriptions = {
str(field["description"]) for field in self.parsed_schema["fields"]
str(field.description.__root__) for field in self.parsed_schema[0].children
}
self.assertEqual(
field_descriptions,

View File

@ -27,15 +27,12 @@ class ProtobufParserTests(TestCase):
sample_protobuf_schema = """
syntax = "proto3";
package persons;
enum Gender {
M = 0; // male
F = 1; // female
O = 2; // other
}
message PersonInfo {
int32 age = 1; // age in years
Gender gender = 2;
@ -51,26 +48,19 @@ class ProtobufParserTests(TestCase):
parsed_schema = protobuf_parser.parse_protobuf_schema()
def test_schema_name(self):
self.assertEqual(self.parsed_schema["name"], "PersonInfo")
self.assertEqual(self.parsed_schema["full_name"], "persons.PersonInfo")
self.assertEqual(self.parsed_schema[0].name.__root__, "PersonInfo")
def test_schema_type(self):
self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD")
def test_field_names(self):
field_names = {str(field["name"]) for field in self.parsed_schema["fields"]}
field_names = {
str(field.name.__root__) for field in self.parsed_schema[0].children
}
self.assertEqual(field_names, {"height", "gender", "age"})
def test_field_full_names(self):
field_full_names = {
str(field["full_name"]) for field in self.parsed_schema["fields"]
}
self.assertEqual(
field_full_names,
{
"persons.PersonInfo.height",
"persons.PersonInfo.gender",
"persons.PersonInfo.age",
},
)
def test_field_types(self):
field_types = {str(field["type"]) for field in self.parsed_schema["fields"]}
self.assertEqual(field_types, {"TYPE_INT32", "TYPE_ENUM"})
field_types = {
str(field.dataType.name) for field in self.parsed_schema[0].children
}
self.assertEqual(field_types, {"INT", "ENUM"})

View File

@ -33,9 +33,6 @@ from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.interfaces.datalake.datalake_test_suite_interface import (
DataLakeTestSuiteInterface,
)
from metadata.interfaces.sqalchemy.sqa_test_suite_interface import SQATestSuiteInterface
from metadata.test_suite.validations.core import validation_enum_registry