503 lines
20 KiB
Python
Raw Normal View History

import subprocess
import time
from typing import Any, Dict, List, Optional, cast
from unittest import mock
import pytest
import requests
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from tests.test_helpers import mce_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port
from tests.test_helpers.state_helpers import (
validate_all_providers_have_committed_successfully,
)
FROZEN_TIME = "2021-10-25 13:00:00"
GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"
KAFKA_CONNECT_SERVER = "http://localhost:28083"
KAFKA_CONNECT_ENDPOINT = f"{KAFKA_CONNECT_SERVER}/connectors"
def is_mysql_up(container_name: str, port: int) -> bool:
"""A cheap way to figure out if mysql is responsive on a container"""
cmd = f"docker logs {container_name} 2>&1 | grep '/var/run/mysqld/mysqld.sock' | grep {port}"
ret = subprocess.run(
cmd,
shell=True,
)
return ret.returncode == 0
@pytest.fixture(scope="module")
def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir):
test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka"
# Share Compose configurations between files and projects
# https://docs.docker.com/compose/extends/
docker_compose_file = [
str(test_resources_dir_kafka / "docker-compose.yml"),
str(test_resources_dir / "docker-compose.override.yml"),
]
with docker_compose_runner(
docker_compose_file, "kafka-connect", cleanup=False
) as docker_services:
wait_for_port(
docker_services,
"test_mysql",
3306,
timeout=120,
checker=lambda: is_mysql_up("test_mysql", 3306),
)
with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services:
# We sometimes run into issues where the broker fails to come up on the first try because
# of all the other processes that are running. By running docker compose twice, we can
# avoid some test flakes. How does this work? The "key" is the same between both
# calls to the docker_compose_runner and the first one sets cleanup=False.
wait_for_port(docker_services, "test_broker", 29092, timeout=120)
wait_for_port(docker_services, "test_connect", 28083, timeout=120)
docker_services.wait_until_responsive(
timeout=30,
pause=1,
check=lambda: requests.get(
KAFKA_CONNECT_ENDPOINT,
).status_code
== 200,
)
yield docker_services
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/kafka-connect"
@pytest.fixture(scope="module")
def loaded_kafka_connect(kafka_connect_runner):
# Set up the container.
time.sleep(10)
# Setup mongo cluster
command = (
'docker exec test_mongo mongo admin -u admin -p admin --eval "rs.initiate();"'
)
ret = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
assert ret.returncode == 0
# Creating MySQL source with no transformations , only topic prefix
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "test-mysql-jdbc-",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with regex router transformations , only topic prefix
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*(book)",
"transforms.TotalReplacement.replacement": "my-new-topic-$1"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with regex router transformations , no topic prefix, table whitelist
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source3",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "book",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "TotalReplacement",
"transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.TotalReplacement.regex": ".*",
"transforms.TotalReplacement.replacement": "my-new-topic"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with query , topic prefix
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source4",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"query": "select * from member",
"topic.prefix": "query-topic",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL source with ExtractTopic router transformations - source dataset not added
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_source5",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "book",
"topic.prefix": "test-mysql-jdbc2-",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}",
"transforms": "changetopic",
"transforms.changetopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.changetopic.field": "name"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating MySQL sink connector - not added
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "mysql_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"insert.mode": "insert",
"auto.create": true,
"topics": "my-topic",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating Debezium MySQL source connector
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "debezium-mysql-connector",
"config": {
"name": "debezium-mysql-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "test_mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "rootpwd",
"database.server.name": "debezium.topics",
"database.history.kafka.bootstrap.servers": "test_broker:9092",
"database.history.kafka.topic": "dbhistory.debeziummysql",
"include.schema.changes": "false"
}
}
""",
)
assert r.status_code == 201 # Created
# Creating Postgresql source
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "postgres_source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "member",
"topic.prefix": "test-postgres-jdbc-",
"tasks.max": "1",
"connection.url": "${env:POSTGRES_CONNECTION_URL}"
}
}""",
)
assert r.status_code == 201 # Created
# Creating Generic source
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data="""{
"name": "generic_source",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "my-topic",
"quickstart": "product",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"max.interval": 1000,
"iterations": 10000000,
"tasks.max": "1"
}
}""",
)
r.raise_for_status()
assert r.status_code == 201 # Created
# Creating MongoDB source
r = requests.post(
KAFKA_CONNECT_ENDPOINT,
headers={"Content-Type": "application/json"},
data=r"""{
"name": "source_mongodb_connector",
"config": {
"tasks.max": "1",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://admin:admin@test_mongo:27017",
"topic.prefix": "mongodb",
"database": "test_db",
"collection": "purchases",
"copy.existing": true,
"copy.existing.namespace.regex": "test_db.purchases",
"change.stream.full.document": "updateLookup",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "-1",
"topic.creation.default.partitions": "-1",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"output.format.key": "schema",
"output.format.value": "json",
"output.schema.infer.value": false,
"publish.full.document.only":true
}
}""",
)
r.raise_for_status()
assert r.status_code == 201 # Created
# Give time for connectors to process the table data
time.sleep(60)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_ingest(
loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_connect_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_mces_golden.json",
ignore_paths=[],
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_mongosourceconnect_ingest(
loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_connect_mongo_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "kafka_connect_mces.json",
golden_path=test_resources_dir / "kafka_connect_mongo_mces_golden.json",
ignore_paths=[],
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_kafka_connect_ingest_stateful(
loaded_kafka_connect, pytestconfig, tmp_path, mock_datahub_graph, test_resources_dir
):
output_file_name: str = "kafka_connect_before_mces.json"
golden_file_name: str = "kafka_connect_before_golden_mces.json"
output_file_deleted_name: str = "kafka_connect_after_mces.json"
golden_file_deleted_name: str = "kafka_connect_after_golden_mces.json"
base_pipeline_config = {
"run_id": "kafka-connect-stateful-test",
"pipeline_name": "kafka-connect-stateful",
"source": {
"type": "kafka-connect",
"config": {
"platform_instance": "connect-instance-1",
"connect_uri": KAFKA_CONNECT_SERVER,
"connector_patterns": {"allow": [".*"]},
"provided_configs": [
{
"provider": "env",
"path_key": "MYSQL_CONNECTION_URL",
"value": "jdbc:mysql://test_mysql:3306/librarydb",
}
],
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
},
},
},
},
"sink": {
"type": "file",
"config": {},
},
}
pipeline_run1 = None
with mock.patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:
mock_checkpoint.return_value = mock_datahub_graph
pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore
base_pipeline_config # type: ignore
)
# Set the special properties for this run
pipeline_run1_config["source"]["config"]["connector_patterns"]["allow"] = [
"mysql_source1",
"mysql_source2",
]
pipeline_run1_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_name}"
pipeline_run1 = Pipeline.create(pipeline_run1_config)
pipeline_run1.run()
pipeline_run1.raise_from_status()
pipeline_run1.pretty_print_summary()
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / output_file_name,
golden_path=f"{test_resources_dir}/{golden_file_name}",
)
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
assert checkpoint1
assert checkpoint1.state
pipeline_run2 = None
with mock.patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:
mock_checkpoint.return_value = mock_datahub_graph
pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore
# Set the special properties for this run
pipeline_run1_config["source"]["config"]["connector_patterns"]["allow"] = [
"mysql_source1",
]
pipeline_run2_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_deleted_name}"
pipeline_run2 = Pipeline.create(pipeline_run2_config)
pipeline_run2.run()
pipeline_run2.raise_from_status()
pipeline_run2.pretty_print_summary()
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / output_file_deleted_name,
golden_path=f"{test_resources_dir}/{golden_file_deleted_name}",
)
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2)
assert checkpoint2
assert checkpoint2.state
# Validate that all providers have committed successfully.
validate_all_providers_have_committed_successfully(
pipeline=pipeline_run1, expected_providers=1
)
validate_all_providers_have_committed_successfully(
pipeline=pipeline_run2, expected_providers=1
)
# Perform all assertions on the states. The deleted table should not be
# part of the second state
state1 = cast(GenericCheckpointState, checkpoint1.state)
state2 = cast(GenericCheckpointState, checkpoint2.state)
difference_pipeline_urns = list(
state1.get_urns_not_in(type="dataFlow", other_checkpoint_state=state2)
)
assert len(difference_pipeline_urns) == 1
deleted_pipeline_urns: List[str] = [
"urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD)"
]
assert sorted(deleted_pipeline_urns) == sorted(difference_pipeline_urns)
difference_job_urns = list(
state1.get_urns_not_in(type="dataJob", other_checkpoint_state=state2)
)
assert len(difference_job_urns) == 3
deleted_job_urns = [
"urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.MixedCaseTable)",
"urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.book)",
"urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.member)",
]
assert sorted(deleted_job_urns) == sorted(difference_job_urns)
def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint]:
from datahub.ingestion.source.kafka_connect import KafkaConnectSource
kafka_connect_source = cast(KafkaConnectSource, pipeline.source)
return kafka_connect_source.get_current_checkpoint(
kafka_connect_source.stale_entity_removal_handler.job_id
)