test(ingest/kafka): Configure kafka connect tests for arm64 (#8362)

This commit is contained in:
Andrew Sikowitz 2023-07-10 12:27:29 -04:00 committed by GitHub
parent 12543bca33
commit d92cbe99e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 128 additions and 164 deletions

View File

@ -1,9 +1,8 @@
---
version: '3.8'
services:
connect:
image: confluentinc/cp-kafka-connect:6.2.1
image: confluentinc/cp-kafka-connect:7.4.0
env_file: ./../kafka-connect/setup/connect.env
container_name: test_connect
hostname: test_connect
@ -35,7 +34,7 @@ services:
#
#confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8
#
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1
#
curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \
| tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \
@ -50,7 +49,7 @@ services:
#
sleep infinity
mysqldb:
image: mysql:5.7
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: rootpwd
MYSQL_USER: foo
@ -76,19 +75,14 @@ services:
mongo:
hostname: mongo
image: mongo:4.2.9
image: mongo:6.0.7
container_name: "test_mongo"
ports:
- "27017:27017"
command: --replSet rs0
environment:
- MONGO_INITDB_ROOT_USERNAME=admin
- MONGO_INITDB_ROOT_PASSWORD=admin
# Don't set mongo admin user or password to avoid KeyFile authentication
# which is required when running MongoDB as a replica set
- MONGO_INITDB_DATABASE=test_db
- MONGO_INITDB_USERNAME=kafka-connector
- MONGO_INITDB_PASSWORD=password
volumes:
- ./../kafka-connect/setup/conf/mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro
volumes:
test_zkdata:
- ./../kafka-connect/setup/conf/:/scripts/

View File

@ -0,0 +1,6 @@
rsconf = {
_id: "rs0",
members: [{ _id: 0, host: "test_mongo:27017", priority: 1.0 }],
};
rs.initiate(rsconf);
rs.status();

View File

@ -1,22 +0,0 @@
#!/bin/bash
mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS
conn = new Mongo();
db = conn.getDB("test_db");
db.purchases.insertOne({ _id: 3, item: "lamp post", price: 12 });
db.purchases.insertOne({ _id: 4, item: "lamp post", price: 13 });
EOJS
{
sleep 3 &&
mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS
var rootUser = '$MONGO_INITDB_ROOT_USERNAME';
var rootPassword = '$MONGO_INITDB_ROOT_PASSWORD';
var admin = db.getSiblingDB('admin');
admin.auth(rootUser, rootPassword);
EOJS
} &

View File

@ -0,0 +1,3 @@
db.createCollection("purchases");
db.purchases.insertOne({ _id: 3, item: "lamp post", price: 12 });
db.purchases.insertOne({ _id: 4, item: "lamp post", price: 13 });

View File

@ -1,5 +1,6 @@
CONNECT_BOOTSTRAP_SERVERS=test_broker:9092
CONNECT_REST_PORT=28083
CONNECT_LISTENERS=http://:28083
CONNECT_GROUP_ID=kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC=_connect-configs
CONNECT_OFFSET_STORAGE_TOPIC=_connect-offsets

View File

@ -1,5 +1,4 @@
import subprocess
import time
from typing import Any, Dict, List, cast
from unittest import mock
@ -82,13 +81,8 @@ def test_resources_dir(pytestconfig):
@pytest.fixture(scope="module")
def loaded_kafka_connect(kafka_connect_runner):
# Set up the container.
time.sleep(10)
# Setup mongo cluster
command = (
'docker exec test_mongo mongo admin -u admin -p admin --eval "rs.initiate();"'
)
# # Setup mongo cluster
command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-init.js"
ret = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
@ -99,17 +93,17 @@ def loaded_kafka_connect(kafka_connect_runner):
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "test-mysql-jdbc-",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
"name": "mysql_source1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "test-mysql-jdbc-",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with regex router transformations , only topic prefix
@ -117,20 +111,20 @@ def loaded_kafka_connect(kafka_connect_runner):
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*(book)",
"transforms.TotalReplacement.replacement": "my-new-topic-$1"
}
}
""",
"name": "mysql_source2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*(book)",
"transforms.TotalReplacement.replacement": "my-new-topic-$1"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with regex router transformations , no topic prefix, table whitelist
@ -138,21 +132,21 @@ def loaded_kafka_connect(kafka_connect_runner):
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source3",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "book",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*",
"transforms.TotalReplacement.replacement": "my-new-topic"
}
}
""",
"name": "mysql_source3",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "book",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*",
"transforms.TotalReplacement.replacement": "my-new-topic"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with query , topic prefix
@ -201,17 +195,17 @@ def loaded_kafka_connect(kafka_connect_runner):
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"insert.mode": "insert",
"auto.create": true,
"topics": "my-topic",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
"name": "mysql_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"insert.mode": "insert",
"auto.create": true,
"topics": "my-topic",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
)
assert r.status_code == 201 # Created
@ -220,21 +214,22 @@ def loaded_kafka_connect(kafka_connect_runner):
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "debezium-mysql-connector",
"config": {
"name": "debezium-mysql-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "test_mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "rootpwd",
"database.server.name": "debezium.topics",
"database.history.kafka.bootstrap.servers": "test_broker:9092",
"database.history.kafka.topic": "dbhistory.debeziummysql",
"include.schema.changes": "false"
}
}
""",
"name": "debezium-mysql-connector",
"config": {
"name": "debezium-mysql-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "test_mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "rootpwd",
"database.server.name": "debezium.topics",
"database.history.kafka.bootstrap.servers": "test_broker:9092",
"database.history.kafka.topic": "dbhistory.debeziummysql",
"database.allowPublicKeyRetrieval": "true",
"include.schema.changes": "false"
}
}
""",
)
assert r.status_code == 201 # Created
@ -243,17 +238,17 @@ def loaded_kafka_connect(kafka_connect_runner):
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "postgres_source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "member",
"topic.prefix": "test-postgres-jdbc-",
"tasks.max": "1",
"connection.url": "${env:POSTGRES_CONNECTION_URL}"
}
}""",
"name": "postgres_source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "member",
"topic.prefix": "test-postgres-jdbc-",
"tasks.max": "1",
"connection.url": "${env:POSTGRES_CONNECTION_URL}"
}
}""",
)
assert r.status_code == 201 # Created
@ -262,19 +257,19 @@ def loaded_kafka_connect(kafka_connect_runner):
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "generic_source",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "my-topic",
"quickstart": "product",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}""",
"name": "generic_source",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "my-topic",
"quickstart": "product",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}""",
)
r.raise_for_status()
assert r.status_code == 201 # Created
@ -284,37 +279,24 @@ def loaded_kafka_connect(kafka_connect_runner):
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data=r"""{
"name": "source_mongodb_connector",
"config": {
"tasks.max": "1",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://admin:admin@test_mongo:27017",
"topic.prefix": "mongodb",
"database": "test_db",
"collection": "purchases",
"copy.existing": true,
"copy.existing.namespace.regex": "test_db.purchases",
"change.stream.full.document": "updateLookup",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "-1",
"topic.creation.default.partitions": "-1",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"output.format.key": "schema",
"output.format.value": "json",
"output.schema.infer.value": false,
"publish.full.document.only":true
}
}""",
"name": "source_mongodb_connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://test_mongo:27017",
"topic.prefix": "mongodb",
"database": "test_db",
"collection": "purchases"
}
}""",
)
r.raise_for_status()
assert r.status_code == 201 # Created
# Give time for connectors to process the table data
time.sleep(60)
command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-populate.js"
ret = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
assert ret.returncode == 0
@freeze_time(FROZEN_TIME)

View File

@ -2,7 +2,7 @@
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.0
image: confluentinc/cp-zookeeper:7.2.2
env_file: zookeeper.env
hostname: test_zookeeper
container_name: test_zookeeper
@ -12,7 +12,7 @@ services:
- test_zkdata:/var/opt/zookeeper
broker:
image: confluentinc/cp-kafka:7.2.0
image: confluentinc/cp-kafka:7.2.2
env_file: broker.env
hostname: test_broker
container_name: test_broker
@ -22,7 +22,7 @@ services:
- "29092:29092"
schema-registry:
image: confluentinc/cp-schema-registry:7.2.0
image: confluentinc/cp-schema-registry:7.2.2
restart: unless-stopped
env_file: schema-registry.env
container_name: test_schema_registry