Kinesis bug fixed (#10578)

* Kinesis bug fixed

* Added sample data fix

* Fixed typo

* Added constansts

* Added try catch for sample data

* Added fixes on review comments

* Modified models

* Added sample data py models

* Fix pylint

* fixed code smell

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Onkar Ravgan 2023-03-23 13:38:56 +05:30 committed by GitHub
parent acb0e71a3e
commit c9feba826f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 260 additions and 57 deletions

View File

@ -41,7 +41,7 @@ precommit_install: ## Install the project's precommit hooks from .pre-commit-co
.PHONY: lint
lint: ## Run pylint on the Python sources to analyze the codebase
PYTHONPATH="${PYTHONPATH}:./ingestion/plugins" find $(PY_SOURCE) -path $(PY_SOURCE)/metadata/generated -prune -false -o -type f -name "*.py" | xargs pylint --ignore-paths=$(PY_SOURCE)/metadata_server/
PYTHONPATH="${PYTHONPATH}:./ingestion/plugins" find $(PY_SOURCE) -path $(PY_SOURCE)/metadata/generated -prune -false -o -type f -name "*.py" | xargs pylint
.PHONY: py_format
py_format: ## Run black and isort to format the Python codebase

View File

@ -9,11 +9,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Kafka source ingestion
Kinesis source ingestion
"""
import binascii
import traceback
from base64 import b64decode
from typing import Any, Dict, Iterable, List
from typing import Iterable, List, Optional
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import TopicSampleData
@ -27,10 +28,24 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData
from metadata.ingestion.source.messaging.kinesis.models import (
KinesisArgs,
KinesisData,
KinesisEnum,
KinesisPartitions,
KinesisRecords,
KinesisShardIterator,
KinesisStreamArgs,
KinesisStreamModel,
KinesisSummaryModel,
KinesisTopicMetadataModel,
)
from metadata.ingestion.source.messaging.messaging_service import (
BrokerTopicDetails,
MessagingServiceSource,
)
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -45,7 +60,7 @@ class KinesisSource(MessagingServiceSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.generate_sample_data = self.config.sourceConfig.config.generateSampleData
self.kinesis = self.connection.client
self.kinesis = self.connection
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -57,48 +72,60 @@ class KinesisSource(MessagingServiceSource):
)
return cls(config, metadata_config)
def get_topic_list(self) -> Iterable[BrokerTopicDetails]:
all_topics, has_more_topics, args = [], True, {"Limit": 100}
try:
while has_more_topics:
topics = self.kinesis.list_streams(**args)
all_topics.extend(topics["StreamNames"])
has_more_topics = topics["HasMoreStreams"]
args["ExclusiveStartStreamName"] = all_topics[-1]
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Failed to fetch models list - {err}")
def get_stream_names_list(self) -> List[str]:
"""
Get the list of all the streams
"""
all_topics, has_more_topics, args = [], True, KinesisArgs(Limit=100)
while has_more_topics:
try:
topics = self.kinesis.list_streams(**args.dict())
kinesis_topic_model = KinesisStreamModel(**topics)
all_topics.extend(kinesis_topic_model.StreamNames)
has_more_topics = kinesis_topic_model.HasMoreStreams
if len(all_topics) > 0:
args.ExclusiveStartStreamName = all_topics[-1]
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Failed to fetch kinesis stream - {err}")
return all_topics
def get_topic_list(self) -> Iterable[BrokerTopicDetails]:
"""
Method to yeild topic details
"""
all_topics = self.get_stream_names_list()
for topic_name in all_topics:
yield BrokerTopicDetails(
topic_name=topic_name,
topic_metadata={
"summary": self._get_topic_details(topic_name),
"partitions": self._get_topic_partitions(topic_name),
},
)
try:
yield BrokerTopicDetails(
topic_name=topic_name,
topic_metadata=KinesisTopicMetadataModel(
summary=self._get_topic_details(topic_name),
partitions=self._get_topic_partitions(topic_name),
),
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Failed to yield kinesis topic - {err}")
def yield_topic(
self, topic_details: BrokerTopicDetails
) -> Iterable[CreateTopicRequest]:
"""
Method to yield the create topic request
"""
try:
logger.info(f"Fetching topic details {topic_details.topic_name}")
topic = CreateTopicRequest(
name=topic_details.topic_name,
service=self.context.messaging_service.fullyQualifiedName.__root__,
partitions=len(topic_details.topic_metadata["partitions"]),
retentionTime=float(
topic_details.topic_metadata["summary"].get(
"RetentionPeriodHours", 0
)
* 3600000
partitions=len(topic_details.topic_metadata.partitions),
retentionTime=self._compute_retention_time(
topic_details.topic_metadata.summary
),
maximumMessageSize=self._get_max_message_size(),
)
if self.generate_sample_data:
topic.sampleData = self._get_sample_data(
topic_details.topic_name, topic_details.topic_metadata["partitions"]
)
self.status.scanned(topic.name.__root__)
yield topic
@ -114,31 +141,47 @@ class KinesisSource(MessagingServiceSource):
def get_topic_name(self, topic_details: BrokerTopicDetails) -> str:
return topic_details.topic_name
def _compute_retention_time(self, summary: Optional[KinesisSummaryModel]) -> float:
retention_time = 0
if summary:
retention_time = (
summary.StreamDescriptionSummary.RetentionPeriodHours * 3600000
)
return float(retention_time)
def _get_max_message_size(self) -> int:
# max message size supported by Kinesis is 1MB and is not configurable
return 1000000
def _get_topic_details(self, topic_name: str) -> Dict[str, Any]:
def _get_topic_details(self, topic_name: str) -> Optional[KinesisSummaryModel]:
try:
topic = self.kinesis.describe_stream_summary(StreamName=topic_name)
return topic["StreamDescriptionSummary"]
topic_summary = self.kinesis.describe_stream_summary(StreamName=topic_name)
return KinesisSummaryModel(**topic_summary)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching topic partitions for topic: {topic_name} - {err}"
)
return {}
return None
def _get_topic_partitions(self, topic_name: str) -> List[str]:
all_partitions, has_more_partitions, args = [], True, {"StreamName": topic_name}
all_partitions, has_more_partitions, args = (
[],
True,
KinesisStreamArgs(StreamName=topic_name),
)
try:
while has_more_partitions:
partitions = self.kinesis.list_shards(**args)
partitions = self.kinesis.list_shards(**args.dict())
kinesis_partitions_model = KinesisPartitions(**partitions)
all_partitions.extend(
[part["ShardId"] for part in partitions["Shards"]]
[
partition.ShardId
for partition in kinesis_partitions_model.Shards or []
]
)
has_more_partitions = partitions.get("NextToken")
args["NextToken"] = partitions.get("NextToken")
has_more_partitions = kinesis_partitions_model.NextToken
args.NextToken = has_more_partitions
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
@ -146,6 +189,27 @@ class KinesisSource(MessagingServiceSource):
)
return all_partitions
def yield_topic_sample_data(
self, topic_details: BrokerTopicDetails
) -> Iterable[OMetaTopicSampleData]:
"""
Method to Get Sample Data of Messaging Entity
"""
try:
if self.context.topic and self.generate_sample_data:
yield OMetaTopicSampleData(
topic=self.context.topic,
sample_data=self._get_sample_data(
topic_details.topic_name,
topic_details.topic_metadata.partitions,
),
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while yielding topic sample data for topic: {topic_details.topic_name} - {err}"
)
def _get_sample_data(self, topic_name, partitions) -> TopicSampleData:
data = []
try:
@ -153,16 +217,20 @@ class KinesisSource(MessagingServiceSource):
shard_iterator = self.kinesis.get_shard_iterator(
StreamName=topic_name,
ShardId=shard,
ShardIteratorType="TRIM_HORIZON",
)["ShardIterator"]
records = self.kinesis.get_records(ShardIterator=shard_iterator)[
"Records"
]
data.extend(
[b64decode(record["Data"]).decode("utf-8") for record in records]
ShardIteratorType=KinesisEnum.TRIM_HORIZON.value,
)
shard_iterator_model = KinesisShardIterator(**shard_iterator)
if shard_iterator_model.ShardIterator:
records = self.kinesis.get_records(
ShardIterator=shard_iterator_model.ShardIterator
)
records_model = KinesisRecords(**records)
if records_model.Records:
data.extend(
self._get_sample_records(records=records_model.Records)
)
if data:
break
except Exception as err:
@ -171,3 +239,18 @@ class KinesisSource(MessagingServiceSource):
f"Error while fetching sample data for topic: {topic_name} - {err}"
)
return TopicSampleData(messages=data)
def _get_sample_records(self, records: List[KinesisData]) -> List:
sample_data = []
try:
for record in records:
record_data = record.Data
if record_data:
try:
sample_data.append(b64decode(record_data).decode(UTF_8))
except (binascii.Error, UnicodeDecodeError):
sample_data.append(record_data.decode(UTF_8))
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Error while fetching sample records for topics - {err}")
return sample_data

View File

@ -0,0 +1,125 @@
# Copyright 2023 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Kinesis Models
"""
# Disable pylint to conform to Kinesis API returns
# We want to convert to the pydantic models in 1 go
# pylint: disable=invalid-name
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel, Extra
class KinesisEnum(Enum):
"""
Enum for Kinesis
"""
TRIM_HORIZON = "TRIM_HORIZON"
class KinesisStreamModel(BaseModel):
"""
Model for Kinesis streams
"""
StreamNames: List[str]
HasMoreStreams: bool
class KinesisSummaryAttributes(BaseModel):
"""
Model for Kinesis Summary Attributes
"""
RetentionPeriodHours: Optional[float] = 0
class KinesisSummaryModel(BaseModel):
"""
Model for Kinesis Summary
"""
StreamDescriptionSummary: KinesisSummaryAttributes
class KinesisTopicMetadataModel(BaseModel):
"""
Model for Kinesis Topic Metadata
"""
summary: Optional[KinesisSummaryModel]
partitions: Optional[List[str]]
class KinesisArgs(BaseModel):
"""
Model for Kinesis API Arguments
"""
class Config:
extra = Extra.allow
Limit: int = 100
class KinesisStreamArgs(BaseModel):
"""
Model for Kinesis Stream API Arguments
"""
class Config:
extra = Extra.allow
StreamName: str
class KinesisShards(BaseModel):
"""
Model for Kinesis Shards
"""
ShardId: str
class KinesisPartitions(BaseModel):
"""
Model for Kinesis Partitions
"""
Shards: Optional[List[KinesisShards]]
NextToken: Optional[str]
class KinesisShardIterator(BaseModel):
"""
Model for Kinesis Shard Iterator
"""
ShardIterator: Optional[str]
class KinesisData(BaseModel):
"""
Model for Kinesis Sample Data
"""
Data: Optional[bytes]
class KinesisRecords(BaseModel):
"""
Model for Kinesis Records
"""
Records: Optional[List[KinesisData]]

View File

@ -39,11 +39,6 @@
"$ref": "../../entity/data/topic.json#/definitions/cleanupPolicy"
}
},
"sampleData": {
"description": "Sample data for a topic.",
"$ref": "../../entity/data/topic.json#/definitions/topicSampleData",
"default": null
},
"replicationFactor": {
"description": "Replication Factor in integer (more than 1).",
"type": "integer"

View File

@ -353,8 +353,8 @@ export const addTag = (tag) => {
cy.get('.ant-select-item-option-content')
.contains(tag)
.should('be.visible')
.click();
.click();
cy.get('[data-testid="tag-selector"] > .ant-select-selector').contains(tag);
cy.get('[data-testid="saveAssociatedTag"]').should('be.visible').click();