import subprocess import time from typing import Any, Dict, List, cast from unittest import mock import pytest import requests from freezegun import freeze_time from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from tests.test_helpers import mce_helpers from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.docker_helpers import wait_for_port from tests.test_helpers.state_helpers import ( get_current_checkpoint_from_pipeline, validate_all_providers_have_committed_successfully, ) FROZEN_TIME = "2021-10-25 13:00:00" GMS_PORT = 8080 GMS_SERVER = f"http://localhost:{GMS_PORT}" KAFKA_CONNECT_SERVER = "http://localhost:28083" KAFKA_CONNECT_ENDPOINT = f"{KAFKA_CONNECT_SERVER}/connectors" def is_mysql_up(container_name: str, port: int) -> bool: """A cheap way to figure out if mysql is responsive on a container""" cmd = f"docker logs {container_name} 2>&1 | grep '/var/run/mysqld/mysqld.sock' | grep {port}" ret = subprocess.run( cmd, shell=True, ) return ret.returncode == 0 @pytest.fixture(scope="module") def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir): test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka" # Share Compose configurations between files and projects # https://docs.docker.com/compose/extends/ docker_compose_file = [ str(test_resources_dir_kafka / "docker-compose.yml"), str(test_resources_dir / "docker-compose.override.yml"), ] with docker_compose_runner( docker_compose_file, "kafka-connect", cleanup=False ) as docker_services: wait_for_port( docker_services, "test_mysql", 3306, timeout=120, checker=lambda: is_mysql_up("test_mysql", 3306), ) with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services: # We sometimes run into issues where the broker fails to come up on the first try because # of all the other processes that are running. By running docker compose twice, we can # avoid some test flakes. How does this work? The "key" is the same between both # calls to the docker_compose_runner and the first one sets cleanup=False. wait_for_port(docker_services, "test_broker", 29092, timeout=120) wait_for_port(docker_services, "test_connect", 28083, timeout=120) docker_services.wait_until_responsive( timeout=30, pause=1, check=lambda: requests.get( KAFKA_CONNECT_ENDPOINT, ).status_code == 200, ) yield docker_services @pytest.fixture(scope="module") def test_resources_dir(pytestconfig): return pytestconfig.rootpath / "tests/integration/kafka-connect" @pytest.fixture(scope="module") def loaded_kafka_connect(kafka_connect_runner): # Set up the container. time.sleep(10) # Setup mongo cluster command = ( 'docker exec test_mongo mongo admin -u admin -p admin --eval "rs.initiate();"' ) ret = subprocess.run( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) assert ret.returncode == 0 # Creating MySQL source with no transformations , only topic prefix r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source1", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "test-mysql-jdbc-", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}" } } """, ) assert r.status_code == 201 # Created # Creating MySQL source with regex router transformations , only topic prefix r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source2", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}", "transforms": "TotalReplacement", "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.TotalReplacement.regex": ".*(book)", "transforms.TotalReplacement.replacement": "my-new-topic-$1" } } """, ) assert r.status_code == 201 # Created # Creating MySQL source with regex router transformations , no topic prefix, table whitelist r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source3", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "table.whitelist": "book", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}", "transforms": "TotalReplacement", "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.TotalReplacement.regex": ".*", "transforms.TotalReplacement.replacement": "my-new-topic" } } """, ) assert r.status_code == 201 # Created # Creating MySQL source with query , topic prefix r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source4", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "query": "select * from member", "topic.prefix": "query-topic", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}" } } """, ) assert r.status_code == 201 # Created # Creating MySQL source with ExtractTopic router transformations - source dataset not added r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source5", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "table.whitelist": "book", "topic.prefix": "test-mysql-jdbc2-", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}", "transforms": "changetopic", "transforms.changetopic.type": "io.confluent.connect.transforms.ExtractTopic$Value", "transforms.changetopic.field": "name" } } """, ) assert r.status_code == 201 # Created # Creating MySQL sink connector - not added r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "insert.mode": "insert", "auto.create": true, "topics": "my-topic", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}" } } """, ) assert r.status_code == 201 # Created # Creating Debezium MySQL source connector r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "debezium-mysql-connector", "config": { "name": "debezium-mysql-connector", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "test_mysql", "database.port": "3306", "database.user": "root", "database.password": "rootpwd", "database.server.name": "debezium.topics", "database.history.kafka.bootstrap.servers": "test_broker:9092", "database.history.kafka.topic": "dbhistory.debeziummysql", "include.schema.changes": "false" } } """, ) assert r.status_code == 201 # Created # Creating Postgresql source r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "postgres_source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "table.whitelist": "member", "topic.prefix": "test-postgres-jdbc-", "tasks.max": "1", "connection.url": "${env:POSTGRES_CONNECTION_URL}" } }""", ) assert r.status_code == 201 # Created # Creating Generic source r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "generic_source", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "my-topic", "quickstart": "product", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "max.interval": 1000, "iterations": 10000000, "tasks.max": "1" } }""", ) r.raise_for_status() assert r.status_code == 201 # Created # Creating MongoDB source r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data=r"""{ "name": "source_mongodb_connector", "config": { "tasks.max": "1", "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://admin:admin@test_mongo:27017", "topic.prefix": "mongodb", "database": "test_db", "collection": "purchases", "copy.existing": true, "copy.existing.namespace.regex": "test_db.purchases", "change.stream.full.document": "updateLookup", "topic.creation.enable": "true", "topic.creation.default.replication.factor": "-1", "topic.creation.default.partitions": "-1", "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": false, "value.converter.schemas.enable": false, "output.format.key": "schema", "output.format.value": "json", "output.schema.infer.value": false, "publish.full.document.only":true } }""", ) r.raise_for_status() assert r.status_code == 201 # Created # Give time for connectors to process the table data time.sleep(60) @freeze_time(FROZEN_TIME) @pytest.mark.integration_batch_1 def test_kafka_connect_ingest( loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir ): # Run the metadata ingestion pipeline. config_file = (test_resources_dir / "kafka_connect_to_file.yml").resolve() run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) # Verify the output. mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / "kafka_connect_mces.json", golden_path=test_resources_dir / "kafka_connect_mces_golden.json", ignore_paths=[], ) @freeze_time(FROZEN_TIME) @pytest.mark.integration_batch_1 def test_kafka_connect_mongosourceconnect_ingest( loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir ): # Run the metadata ingestion pipeline. config_file = (test_resources_dir / "kafka_connect_mongo_to_file.yml").resolve() run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) # Verify the output. mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / "kafka_connect_mces.json", golden_path=test_resources_dir / "kafka_connect_mongo_mces_golden.json", ignore_paths=[], ) @freeze_time(FROZEN_TIME) @pytest.mark.integration_batch_1 def test_kafka_connect_ingest_stateful( loaded_kafka_connect, pytestconfig, tmp_path, mock_datahub_graph, test_resources_dir ): output_file_name: str = "kafka_connect_before_mces.json" golden_file_name: str = "kafka_connect_before_golden_mces.json" output_file_deleted_name: str = "kafka_connect_after_mces.json" golden_file_deleted_name: str = "kafka_connect_after_golden_mces.json" base_pipeline_config = { "run_id": "kafka-connect-stateful-test", "pipeline_name": "kafka-connect-stateful", "source": { "type": "kafka-connect", "config": { "platform_instance": "connect-instance-1", "connect_uri": KAFKA_CONNECT_SERVER, "connector_patterns": {"allow": [".*"]}, "provided_configs": [ { "provider": "env", "path_key": "MYSQL_CONNECTION_URL", "value": "jdbc:mysql://test_mysql:3306/librarydb", } ], "stateful_ingestion": { "enabled": True, "remove_stale_metadata": True, "fail_safe_threshold": 100.0, "state_provider": { "type": "datahub", "config": {"datahub_api": {"server": GMS_SERVER}}, }, }, }, }, "sink": { "type": "file", "config": {}, }, } pipeline_run1 = None with mock.patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, ) as mock_checkpoint: mock_checkpoint.return_value = mock_datahub_graph pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore base_pipeline_config # type: ignore ) # Set the special properties for this run pipeline_run1_config["source"]["config"]["connector_patterns"]["allow"] = [ "mysql_source1", "mysql_source2", ] pipeline_run1_config["sink"]["config"][ "filename" ] = f"{tmp_path}/{output_file_name}" pipeline_run1 = Pipeline.create(pipeline_run1_config) pipeline_run1.run() pipeline_run1.raise_from_status() pipeline_run1.pretty_print_summary() mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / output_file_name, golden_path=f"{test_resources_dir}/{golden_file_name}", ) checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) assert checkpoint1 assert checkpoint1.state pipeline_run2 = None with mock.patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, ) as mock_checkpoint: mock_checkpoint.return_value = mock_datahub_graph pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore # Set the special properties for this run pipeline_run1_config["source"]["config"]["connector_patterns"]["allow"] = [ "mysql_source1", ] pipeline_run2_config["sink"]["config"][ "filename" ] = f"{tmp_path}/{output_file_deleted_name}" pipeline_run2 = Pipeline.create(pipeline_run2_config) pipeline_run2.run() pipeline_run2.raise_from_status() pipeline_run2.pretty_print_summary() mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / output_file_deleted_name, golden_path=f"{test_resources_dir}/{golden_file_deleted_name}", ) checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) assert checkpoint2 assert checkpoint2.state # Validate that all providers have committed successfully. validate_all_providers_have_committed_successfully( pipeline=pipeline_run1, expected_providers=1 ) validate_all_providers_have_committed_successfully( pipeline=pipeline_run2, expected_providers=1 ) # Perform all assertions on the states. The deleted table should not be # part of the second state state1 = cast(GenericCheckpointState, checkpoint1.state) state2 = cast(GenericCheckpointState, checkpoint2.state) difference_pipeline_urns = list( state1.get_urns_not_in(type="dataFlow", other_checkpoint_state=state2) ) assert len(difference_pipeline_urns) == 1 deleted_pipeline_urns: List[str] = [ "urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD)" ] assert sorted(deleted_pipeline_urns) == sorted(difference_pipeline_urns) difference_job_urns = list( state1.get_urns_not_in(type="dataJob", other_checkpoint_state=state2) ) assert len(difference_job_urns) == 3 deleted_job_urns = [ "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.MixedCaseTable)", "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.book)", "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.member)", ] assert sorted(deleted_job_urns) == sorted(difference_job_urns)