datahub/docker/kafka-setup/kafka-setup.sh
jaegwon.seo fc7b6853b9
change generate kafka connect properties from env (#10545)
Co-authored-by: david-leifker <114954101+david-leifker@users.noreply.github.com>
2024-07-31 15:05:55 -05:00

134 lines
5.3 KiB
Bash
Executable File

#!/bin/bash
## Exit early if PRECREATION is not needed
if [[ $DATAHUB_PRECREATE_TOPICS == "false" ]]; then
echo "DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS}"
echo "Pre-creation of topics has been turned off, exiting"
exit 0
fi
. kafka-config.sh
echo "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVER" > $CONNECTION_PROPERTIES_PATH
python env_to_properties.py KAFKA_PROPERTIES_ $CONNECTION_PROPERTIES_PATH
# cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180
. kafka-ready.sh
############################################################
# Start Topic Creation Logic
############################################################
# make the files
START=$(mktemp -t start-XXXX)
FIFO=$(mktemp -t fifo-XXXX)
FIFO_LOCK=$(mktemp -t lock-XXXX)
START_LOCK=$(mktemp -t lock-XXXX)
## mktemp makes a regular file. Delete that an make a fifo.
rm $FIFO
mkfifo $FIFO
echo $FIFO
## create a trap to cleanup on exit if we fail in the middle.
cleanup() {
rm $FIFO
rm $START
rm $FIFO_LOCK
rm $START_LOCK
}
trap cleanup 0
# Start worker script
. kafka-topic-workers.sh $START $FIFO $FIFO_LOCK $START_LOCK
## Open the fifo for writing.
exec 3>$FIFO
## Open the start lock for reading
exec 4<$START_LOCK
## Wait for the workers to start
while true; do
flock 4
started=$(wc -l $START | cut -d \ -f 1)
flock -u 4
if [[ $started -eq $WORKERS ]]; then
break
else
echo waiting, started $started of $WORKERS
fi
done
exec 4<&-
## utility function to send the jobs to the workers
send() {
work_id=$1
topic_args=$2
topic_config=$3
echo -e "sending $work_id\n worker_args: ${topic_args}${DELIMITER}${topic_config}"
echo "$work_id" "${topic_args}${DELIMITER}${topic_config}" 1>&3 ## the fifo is fd 3
}
## Produce the jobs to run.
send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME" \
"--entity-type topics --entity-name $METADATA_AUDIT_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME" \
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
# Set retention to 90 days
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
"--entity-type topics --entity-name $METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME" \
"--entity-type topics --entity-name $PLATFORM_EVENT_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
# Infinite retention upgrade topic
# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite
# Please see the bug report below for details
# https://github.com/datahub-project/datahub/issues/7882
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" \
"--entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1"
# Create topic for datahub usage event
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME"
fi
## close the filo
exec 3<&-
## disable the cleanup trap
trap '' 0
## It is safe to delete the files because the workers
## already opened them. Thus, only the names are going away
## the actual files will stay there until the workers
## all finish.
cleanup
## now wait for all the workers.
wait
echo "Topic Creation Complete."
############################################################
# End Topic Creation Logic
############################################################
## If using confluent schema registry as a standalone component, then configure compact cleanup policy.
if [[ $USE_CONFLUENT_SCHEMA_REGISTRY == "TRUE" ]]; then
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
--entity-type topics \
--entity-name _schemas \
--alter --add-config cleanup.policy=compact
fi