Fix:#8553 Parse Avro/Protobuf/Json schemas (#8654)

* Added topic parsers

* Fixed pylint

* Addressed review comments

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-11-11 16:35:09 +05:30 committed by GitHub
parent 03c1c005cf
commit eee3f9ffec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 564 additions and 2 deletions

View File

@ -90,9 +90,23 @@ plugins: Dict[str, Set[str]] = {
"thrift-sasl==0.4.3",
"presto-types-parser==0.0.2",
},
"kafka": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"},
"kafka": {
"confluent_kafka==1.8.2",
"fastavro>=1.2.0",
"avro-python3",
"avro",
"grpcio-tools",
"protobuf",
},
"kinesis": {"boto3~=1.19.12"},
"redpanda": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"},
"redpanda": {
"confluent_kafka==1.8.2",
"fastavro>=1.2.0",
"avro-python3",
"avro",
"grpcio-tools",
"protobuf",
},
"ldap-users": {"ldap3==2.9.1"},
"looker": {"looker-sdk>=22.4.0"},
"mssql": {"sqlalchemy-pytds>=0.3"},

View File

@ -0,0 +1,37 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils module to parse the avro schema
"""
import traceback
from typing import Optional
import avro.schema as avroschema
from avro.schema import RecordSchema
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def parse_avro_schema(schema: str) -> Optional[RecordSchema]:
"""
Method to parse the avro schema
"""
try:
parsed_schema = avroschema.parse(schema)
return parsed_schema
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Unable to parse the avro schema: {exc}")
return None

View File

@ -0,0 +1,49 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils module to parse the jsonschema
"""
import json
import traceback
from typing import Optional
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def parse_json_schema(schema_text: str) -> Optional[dict]:
"""
Method to parse the jsonschema
"""
try:
json_schema_data = json.loads(schema_text)
properties = json_schema_data.get("properties")
parsed_schema = {}
parsed_schema["name"] = json_schema_data.get("title")
parsed_schema["type"] = json_schema_data.get("type")
parsed_schema["fields"] = []
for key, value in properties.items():
field = {
"name": key,
"type": value.get("type"),
"description": value.get("description"),
}
parsed_schema["fields"].append(field)
return parsed_schema
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Unable to parse the jsonschema: {exc}")
return None

View File

@ -0,0 +1,210 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utils module to parse the protobuf schema
"""
import glob
import importlib
import shutil
import sys
import traceback
from enum import Enum
from pathlib import Path
from typing import Optional
import grpc_tools.protoc
from pydantic import BaseModel
from metadata.utils.helpers import snake_to_camel
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class ProtobufDataTypes(Enum):
"""
Enum for Protobuf Datatypes
"""
# Field type unknown.
TYPE_UNKNOWN = 0
# Field type double.
TYPE_DOUBLE = 1
# Field type float.
TYPE_FLOAT = 2
# Field type int64.
TYPE_INT64 = 3
# Field type uint64.
TYPE_UINT64 = 4
# Field type int32.
TYPE_INT32 = 5
# Field type fixed64.
TYPE_FIXED64 = 6
# Field type fixed32.
TYPE_FIXED32 = 7
# Field type bool.
TYPE_BOOL = 8
# Field type string.
TYPE_STRING = 9
# Field type group. Proto2 syntax only, and deprecated.
TYPE_GROUP = 10
# Field type message.
TYPE_MESSAGE = 11
# Field type bytes.
TYPE_BYTES = 12
# Field type uint32.
TYPE_UINT32 = 13
# Field type enum.
TYPE_ENUM = 14
# Field type sfixed32.
TYPE_SFIXED32 = 15
# Field type sfixed64.
TYPE_SFIXED64 = 16
# Field type sint32.
TYPE_SINT32 = 17
# Field type sint64.
TYPE_SINT64 = 18
class ProtobufParserConfig(BaseModel):
"""
Protobuf Parser Config class
:param schema_name: Name of protobuf schema
:param schema_text: Protobuf schema definition in text format
:param base_file_path: A temporary directory will be created under this path for
generating the files required for protobuf parsing and compiling. By default
the directory will be created under "/tmp/protobuf_openmetadata" unless it is
specified in the parameter.
"""
schema_name: str
schema_text: str
base_file_path: Optional[str] = "/tmp/protobuf_openmetadata"
class ProtobufParser:
"""
Protobuf Parser class
"""
config: ProtobufParserConfig
def __init__(self, config):
self.config = config
self.proto_interface_dir = f"{self.config.base_file_path}/interfaces"
self.generated_src_dir = f"{self.config.base_file_path}/generated/"
def load_module(self, module):
"""
Get the python module from path
"""
module_path = module
return __import__(module_path, fromlist=[module])
def create_proto_files(self):
"""
Method to generate the protobuf directory and file structure
"""
try:
# Create a temporary directory for saving all the files if not already present
generated_src_dir_path = Path(self.generated_src_dir)
generated_src_dir_path.mkdir(parents=True, exist_ok=True)
proto_interface_dir_path = Path(self.proto_interface_dir)
proto_interface_dir_path.mkdir(parents=True, exist_ok=True)
# Create a .proto file under the interfaces directory with schema text
file_path = f"{self.proto_interface_dir}/{self.config.schema_name}.proto"
with open(file_path, "w", encoding="UTF-8") as file:
file.write(self.config.schema_text)
proto_path = "generated=" + self.proto_interface_dir
return proto_path, file_path
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to create protobuf directory structure for {self.config.schema_name}: {exc}"
)
return None
def get_protobuf_python_object(self, proto_path: str, file_path: str):
"""
Method to create protobuf python module and get object
"""
try:
# compile the .proto file and create python class
grpc_tools.protoc.main(
[
"protoc",
file_path,
f"--proto_path={proto_path}",
f"--python_out={self.config.base_file_path}",
]
)
# import the python file
sys.path.append(self.generated_src_dir)
generated_src_dir_path = Path(self.generated_src_dir)
py_file = glob.glob(
str(
generated_src_dir_path.joinpath(f"{self.config.schema_name}_pb2.py")
)
)[0]
module_name = Path(py_file).stem
message = importlib.import_module(module_name)
# get the class and create a object instance
class_ = getattr(message, snake_to_camel(self.config.schema_name))
instance = class_()
return instance
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to create protobuf python module for {self.config.schema_name}: {exc}"
)
return None
def parse_protobuf_schema(self) -> Optional[dict]:
"""
Method to parse the protobuf schema
"""
try:
proto_path, file_path = self.create_proto_files()
instance = self.get_protobuf_python_object(
proto_path=proto_path, file_path=file_path
)
# processing the object and parsing the schema
parsed_schema = {}
parsed_schema["name"] = instance.DESCRIPTOR.name
parsed_schema["full_name"] = instance.DESCRIPTOR.full_name
parsed_schema["fields"] = []
for field in instance.DESCRIPTOR.fields:
field_dict = {
"name": field.name,
"full_name": field.full_name,
"type": ProtobufDataTypes(field.type).name,
}
parsed_schema["fields"].append(field_dict)
# Clean up the tmp folder
if Path(self.config.base_file_path).exists():
shutil.rmtree(self.config.base_file_path)
return parsed_schema
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to parse protobuf schema for {self.config.schema_name}: {exc}"
)
return None

View File

@ -0,0 +1,102 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Avro parser tests
"""
from unittest import TestCase
from metadata.parsers.avro_parser import parse_avro_schema
class AvroParserTests(TestCase):
"""
Check methods from avro_parser.py
"""
sample_avro_schema = """{
"namespace": "example.avro",
"type": "record",
"name": "Order",
"fields": [
{
"name": "order_id",
"type": "int"
},
{
"name": "api_client_id",
"type": "int"
},
{
"name": "billing_address_id",
"type": "int"
},
{
"name": "customer_id",
"type": "int"
},
{
"name": "location_id",
"type": "int"
},
{
"name": "shipping_address_id",
"type": "int"
},
{
"name": "user_id",
"type": "int"
},
{
"name": "total_price",
"type": "double"
},
{
"name": "discount_code",
"type": "string"
},
{
"name": "processed_at",
"type": "int"
}
]
}"""
parsed_schema = parse_avro_schema(sample_avro_schema)
def test_schema_name(self):
self.assertEqual(self.parsed_schema.name, "Order")
self.assertEqual(self.parsed_schema.namespace, "example.avro")
def test_schema_type(self):
self.assertEqual(self.parsed_schema.type, "record")
def test_field_names(self):
field_names = {str(field.name) for field in self.parsed_schema.fields}
self.assertEqual(
field_names,
{
"api_client_id",
"user_id",
"order_id",
"discount_code",
"location_id",
"processed_at",
"total_price",
"shipping_address_id",
"billing_address_id",
"customer_id",
},
)
def test_field_types(self):
field_types = {str(field.type) for field in self.parsed_schema.fields}
self.assertEqual(field_types, {'"int"', '"string"', '"double"'})

View File

@ -0,0 +1,74 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Jsonschema parser tests
"""
from unittest import TestCase
from metadata.parsers.json_schema_parser import parse_json_schema
class JsonSchemaParserTests(TestCase):
"""
Check methods from json_schema_parser.py
"""
sample_json_schema = """{
"$id": "https://example.com/person.schema.json",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Person",
"type": "object",
"properties": {
"firstName": {
"type": "string",
"description": "The person's first name."
},
"lastName": {
"type": "string",
"description": "The person's last name."
},
"age": {
"description": "Age in years which must be equal to or greater than zero.",
"type": "integer",
"minimum": 0
}
}
}"""
parsed_schema = parse_json_schema(sample_json_schema)
def test_schema_name(self):
self.assertEqual(self.parsed_schema["name"], "Person")
def test_schema_type(self):
self.assertEqual(self.parsed_schema["type"], "object")
def test_field_names(self):
field_names = {str(field["name"]) for field in self.parsed_schema["fields"]}
self.assertEqual(field_names, {"firstName", "lastName", "age"})
def test_field_types(self):
field_types = {str(field["type"]) for field in self.parsed_schema["fields"]}
self.assertEqual(field_types, {"integer", "string"})
def test_field_descriptions(self):
field_descriptions = {
str(field["description"]) for field in self.parsed_schema["fields"]
}
self.assertEqual(
field_descriptions,
{
"The person's first name.",
"The person's last name.",
"Age in years which must be equal to or greater than zero.",
},
)

View File

@ -0,0 +1,76 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Protobuf parser tests
"""
from unittest import TestCase
from metadata.parsers.protobuf_parser import ProtobufParser, ProtobufParserConfig
class ProtobufParserTests(TestCase):
"""
Check methods from protobuf_parser.py
"""
schema_name = "person_info"
sample_protobuf_schema = """
syntax = "proto3";
package persons;
enum Gender {
M = 0; // male
F = 1; // female
O = 2; // other
}
message PersonInfo {
int32 age = 1; // age in years
Gender gender = 2;
int32 height = 3; // height in cm
}
"""
protobuf_parser = ProtobufParser(
config=ProtobufParserConfig(
schema_name=schema_name, schema_text=sample_protobuf_schema
)
)
parsed_schema = protobuf_parser.parse_protobuf_schema()
def test_schema_name(self):
self.assertEqual(self.parsed_schema["name"], "PersonInfo")
self.assertEqual(self.parsed_schema["full_name"], "persons.PersonInfo")
def test_field_names(self):
field_names = {str(field["name"]) for field in self.parsed_schema["fields"]}
self.assertEqual(field_names, {"height", "gender", "age"})
def test_field_full_names(self):
field_full_names = {
str(field["full_name"]) for field in self.parsed_schema["fields"]
}
self.assertEqual(
field_full_names,
{
"persons.PersonInfo.height",
"persons.PersonInfo.gender",
"persons.PersonInfo.age",
},
)
def test_field_types(self):
field_types = {str(field["type"]) for field in self.parsed_schema["fields"]}
self.assertEqual(field_types, {"TYPE_INT32", "TYPE_ENUM"})