Fix #15454: Added protobuf parser complex schema support (#16071)

* Added protobuf parser complex schema support

* Added options keyword in proto testing
This commit is contained in:
Onkar Ravgan 2024-04-30 17:59:27 +05:30 committed by GitHub
parent 91f930d4c4
commit 87c8254c38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 175 additions and 3 deletions

View File

@ -47,6 +47,7 @@ from metadata.parsers.schema_parsers import (
)
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.messaging_utils import merge_and_clean_protobuf_schema
logger = ingestion_logger()
@ -75,6 +76,7 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
self.generate_sample_data = self.config.sourceConfig.config.generateSampleData
self.admin_client = self.connection.admin_client
self.schema_registry_client = self.connection.schema_registry_client
self.context.processed_schemas = {}
if self.generate_sample_data:
self.consumer_client = self.connection.consumer_client
@ -125,9 +127,14 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
raise InvalidSchemaTypeException(
f"Cannot find {schema_type} in parser providers registry."
)
schema_fields = load_parser_fn(
topic_details.topic_name, topic_schema.schema_str
schema_text = topic_schema.schema_str
# In protobuf schema, we need to merge all the schema text with references
if schema_type == SchemaType.Protobuf.value.lower():
schema_text = merge_and_clean_protobuf_schema(
self._get_schema_text_with_references(schema=topic_schema)
)
schema_fields = load_parser_fn(topic_details.topic_name, schema_text)
topic.messageSchema = Topic(
schemaText=topic_schema.schema_str,
@ -195,6 +202,38 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
f"Exception adding properties to topic [{topic.name}]: {exc}"
)
def _get_schema_text_with_references(self, schema) -> Optional[str]:
"""
Returns the schema text with references resolved using recursive calls
"""
try:
if schema:
schema_text = schema.schema_str
for reference in schema.references or []:
if not self.context.processed_schemas.get(reference.name):
self.context.processed_schemas[reference.name] = True
reference_schema = (
self.schema_registry_client.get_latest_version(
reference.name
)
)
if reference_schema.schema.references:
schema_text = (
schema_text
+ self._get_schema_text_with_references(
reference_schema.schema
)
)
else:
schema_text = (
schema_text + reference_schema.schema.schema_str
)
return schema_text
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to get schema with references: {exc}")
return None
def _parse_topic_metadata(self, topic_name: str) -> Optional[Schema]:
try:
if self.schema_registry_client:

View File

@ -0,0 +1,43 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helpers module for messaging sources
"""
import re
import traceback
from typing import Optional
from metadata.utils.logger import utils_logger
logger = utils_logger()
def merge_and_clean_protobuf_schema(schema_text: Optional[str]) -> Optional[str]:
"""
Remove the import and extra syntax lines for a schema with references
"""
try:
lines = schema_text.splitlines() if schema_text else []
new_lines = []
for i, line in enumerate(lines):
if not re.search(r'import ".*";', line) and not re.search(
r"option .*;", line
):
if re.search(r'\s*syntax\s*=\s*"proto\d+";\s*', line) and i != 0:
continue
new_lines.append(line)
return "\n".join(new_lines)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to merge and clean protobuf schema: {exc}")
return None

View File

@ -0,0 +1,9 @@
syntax = "proto3";
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
string country = 5;
}

View File

@ -0,0 +1,10 @@
syntax = "proto3";
import "address.proto";
import "contact.proto";
message Company {
string name = 1;
Address address = 2;
Contact contact = 3;
}

View File

@ -0,0 +1,9 @@
syntax = "proto3";
option java_package = "group.org.prod.schemas.contact";
option java_outer_classname = "ContactProto";
message Contact {
string email = 1;
string phone = 2;
}

View File

@ -0,0 +1,8 @@
syntax = "proto3";
import "company.proto";
message Department {
string name = 1;
Company company = 2;
}

View File

@ -0,0 +1,14 @@
syntax = "proto3";
import "team.proto";
import "contact.proto";
option java_package = "group.org.prod.schemas.customers";
option java_outer_classname = "EmployeeProto";
message Employee {
string first_name = 1;
string last_name = 2;
Team team = 3;
Contact contact = 4;
}

View File

@ -0,0 +1,8 @@
syntax = "proto3";
import "department.proto";
message Team {
string name = 1;
Department department = 2;
}

View File

@ -13,10 +13,12 @@
Protobuf parser tests
"""
import os
from unittest import TestCase
from metadata.generated.schema.entity.data.table import Column
from metadata.parsers.protobuf_parser import ProtobufParser, ProtobufParserConfig
from metadata.utils.messaging_utils import merge_and_clean_protobuf_schema
class ProtobufParserTests(TestCase):
@ -94,3 +96,33 @@ class ProtobufParserTests(TestCase):
parsed_schema = self.protobuf_parser.parse_protobuf_schema(cls=Column)
field_types = {str(field.dataType.name) for field in parsed_schema[0].children}
self.assertEqual(field_types, {"INT", "ENUM", "RECORD", "STRING", "BOOLEAN"})
def test_complex_protobuf_schema_files(self):
"""
We'll read the files under ./ingestion/tests/unit/resources/protobuf_parser and parse them
This will be similar in way to how we get the data from kafka source
"""
resource_path = "./ingestion/tests/unit/resources/protobuf_parser/"
schema_name = "employee"
file_list = os.listdir(resource_path)
schema_text = ""
for file_name in file_list:
file_path = os.path.join(resource_path, file_name)
with open(file_path, "r") as file:
schema_text = schema_text + file.read()
schema_text = merge_and_clean_protobuf_schema(schema_text)
protobuf_parser = ProtobufParser(
config=ProtobufParserConfig(
schema_name=schema_name, schema_text=schema_text
)
)
parsed_schema = protobuf_parser.parse_protobuf_schema()
self.assertEqual(parsed_schema[0].name.__root__, "Employee")
self.assertEqual(len(parsed_schema[0].children), 4)
self.assertEqual(parsed_schema[0].children[3].name.__root__, "contact")
self.assertEqual(
parsed_schema[0].children[3].children[0].name.__root__, "email"
)
self.assertEqual(
parsed_schema[0].children[3].children[1].name.__root__, "phone"
)