From fa946725b234a42e9886f265e925ad68d9659da6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Thu, 24 Jul 2025 22:36:28 +0200 Subject: [PATCH] test(kafka-connect): update Debezium connectors to fix broken integration tests (#14213) --- .../kafka-connect/docker-compose.override.yml | 8 +- ...connect_debezium_postgres_mces_golden.json | 2 +- ...onnect_debezium_sqlserver_mces_golden.json | 2 +- .../kafka_connect_mces_golden.json | 189 ++++++++++++------ .../kafka-connect/test_kafka_connect.py | 69 +++++-- 5 files changed, 180 insertions(+), 90 deletions(-) diff --git a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml index 9f0ce5f636..4b4b60bf4a 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml @@ -47,10 +47,10 @@ services: confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.2.5 || { echo "Failed to install JDBC connector"; exit 1; } confluent-hub install --no-prompt confluentinc/connect-transforms:1.4.1 || { echo "Failed to install transforms"; exit 1; } confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.6.0 || { echo "Failed to install datagen"; exit 1; } - confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0 || { echo "Failed to install Debezium MySQL"; exit 1; } - confluent-hub install --no-prompt debezium/debezium-connector-postgresql:1.7.0 || { echo "Failed to install Debezium PostgreSQL"; exit 1; } - confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:1.7.0 || { echo "Failed to install Debezium SQL Server"; exit 1; } - confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8 || { echo "Failed to install BigQuery"; exit 1; } + confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.2 || { echo "Failed to install Debezium MySQL"; exit 1; } + confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.5.4 || { echo "Failed to install Debezium PostgreSQL"; exit 1; } + confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.5.4 || { echo "Failed to install Debezium SQL Server"; exit 1; } + confluent-hub install --no-prompt wepay/kafka-connect-bigquery:2.5.6 || { echo "Failed to install BigQuery"; exit 1; } confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1 || { echo "Failed to install MongoDB"; exit 1; } confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.5.1 || { echo "Failed to install S3"; exit 1; } diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_debezium_postgres_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_debezium_postgres_mces_golden.json index 16c0736eb2..eacf27d024 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_debezium_postgres_mces_golden.json +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_debezium_postgres_mces_golden.json @@ -48,7 +48,7 @@ "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres.public.member,PROD)" ], "outputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:kafka,postgres.debezium.server.public.member,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:kafka,postgres.debezium.public.member,PROD)" ] } }, diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_debezium_sqlserver_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_debezium_sqlserver_mces_golden.json index d2e9be8953..35cc2ffcb5 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_debezium_sqlserver_mces_golden.json +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_debezium_sqlserver_mces_golden.json @@ -48,7 +48,7 @@ "urn:li:dataset:(urn:li:dataPlatform:mssql,testdb.dbo.test_table,PROD)" ], "outputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:kafka,sqlserver.debezium.server.dbo.test_table,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.sqlserver.testdb.dbo.test_table,PROD)" ] } }, diff --git a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json index 12528d07d7..27133c52a3 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json +++ b/metadata-ingestion/tests/integration/kafka-connect/kafka_connect_mces_golden.json @@ -13,7 +13,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -30,7 +31,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -49,7 +51,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -63,13 +66,14 @@ "urn:li:dataset:(urn:li:dataPlatform:mysql,mysql1.librarydb.member,PROD)" ], "outputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:kafka,all.debezium.topics.librarydb.member,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.mysql.librarydb.member,PROD)" ] } }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -88,7 +92,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -102,13 +107,14 @@ "urn:li:dataset:(urn:li:dataPlatform:mysql,mysql1.librarydb.book,PROD)" ], "outputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:kafka,all.debezium.topics.librarydb.book,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.mysql.librarydb.book,PROD)" ] } }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -127,7 +133,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -141,13 +148,14 @@ "urn:li:dataset:(urn:li:dataPlatform:mysql,mysql1.librarydb.mixedcasetable,PROD)" ], "outputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:kafka,all.debezium.topics.librarydb.mixedcasetable,PROD)" + "urn:li:dataset:(urn:li:dataPlatform:kafka,debezium.mysql.librarydb.mixedcasetable,PROD)" ] } }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -173,7 +181,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -192,7 +201,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -212,7 +222,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -241,7 +252,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -260,7 +272,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -280,7 +293,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -308,7 +322,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -327,7 +342,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -347,7 +363,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -366,7 +383,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -386,7 +404,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -405,7 +424,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -425,7 +445,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -450,7 +471,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -469,7 +491,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -489,7 +512,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -508,7 +532,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -528,7 +553,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -547,7 +573,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -567,7 +594,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -596,7 +624,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -615,7 +644,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -633,7 +663,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -652,7 +683,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -670,7 +702,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -689,7 +722,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -707,7 +741,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -733,7 +768,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -752,7 +788,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -767,7 +804,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -785,7 +823,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -800,7 +839,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -815,7 +855,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -830,7 +871,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -845,7 +887,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -860,7 +903,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -875,7 +919,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -890,7 +935,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -905,7 +951,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -920,7 +967,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -935,7 +983,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -950,7 +999,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -965,7 +1015,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -980,7 +1031,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -995,7 +1047,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -1010,7 +1063,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -1025,7 +1079,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -1040,7 +1095,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -1055,7 +1111,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -1070,7 +1127,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -1085,7 +1143,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -1100,7 +1159,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } }, { @@ -1115,7 +1175,8 @@ }, "systemMetadata": { "lastObserved": 1635166800000, - "runId": "kafka-connect-run" + "runId": "kafka-connect-run", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 3b44c0fb83..4a7470d0f9 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -173,7 +173,9 @@ def loaded_kafka_connect(kafka_connect_runner): } """, ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating MySQL source with regex router transformations , only topic prefix r = requests.post( KAFKA_CONNECT_ENDPOINT, @@ -194,7 +196,9 @@ def loaded_kafka_connect(kafka_connect_runner): } """, ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating MySQL source with regex router transformations , no topic prefix, table whitelist r = requests.post( KAFKA_CONNECT_ENDPOINT, @@ -216,7 +220,9 @@ def loaded_kafka_connect(kafka_connect_runner): } """, ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating MySQL source with query , topic prefix r = requests.post( KAFKA_CONNECT_ENDPOINT, @@ -235,7 +241,9 @@ def loaded_kafka_connect(kafka_connect_runner): } """, ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating MySQL source with ExtractTopic router transformations - source dataset not added r = requests.post( KAFKA_CONNECT_ENDPOINT, @@ -257,7 +265,9 @@ def loaded_kafka_connect(kafka_connect_runner): } """, ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating MySQL sink connector - not added r = requests.post( KAFKA_CONNECT_ENDPOINT, @@ -275,7 +285,9 @@ def loaded_kafka_connect(kafka_connect_runner): } """, ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating Debezium MySQL source connector r = requests.post( @@ -290,16 +302,20 @@ def loaded_kafka_connect(kafka_connect_runner): "database.port": "3306", "database.user": "root", "database.password": "rootpwd", + "database.server.id": "184054", + "topic.prefix": "debezium.mysql", "database.server.name": "all.debezium.topics", - "database.history.kafka.bootstrap.servers": "test_broker:9092", - "database.history.kafka.topic": "dbhistory.debeziummysql", + "schema.history.internal.kafka.bootstrap.servers": "test_broker:9092", + "schema.history.internal.kafka.topic": "dbhistory.debeziummysql", "database.allowPublicKeyRetrieval": "true", "include.schema.changes": "false" } } """, ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating Postgresql source (JDBC connector) r = requests.post( @@ -318,7 +334,9 @@ def loaded_kafka_connect(kafka_connect_runner): } }""", ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating Debezium PostgreSQL source connector r = requests.post( @@ -334,6 +352,7 @@ def loaded_kafka_connect(kafka_connect_runner): "database.user": "postgres", "database.password": "datahub", "database.dbname": "postgres", + "topic.prefix": "postgres.debezium", "database.server.name": "postgres.debezium.server", "table.include.list": "public.member", "plugin.name": "pgoutput", @@ -341,7 +360,9 @@ def loaded_kafka_connect(kafka_connect_runner): } }""", ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating Debezium SQL Server source connector r = requests.post( @@ -358,16 +379,18 @@ def loaded_kafka_connect(kafka_connect_runner): "database.password": "Password123!", "database.names": "TestDB", "database.server.name": "sqlserver.debezium.server", + "topic.prefix": "debezium.sqlserver", "table.include.list": "dbo.test_table", - "database.history.kafka.bootstrap.servers": "test_broker:9092", - "database.history.kafka.topic": "dbhistory.sqlserver" + "schema.history.internal.kafka.bootstrap.servers": "test_broker:9092", + "schema.history.internal.kafka.topic": "dbhistory.sqlserver", + "database.encrypt": "false", + "database.trustServerCertificate": "true" } }""", ) - if r.status_code != 201: - print(f"SQL Server connector creation failed: {r.status_code}") - print(f"Response: {r.text}") - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating Generic source r = requests.post( @@ -389,7 +412,9 @@ def loaded_kafka_connect(kafka_connect_runner): }""", ) r.raise_for_status() - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) print("Populating MongoDB with test data...") # we populate the database before creating the connector @@ -416,7 +441,9 @@ def loaded_kafka_connect(kafka_connect_runner): }""", ) r.raise_for_status() - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Creating S3 Sink source r = requests.post( @@ -474,7 +501,9 @@ def loaded_kafka_connect(kafka_connect_runner): } """, ) - assert r.status_code == 201 # Created + assert r.status_code == 201, ( + f"Failed with code {r.status_code} and response {r.text}" + ) # Connectors should be ready to process data thanks to Docker health checks print("Waiting for Kafka Connect connectors to initialize and process data...")