Fix #10429: Kafka Sample data improvements and adding support for JSONSchema and Protobuf (#10430)

* Fix #10429: Kafka Sample data improvements and adding support for JSONSchema and Protobuf

* Fix #10429: Kafka Sample data improvements and adding support for JSONSchema and Protobuf

* Fix #10429: Kafka Sample data improvements and adding support for JSONSchema and Protobuf

* Fix #10429: Kafka Sample data improvements and adding support for JSONSchema and Protobuf

* Added top level parsing and unit tests

* fix(ui): show schemaText and fields both

* fix no data placeholder for fields & schema text

* addressing comments

* fixed py checkstyle

---------

Co-authored-by: Onkar Ravgan <onkar.10r@gmail.com>
Co-authored-by: Chirag Madlani <12962843+chirag-madlani@users.noreply.github.com>
This commit is contained in:
Sriharsha Chintalapani 2023-03-07 06:40:04 -08:00 committed by GitHub
parent f867ff10e7
commit fe73948b55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 199 additions and 111 deletions

View File

@ -54,7 +54,7 @@ COMMONS = {
"kafka": {
"avro~=1.11",
VERSIONS["avro-python3"],
"confluent_kafka==1.8.2",
"confluent_kafka==1.9.2",
"fastavro>=1.2.0",
# Due to https://github.com/grpc/grpc/issues/30843#issuecomment-1303816925
# use >= v1.47.2 https://github.com/grpc/grpc/blob/v1.47.2/tools/distrib/python/grpcio_tools/grpc_version.py#L17

View File

@ -19,6 +19,9 @@ sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth
authProvider: openmetadata
securityConfig:
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -21,6 +21,7 @@ from typing import Iterable, Optional
import confluent_kafka
from confluent_kafka import KafkaError, KafkaException
from confluent_kafka.admin import ConfigResource
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.schema_registry.schema_registry_client import Schema
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
@ -46,6 +47,15 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
def on_partitions_assignment_to_consumer(consumer, partitions):
# get offset tuple from the first partition
for partition in partitions:
last_offset = consumer.get_watermark_offsets(partition)
# get latest 50 messages, if there are no more than 50 messages we try to fetch from beginning of the queue
partition.offset = last_offset[1] - 50 if last_offset[1] > 50 else 0
consumer.assign(partitions)
class CommonBrokerSource(MessagingServiceSource, ABC):
"""
Common Broker Source Class
@ -104,7 +114,6 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
]
)
self.add_properties_to_topic_from_resource(topic, topic_config_resource)
if topic_schema is not None:
schema_type = topic_schema.schema_type.lower()
load_parser_fn = schema_parser_config_registry.registry.get(schema_type)
@ -121,9 +130,12 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
schemaType=schema_type_map.get(
topic_schema.schema_type.lower(), SchemaType.Other.value
),
schemaFields=schema_fields,
schemaFields=schema_fields if schema_fields is not None else [],
)
else:
topic.messageSchema = Topic(
schemaText="", schemaType=SchemaType.Other, schemaFields=[]
)
self.status.topic_scanned(topic.name.__root__)
yield topic
@ -197,17 +209,13 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
"""
Method to Get Sample Data of Messaging Entity
"""
if (
self.context.topic
and self.context.topic.messageSchema
and self.context.topic.messageSchema.schemaType.value
== SchemaType.Avro.value
and self.generate_sample_data
):
if self.context.topic and self.generate_sample_data:
topic_name = topic_details.topic_name
sample_data = []
try:
self.consumer_client.subscribe([topic_name])
self.consumer_client.subscribe(
[topic_name], on_assign=on_partitions_assignment_to_consumer
)
logger.info(
f"Broker consumer polling for sample messages in topic {topic_name}"
)
@ -223,12 +231,10 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
try:
value = message.value()
sample_data.append(
value.decode()
if isinstance(value, bytes)
else str(
self.consumer_client._serializer.decode_message( # pylint: disable=protected-access
value
)
self.decode_message(
value,
self.context.topic.messageSchema.schemaText,
self.context.topic.messageSchema.schemaType,
)
)
except Exception as exc:
@ -242,6 +248,17 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
sample_data=TopicSampleData(messages=sample_data),
)
def decode_message(self, record: bytes, schema: str, schema_type: SchemaType):
if schema_type == SchemaType.Avro:
deserializer = AvroDeserializer(
schema_str=schema, schema_registry_client=self.schema_registry_client
)
return str(deserializer(record, None))
if schema_type == SchemaType.Protobuf:
logger.debug("Protobuf deserializing sample data is not supported")
return ""
return str(record.decode("utf-8"))
def close(self):
if self.generate_sample_data and self.consumer_client:
self.consumer_client.close()

View File

@ -79,7 +79,8 @@ def get_connection(
if "group.id" not in consumer_config:
consumer_config["group.id"] = "openmetadata-consumer"
if "auto.offset.reset" not in consumer_config:
consumer_config["auto.offset.reset"] = "earliest"
consumer_config["auto.offset.reset"] = "largest"
consumer_config["enable.auto.commit"] = False
logger.debug(f"Using Kafka consumer config: {consumer_config}")
consumer_client = AvroConsumer(
consumer_config, schema_registry=schema_registry_client

View File

@ -69,8 +69,7 @@ def parse_single_field(
Parse primitive field for avro schema
"""
obj = cls(
name=field.name,
dataType=str(field.type.type).upper(),
name=field.name, dataType=str(field.type.type).upper(), description=field.doc
)
if cls == Column:
obj.dataTypeDisplay = str(field.type.type)
@ -85,7 +84,15 @@ def parse_avro_schema(
"""
try:
parsed_schema = avroschema.parse(schema)
return get_avro_fields(parsed_schema, cls)
models = [
cls(
name=parsed_schema.name,
dataType=str(parsed_schema.type).upper(),
children=get_avro_fields(parsed_schema, cls),
description=parsed_schema.doc,
)
]
return models
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Unable to parse the avro schema: {exc}")

View File

@ -44,7 +44,14 @@ def parse_json_schema(schema_text: str) -> Optional[List[FieldModel]]:
"""
try:
json_schema_data = json.loads(schema_text)
field_models = get_json_schema_fields(json_schema_data.get("properties"))
field_models = [
FieldModel(
name=json_schema_data.get("title", "default"),
dataType=JsonSchemaDataTypes(json_schema_data.get("type")).name,
description=json_schema_data.get("description"),
children=get_json_schema_fields(json_schema_data.get("properties")),
)
]
return field_models
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())

View File

@ -176,7 +176,13 @@ class ProtobufParser:
proto_path=proto_path, file_path=file_path
)
field_models = self.get_protobuf_fields(instance.DESCRIPTOR.fields)
field_models = [
FieldModel(
name=instance.DESCRIPTOR.name,
dataType="RECORD",
children=self.get_protobuf_fields(instance.DESCRIPTOR.fields),
)
]
# Clean up the tmp folder
if Path(self.config.base_file_path).exists():

View File

@ -73,13 +73,15 @@ class AvroParserTests(TestCase):
parsed_schema = parse_avro_schema(sample_avro_schema)
def test_schema_name(self):
self.assertEqual(self.parsed_schema[0].name.__root__, "order_id")
self.assertEqual(self.parsed_schema[0].name.__root__, "Order")
def test_schema_type(self):
self.assertEqual(self.parsed_schema[0].dataType.name, "INT")
self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD")
def test_field_names(self):
field_names = {str(field.name.__root__) for field in self.parsed_schema}
field_names = {
str(field.name.__root__) for field in self.parsed_schema[0].children
}
self.assertEqual(
field_names,
{
@ -97,5 +99,7 @@ class AvroParserTests(TestCase):
)
def test_field_types(self):
field_types = {str(field.dataType.name) for field in self.parsed_schema}
field_types = {
str(field.dataType.name) for field in self.parsed_schema[0].children
}
self.assertEqual(field_types, {"INT", "STRING", "DOUBLE"})

View File

@ -47,22 +47,26 @@ class JsonSchemaParserTests(TestCase):
parsed_schema = parse_json_schema(sample_json_schema)
def test_schema_name(self):
self.assertEqual(self.parsed_schema[0].name.__root__, "firstName")
self.assertEqual(self.parsed_schema[0].name.__root__, "Person")
def test_schema_type(self):
self.assertEqual(self.parsed_schema[0].dataType.name, "STRING")
self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD")
def test_field_names(self):
field_names = {str(field.name.__root__) for field in self.parsed_schema}
field_names = {
str(field.name.__root__) for field in self.parsed_schema[0].children
}
self.assertEqual(field_names, {"firstName", "lastName", "age"})
def test_field_types(self):
field_types = {str(field.dataType.name) for field in self.parsed_schema}
field_types = {
str(field.dataType.name) for field in self.parsed_schema[0].children
}
self.assertEqual(field_types, {"INT", "STRING"})
def test_field_descriptions(self):
field_descriptions = {
str(field.description.__root__) for field in self.parsed_schema
str(field.description.__root__) for field in self.parsed_schema[0].children
}
self.assertEqual(
field_descriptions,

View File

@ -48,15 +48,19 @@ class ProtobufParserTests(TestCase):
parsed_schema = protobuf_parser.parse_protobuf_schema()
def test_schema_name(self):
self.assertEqual(self.parsed_schema[0].name.__root__, "age")
self.assertEqual(self.parsed_schema[0].name.__root__, "PersonInfo")
def test_schema_type(self):
self.assertEqual(self.parsed_schema[0].dataType.name, "INT")
self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD")
def test_field_names(self):
field_names = {str(field.name.__root__) for field in self.parsed_schema}
field_names = {
str(field.name.__root__) for field in self.parsed_schema[0].children
}
self.assertEqual(field_names, {"height", "gender", "age"})
def test_field_types(self):
field_types = {str(field.dataType.name) for field in self.parsed_schema}
field_types = {
str(field.dataType.name) for field in self.parsed_schema[0].children
}
self.assertEqual(field_types, {"INT", "ENUM"})

View File

@ -14,15 +14,8 @@
import { Card } from 'antd';
import { AxiosError } from 'axios';
import { ENTITY_CARD_CLASS } from 'constants/entity.constants';
import { isEmpty } from 'lodash';
import { EntityTags, ExtraInfo } from 'Models';
import React, {
Fragment,
RefObject,
useCallback,
useEffect,
useState,
} from 'react';
import React, { RefObject, useCallback, useEffect, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { useHistory } from 'react-router-dom';
import { restoreTopic } from 'rest/topicsAPI';
@ -382,26 +375,6 @@ const TopicDetails: React.FC<TopicDetailsProps> = ({
}
};
const getInfoBadge = (infos: Array<Record<string, string | number>>) => {
return (
<div className="tw-flex tw-justify-between">
<div className="tw-flex tw-gap-3">
{infos.map((info, index) => (
<div className="tw-mt-4" key={index}>
<span className="tw-py-1.5 tw-px-2 tw-rounded-l tw-bg-tag ">
{info.key}
</span>
<span className="tw-py-1.5 tw-px-2 tw-bg-primary-lite tw-font-normal tw-rounded-r">
{info.value}
</span>
</div>
))}
</div>
<div />
</div>
);
};
const handleFullScreenClick = () => {
history.push(getLineageViewPath(EntityType.TOPIC, topicFQN));
};
@ -558,33 +531,17 @@ const TopicDetails: React.FC<TopicDetailsProps> = ({
/>
</div>
</div>
{!isEmpty(topicDetails.messageSchema?.schemaFields) ? (
<Fragment>
{getInfoBadge([
{
key: t('label.schema'),
value: topicDetails.messageSchema?.schemaType ?? '',
},
])}
<TopicSchemaFields
className="mt-4"
hasDescriptionEditAccess={
topicPermissions.EditAll ||
topicPermissions.EditDescription
}
hasTagEditAccess={
topicPermissions.EditAll || topicPermissions.EditTags
}
isReadOnly={Boolean(deleted)}
messageSchema={topicDetails.messageSchema}
onUpdate={handleSchemaFieldsUpdate}
/>
</Fragment>
) : (
<div className="tw-flex tw-justify-center tw-font-medium tw-items-center tw-border tw-border-main tw-rounded-md tw-p-8">
{t('message.no-schema-data-available')}
</div>
)}
<TopicSchemaFields
hasDescriptionEditAccess={
topicPermissions.EditAll || topicPermissions.EditDescription
}
hasTagEditAccess={
topicPermissions.EditAll || topicPermissions.EditTags
}
isReadOnly={Boolean(deleted)}
messageSchema={topicDetails.messageSchema}
onUpdate={handleSchemaFieldsUpdate}
/>
</Card>
)}
{activeTab === 2 && (

View File

@ -29,3 +29,8 @@ export interface TopicSchemaFieldsProps
isReadOnly: boolean;
onUpdate: (updatedMessageSchema: Topic['messageSchema']) => Promise<void>;
}
export enum SchemaViewType {
FIELDS = 'fields',
TEXT = 'text',
}

View File

@ -11,8 +11,23 @@
* limitations under the License.
*/
import { Button, Popover, Space, Typography } from 'antd';
import {
Button,
Col,
Popover,
Radio,
RadioChangeEvent,
Row,
Space,
Tag,
Typography,
} from 'antd';
import Table, { ColumnsType } from 'antd/lib/table';
import classNames from 'classnames';
import ErrorPlaceHolder from 'components/common/error-with-placeholder/ErrorPlaceHolder';
import SchemaEditor from 'components/schema-editor/SchemaEditor';
import { CSMode } from 'enums/codemirror.enum';
import { ERROR_PLACEHOLDER_TYPE } from 'enums/common.enum';
import { cloneDeep, isEmpty, isUndefined } from 'lodash';
import { EntityTags, TagOption } from 'Models';
import React, { FC, useMemo, useState } from 'react';
@ -30,7 +45,11 @@ import RichTextEditorPreviewer from '../../common/rich-text-editor/RichTextEdito
import { ModalWithMarkdownEditor } from '../../Modals/ModalWithMarkdownEditor/ModalWithMarkdownEditor';
import TagsContainer from '../../Tag/TagsContainer/tags-container';
import TagsViewer from '../../Tag/TagsViewer/tags-viewer';
import { CellRendered, TopicSchemaFieldsProps } from './TopicSchema.interface';
import {
CellRendered,
SchemaViewType,
TopicSchemaFieldsProps,
} from './TopicSchema.interface';
const TopicSchemaFields: FC<TopicSchemaFieldsProps> = ({
messageSchema,
@ -48,6 +67,9 @@ const TopicSchemaFields: FC<TopicSchemaFieldsProps> = ({
const [tagList, setTagList] = useState<TagOption[]>([]);
const [isTagLoading, setIsTagLoading] = useState<boolean>(false);
const [tagFetchFailed, setTagFetchFailed] = useState<boolean>(false);
const [viewType, setViewType] = useState<SchemaViewType>(
SchemaViewType.FIELDS
);
const fetchTags = async () => {
setIsTagLoading(true);
@ -240,22 +262,71 @@ const TopicSchemaFields: FC<TopicSchemaFieldsProps> = ({
]
);
const handleViewChange = (e: RadioChangeEvent) => {
setViewType(e.target.value);
};
return (
<>
<Table
bordered
className={className}
columns={columns}
data-testid="topic-schema-fields-table"
dataSource={messageSchema?.schemaFields}
expandable={{
...getTableExpandableConfig<Field>(),
rowExpandable: (record) => !isEmpty(record.children),
}}
pagination={false}
rowKey="name"
size="small"
/>
<Row className="mt-4" gutter={[16, 16]}>
<Col>
<Space>
<Typography.Text type="secondary">
{t('label.schema')}
</Typography.Text>
<Tag>{messageSchema?.schemaType ?? ''}</Tag>
</Space>
</Col>
{isEmpty(messageSchema?.schemaFields) &&
isEmpty(messageSchema?.schemaText) ? (
<ErrorPlaceHolder type={ERROR_PLACEHOLDER_TYPE.VIEW}>
{t('message.no-schema-data-available')}
</ErrorPlaceHolder>
) : (
<>
{!isEmpty(messageSchema?.schemaFields) && (
<Col span={24}>
<Radio.Group value={viewType} onChange={handleViewChange}>
<Radio.Button value={SchemaViewType.FIELDS}>
{t('label.field-plural')}
</Radio.Button>
<Radio.Button value={SchemaViewType.TEXT}>
{t('label.text')}
</Radio.Button>
</Radio.Group>
</Col>
)}
<Col span={24}>
{viewType === SchemaViewType.TEXT ||
isEmpty(messageSchema?.schemaFields) ? (
messageSchema?.schemaText && (
<SchemaEditor
editorClass={classNames('table-query-editor')}
mode={{ name: CSMode.JAVASCRIPT }}
options={{
styleActiveLine: false,
}}
value={messageSchema?.schemaText ?? ''}
/>
)
) : (
<Table
bordered
className={className}
columns={columns}
data-testid="topic-schema-fields-table"
dataSource={messageSchema?.schemaFields}
expandable={{
...getTableExpandableConfig<Field>(),
rowExpandable: (record) => !isEmpty(record.children),
}}
pagination={false}
rowKey="name"
size="small"
/>
)}
</Col>
</>
)}
{editFieldDescription && (
<ModalWithMarkdownEditor
header={`${t('label.edit-entity', {
@ -270,7 +341,7 @@ const TopicSchemaFields: FC<TopicSchemaFieldsProps> = ({
onSave={handleFieldDescriptionChange}
/>
)}
</>
</Row>
);
};

View File

@ -740,6 +740,7 @@
"test-type": "Test type",
"testing-connection": "Testing Connection",
"tests-summary": "Tests Summary",
"text": "Text",
"thread": "Thread",
"three-dash-symbol": "---",
"three-dots-symbol": "•••",

View File

@ -44,5 +44,6 @@
@active-color: #e8f4ff;
@background-color: #ffffff;
@panels-shadow-color: 1px 1px 8px rgba(0, 0, 0, 0.06);
@radio-button-checked-bg: rgba(113, 71, 232, 0.1);
@announcement-background: #fffdf8;
@announcement-border: #ffc143;