mirror of
https://github.com/Unstructured-IO/unstructured.git
synced 2025-06-27 02:30:08 +00:00

Thanks to @tullytim we have a new Kafka source and destination connector. It also works with hosted Kafka via Confluent. Documentation will be added to the Docs repo.
75 lines
2.5 KiB
Python
75 lines
2.5 KiB
Python
#!/usr/bin/env python
|
|
import socket
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
import click
|
|
from confluent_kafka import Consumer, TopicPartition
|
|
|
|
|
|
@click.group(name="kafka-ingest")
|
|
def cli():
|
|
pass
|
|
|
|
|
|
def get_partition_size(consumer: Consumer, topic_name: str, partition_key: int):
|
|
topic_partition = TopicPartition(topic_name, partition_key)
|
|
low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
|
|
partition_size = high_offset - low_offset
|
|
return partition_size
|
|
|
|
|
|
def get_topic_size(consumer: Consumer, topic_name: str):
|
|
print(f"Getting the number of messages in the topic {topic_name}")
|
|
topic = consumer.list_topics(topic=topic_name)
|
|
print(f"topic {topic}")
|
|
partitions = topic.topics[topic_name].partitions
|
|
workers, max_workers = [], len(partitions) or 1
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as e:
|
|
for partition_key in list(topic.topics[topic_name].partitions.keys()):
|
|
job = e.submit(get_partition_size, consumer, topic_name, partition_key)
|
|
workers.append(job)
|
|
|
|
topic_size = sum([w.result() for w in workers])
|
|
return topic_size
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--bootstrap-server", type=str, required=True)
|
|
@click.option("--topic", type=str, required=True)
|
|
@click.option("--api-key", type=str, required=False)
|
|
@click.option("--secret", type=str, required=False)
|
|
@click.option("--confluent", type=bool, required=True, default=True)
|
|
@click.option("--port", type=int, required=False, default=9092)
|
|
def check(bootstrap_server: str, topic: str, api_key: str, secret: str, confluent: bool, port: int):
|
|
conf = {
|
|
"bootstrap.servers": f"{bootstrap_server}:{port}",
|
|
"client.id": socket.gethostname(),
|
|
"group.id": "your_group_id",
|
|
"enable.auto.commit": "true",
|
|
"auto.offset.reset": "earliest",
|
|
}
|
|
|
|
if confluent:
|
|
conf["security.protocol"] = "SASL_SSL"
|
|
conf["sasl.mechanism"] = "PLAIN"
|
|
conf["sasl.username"] = api_key
|
|
conf["sasl.password"] = secret
|
|
|
|
consumer = Consumer(conf)
|
|
print("Checking the number of messages in the topic")
|
|
topic_size = get_topic_size(consumer, topic)
|
|
expected = 16
|
|
print(
|
|
f"Checking that the number of messages found ({topic_size}) "
|
|
f"matches what's expected: {expected}"
|
|
)
|
|
assert (
|
|
topic_size == expected
|
|
), f"number of messages found ({topic_size}) doesn't match what's expected: {expected}"
|
|
print("successfully checked the number of messages!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|