2021-04-16 18:40:39 +02:00
|
|
|
#!/bin/bash
|
|
|
|
|
2023-01-31 00:34:36 +01:00
|
|
|
## Exit early if PRECREATION is not needed
|
|
|
|
if [[ $DATAHUB_PRECREATE_TOPICS == "false" ]]; then
|
|
|
|
echo "DATAHUB_PRECREATE_TOPICS=${DATAHUB_PRECREATE_TOPICS}"
|
|
|
|
echo "Pre-creation of topics has been turned off, exiting"
|
|
|
|
exit 0
|
|
|
|
fi
|
|
|
|
|
2023-01-09 12:27:00 +00:00
|
|
|
. kafka-config.sh
|
2022-12-15 20:09:29 -06:00
|
|
|
|
2021-04-16 17:51:43 -07:00
|
|
|
echo "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVER" > $CONNECTION_PROPERTIES_PATH
|
2021-04-16 18:40:39 +02:00
|
|
|
|
2024-08-01 05:05:55 +09:00
|
|
|
python env_to_properties.py KAFKA_PROPERTIES_ $CONNECTION_PROPERTIES_PATH
|
2022-11-22 16:37:24 -05:00
|
|
|
|
2023-08-22 01:08:08 -04:00
|
|
|
# cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180
|
|
|
|
. kafka-ready.sh
|
2023-01-09 12:27:00 +00:00
|
|
|
|
|
|
|
############################################################
|
|
|
|
# Start Topic Creation Logic
|
|
|
|
############################################################
|
|
|
|
# make the files
|
|
|
|
START=$(mktemp -t start-XXXX)
|
|
|
|
FIFO=$(mktemp -t fifo-XXXX)
|
|
|
|
FIFO_LOCK=$(mktemp -t lock-XXXX)
|
|
|
|
START_LOCK=$(mktemp -t lock-XXXX)
|
|
|
|
|
|
|
|
## mktemp makes a regular file. Delete that an make a fifo.
|
|
|
|
rm $FIFO
|
|
|
|
mkfifo $FIFO
|
|
|
|
echo $FIFO
|
|
|
|
|
|
|
|
## create a trap to cleanup on exit if we fail in the middle.
|
|
|
|
cleanup() {
|
|
|
|
rm $FIFO
|
|
|
|
rm $START
|
|
|
|
rm $FIFO_LOCK
|
|
|
|
rm $START_LOCK
|
|
|
|
}
|
|
|
|
trap cleanup 0
|
|
|
|
|
|
|
|
# Start worker script
|
|
|
|
. kafka-topic-workers.sh $START $FIFO $FIFO_LOCK $START_LOCK
|
|
|
|
|
|
|
|
## Open the fifo for writing.
|
|
|
|
exec 3>$FIFO
|
|
|
|
## Open the start lock for reading
|
|
|
|
exec 4<$START_LOCK
|
|
|
|
|
|
|
|
## Wait for the workers to start
|
|
|
|
while true; do
|
|
|
|
flock 4
|
|
|
|
started=$(wc -l $START | cut -d \ -f 1)
|
|
|
|
flock -u 4
|
|
|
|
if [[ $started -eq $WORKERS ]]; then
|
|
|
|
break
|
|
|
|
else
|
|
|
|
echo waiting, started $started of $WORKERS
|
|
|
|
fi
|
|
|
|
done
|
|
|
|
exec 4<&-
|
|
|
|
|
|
|
|
## utility function to send the jobs to the workers
|
|
|
|
send() {
|
|
|
|
work_id=$1
|
|
|
|
topic_args=$2
|
2023-10-29 16:26:05 -05:00
|
|
|
topic_config=$3
|
|
|
|
|
|
|
|
echo -e "sending $work_id\n worker_args: ${topic_args}${DELIMITER}${topic_config}"
|
|
|
|
echo "$work_id" "${topic_args}${DELIMITER}${topic_config}" 1>&3 ## the fifo is fd 3
|
2023-01-09 12:27:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
## Produce the jobs to run.
|
2023-10-29 16:26:05 -05:00
|
|
|
send "$METADATA_AUDIT_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_AUDIT_EVENT_NAME" \
|
|
|
|
"--entity-type topics --entity-name $METADATA_AUDIT_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
|
|
|
|
|
|
|
|
send "$METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_EVENT_NAME" \
|
|
|
|
"--entity-type topics --entity-name $METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
|
|
|
|
send "$FAILED_METADATA_CHANGE_EVENT_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_EVENT_NAME" \
|
|
|
|
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_EVENT_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
|
|
|
|
|
|
|
|
send "$METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME" \
|
|
|
|
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
|
2022-03-29 18:32:04 -07:00
|
|
|
|
2021-07-30 17:41:03 -07:00
|
|
|
# Set retention to 90 days
|
2023-10-29 16:26:05 -05:00
|
|
|
send "$METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" "--partitions $PARTITIONS --config retention.ms=7776000000 --topic $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME" \
|
|
|
|
"--entity-type topics --entity-name $METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
|
|
|
|
|
|
|
|
send "$METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
|
|
|
|
"--entity-type topics --entity-name $METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
|
|
|
|
send "$FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" "--partitions $PARTITIONS --topic $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME" \
|
|
|
|
"--entity-type topics --entity-name $FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
|
|
|
|
|
|
|
|
send "$PLATFORM_EVENT_TOPIC_NAME" "--partitions $PARTITIONS --topic $PLATFORM_EVENT_TOPIC_NAME" \
|
|
|
|
"--entity-type topics --entity-name $PLATFORM_EVENT_TOPIC_NAME --alter --add-config max.message.bytes=$MAX_MESSAGE_BYTES"
|
2021-07-30 17:41:03 -07:00
|
|
|
|
2023-01-31 18:44:37 -06:00
|
|
|
# Infinite retention upgrade topic
|
2023-10-29 16:26:05 -05:00
|
|
|
# Make sure the retention.ms config for $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME is configured to infinite
|
|
|
|
# Please see the bug report below for details
|
|
|
|
# https://github.com/datahub-project/datahub/issues/7882
|
|
|
|
send "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" "--partitions 1 --config retention.ms=-1 --topic $DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" \
|
|
|
|
"--entity-type topics --entity-name "$DATAHUB_UPGRADE_HISTORY_TOPIC_NAME" --alter --add-config retention.ms=-1"
|
|
|
|
|
2021-05-11 15:41:42 -07:00
|
|
|
# Create topic for datahub usage event
|
|
|
|
if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
|
2023-02-09 18:54:20 -06:00
|
|
|
send "$DATAHUB_USAGE_EVENT_NAME" "--partitions $PARTITIONS --topic $DATAHUB_USAGE_EVENT_NAME"
|
2021-05-11 15:41:42 -07:00
|
|
|
fi
|
2022-07-17 18:59:09 -07:00
|
|
|
|
2023-01-09 12:27:00 +00:00
|
|
|
## close the filo
|
|
|
|
exec 3<&-
|
|
|
|
## disable the cleanup trap
|
|
|
|
trap '' 0
|
|
|
|
## It is safe to delete the files because the workers
|
|
|
|
## already opened them. Thus, only the names are going away
|
|
|
|
## the actual files will stay there until the workers
|
|
|
|
## all finish.
|
|
|
|
cleanup
|
|
|
|
## now wait for all the workers.
|
|
|
|
wait
|
|
|
|
|
|
|
|
echo "Topic Creation Complete."
|
|
|
|
|
|
|
|
############################################################
|
|
|
|
# End Topic Creation Logic
|
|
|
|
############################################################
|
|
|
|
|
2023-05-01 13:18:41 -05:00
|
|
|
## If using confluent schema registry as a standalone component, then configure compact cleanup policy.
|
|
|
|
if [[ $USE_CONFLUENT_SCHEMA_REGISTRY == "TRUE" ]]; then
|
|
|
|
kafka-configs.sh --command-config $CONNECTION_PROPERTIES_PATH --bootstrap-server $KAFKA_BOOTSTRAP_SERVER \
|
|
|
|
--entity-type topics \
|
|
|
|
--entity-name _schemas \
|
|
|
|
--alter --add-config cleanup.policy=compact
|
|
|
|
fi
|