test(kafka-connect): update Debezium connectors to fix broken integration tests (#14213)

This commit is contained in:
Sergio Gómez Villamor 2025-07-24 22:36:28 +02:00 committed by GitHub
parent b1ef86b83e
commit fa946725b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 180 additions and 90 deletions

View File

@ -47,10 +47,10 @@ services:
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.5 || { echo "Failed to install JDBC connector"; exit 1; }
confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.1 || { echo "Failed to install transforms"; exit 1; }
confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0 || { echo "Failed to install datagen"; exit 1; }
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0 || { echo "Failed to install Debezium MySQL"; exit 1; }
confluent-hub install --no-prompt debezium/debezium-connector-postgresql:1.7.0 || { echo "Failed to install Debezium PostgreSQL"; exit 1; }
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:1.7.0 || { echo "Failed to install Debezium SQL Server"; exit 1; }
confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8 || { echo "Failed to install BigQuery"; exit 1; }
confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.2 || { echo "Failed to install Debezium MySQL"; exit 1; }
confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.5.4 || { echo "Failed to install Debezium PostgreSQL"; exit 1; }
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.5.4 || { echo "Failed to install Debezium SQL Server"; exit 1; }
confluent-hub install --no-prompt wepay/kafka-connect-bigquery:2.5.6 || { echo "Failed to install BigQuery"; exit 1; }
confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1 || { echo "Failed to install MongoDB"; exit 1; }
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.1 || { echo "Failed to install S3"; exit 1; }

View File

@ -48,7 +48,7 @@
"urn:li:dataset:(urn:li:dataPlatform:postgres,postgres.public.member,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,postgres.debezium.server.public.member,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:kafka,postgres.debezium.public.member,PROD)"
]
}
},

View File

@ -48,7 +48,7 @@
"urn:li:dataset:(urn:li:dataPlatform:mssql,testdb.dbo.test_table,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,sqlserver.debezium.server.dbo.test_table,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.sqlserver.testdb.dbo.test_table,PROD)"
]
}
},

View File

@ -13,7 +13,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -30,7 +31,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -49,7 +51,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -63,13 +66,14 @@
"urn:li:dataset:(urn:li:dataPlatform:mysql,mysql1.librarydb.member,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,all.debezium.topics.librarydb.member,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.mysql.librarydb.member,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -88,7 +92,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -102,13 +107,14 @@
"urn:li:dataset:(urn:li:dataPlatform:mysql,mysql1.librarydb.book,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,all.debezium.topics.librarydb.book,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.mysql.librarydb.book,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -127,7 +133,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -141,13 +148,14 @@
"urn:li:dataset:(urn:li:dataPlatform:mysql,mysql1.librarydb.mixedcasetable,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,all.debezium.topics.librarydb.mixedcasetable,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.mysql.librarydb.mixedcasetable,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -173,7 +181,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -192,7 +201,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -212,7 +222,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -241,7 +252,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -260,7 +272,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -280,7 +293,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -308,7 +322,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -327,7 +342,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -347,7 +363,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -366,7 +383,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -386,7 +404,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -405,7 +424,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -425,7 +445,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -450,7 +471,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -469,7 +491,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -489,7 +512,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -508,7 +532,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -528,7 +553,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -547,7 +573,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -567,7 +594,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -596,7 +624,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -615,7 +644,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -633,7 +663,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -652,7 +683,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -670,7 +702,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -689,7 +722,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -707,7 +741,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -733,7 +768,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -752,7 +788,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -767,7 +804,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -785,7 +823,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -800,7 +839,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -815,7 +855,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -830,7 +871,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -845,7 +887,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -860,7 +903,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -875,7 +919,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -890,7 +935,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -905,7 +951,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -920,7 +967,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -935,7 +983,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -950,7 +999,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -965,7 +1015,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -980,7 +1031,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -995,7 +1047,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -1010,7 +1063,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -1025,7 +1079,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -1040,7 +1095,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -1055,7 +1111,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -1070,7 +1127,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -1085,7 +1143,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -1100,7 +1159,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
},
{
@ -1115,7 +1175,8 @@
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"runId": "kafka-connect-run",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -173,7 +173,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}
""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating MySQL source with regex router transformations , only topic prefix
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
@ -194,7 +196,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}
""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating MySQL source with regex router transformations , no topic prefix, table whitelist
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
@ -216,7 +220,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}
""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating MySQL source with query , topic prefix
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
@ -235,7 +241,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}
""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating MySQL source with ExtractTopic router transformations - source dataset not added
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
@ -257,7 +265,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}
""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating MySQL sink connector - not added
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
@ -275,7 +285,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}
""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating Debezium MySQL source connector
r = requests.post(
@ -290,16 +302,20 @@ def loaded_kafka_connect(kafka_connect_runner):
"database.port": "3306",
"database.user": "root",
"database.password": "rootpwd",
"database.server.id": "184054",
"topic.prefix": "debezium.mysql",
"database.server.name": "all.debezium.topics",
"database.history.kafka.bootstrap.servers": "test_broker:9092",
"database.history.kafka.topic": "dbhistory.debeziummysql",
"schema.history.internal.kafka.bootstrap.servers": "test_broker:9092",
"schema.history.internal.kafka.topic": "dbhistory.debeziummysql",
"database.allowPublicKeyRetrieval": "true",
"include.schema.changes": "false"
}
}
""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating Postgresql source (JDBC connector)
r = requests.post(
@ -318,7 +334,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}
}""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating Debezium PostgreSQL source connector
r = requests.post(
@ -334,6 +352,7 @@ def loaded_kafka_connect(kafka_connect_runner):
"database.user": "postgres",
"database.password": "datahub",
"database.dbname": "postgres",
"topic.prefix": "postgres.debezium",
"database.server.name": "postgres.debezium.server",
"table.include.list": "public.member",
"plugin.name": "pgoutput",
@ -341,7 +360,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}
}""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating Debezium SQL Server source connector
r = requests.post(
@ -358,16 +379,18 @@ def loaded_kafka_connect(kafka_connect_runner):
"database.password": "Password123!",
"database.names": "TestDB",
"database.server.name": "sqlserver.debezium.server",
"topic.prefix": "debezium.sqlserver",
"table.include.list": "dbo.test_table",
"database.history.kafka.bootstrap.servers": "test_broker:9092",
"database.history.kafka.topic": "dbhistory.sqlserver"
"schema.history.internal.kafka.bootstrap.servers": "test_broker:9092",
"schema.history.internal.kafka.topic": "dbhistory.sqlserver",
"database.encrypt": "false",
"database.trustServerCertificate": "true"
}
}""",
)
if r.status_code != 201:
print(f"SQL Server connector creation failed: {r.status_code}")
print(f"Response: {r.text}")
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating Generic source
r = requests.post(
@ -389,7 +412,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}""",
)
r.raise_for_status()
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
print("Populating MongoDB with test data...")
# we populate the database before creating the connector
@ -416,7 +441,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}""",
)
r.raise_for_status()
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Creating S3 Sink source
r = requests.post(
@ -474,7 +501,9 @@ def loaded_kafka_connect(kafka_connect_runner):
}
""",
)
assert r.status_code == 201 # Created
assert r.status_code == 201, (
f"Failed with code {r.status_code} and response {r.text}"
)
# Connectors should be ready to process data thanks to Docker health checks
print("Waiting for Kafka Connect connectors to initialize and process data...")