Fix #7156: Add Sample Data for Kafka and Redpanda (#9472)

Co-authored-by: Ayush Shah <ayush@getcollate.io>
Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Mayur Singal 2023-01-12 14:13:25 +05:30 committed by GitHub
parent f2e54fe7b9
commit c4a7965ffc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 36 deletions

View File

@ -0,0 +1,22 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Model required to ingest topic sample data
"""
from pydantic import BaseModel
from metadata.generated.schema.entity.data.topic import Topic, TopicSampleData
class OMetaTopicSampleData(BaseModel):
topic: Topic
sample_data: TopicSampleData

View File

@ -46,6 +46,7 @@ from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.profile_data import OMetaTableProfileSampleData
from metadata.ingestion.models.table_metadata import DeleteTable
@ -121,6 +122,7 @@ class MetadataRestSink(Sink[Entity]):
self.write_record.register(
OMetaTestCaseResultsSample, self.write_test_case_results_sample
)
self.write_record.register(OMetaTopicSampleData, self.write_topic_sample_data)
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
@ -564,6 +566,28 @@ class MetadataRestSink(Sink[Entity]):
f"Unexpected error writing test case result sample [{record}]: {exc}"
)
def write_topic_sample_data(self, record: OMetaTopicSampleData):
"""
Use the /testCase endpoint to ingest sample test suite
"""
try:
if record.sample_data.messages:
self.metadata.ingest_topic_sample_data(
record.topic,
record.sample_data,
)
logger.debug(
f"Successfully ingested sample data for {record.topic.name.__root__}"
)
self.status.records_written(
f"topicSampleData: {record.topic.name.__root__}"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Unexpected error while ingesting sample data for topic [{record.topic.name.__root__}]: {exc}"
)
def get_status(self):
return self.status

View File

@ -33,6 +33,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.schema import SchemaType, Topic
from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData
from metadata.ingestion.source.messaging.messaging_service import (
BrokerTopicDetails,
MessagingServiceSource,
@ -126,11 +127,6 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
),
schemaFields=schema_fields,
)
if (
topic_schema.schema_type.lower() == SchemaType.Avro.value.lower()
and self.generate_sample_data
):
topic.sampleData = self._get_sample_data(topic.name)
self.status.topic_scanned(topic.name.__root__)
yield topic
@ -199,37 +195,56 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
)
return None
def _get_sample_data(self, topic_name):
sample_data = []
try:
self.consumer_client.subscribe([topic_name.__root__])
logger.info(
f"Broker consumer polling for sample messages in topic {topic_name.__root__}"
)
messages = self.consumer_client.consume(num_messages=10, timeout=10)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch sample data from topic {topic_name.__root__}: {exc}"
)
else:
if messages:
for message in messages:
try:
sample_data.append(
str(
self.consumer_client._serializer.decode_message( # pylint: disable=protected-access
message.value()
def yield_topic_sample_data(
self, topic_details: BrokerTopicDetails
) -> TopicSampleData:
"""
Method to Get Sample Data of Messaging Entity
"""
if (
self.context.topic
and self.context.topic.messageSchema
and self.context.topic.messageSchema.schemaType.value
== SchemaType.Avro.value
and self.generate_sample_data
):
topic_name = topic_details.topic_name
sample_data = []
try:
self.consumer_client.subscribe([topic_name])
logger.info(
f"Broker consumer polling for sample messages in topic {topic_name}"
)
messages = self.consumer_client.consume(num_messages=10, timeout=10)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch sample data from topic {topic_name}: {exc}"
)
else:
if messages:
for message in messages:
try:
value = message.value()
sample_data.append(
value.decode()
if isinstance(value, bytes)
else str(
self.consumer_client._serializer.decode_message( # pylint: disable=protected-access
value
)
)
)
)
except Exception as exc:
logger.warning(
f"Failed to decode sample data from topic {topic_name.__root__}: {exc}"
)
except Exception as exc:
logger.warning(
f"Failed to decode sample data from topic {topic_name}: {exc}"
)
self.consumer_client.unsubscribe()
return TopicSampleData(messages=sample_data)
self.consumer_client.unsubscribe()
yield OMetaTopicSampleData(
topic=self.context.topic,
sample_data=TopicSampleData(messages=sample_data),
)
def close(self):
if self.generate_sample_data and self.consumer_client:

View File

@ -18,7 +18,7 @@ from typing import Any, Iterable, List, Optional
from pydantic import BaseModel
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.data.topic import Topic, TopicSampleData
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -81,10 +81,18 @@ class MessagingServiceTopology(ServiceTopology):
stages=[
NodeStage(
type_=Topic,
context="Topic",
context="topic",
processor="yield_topic",
consumer=["messaging_service"],
)
),
NodeStage(
type_=TopicSampleData,
context="topic_sample_data",
processor="yield_topic_sample_data",
consumer=["messaging_service"],
nullable=True,
ack_sink=False,
),
],
)
@ -116,6 +124,11 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC):
Method to Get Messaging Entity
"""
def yield_topic_sample_data(self, topic_details: Any) -> Iterable[TopicSampleData]:
"""
Method to Get Sample Data of Messaging Entity
"""
@abstractmethod
def get_topic_list(self) -> Optional[List[Any]]:
"""