From eee3f9ffec453ce9b33b283fff6d8bc14f985243 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Fri, 11 Nov 2022 16:35:09 +0530 Subject: [PATCH] Fix:#8553 Parse Avro/Protobuf/Json schemas (#8654) * Added topic parsers * Fixed pylint * Addressed review comments Co-authored-by: Onkar Ravgan --- ingestion/setup.py | 18 +- ingestion/src/metadata/parsers/__init__.py | 0 ingestion/src/metadata/parsers/avro_parser.py | 37 +++ .../metadata/parsers/json_schema_parser.py | 49 ++++ .../src/metadata/parsers/protobuf_parser.py | 210 ++++++++++++++++++ ingestion/tests/unit/test_avro_parser.py | 102 +++++++++ .../tests/unit/test_json_schema_parser.py | 74 ++++++ ingestion/tests/unit/test_protobuf_parser.py | 76 +++++++ 8 files changed, 564 insertions(+), 2 deletions(-) create mode 100644 ingestion/src/metadata/parsers/__init__.py create mode 100644 ingestion/src/metadata/parsers/avro_parser.py create mode 100644 ingestion/src/metadata/parsers/json_schema_parser.py create mode 100644 ingestion/src/metadata/parsers/protobuf_parser.py create mode 100644 ingestion/tests/unit/test_avro_parser.py create mode 100644 ingestion/tests/unit/test_json_schema_parser.py create mode 100644 ingestion/tests/unit/test_protobuf_parser.py diff --git a/ingestion/setup.py b/ingestion/setup.py index bdbdb714697..534ee46a6c9 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -90,9 +90,23 @@ plugins: Dict[str, Set[str]] = { "thrift-sasl==0.4.3", "presto-types-parser==0.0.2", }, - "kafka": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"}, + "kafka": { + "confluent_kafka==1.8.2", + "fastavro>=1.2.0", + "avro-python3", + "avro", + "grpcio-tools", + "protobuf", + }, "kinesis": {"boto3~=1.19.12"}, - "redpanda": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"}, + "redpanda": { + "confluent_kafka==1.8.2", + "fastavro>=1.2.0", + "avro-python3", + "avro", + "grpcio-tools", + "protobuf", + }, "ldap-users": {"ldap3==2.9.1"}, "looker": {"looker-sdk>=22.4.0"}, "mssql": {"sqlalchemy-pytds>=0.3"}, diff --git a/ingestion/src/metadata/parsers/__init__.py b/ingestion/src/metadata/parsers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/parsers/avro_parser.py b/ingestion/src/metadata/parsers/avro_parser.py new file mode 100644 index 00000000000..1888cafdc0c --- /dev/null +++ b/ingestion/src/metadata/parsers/avro_parser.py @@ -0,0 +1,37 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utils module to parse the avro schema +""" + +import traceback +from typing import Optional + +import avro.schema as avroschema +from avro.schema import RecordSchema + +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +def parse_avro_schema(schema: str) -> Optional[RecordSchema]: + """ + Method to parse the avro schema + """ + try: + parsed_schema = avroschema.parse(schema) + return parsed_schema + except Exception as exc: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.warning(f"Unable to parse the avro schema: {exc}") + return None diff --git a/ingestion/src/metadata/parsers/json_schema_parser.py b/ingestion/src/metadata/parsers/json_schema_parser.py new file mode 100644 index 00000000000..e9bc7aee3db --- /dev/null +++ b/ingestion/src/metadata/parsers/json_schema_parser.py @@ -0,0 +1,49 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utils module to parse the jsonschema +""" + +import json +import traceback +from typing import Optional + +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +def parse_json_schema(schema_text: str) -> Optional[dict]: + """ + Method to parse the jsonschema + """ + try: + json_schema_data = json.loads(schema_text) + properties = json_schema_data.get("properties") + parsed_schema = {} + parsed_schema["name"] = json_schema_data.get("title") + parsed_schema["type"] = json_schema_data.get("type") + parsed_schema["fields"] = [] + + for key, value in properties.items(): + field = { + "name": key, + "type": value.get("type"), + "description": value.get("description"), + } + parsed_schema["fields"].append(field) + + return parsed_schema + except Exception as exc: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.warning(f"Unable to parse the jsonschema: {exc}") + return None diff --git a/ingestion/src/metadata/parsers/protobuf_parser.py b/ingestion/src/metadata/parsers/protobuf_parser.py new file mode 100644 index 00000000000..b4486c1b46e --- /dev/null +++ b/ingestion/src/metadata/parsers/protobuf_parser.py @@ -0,0 +1,210 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utils module to parse the protobuf schema +""" + +import glob +import importlib +import shutil +import sys +import traceback +from enum import Enum +from pathlib import Path +from typing import Optional + +import grpc_tools.protoc +from pydantic import BaseModel + +from metadata.utils.helpers import snake_to_camel +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class ProtobufDataTypes(Enum): + """ + Enum for Protobuf Datatypes + """ + + # Field type unknown. + TYPE_UNKNOWN = 0 + # Field type double. + TYPE_DOUBLE = 1 + # Field type float. + TYPE_FLOAT = 2 + # Field type int64. + TYPE_INT64 = 3 + # Field type uint64. + TYPE_UINT64 = 4 + # Field type int32. + TYPE_INT32 = 5 + # Field type fixed64. + TYPE_FIXED64 = 6 + # Field type fixed32. + TYPE_FIXED32 = 7 + # Field type bool. + TYPE_BOOL = 8 + # Field type string. + TYPE_STRING = 9 + # Field type group. Proto2 syntax only, and deprecated. + TYPE_GROUP = 10 + # Field type message. + TYPE_MESSAGE = 11 + # Field type bytes. + TYPE_BYTES = 12 + # Field type uint32. + TYPE_UINT32 = 13 + # Field type enum. + TYPE_ENUM = 14 + # Field type sfixed32. + TYPE_SFIXED32 = 15 + # Field type sfixed64. + TYPE_SFIXED64 = 16 + # Field type sint32. + TYPE_SINT32 = 17 + # Field type sint64. + TYPE_SINT64 = 18 + + +class ProtobufParserConfig(BaseModel): + """ + Protobuf Parser Config class + :param schema_name: Name of protobuf schema + :param schema_text: Protobuf schema definition in text format + :param base_file_path: A temporary directory will be created under this path for + generating the files required for protobuf parsing and compiling. By default + the directory will be created under "/tmp/protobuf_openmetadata" unless it is + specified in the parameter. + """ + + schema_name: str + schema_text: str + base_file_path: Optional[str] = "/tmp/protobuf_openmetadata" + + +class ProtobufParser: + """ + Protobuf Parser class + """ + + config: ProtobufParserConfig + + def __init__(self, config): + self.config = config + self.proto_interface_dir = f"{self.config.base_file_path}/interfaces" + self.generated_src_dir = f"{self.config.base_file_path}/generated/" + + def load_module(self, module): + """ + Get the python module from path + """ + module_path = module + return __import__(module_path, fromlist=[module]) + + def create_proto_files(self): + """ + Method to generate the protobuf directory and file structure + """ + try: + # Create a temporary directory for saving all the files if not already present + generated_src_dir_path = Path(self.generated_src_dir) + generated_src_dir_path.mkdir(parents=True, exist_ok=True) + proto_interface_dir_path = Path(self.proto_interface_dir) + proto_interface_dir_path.mkdir(parents=True, exist_ok=True) + + # Create a .proto file under the interfaces directory with schema text + file_path = f"{self.proto_interface_dir}/{self.config.schema_name}.proto" + with open(file_path, "w", encoding="UTF-8") as file: + file.write(self.config.schema_text) + proto_path = "generated=" + self.proto_interface_dir + return proto_path, file_path + except Exception as exc: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.warning( + f"Unable to create protobuf directory structure for {self.config.schema_name}: {exc}" + ) + return None + + def get_protobuf_python_object(self, proto_path: str, file_path: str): + """ + Method to create protobuf python module and get object + """ + try: + # compile the .proto file and create python class + grpc_tools.protoc.main( + [ + "protoc", + file_path, + f"--proto_path={proto_path}", + f"--python_out={self.config.base_file_path}", + ] + ) + + # import the python file + sys.path.append(self.generated_src_dir) + generated_src_dir_path = Path(self.generated_src_dir) + py_file = glob.glob( + str( + generated_src_dir_path.joinpath(f"{self.config.schema_name}_pb2.py") + ) + )[0] + module_name = Path(py_file).stem + message = importlib.import_module(module_name) + + # get the class and create a object instance + class_ = getattr(message, snake_to_camel(self.config.schema_name)) + instance = class_() + return instance + except Exception as exc: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.warning( + f"Unable to create protobuf python module for {self.config.schema_name}: {exc}" + ) + return None + + def parse_protobuf_schema(self) -> Optional[dict]: + """ + Method to parse the protobuf schema + """ + + try: + proto_path, file_path = self.create_proto_files() + instance = self.get_protobuf_python_object( + proto_path=proto_path, file_path=file_path + ) + + # processing the object and parsing the schema + parsed_schema = {} + parsed_schema["name"] = instance.DESCRIPTOR.name + parsed_schema["full_name"] = instance.DESCRIPTOR.full_name + parsed_schema["fields"] = [] + + for field in instance.DESCRIPTOR.fields: + field_dict = { + "name": field.name, + "full_name": field.full_name, + "type": ProtobufDataTypes(field.type).name, + } + parsed_schema["fields"].append(field_dict) + + # Clean up the tmp folder + if Path(self.config.base_file_path).exists(): + shutil.rmtree(self.config.base_file_path) + + return parsed_schema + except Exception as exc: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.warning( + f"Unable to parse protobuf schema for {self.config.schema_name}: {exc}" + ) + return None diff --git a/ingestion/tests/unit/test_avro_parser.py b/ingestion/tests/unit/test_avro_parser.py new file mode 100644 index 00000000000..568fadf5aca --- /dev/null +++ b/ingestion/tests/unit/test_avro_parser.py @@ -0,0 +1,102 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Avro parser tests +""" +from unittest import TestCase + +from metadata.parsers.avro_parser import parse_avro_schema + + +class AvroParserTests(TestCase): + """ + Check methods from avro_parser.py + """ + + sample_avro_schema = """{ + "namespace": "example.avro", + "type": "record", + "name": "Order", + "fields": [ + { + "name": "order_id", + "type": "int" + }, + { + "name": "api_client_id", + "type": "int" + }, + { + "name": "billing_address_id", + "type": "int" + }, + { + "name": "customer_id", + "type": "int" + }, + { + "name": "location_id", + "type": "int" + }, + { + "name": "shipping_address_id", + "type": "int" + }, + { + "name": "user_id", + "type": "int" + }, + { + "name": "total_price", + "type": "double" + }, + { + "name": "discount_code", + "type": "string" + }, + { + "name": "processed_at", + "type": "int" + } + ] + }""" + + parsed_schema = parse_avro_schema(sample_avro_schema) + + def test_schema_name(self): + self.assertEqual(self.parsed_schema.name, "Order") + self.assertEqual(self.parsed_schema.namespace, "example.avro") + + def test_schema_type(self): + self.assertEqual(self.parsed_schema.type, "record") + + def test_field_names(self): + field_names = {str(field.name) for field in self.parsed_schema.fields} + self.assertEqual( + field_names, + { + "api_client_id", + "user_id", + "order_id", + "discount_code", + "location_id", + "processed_at", + "total_price", + "shipping_address_id", + "billing_address_id", + "customer_id", + }, + ) + + def test_field_types(self): + field_types = {str(field.type) for field in self.parsed_schema.fields} + self.assertEqual(field_types, {'"int"', '"string"', '"double"'}) diff --git a/ingestion/tests/unit/test_json_schema_parser.py b/ingestion/tests/unit/test_json_schema_parser.py new file mode 100644 index 00000000000..aedcbec602e --- /dev/null +++ b/ingestion/tests/unit/test_json_schema_parser.py @@ -0,0 +1,74 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Jsonschema parser tests +""" +from unittest import TestCase + +from metadata.parsers.json_schema_parser import parse_json_schema + + +class JsonSchemaParserTests(TestCase): + """ + Check methods from json_schema_parser.py + """ + + sample_json_schema = """{ + "$id": "https://example.com/person.schema.json", + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "Person", + "type": "object", + "properties": { + "firstName": { + "type": "string", + "description": "The person's first name." + }, + "lastName": { + "type": "string", + "description": "The person's last name." + }, + "age": { + "description": "Age in years which must be equal to or greater than zero.", + "type": "integer", + "minimum": 0 + } + } + }""" + + parsed_schema = parse_json_schema(sample_json_schema) + + def test_schema_name(self): + self.assertEqual(self.parsed_schema["name"], "Person") + + def test_schema_type(self): + self.assertEqual(self.parsed_schema["type"], "object") + + def test_field_names(self): + field_names = {str(field["name"]) for field in self.parsed_schema["fields"]} + self.assertEqual(field_names, {"firstName", "lastName", "age"}) + + def test_field_types(self): + field_types = {str(field["type"]) for field in self.parsed_schema["fields"]} + self.assertEqual(field_types, {"integer", "string"}) + + def test_field_descriptions(self): + field_descriptions = { + str(field["description"]) for field in self.parsed_schema["fields"] + } + self.assertEqual( + field_descriptions, + { + "The person's first name.", + "The person's last name.", + "Age in years which must be equal to or greater than zero.", + }, + ) diff --git a/ingestion/tests/unit/test_protobuf_parser.py b/ingestion/tests/unit/test_protobuf_parser.py new file mode 100644 index 00000000000..42e3ec527ed --- /dev/null +++ b/ingestion/tests/unit/test_protobuf_parser.py @@ -0,0 +1,76 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Protobuf parser tests +""" + +from unittest import TestCase + +from metadata.parsers.protobuf_parser import ProtobufParser, ProtobufParserConfig + + +class ProtobufParserTests(TestCase): + """ + Check methods from protobuf_parser.py + """ + + schema_name = "person_info" + + sample_protobuf_schema = """ + syntax = "proto3"; + + package persons; + + enum Gender { + M = 0; // male + F = 1; // female + O = 2; // other + } + + message PersonInfo { + int32 age = 1; // age in years + Gender gender = 2; + int32 height = 3; // height in cm + } + """ + + protobuf_parser = ProtobufParser( + config=ProtobufParserConfig( + schema_name=schema_name, schema_text=sample_protobuf_schema + ) + ) + parsed_schema = protobuf_parser.parse_protobuf_schema() + + def test_schema_name(self): + self.assertEqual(self.parsed_schema["name"], "PersonInfo") + self.assertEqual(self.parsed_schema["full_name"], "persons.PersonInfo") + + def test_field_names(self): + field_names = {str(field["name"]) for field in self.parsed_schema["fields"]} + self.assertEqual(field_names, {"height", "gender", "age"}) + + def test_field_full_names(self): + field_full_names = { + str(field["full_name"]) for field in self.parsed_schema["fields"] + } + self.assertEqual( + field_full_names, + { + "persons.PersonInfo.height", + "persons.PersonInfo.gender", + "persons.PersonInfo.age", + }, + ) + + def test_field_types(self): + field_types = {str(field["type"]) for field in self.parsed_schema["fields"]} + self.assertEqual(field_types, {"TYPE_INT32", "TYPE_ENUM"})