David Potter 8610bd3ab9
feat: Kafka source and destination connector (#3176)
Thanks to @tullytim we have a new Kafka source and destination
connector. It also works with hosted Kafka via Confluent.

Documentation will be added to the Docs repo.
2024-06-22 23:26:23 +00:00

75 lines
2.5 KiB
Python

#!/usr/bin/env python
import socket
from concurrent.futures import ThreadPoolExecutor
import click
from confluent_kafka import Consumer, TopicPartition
@click.group(name="kafka-ingest")
def cli():
pass
def get_partition_size(consumer: Consumer, topic_name: str, partition_key: int):
topic_partition = TopicPartition(topic_name, partition_key)
low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
partition_size = high_offset - low_offset
return partition_size
def get_topic_size(consumer: Consumer, topic_name: str):
print(f"Getting the number of messages in the topic {topic_name}")
topic = consumer.list_topics(topic=topic_name)
print(f"topic {topic}")
partitions = topic.topics[topic_name].partitions
workers, max_workers = [], len(partitions) or 1
with ThreadPoolExecutor(max_workers=max_workers) as e:
for partition_key in list(topic.topics[topic_name].partitions.keys()):
job = e.submit(get_partition_size, consumer, topic_name, partition_key)
workers.append(job)
topic_size = sum([w.result() for w in workers])
return topic_size
@cli.command()
@click.option("--bootstrap-server", type=str, required=True)
@click.option("--topic", type=str, required=True)
@click.option("--api-key", type=str, required=False)
@click.option("--secret", type=str, required=False)
@click.option("--confluent", type=bool, required=True, default=True)
@click.option("--port", type=int, required=False, default=9092)
def check(bootstrap_server: str, topic: str, api_key: str, secret: str, confluent: bool, port: int):
conf = {
"bootstrap.servers": f"{bootstrap_server}:{port}",
"client.id": socket.gethostname(),
"group.id": "your_group_id",
"enable.auto.commit": "true",
"auto.offset.reset": "earliest",
}
if confluent:
conf["security.protocol"] = "SASL_SSL"
conf["sasl.mechanism"] = "PLAIN"
conf["sasl.username"] = api_key
conf["sasl.password"] = secret
consumer = Consumer(conf)
print("Checking the number of messages in the topic")
topic_size = get_topic_size(consumer, topic)
expected = 16
print(
f"Checking that the number of messages found ({topic_size}) "
f"matches what's expected: {expected}"
)
assert (
topic_size == expected
), f"number of messages found ({topic_size}) doesn't match what's expected: {expected}"
print("successfully checked the number of messages!")
if __name__ == "__main__":
cli()