feat: add Kinesis connector (#8452)

Signed-off-by: Tushar Mittal <chiragmittal.mittal@gmail.com>
This commit is contained in:
Tushar Mittal 2022-11-02 16:12:45 +05:30 committed by GitHub
parent 119763afc4
commit 2a65df5f36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 403 additions and 13 deletions

View File

@ -90,6 +90,7 @@ plugins: Dict[str, Set[str]] = {
"presto-types-parser==0.0.2",
},
"kafka": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"},
"kinesis": {"boto3~=1.19.12"},
"redpanda": {"confluent_kafka==1.8.2", "fastavro>=1.2.0", "avro-python3"},
"ldap-users": {"ldap3==2.9.1"},
"looker": {"looker-sdk>=22.4.0"},
@ -155,6 +156,8 @@ test = {
"great-expectations~=0.15.0",
# Airflow tests
"apache-airflow==2.3.3",
# mock boto3 functions
"moto==4.0.8",
}
build_options = {"includes": ["_cffi_backend"]}

View File

@ -20,6 +20,7 @@ from metadata.clients.connection_clients import (
DynamoClient,
GlueDBClient,
GluePipelineClient,
KinesisClient,
)
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
from metadata.utils.logger import utils_logger
@ -91,3 +92,6 @@ class AWSClient:
def get_glue_pipeline_client(self) -> GluePipelineClient:
return GluePipelineClient(self.get_client("glue"))
def get_kinesis_client(self) -> KinesisClient:
return KinesisClient(self.get_client("kinesis"))

View File

@ -55,6 +55,12 @@ class KafkaClient:
self.consumer_client = consumer_client
@dataclass
class KinesisClient:
def __init__(self, client) -> None:
self.client = client
@dataclass
class MetabaseClient:
def __init__(self, client) -> None:

View File

@ -0,0 +1,20 @@
source:
type: kinesis
serviceName: local_kinesis
serviceConnection:
config:
type: Kinesis
awsConfig:
awsRegion: eu-west-1
sourceConfig:
config:
type: MessagingMetadata
generateSampleData: false
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: "DEBUG"
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -16,13 +16,12 @@ Common Broker for fetching metadata
import concurrent.futures
import traceback
from abc import ABC
from typing import Any, Iterable, Optional
from typing import Iterable, Optional
import confluent_kafka
from confluent_kafka import KafkaError, KafkaException
from confluent_kafka.admin import ConfigResource
from confluent_kafka.schema_registry.schema_registry_client import Schema
from pydantic import BaseModel
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import SchemaType, TopicSampleData
@ -33,21 +32,15 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.messaging.messaging_service import MessagingServiceSource
from metadata.ingestion.source.messaging.messaging_service import (
BrokerTopicDetails,
MessagingServiceSource,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class BrokerTopicDetails(BaseModel):
"""
Wrapper Class to combine the topic_name with topic_metadata
"""
topic_name: str
topic_metadata: Any
class CommonBrokerSource(MessagingServiceSource, ABC):
"""
Common Broker Source Class

View File

@ -0,0 +1,177 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Kafka source ingestion
"""
import traceback
from base64 import b64decode
from typing import Any, Dict, Iterable, List
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import TopicSampleData
from metadata.generated.schema.entity.services.connections.messaging.kinesisConnection import (
KinesisConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.messaging.messaging_service import (
BrokerTopicDetails,
MessagingServiceSource,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class KinesisSource(MessagingServiceSource):
"""
Implements the necessary methods to extract
topics metadata from Kinesis Source
"""
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.generate_sample_data = self.config.sourceConfig.config.generateSampleData
self.kinesis = self.connection.client
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: KinesisConnection = config.serviceConnection.__root__.config
if not isinstance(connection, KinesisConnection):
raise InvalidSourceException(
f"Expected KinesisConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_topic_list(self) -> Iterable[BrokerTopicDetails]:
all_topics, has_more_topics, args = [], True, {"Limit": 100}
try:
while has_more_topics:
topics = self.kinesis.list_streams(**args)
all_topics.extend(topics["StreamNames"])
has_more_topics = topics["HasMoreStreams"]
args["ExclusiveStartStreamName"] = all_topics[-1]
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Failed to fetch models list - {err}")
for topic_name in all_topics:
yield BrokerTopicDetails(
topic_name=topic_name,
topic_metadata={
"summary": self._get_topic_details(topic_name),
"partitions": self._get_topic_partitions(topic_name),
},
)
def yield_topic(
self, topic_details: BrokerTopicDetails
) -> Iterable[CreateTopicRequest]:
try:
logger.info(f"Fetching topic details {topic_details.topic_name}")
topic = CreateTopicRequest(
name=topic_details.topic_name,
service=EntityReference(
id=self.context.messaging_service.id.__root__,
type="messagingService",
),
partitions=len(topic_details.topic_metadata["partitions"]),
retentionTime=float(
topic_details.topic_metadata["summary"].get(
"RetentionPeriodHours", 0
)
* 3600000
),
maximumMessageSize=self._get_max_message_size(),
)
if self.generate_sample_data:
topic.sampleData = self._get_sample_data(
topic_details.topic_name, topic_details.topic_metadata["partitions"]
)
self.status.topic_scanned(topic.name.__root__)
yield topic
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unexpected exception to yield topic [{topic_details.topic_name}]: {exc}"
)
self.status.failures.append(
f"{self.config.serviceName}.{topic_details.topic_name}"
)
def get_topic_name(self, topic_details: BrokerTopicDetails) -> str:
return topic_details.topic_name
def _get_max_message_size(self) -> int:
# max message size supported by Kinesis is 1MB and is not configurable
return 1000000
def _get_topic_details(self, topic_name: str) -> Dict[str, Any]:
try:
topic = self.kinesis.describe_stream_summary(StreamName=topic_name)
return topic["StreamDescriptionSummary"]
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching topic partitions for topic: {topic_name} - {err}"
)
return {}
def _get_topic_partitions(self, topic_name: str) -> List[str]:
all_partitions, has_more_partitions, args = [], True, {"StreamName": topic_name}
try:
while has_more_partitions:
partitions = self.kinesis.list_shards(**args)
all_partitions.extend(
[part["ShardId"] for part in partitions["Shards"]]
)
has_more_partitions = partitions.get("NextToken")
args["NextToken"] = partitions.get("NextToken")
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching topic partitions for topic: {topic_name} - {err}"
)
return all_partitions
def _get_sample_data(self, topic_name, partitions) -> TopicSampleData:
data = []
try:
for shard in partitions:
shard_iterator = self.kinesis.get_shard_iterator(
StreamName=topic_name,
ShardId=shard,
ShardIteratorType="TRIM_HORIZON",
)["ShardIterator"]
records = self.kinesis.get_records(ShardIterator=shard_iterator)[
"Records"
]
data.extend(
[b64decode(record["Data"]).decode("utf-8") for record in records]
)
if data:
break
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching sample data for topic: {topic_name} - {err}"
)
return TopicSampleData(messages=data)

View File

@ -15,6 +15,8 @@ Base class for ingesting messaging services
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Optional
from pydantic import BaseModel
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
@ -43,6 +45,15 @@ from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_topic
class BrokerTopicDetails(BaseModel):
"""
Wrapper Class to combine the topic_name with topic_metadata
"""
topic_name: str
topic_metadata: Any
class MessagingServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Messaging Services.

View File

@ -40,6 +40,7 @@ from metadata.clients.connection_clients import (
GlueDBClient,
GluePipelineClient,
KafkaClient,
KinesisClient,
LookerClient,
MetabaseClient,
MlflowClientWrapper,
@ -111,6 +112,9 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
)
from metadata.generated.schema.entity.services.connections.messaging.kinesisConnection import (
KinesisConnection,
)
from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import (
RedpandaConnection,
)
@ -567,6 +571,36 @@ def _(connection: DeltaLakeClient) -> None:
raise SourceConnectionException(msg) from exc
@get_connection.register
def _(
connection: KinesisConnection,
verbose: bool = False, # pylint: disable=unused-argument
) -> KinesisClient:
from metadata.clients.aws_client import AWSClient
kinesis_connection = AWSClient(connection.awsConfig).get_kinesis_client()
return kinesis_connection
@test_connection.register
def _(connection: KinesisClient) -> None:
"""
Test that we can connect to the Kinesis source using the given aws credentials
:param engine: boto service resource to test
:return: None or raise an exception if we cannot connect
"""
from botocore.client import ClientError
try:
connection.client.list_streams()
except ClientError as err:
msg = f"Connection error for {connection}: {err}. Check the connection details."
raise SourceConnectionException(msg) from err
except Exception as exc:
msg = f"Unknown error connecting with {connection}: {exc}."
raise SourceConnectionException(msg) from exc
@get_connection.register
def _(
connection: MetabaseConnection, verbose: bool = False

View File

@ -0,0 +1,102 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Kinesis unit tests
"""
import json
from base64 import b64encode
from unittest import TestCase
from unittest.mock import patch
import boto3
from moto import mock_kinesis
from metadata.ingestion.api.workflow import Workflow
CONFIG = """
{
"source": {
"type": "kinesis",
"serviceName": "local_kinesis",
"serviceConnection": {
"config": {
"type": "Kinesis",
"awsConfig": {
"awsRegion": "eu-west-1"
}
}
},
"sourceConfig": {
"config": {
"type": "MessagingMetadata",
"generateSampleData": true
}
}
},
"sink": {
"type": "file",
"config": {
"filename": "/var/tmp/datasets.json"
}
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}
"""
def execute_workflow():
workflow = Workflow.create(json.loads(CONFIG))
workflow.execute()
workflow.print_status()
workflow.stop()
def get_file_path():
return json.loads(CONFIG)["sink"]["config"]["filename"]
def _setup_mock_kinesis(create_stream: bool = False):
kinesis = boto3.Session().client("kinesis")
if not create_stream:
return
kinesis.create_stream(StreamName="mock-stream", ShardCount=5)
kinesis.put_record(
StreamName="mock-stream",
Data=b64encode("mock-data".encode("utf-8")),
PartitionKey="mock-key",
)
@mock_kinesis
@patch("sqlalchemy.engine.base.Engine.connect")
class KinesisIngestionTest(TestCase):
def test_kinesis_empty_topics(self, mock_connect):
_setup_mock_kinesis()
execute_workflow()
file_path = get_file_path()
with open(file_path, "r") as file:
assert len(json.loads(file.read())) == 0
def test_kinesis_topics(self, mock_connect):
_setup_mock_kinesis(create_stream=True)
execute_workflow()
file_path = get_file_path()
with open(file_path, "r") as file:
data = json.loads(file.read())
assert data[0]["name"] == "mock-stream"
assert data[0]["partitions"] == 5
assert "mock-data" in data[0]["sampleData"]["messages"]

View File

@ -0,0 +1,34 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/messaging/kinesisConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "KinesisConnection",
"description": "Kinesis Connection Config",
"type": "object",
"javaType": "org.openmetadata.schema.services.connections.messaging.KinesisConnection",
"definitions": {
"kinesisType": {
"description": "Service type.",
"type": "string",
"enum": ["Kinesis"],
"default": "Kinesis"
}
},
"properties": {
"type": {
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/kinesisType",
"default": "Kinesis"
},
"awsConfig": {
"title": "AWS Credentials Configuration",
"$ref": "../../../../security/credentials/awsCredentials.json"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false,
"required": ["awsConfig"]
}

View File

@ -14,7 +14,7 @@
"description": "Type of messaging service - Kafka or Pulsar.",
"type": "string",
"javaInterfaces": ["org.openmetadata.schema.EnumInterface"],
"enum": ["Kafka", "Pulsar", "Redpanda", "CustomMessaging"],
"enum": ["Kafka", "Pulsar", "Redpanda", "Kinesis", "CustomMessaging"],
"javaEnums": [
{
"name": "Kafka"
@ -25,6 +25,9 @@
{
"name": "Redpanda"
},
{
"name": "Kinesis"
},
{
"name": "CustomMessaging"
}
@ -58,6 +61,9 @@
{
"$ref": "./connections/messaging/redpandaConnection.json"
},
{
"$ref": "./connections/messaging/kinesisConnection.json"
},
{
"$ref": "connections/messaging/customMessagingConnection.json"
}