From c9feba826f98a2e4d5e21b5457b521f9f0854068 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Thu, 23 Mar 2023 13:38:56 +0530 Subject: [PATCH] Kinesis bug fixed (#10578) * Kinesis bug fixed * Added sample data fix * Fixed typo * Added constansts * Added try catch for sample data * Added fixes on review comments * Modified models * Added sample data py models * Fix pylint * fixed code smell --------- Co-authored-by: Pere Miquel Brull --- Makefile | 2 +- .../source/messaging/kinesis/metadata.py | 181 +++++++++++++----- .../source/messaging/kinesis/models.py | 125 ++++++++++++ .../json/schema/api/data/createTopic.json | 5 - .../ui/cypress/common/advancedSearch.js | 4 +- 5 files changed, 260 insertions(+), 57 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/messaging/kinesis/models.py diff --git a/Makefile b/Makefile index c341c8804bf..15e4ecc7621 100644 --- a/Makefile +++ b/Makefile @@ -41,7 +41,7 @@ precommit_install: ## Install the project's precommit hooks from .pre-commit-co .PHONY: lint lint: ## Run pylint on the Python sources to analyze the codebase - PYTHONPATH="${PYTHONPATH}:./ingestion/plugins" find $(PY_SOURCE) -path $(PY_SOURCE)/metadata/generated -prune -false -o -type f -name "*.py" | xargs pylint --ignore-paths=$(PY_SOURCE)/metadata_server/ + PYTHONPATH="${PYTHONPATH}:./ingestion/plugins" find $(PY_SOURCE) -path $(PY_SOURCE)/metadata/generated -prune -false -o -type f -name "*.py" | xargs pylint .PHONY: py_format py_format: ## Run black and isort to format the Python codebase diff --git a/ingestion/src/metadata/ingestion/source/messaging/kinesis/metadata.py b/ingestion/src/metadata/ingestion/source/messaging/kinesis/metadata.py index 224a970ccad..272f91af1cf 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kinesis/metadata.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kinesis/metadata.py @@ -9,11 +9,12 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -Kafka source ingestion +Kinesis source ingestion """ +import binascii import traceback from base64 import b64decode -from typing import Any, Dict, Iterable, List +from typing import Iterable, List, Optional from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.entity.data.topic import TopicSampleData @@ -27,10 +28,24 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData +from metadata.ingestion.source.messaging.kinesis.models import ( + KinesisArgs, + KinesisData, + KinesisEnum, + KinesisPartitions, + KinesisRecords, + KinesisShardIterator, + KinesisStreamArgs, + KinesisStreamModel, + KinesisSummaryModel, + KinesisTopicMetadataModel, +) from metadata.ingestion.source.messaging.messaging_service import ( BrokerTopicDetails, MessagingServiceSource, ) +from metadata.utils.constants import UTF_8 from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -45,7 +60,7 @@ class KinesisSource(MessagingServiceSource): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) self.generate_sample_data = self.config.sourceConfig.config.generateSampleData - self.kinesis = self.connection.client + self.kinesis = self.connection @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -57,48 +72,60 @@ class KinesisSource(MessagingServiceSource): ) return cls(config, metadata_config) - def get_topic_list(self) -> Iterable[BrokerTopicDetails]: - all_topics, has_more_topics, args = [], True, {"Limit": 100} - try: - while has_more_topics: - topics = self.kinesis.list_streams(**args) - all_topics.extend(topics["StreamNames"]) - has_more_topics = topics["HasMoreStreams"] - args["ExclusiveStartStreamName"] = all_topics[-1] - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(f"Failed to fetch models list - {err}") + def get_stream_names_list(self) -> List[str]: + """ + Get the list of all the streams + """ + all_topics, has_more_topics, args = [], True, KinesisArgs(Limit=100) + while has_more_topics: + try: + topics = self.kinesis.list_streams(**args.dict()) + kinesis_topic_model = KinesisStreamModel(**topics) + all_topics.extend(kinesis_topic_model.StreamNames) + has_more_topics = kinesis_topic_model.HasMoreStreams + if len(all_topics) > 0: + args.ExclusiveStartStreamName = all_topics[-1] + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Failed to fetch kinesis stream - {err}") + return all_topics + def get_topic_list(self) -> Iterable[BrokerTopicDetails]: + """ + Method to yeild topic details + """ + all_topics = self.get_stream_names_list() for topic_name in all_topics: - yield BrokerTopicDetails( - topic_name=topic_name, - topic_metadata={ - "summary": self._get_topic_details(topic_name), - "partitions": self._get_topic_partitions(topic_name), - }, - ) + try: + yield BrokerTopicDetails( + topic_name=topic_name, + topic_metadata=KinesisTopicMetadataModel( + summary=self._get_topic_details(topic_name), + partitions=self._get_topic_partitions(topic_name), + ), + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(f"Failed to yield kinesis topic - {err}") def yield_topic( self, topic_details: BrokerTopicDetails ) -> Iterable[CreateTopicRequest]: + """ + Method to yield the create topic request + """ try: logger.info(f"Fetching topic details {topic_details.topic_name}") + topic = CreateTopicRequest( name=topic_details.topic_name, service=self.context.messaging_service.fullyQualifiedName.__root__, - partitions=len(topic_details.topic_metadata["partitions"]), - retentionTime=float( - topic_details.topic_metadata["summary"].get( - "RetentionPeriodHours", 0 - ) - * 3600000 + partitions=len(topic_details.topic_metadata.partitions), + retentionTime=self._compute_retention_time( + topic_details.topic_metadata.summary ), maximumMessageSize=self._get_max_message_size(), ) - if self.generate_sample_data: - topic.sampleData = self._get_sample_data( - topic_details.topic_name, topic_details.topic_metadata["partitions"] - ) self.status.scanned(topic.name.__root__) yield topic @@ -114,31 +141,47 @@ class KinesisSource(MessagingServiceSource): def get_topic_name(self, topic_details: BrokerTopicDetails) -> str: return topic_details.topic_name + def _compute_retention_time(self, summary: Optional[KinesisSummaryModel]) -> float: + retention_time = 0 + if summary: + retention_time = ( + summary.StreamDescriptionSummary.RetentionPeriodHours * 3600000 + ) + return float(retention_time) + def _get_max_message_size(self) -> int: # max message size supported by Kinesis is 1MB and is not configurable return 1000000 - def _get_topic_details(self, topic_name: str) -> Dict[str, Any]: + def _get_topic_details(self, topic_name: str) -> Optional[KinesisSummaryModel]: try: - topic = self.kinesis.describe_stream_summary(StreamName=topic_name) - return topic["StreamDescriptionSummary"] + topic_summary = self.kinesis.describe_stream_summary(StreamName=topic_name) + return KinesisSummaryModel(**topic_summary) except Exception as err: logger.debug(traceback.format_exc()) logger.warning( f"Error while fetching topic partitions for topic: {topic_name} - {err}" ) - return {} + return None def _get_topic_partitions(self, topic_name: str) -> List[str]: - all_partitions, has_more_partitions, args = [], True, {"StreamName": topic_name} + all_partitions, has_more_partitions, args = ( + [], + True, + KinesisStreamArgs(StreamName=topic_name), + ) try: while has_more_partitions: - partitions = self.kinesis.list_shards(**args) + partitions = self.kinesis.list_shards(**args.dict()) + kinesis_partitions_model = KinesisPartitions(**partitions) all_partitions.extend( - [part["ShardId"] for part in partitions["Shards"]] + [ + partition.ShardId + for partition in kinesis_partitions_model.Shards or [] + ] ) - has_more_partitions = partitions.get("NextToken") - args["NextToken"] = partitions.get("NextToken") + has_more_partitions = kinesis_partitions_model.NextToken + args.NextToken = has_more_partitions except Exception as err: logger.debug(traceback.format_exc()) logger.warning( @@ -146,6 +189,27 @@ class KinesisSource(MessagingServiceSource): ) return all_partitions + def yield_topic_sample_data( + self, topic_details: BrokerTopicDetails + ) -> Iterable[OMetaTopicSampleData]: + """ + Method to Get Sample Data of Messaging Entity + """ + try: + if self.context.topic and self.generate_sample_data: + yield OMetaTopicSampleData( + topic=self.context.topic, + sample_data=self._get_sample_data( + topic_details.topic_name, + topic_details.topic_metadata.partitions, + ), + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error while yielding topic sample data for topic: {topic_details.topic_name} - {err}" + ) + def _get_sample_data(self, topic_name, partitions) -> TopicSampleData: data = [] try: @@ -153,16 +217,20 @@ class KinesisSource(MessagingServiceSource): shard_iterator = self.kinesis.get_shard_iterator( StreamName=topic_name, ShardId=shard, - ShardIteratorType="TRIM_HORIZON", - )["ShardIterator"] - - records = self.kinesis.get_records(ShardIterator=shard_iterator)[ - "Records" - ] - - data.extend( - [b64decode(record["Data"]).decode("utf-8") for record in records] + ShardIteratorType=KinesisEnum.TRIM_HORIZON.value, ) + shard_iterator_model = KinesisShardIterator(**shard_iterator) + + if shard_iterator_model.ShardIterator: + records = self.kinesis.get_records( + ShardIterator=shard_iterator_model.ShardIterator + ) + records_model = KinesisRecords(**records) + if records_model.Records: + data.extend( + self._get_sample_records(records=records_model.Records) + ) + if data: break except Exception as err: @@ -171,3 +239,18 @@ class KinesisSource(MessagingServiceSource): f"Error while fetching sample data for topic: {topic_name} - {err}" ) return TopicSampleData(messages=data) + + def _get_sample_records(self, records: List[KinesisData]) -> List: + sample_data = [] + try: + for record in records: + record_data = record.Data + if record_data: + try: + sample_data.append(b64decode(record_data).decode(UTF_8)) + except (binascii.Error, UnicodeDecodeError): + sample_data.append(record_data.decode(UTF_8)) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Error while fetching sample records for topics - {err}") + return sample_data diff --git a/ingestion/src/metadata/ingestion/source/messaging/kinesis/models.py b/ingestion/src/metadata/ingestion/source/messaging/kinesis/models.py new file mode 100644 index 00000000000..f0b4bac0b01 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/messaging/kinesis/models.py @@ -0,0 +1,125 @@ +# Copyright 2023 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Kinesis Models +""" +# Disable pylint to conform to Kinesis API returns +# We want to convert to the pydantic models in 1 go +# pylint: disable=invalid-name +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, Extra + + +class KinesisEnum(Enum): + """ + Enum for Kinesis + """ + + TRIM_HORIZON = "TRIM_HORIZON" + + +class KinesisStreamModel(BaseModel): + """ + Model for Kinesis streams + """ + + StreamNames: List[str] + HasMoreStreams: bool + + +class KinesisSummaryAttributes(BaseModel): + """ + Model for Kinesis Summary Attributes + """ + + RetentionPeriodHours: Optional[float] = 0 + + +class KinesisSummaryModel(BaseModel): + """ + Model for Kinesis Summary + """ + + StreamDescriptionSummary: KinesisSummaryAttributes + + +class KinesisTopicMetadataModel(BaseModel): + """ + Model for Kinesis Topic Metadata + """ + + summary: Optional[KinesisSummaryModel] + partitions: Optional[List[str]] + + +class KinesisArgs(BaseModel): + """ + Model for Kinesis API Arguments + """ + + class Config: + extra = Extra.allow + + Limit: int = 100 + + +class KinesisStreamArgs(BaseModel): + """ + Model for Kinesis Stream API Arguments + """ + + class Config: + extra = Extra.allow + + StreamName: str + + +class KinesisShards(BaseModel): + """ + Model for Kinesis Shards + """ + + ShardId: str + + +class KinesisPartitions(BaseModel): + """ + Model for Kinesis Partitions + """ + + Shards: Optional[List[KinesisShards]] + NextToken: Optional[str] + + +class KinesisShardIterator(BaseModel): + """ + Model for Kinesis Shard Iterator + """ + + ShardIterator: Optional[str] + + +class KinesisData(BaseModel): + """ + Model for Kinesis Sample Data + """ + + Data: Optional[bytes] + + +class KinesisRecords(BaseModel): + """ + Model for Kinesis Records + """ + + Records: Optional[List[KinesisData]] diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createTopic.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createTopic.json index c955feb2c69..65f5ff150ab 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createTopic.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createTopic.json @@ -39,11 +39,6 @@ "$ref": "../../entity/data/topic.json#/definitions/cleanupPolicy" } }, - "sampleData": { - "description": "Sample data for a topic.", - "$ref": "../../entity/data/topic.json#/definitions/topicSampleData", - "default": null - }, "replicationFactor": { "description": "Replication Factor in integer (more than 1).", "type": "integer" diff --git a/openmetadata-ui/src/main/resources/ui/cypress/common/advancedSearch.js b/openmetadata-ui/src/main/resources/ui/cypress/common/advancedSearch.js index 0ad2c4fae37..e46b1942028 100644 --- a/openmetadata-ui/src/main/resources/ui/cypress/common/advancedSearch.js +++ b/openmetadata-ui/src/main/resources/ui/cypress/common/advancedSearch.js @@ -353,8 +353,8 @@ export const addTag = (tag) => { cy.get('.ant-select-item-option-content') .contains(tag) .should('be.visible') - .click(); - + .click(); + cy.get('[data-testid="tag-selector"] > .ant-select-selector').contains(tag); cy.get('[data-testid="saveAssociatedTag"]').should('be.visible').click();