diff --git a/ingestion/setup.py b/ingestion/setup.py index 7538caa5543..7b02e5f4675 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -54,7 +54,7 @@ COMMONS = { "kafka": { "avro~=1.11", VERSIONS["avro-python3"], - "confluent_kafka==1.8.2", + "confluent_kafka==1.9.2", "fastavro>=1.2.0", # Due to https://github.com/grpc/grpc/issues/30843#issuecomment-1303816925 # use >= v1.47.2 https://github.com/grpc/grpc/blob/v1.47.2/tools/distrib/python/grpcio_tools/grpc_version.py#L17 diff --git a/ingestion/src/metadata/examples/workflows/kafka.yaml b/ingestion/src/metadata/examples/workflows/kafka.yaml index 32a32e2c57b..45a3c7c3920 100644 --- a/ingestion/src/metadata/examples/workflows/kafka.yaml +++ b/ingestion/src/metadata/examples/workflows/kafka.yaml @@ -19,6 +19,9 @@ sink: type: metadata-rest config: {} workflowConfig: + loggerLevel: DEBUG openMetadataServerConfig: hostPort: http://localhost:8585/api - authProvider: no-auth \ No newline at end of file + authProvider: openmetadata + securityConfig: + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" diff --git a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py index 972674e5730..660681f7fc3 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py +++ b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py @@ -21,6 +21,7 @@ from typing import Iterable, Optional import confluent_kafka from confluent_kafka import KafkaError, KafkaException from confluent_kafka.admin import ConfigResource +from confluent_kafka.schema_registry.avro import AvroDeserializer from confluent_kafka.schema_registry.schema_registry_client import Schema from metadata.generated.schema.api.data.createTopic import CreateTopicRequest @@ -46,6 +47,15 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +def on_partitions_assignment_to_consumer(consumer, partitions): + # get offset tuple from the first partition + for partition in partitions: + last_offset = consumer.get_watermark_offsets(partition) + # get latest 50 messages, if there are no more than 50 messages we try to fetch from beginning of the queue + partition.offset = last_offset[1] - 50 if last_offset[1] > 50 else 0 + consumer.assign(partitions) + + class CommonBrokerSource(MessagingServiceSource, ABC): """ Common Broker Source Class @@ -104,7 +114,6 @@ class CommonBrokerSource(MessagingServiceSource, ABC): ] ) self.add_properties_to_topic_from_resource(topic, topic_config_resource) - if topic_schema is not None: schema_type = topic_schema.schema_type.lower() load_parser_fn = schema_parser_config_registry.registry.get(schema_type) @@ -121,9 +130,12 @@ class CommonBrokerSource(MessagingServiceSource, ABC): schemaType=schema_type_map.get( topic_schema.schema_type.lower(), SchemaType.Other.value ), - schemaFields=schema_fields, + schemaFields=schema_fields if schema_fields is not None else [], + ) + else: + topic.messageSchema = Topic( + schemaText="", schemaType=SchemaType.Other, schemaFields=[] ) - self.status.topic_scanned(topic.name.__root__) yield topic @@ -197,17 +209,13 @@ class CommonBrokerSource(MessagingServiceSource, ABC): """ Method to Get Sample Data of Messaging Entity """ - if ( - self.context.topic - and self.context.topic.messageSchema - and self.context.topic.messageSchema.schemaType.value - == SchemaType.Avro.value - and self.generate_sample_data - ): + if self.context.topic and self.generate_sample_data: topic_name = topic_details.topic_name sample_data = [] try: - self.consumer_client.subscribe([topic_name]) + self.consumer_client.subscribe( + [topic_name], on_assign=on_partitions_assignment_to_consumer + ) logger.info( f"Broker consumer polling for sample messages in topic {topic_name}" ) @@ -223,12 +231,10 @@ class CommonBrokerSource(MessagingServiceSource, ABC): try: value = message.value() sample_data.append( - value.decode() - if isinstance(value, bytes) - else str( - self.consumer_client._serializer.decode_message( # pylint: disable=protected-access - value - ) + self.decode_message( + value, + self.context.topic.messageSchema.schemaText, + self.context.topic.messageSchema.schemaType, ) ) except Exception as exc: @@ -242,6 +248,17 @@ class CommonBrokerSource(MessagingServiceSource, ABC): sample_data=TopicSampleData(messages=sample_data), ) + def decode_message(self, record: bytes, schema: str, schema_type: SchemaType): + if schema_type == SchemaType.Avro: + deserializer = AvroDeserializer( + schema_str=schema, schema_registry_client=self.schema_registry_client + ) + return str(deserializer(record, None)) + if schema_type == SchemaType.Protobuf: + logger.debug("Protobuf deserializing sample data is not supported") + return "" + return str(record.decode("utf-8")) + def close(self): if self.generate_sample_data and self.consumer_client: self.consumer_client.close() diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py index aa41f2ac923..610e83fd0c1 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka/connection.py @@ -79,7 +79,8 @@ def get_connection( if "group.id" not in consumer_config: consumer_config["group.id"] = "openmetadata-consumer" if "auto.offset.reset" not in consumer_config: - consumer_config["auto.offset.reset"] = "earliest" + consumer_config["auto.offset.reset"] = "largest" + consumer_config["enable.auto.commit"] = False logger.debug(f"Using Kafka consumer config: {consumer_config}") consumer_client = AvroConsumer( consumer_config, schema_registry=schema_registry_client diff --git a/ingestion/src/metadata/parsers/avro_parser.py b/ingestion/src/metadata/parsers/avro_parser.py index 4db90946592..db207f90ceb 100644 --- a/ingestion/src/metadata/parsers/avro_parser.py +++ b/ingestion/src/metadata/parsers/avro_parser.py @@ -69,8 +69,7 @@ def parse_single_field( Parse primitive field for avro schema """ obj = cls( - name=field.name, - dataType=str(field.type.type).upper(), + name=field.name, dataType=str(field.type.type).upper(), description=field.doc ) if cls == Column: obj.dataTypeDisplay = str(field.type.type) @@ -85,7 +84,15 @@ def parse_avro_schema( """ try: parsed_schema = avroschema.parse(schema) - return get_avro_fields(parsed_schema, cls) + models = [ + cls( + name=parsed_schema.name, + dataType=str(parsed_schema.type).upper(), + children=get_avro_fields(parsed_schema, cls), + description=parsed_schema.doc, + ) + ] + return models except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.warning(f"Unable to parse the avro schema: {exc}") diff --git a/ingestion/src/metadata/parsers/json_schema_parser.py b/ingestion/src/metadata/parsers/json_schema_parser.py index 7f268f5425d..818fc27fbce 100644 --- a/ingestion/src/metadata/parsers/json_schema_parser.py +++ b/ingestion/src/metadata/parsers/json_schema_parser.py @@ -44,7 +44,14 @@ def parse_json_schema(schema_text: str) -> Optional[List[FieldModel]]: """ try: json_schema_data = json.loads(schema_text) - field_models = get_json_schema_fields(json_schema_data.get("properties")) + field_models = [ + FieldModel( + name=json_schema_data.get("title", "default"), + dataType=JsonSchemaDataTypes(json_schema_data.get("type")).name, + description=json_schema_data.get("description"), + children=get_json_schema_fields(json_schema_data.get("properties")), + ) + ] return field_models except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/parsers/protobuf_parser.py b/ingestion/src/metadata/parsers/protobuf_parser.py index a39d6f9705c..3b7f69367f5 100644 --- a/ingestion/src/metadata/parsers/protobuf_parser.py +++ b/ingestion/src/metadata/parsers/protobuf_parser.py @@ -176,7 +176,13 @@ class ProtobufParser: proto_path=proto_path, file_path=file_path ) - field_models = self.get_protobuf_fields(instance.DESCRIPTOR.fields) + field_models = [ + FieldModel( + name=instance.DESCRIPTOR.name, + dataType="RECORD", + children=self.get_protobuf_fields(instance.DESCRIPTOR.fields), + ) + ] # Clean up the tmp folder if Path(self.config.base_file_path).exists(): diff --git a/ingestion/tests/unit/test_avro_parser.py b/ingestion/tests/unit/test_avro_parser.py index 729766c4f50..0ce296a5adc 100644 --- a/ingestion/tests/unit/test_avro_parser.py +++ b/ingestion/tests/unit/test_avro_parser.py @@ -73,13 +73,15 @@ class AvroParserTests(TestCase): parsed_schema = parse_avro_schema(sample_avro_schema) def test_schema_name(self): - self.assertEqual(self.parsed_schema[0].name.__root__, "order_id") + self.assertEqual(self.parsed_schema[0].name.__root__, "Order") def test_schema_type(self): - self.assertEqual(self.parsed_schema[0].dataType.name, "INT") + self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD") def test_field_names(self): - field_names = {str(field.name.__root__) for field in self.parsed_schema} + field_names = { + str(field.name.__root__) for field in self.parsed_schema[0].children + } self.assertEqual( field_names, { @@ -97,5 +99,7 @@ class AvroParserTests(TestCase): ) def test_field_types(self): - field_types = {str(field.dataType.name) for field in self.parsed_schema} + field_types = { + str(field.dataType.name) for field in self.parsed_schema[0].children + } self.assertEqual(field_types, {"INT", "STRING", "DOUBLE"}) diff --git a/ingestion/tests/unit/test_json_schema_parser.py b/ingestion/tests/unit/test_json_schema_parser.py index 7889f59999c..09f5f91d299 100644 --- a/ingestion/tests/unit/test_json_schema_parser.py +++ b/ingestion/tests/unit/test_json_schema_parser.py @@ -47,22 +47,26 @@ class JsonSchemaParserTests(TestCase): parsed_schema = parse_json_schema(sample_json_schema) def test_schema_name(self): - self.assertEqual(self.parsed_schema[0].name.__root__, "firstName") + self.assertEqual(self.parsed_schema[0].name.__root__, "Person") def test_schema_type(self): - self.assertEqual(self.parsed_schema[0].dataType.name, "STRING") + self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD") def test_field_names(self): - field_names = {str(field.name.__root__) for field in self.parsed_schema} + field_names = { + str(field.name.__root__) for field in self.parsed_schema[0].children + } self.assertEqual(field_names, {"firstName", "lastName", "age"}) def test_field_types(self): - field_types = {str(field.dataType.name) for field in self.parsed_schema} + field_types = { + str(field.dataType.name) for field in self.parsed_schema[0].children + } self.assertEqual(field_types, {"INT", "STRING"}) def test_field_descriptions(self): field_descriptions = { - str(field.description.__root__) for field in self.parsed_schema + str(field.description.__root__) for field in self.parsed_schema[0].children } self.assertEqual( field_descriptions, diff --git a/ingestion/tests/unit/test_protobuf_parser.py b/ingestion/tests/unit/test_protobuf_parser.py index b6530c5c368..f7cea387881 100644 --- a/ingestion/tests/unit/test_protobuf_parser.py +++ b/ingestion/tests/unit/test_protobuf_parser.py @@ -48,15 +48,19 @@ class ProtobufParserTests(TestCase): parsed_schema = protobuf_parser.parse_protobuf_schema() def test_schema_name(self): - self.assertEqual(self.parsed_schema[0].name.__root__, "age") + self.assertEqual(self.parsed_schema[0].name.__root__, "PersonInfo") def test_schema_type(self): - self.assertEqual(self.parsed_schema[0].dataType.name, "INT") + self.assertEqual(self.parsed_schema[0].dataType.name, "RECORD") def test_field_names(self): - field_names = {str(field.name.__root__) for field in self.parsed_schema} + field_names = { + str(field.name.__root__) for field in self.parsed_schema[0].children + } self.assertEqual(field_names, {"height", "gender", "age"}) def test_field_types(self): - field_types = {str(field.dataType.name) for field in self.parsed_schema} + field_types = { + str(field.dataType.name) for field in self.parsed_schema[0].children + } self.assertEqual(field_types, {"INT", "ENUM"}) diff --git a/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicDetails.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicDetails.component.tsx index 25c5a4af66a..2921922d335 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicDetails.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicDetails.component.tsx @@ -14,15 +14,8 @@ import { Card } from 'antd'; import { AxiosError } from 'axios'; import { ENTITY_CARD_CLASS } from 'constants/entity.constants'; -import { isEmpty } from 'lodash'; import { EntityTags, ExtraInfo } from 'Models'; -import React, { - Fragment, - RefObject, - useCallback, - useEffect, - useState, -} from 'react'; +import React, { RefObject, useCallback, useEffect, useState } from 'react'; import { useTranslation } from 'react-i18next'; import { useHistory } from 'react-router-dom'; import { restoreTopic } from 'rest/topicsAPI'; @@ -382,26 +375,6 @@ const TopicDetails: React.FC = ({ } }; - const getInfoBadge = (infos: Array>) => { - return ( -
-
- {infos.map((info, index) => ( -
- - {info.key} - - - {info.value} - -
- ))} -
-
-
- ); - }; - const handleFullScreenClick = () => { history.push(getLineageViewPath(EntityType.TOPIC, topicFQN)); }; @@ -558,33 +531,17 @@ const TopicDetails: React.FC = ({ />
- {!isEmpty(topicDetails.messageSchema?.schemaFields) ? ( - - {getInfoBadge([ - { - key: t('label.schema'), - value: topicDetails.messageSchema?.schemaType ?? '', - }, - ])} - - - ) : ( -
- {t('message.no-schema-data-available')} -
- )} + )} {activeTab === 2 && ( diff --git a/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicSchema/TopicSchema.interface.tsx b/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicSchema/TopicSchema.interface.tsx index a1bc2ad6e19..156774f025d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicSchema/TopicSchema.interface.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicSchema/TopicSchema.interface.tsx @@ -29,3 +29,8 @@ export interface TopicSchemaFieldsProps isReadOnly: boolean; onUpdate: (updatedMessageSchema: Topic['messageSchema']) => Promise; } + +export enum SchemaViewType { + FIELDS = 'fields', + TEXT = 'text', +} diff --git a/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicSchema/TopicSchema.tsx b/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicSchema/TopicSchema.tsx index dd5f20d3914..8e0a251370d 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicSchema/TopicSchema.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/TopicDetails/TopicSchema/TopicSchema.tsx @@ -11,8 +11,23 @@ * limitations under the License. */ -import { Button, Popover, Space, Typography } from 'antd'; +import { + Button, + Col, + Popover, + Radio, + RadioChangeEvent, + Row, + Space, + Tag, + Typography, +} from 'antd'; import Table, { ColumnsType } from 'antd/lib/table'; +import classNames from 'classnames'; +import ErrorPlaceHolder from 'components/common/error-with-placeholder/ErrorPlaceHolder'; +import SchemaEditor from 'components/schema-editor/SchemaEditor'; +import { CSMode } from 'enums/codemirror.enum'; +import { ERROR_PLACEHOLDER_TYPE } from 'enums/common.enum'; import { cloneDeep, isEmpty, isUndefined } from 'lodash'; import { EntityTags, TagOption } from 'Models'; import React, { FC, useMemo, useState } from 'react'; @@ -30,7 +45,11 @@ import RichTextEditorPreviewer from '../../common/rich-text-editor/RichTextEdito import { ModalWithMarkdownEditor } from '../../Modals/ModalWithMarkdownEditor/ModalWithMarkdownEditor'; import TagsContainer from '../../Tag/TagsContainer/tags-container'; import TagsViewer from '../../Tag/TagsViewer/tags-viewer'; -import { CellRendered, TopicSchemaFieldsProps } from './TopicSchema.interface'; +import { + CellRendered, + SchemaViewType, + TopicSchemaFieldsProps, +} from './TopicSchema.interface'; const TopicSchemaFields: FC = ({ messageSchema, @@ -48,6 +67,9 @@ const TopicSchemaFields: FC = ({ const [tagList, setTagList] = useState([]); const [isTagLoading, setIsTagLoading] = useState(false); const [tagFetchFailed, setTagFetchFailed] = useState(false); + const [viewType, setViewType] = useState( + SchemaViewType.FIELDS + ); const fetchTags = async () => { setIsTagLoading(true); @@ -240,22 +262,71 @@ const TopicSchemaFields: FC = ({ ] ); + const handleViewChange = (e: RadioChangeEvent) => { + setViewType(e.target.value); + }; + return ( - <> - (), - rowExpandable: (record) => !isEmpty(record.children), - }} - pagination={false} - rowKey="name" - size="small" - /> + + + + + {t('label.schema')} + + {messageSchema?.schemaType ?? ''} + + + {isEmpty(messageSchema?.schemaFields) && + isEmpty(messageSchema?.schemaText) ? ( + + {t('message.no-schema-data-available')} + + ) : ( + <> + {!isEmpty(messageSchema?.schemaFields) && ( + + + + {t('label.field-plural')} + + + {t('label.text')} + + + + )} + + {viewType === SchemaViewType.TEXT || + isEmpty(messageSchema?.schemaFields) ? ( + messageSchema?.schemaText && ( + + ) + ) : ( +
(), + rowExpandable: (record) => !isEmpty(record.children), + }} + pagination={false} + rowKey="name" + size="small" + /> + )} + + + )} {editFieldDescription && ( = ({ onSave={handleFieldDescriptionChange} /> )} - + ); }; diff --git a/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json b/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json index 072b20790d6..f6e61835d81 100644 --- a/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json +++ b/openmetadata-ui/src/main/resources/ui/src/locale/languages/en-us.json @@ -740,6 +740,7 @@ "test-type": "Test type", "testing-connection": "Testing Connection", "tests-summary": "Tests Summary", + "text": "Text", "thread": "Thread", "three-dash-symbol": "---", "three-dots-symbol": "•••", diff --git a/openmetadata-ui/src/main/resources/ui/src/styles/variables.less b/openmetadata-ui/src/main/resources/ui/src/styles/variables.less index 5225f57c441..89b209f2968 100644 --- a/openmetadata-ui/src/main/resources/ui/src/styles/variables.less +++ b/openmetadata-ui/src/main/resources/ui/src/styles/variables.less @@ -44,5 +44,6 @@ @active-color: #e8f4ff; @background-color: #ffffff; @panels-shadow-color: 1px 1px 8px rgba(0, 0, 0, 0.06); +@radio-button-checked-bg: rgba(113, 71, 232, 0.1); @announcement-background: #fffdf8; @announcement-border: #ffc143;