diff --git a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml index 3fd1af4f1d..dbb4f84c0a 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml +++ b/metadata-ingestion/tests/integration/kafka-connect/docker-compose.override.yml @@ -1,9 +1,8 @@ --- version: '3.8' services: - connect: - image: confluentinc/cp-kafka-connect:6.2.1 + image: confluentinc/cp-kafka-connect:7.4.0 env_file: ./../kafka-connect/setup/connect.env container_name: test_connect hostname: test_connect @@ -35,7 +34,7 @@ services: # #confluent-hub install --no-prompt wepay/kafka-connect-bigquery:1.6.8 # - confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0 + confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.10.1 # curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz" \ | tar -xzf - -C /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib \ @@ -50,7 +49,7 @@ services: # sleep infinity mysqldb: - image: mysql:5.7 + image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: rootpwd MYSQL_USER: foo @@ -76,19 +75,14 @@ services: mongo: hostname: mongo - image: mongo:4.2.9 + image: mongo:6.0.7 container_name: "test_mongo" ports: - "27017:27017" command: --replSet rs0 environment: - - MONGO_INITDB_ROOT_USERNAME=admin - - MONGO_INITDB_ROOT_PASSWORD=admin + # Don't set mongo admin user or password to avoid KeyFile authentication + # which is required when running MongoDB as a replica set - MONGO_INITDB_DATABASE=test_db - - MONGO_INITDB_USERNAME=kafka-connector - - MONGO_INITDB_PASSWORD=password volumes: - - ./../kafka-connect/setup/conf/mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh:ro - -volumes: - test_zkdata: \ No newline at end of file + - ./../kafka-connect/setup/conf/:/scripts/ diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.js b/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.js new file mode 100644 index 0000000000..6c563f30fb --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.js @@ -0,0 +1,6 @@ +rsconf = { + _id: "rs0", + members: [{ _id: 0, host: "test_mongo:27017", priority: 1.0 }], +}; +rs.initiate(rsconf); +rs.status(); diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.sh b/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.sh deleted file mode 100644 index acd8424e5e..0000000000 --- a/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-init.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash - -mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS - conn = new Mongo(); - db = conn.getDB("test_db"); - db.purchases.insertOne({ _id: 3, item: "lamp post", price: 12 }); - db.purchases.insertOne({ _id: 4, item: "lamp post", price: 13 }); -EOJS - - -{ -sleep 3 && -mongo -- "$MONGO_INITDB_DATABASE" <<-EOJS - var rootUser = '$MONGO_INITDB_ROOT_USERNAME'; - var rootPassword = '$MONGO_INITDB_ROOT_PASSWORD'; - var admin = db.getSiblingDB('admin'); - admin.auth(rootUser, rootPassword); -EOJS -} & - - - diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-populate.js b/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-populate.js new file mode 100644 index 0000000000..31dc59aec4 --- /dev/null +++ b/metadata-ingestion/tests/integration/kafka-connect/setup/conf/mongo-populate.js @@ -0,0 +1,3 @@ +db.createCollection("purchases"); +db.purchases.insertOne({ _id: 3, item: "lamp post", price: 12 }); +db.purchases.insertOne({ _id: 4, item: "lamp post", price: 13 }); diff --git a/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env b/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env index ade5104605..7b25ae37b1 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env +++ b/metadata-ingestion/tests/integration/kafka-connect/setup/connect.env @@ -1,5 +1,6 @@ CONNECT_BOOTSTRAP_SERVERS=test_broker:9092 CONNECT_REST_PORT=28083 +CONNECT_LISTENERS=http://:28083 CONNECT_GROUP_ID=kafka-connect CONNECT_CONFIG_STORAGE_TOPIC=_connect-configs CONNECT_OFFSET_STORAGE_TOPIC=_connect-offsets @@ -22,4 +23,4 @@ CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/ CONNECT_CONFIG_PROVIDERS=env CONNECT_CONFIG_PROVIDERS_ENV_CLASS=io.strimzi.kafka.EnvVarConfigProvider MYSQL_CONNECTION_URL=jdbc:mysql://foo:datahub@test_mysql:3306/librarydb -POSTGRES_CONNECTION_URL=jdbc:postgresql://test_postgres:5432/postgres?user=postgres&password=datahub \ No newline at end of file +POSTGRES_CONNECTION_URL=jdbc:postgresql://test_postgres:5432/postgres?user=postgres&password=datahub diff --git a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py index 8efea3ced6..aaa39cd5f5 100644 --- a/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py +++ b/metadata-ingestion/tests/integration/kafka-connect/test_kafka_connect.py @@ -1,5 +1,4 @@ import subprocess -import time from typing import Any, Dict, List, cast from unittest import mock @@ -82,13 +81,8 @@ def test_resources_dir(pytestconfig): @pytest.fixture(scope="module") def loaded_kafka_connect(kafka_connect_runner): - # Set up the container. - time.sleep(10) - - # Setup mongo cluster - command = ( - 'docker exec test_mongo mongo admin -u admin -p admin --eval "rs.initiate();"' - ) + # # Setup mongo cluster + command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-init.js" ret = subprocess.run( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) @@ -99,17 +93,17 @@ def loaded_kafka_connect(kafka_connect_runner): KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ - "name": "mysql_source1", - "config": { - "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", - "mode": "incrementing", - "incrementing.column.name": "id", - "topic.prefix": "test-mysql-jdbc-", - "tasks.max": "1", - "connection.url": "${env:MYSQL_CONNECTION_URL}" - } - } - """, + "name": "mysql_source1", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "topic.prefix": "test-mysql-jdbc-", + "tasks.max": "1", + "connection.url": "${env:MYSQL_CONNECTION_URL}" + } + } + """, ) assert r.status_code == 201 # Created # Creating MySQL source with regex router transformations , only topic prefix @@ -117,20 +111,20 @@ def loaded_kafka_connect(kafka_connect_runner): KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ - "name": "mysql_source2", - "config": { - "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", - "mode": "incrementing", - "incrementing.column.name": "id", - "tasks.max": "1", - "connection.url": "${env:MYSQL_CONNECTION_URL}", - "transforms": "TotalReplacement", - "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.TotalReplacement.regex": ".*(book)", - "transforms.TotalReplacement.replacement": "my-new-topic-$1" - } - } - """, + "name": "mysql_source2", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "tasks.max": "1", + "connection.url": "${env:MYSQL_CONNECTION_URL}", + "transforms": "TotalReplacement", + "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TotalReplacement.regex": ".*(book)", + "transforms.TotalReplacement.replacement": "my-new-topic-$1" + } + } + """, ) assert r.status_code == 201 # Created # Creating MySQL source with regex router transformations , no topic prefix, table whitelist @@ -138,21 +132,21 @@ def loaded_kafka_connect(kafka_connect_runner): KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ - "name": "mysql_source3", - "config": { - "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", - "mode": "incrementing", - "incrementing.column.name": "id", - "table.whitelist": "book", - "tasks.max": "1", - "connection.url": "${env:MYSQL_CONNECTION_URL}", - "transforms": "TotalReplacement", - "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", - "transforms.TotalReplacement.regex": ".*", - "transforms.TotalReplacement.replacement": "my-new-topic" - } - } - """, + "name": "mysql_source3", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "table.whitelist": "book", + "tasks.max": "1", + "connection.url": "${env:MYSQL_CONNECTION_URL}", + "transforms": "TotalReplacement", + "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.TotalReplacement.regex": ".*", + "transforms.TotalReplacement.replacement": "my-new-topic" + } + } + """, ) assert r.status_code == 201 # Created # Creating MySQL source with query , topic prefix @@ -201,17 +195,17 @@ def loaded_kafka_connect(kafka_connect_runner): KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ - "name": "mysql_sink", - "config": { - "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", - "insert.mode": "insert", - "auto.create": true, - "topics": "my-topic", - "tasks.max": "1", - "connection.url": "${env:MYSQL_CONNECTION_URL}" - } - } - """, + "name": "mysql_sink", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", + "insert.mode": "insert", + "auto.create": true, + "topics": "my-topic", + "tasks.max": "1", + "connection.url": "${env:MYSQL_CONNECTION_URL}" + } + } + """, ) assert r.status_code == 201 # Created @@ -220,21 +214,22 @@ def loaded_kafka_connect(kafka_connect_runner): KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ - "name": "debezium-mysql-connector", - "config": { - "name": "debezium-mysql-connector", - "connector.class": "io.debezium.connector.mysql.MySqlConnector", - "database.hostname": "test_mysql", - "database.port": "3306", - "database.user": "root", - "database.password": "rootpwd", - "database.server.name": "debezium.topics", - "database.history.kafka.bootstrap.servers": "test_broker:9092", - "database.history.kafka.topic": "dbhistory.debeziummysql", - "include.schema.changes": "false" - } - } - """, + "name": "debezium-mysql-connector", + "config": { + "name": "debezium-mysql-connector", + "connector.class": "io.debezium.connector.mysql.MySqlConnector", + "database.hostname": "test_mysql", + "database.port": "3306", + "database.user": "root", + "database.password": "rootpwd", + "database.server.name": "debezium.topics", + "database.history.kafka.bootstrap.servers": "test_broker:9092", + "database.history.kafka.topic": "dbhistory.debeziummysql", + "database.allowPublicKeyRetrieval": "true", + "include.schema.changes": "false" + } + } + """, ) assert r.status_code == 201 # Created @@ -243,17 +238,17 @@ def loaded_kafka_connect(kafka_connect_runner): KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ - "name": "postgres_source", - "config": { - "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", - "mode": "incrementing", - "incrementing.column.name": "id", - "table.whitelist": "member", - "topic.prefix": "test-postgres-jdbc-", - "tasks.max": "1", - "connection.url": "${env:POSTGRES_CONNECTION_URL}" - } - }""", + "name": "postgres_source", + "config": { + "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", + "mode": "incrementing", + "incrementing.column.name": "id", + "table.whitelist": "member", + "topic.prefix": "test-postgres-jdbc-", + "tasks.max": "1", + "connection.url": "${env:POSTGRES_CONNECTION_URL}" + } + }""", ) assert r.status_code == 201 # Created @@ -262,19 +257,19 @@ def loaded_kafka_connect(kafka_connect_runner): KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ - "name": "generic_source", - "config": { - "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", - "kafka.topic": "my-topic", - "quickstart": "product", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "max.interval": 1000, - "iterations": 10000000, - "tasks.max": "1" - } - }""", + "name": "generic_source", + "config": { + "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", + "kafka.topic": "my-topic", + "quickstart": "product", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "max.interval": 1000, + "iterations": 10000000, + "tasks.max": "1" + } + }""", ) r.raise_for_status() assert r.status_code == 201 # Created @@ -284,37 +279,24 @@ def loaded_kafka_connect(kafka_connect_runner): KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data=r"""{ - "name": "source_mongodb_connector", - "config": { - "tasks.max": "1", - "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", - "connection.uri": "mongodb://admin:admin@test_mongo:27017", - "topic.prefix": "mongodb", - "database": "test_db", - "collection": "purchases", - "copy.existing": true, - "copy.existing.namespace.regex": "test_db.purchases", - "change.stream.full.document": "updateLookup", - "topic.creation.enable": "true", - "topic.creation.default.replication.factor": "-1", - "topic.creation.default.partitions": "-1", - "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.storage.StringConverter", - "key.converter.schemas.enable": false, - "value.converter.schemas.enable": false, - "output.format.key": "schema", - "output.format.value": "json", - "output.schema.infer.value": false, - "publish.full.document.only":true - } - }""", + "name": "source_mongodb_connector", + "config": { + "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", + "connection.uri": "mongodb://test_mongo:27017", + "topic.prefix": "mongodb", + "database": "test_db", + "collection": "purchases" + } + }""", ) r.raise_for_status() assert r.status_code == 201 # Created - # Give time for connectors to process the table data - time.sleep(60) + command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-populate.js" + ret = subprocess.run( + command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + assert ret.returncode == 0 @freeze_time(FROZEN_TIME) diff --git a/metadata-ingestion/tests/integration/kafka/docker-compose.yml b/metadata-ingestion/tests/integration/kafka/docker-compose.yml index 5983b9ae70..43f30cbe1e 100644 --- a/metadata-ingestion/tests/integration/kafka/docker-compose.yml +++ b/metadata-ingestion/tests/integration/kafka/docker-compose.yml @@ -2,7 +2,7 @@ version: '3.8' services: zookeeper: - image: confluentinc/cp-zookeeper:7.2.0 + image: confluentinc/cp-zookeeper:7.2.2 env_file: zookeeper.env hostname: test_zookeeper container_name: test_zookeeper @@ -12,7 +12,7 @@ services: - test_zkdata:/var/opt/zookeeper broker: - image: confluentinc/cp-kafka:7.2.0 + image: confluentinc/cp-kafka:7.2.2 env_file: broker.env hostname: test_broker container_name: test_broker @@ -22,7 +22,7 @@ services: - "29092:29092" schema-registry: - image: confluentinc/cp-schema-registry:7.2.0 + image: confluentinc/cp-schema-registry:7.2.2 restart: unless-stopped env_file: schema-registry.env container_name: test_schema_registry