import logging import subprocess from typing import Any, Dict, List, Optional, cast from unittest import mock import jpype import jpype.imports import pytest import requests from freezegun import freeze_time from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.kafka_connect.kafka_connect import SinkTopicFilter from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from datahub.testing import mce_helpers from tests.test_helpers.click_helpers import run_datahub_cmd from tests.test_helpers.state_helpers import ( get_current_checkpoint_from_pipeline, validate_all_providers_have_committed_successfully, ) logger = logging.getLogger(__name__) pytestmark = pytest.mark.integration_batch_1 FROZEN_TIME = "2021-10-25 13:00:00" GMS_PORT = 8080 GMS_SERVER = f"http://localhost:{GMS_PORT}" KAFKA_CONNECT_SERVER = "http://localhost:28083" KAFKA_CONNECT_ENDPOINT = f"{KAFKA_CONNECT_SERVER}/connectors" def check_connectors_ready( server_url: str = "http://localhost:28083", only_plugins: bool = False ) -> bool: """ Check if Kafka Connect is fully initialized with plugins installed and all connectors are in a RUNNING state. Args: server_url: The base URL of the Kafka Connect REST API only_plugins: If True, only check if the connector plugins are installed Returns: bool: True if all connectors are running, False otherwise """ try: # Check connector plugins are installed response = requests.get(f"{server_url}/connector-plugins") logger.debug( f"check-connectors-ready: connector-plugins: {response.status_code} {response.json()}" ) response.raise_for_status() if not response.json(): return False if only_plugins: return True # Get list of all connectors connectors_response = requests.get(f"{server_url}/connectors") logger.debug( f"check-connectors-ready: connector: {connectors_response.status_code} {connectors_response.json()}" ) connectors_response.raise_for_status() connectors = connectors_response.json() if not connectors: # Empty list means no connectors yet return False for connector in connectors: # Based on experience, these connectors can be in FAILED state and still work for tests: if connector in ["mysql_sink", "bigquery-sink-connector"]: logger.debug( f"check-connectors-ready: skipping validation for {connector} as it can be in FAILED state for tests" ) continue # Check status of each connector status_response = requests.get( f"{server_url}/connectors/{connector}/status" ) logger.debug( f"check-connectors-ready: connector {connector}: {status_response.status_code} {status_response.json()}" ) status_response.raise_for_status() status = status_response.json() if status.get("connector", {}).get("state") != "RUNNING": logger.debug( f"check-connectors-ready: connector {connector} is not running" ) return False # Check all tasks are running for task in status.get("tasks", []): if task.get("state") != "RUNNING": logger.debug( f"check-connectors-ready: connector {connector} task {task} is not running" ) return False # Check topics were provisioned topics_response = requests.get( f"{server_url}/connectors/{connector}/topics" ) topics_response.raise_for_status() topics_data = topics_response.json() if topics_data and topics_data.get(connector, {}).get("topics"): logger.debug( f"Connector {connector} topics: {topics_data[connector]['topics']}" ) else: logger.debug(f"Connector {connector} topics not found yet!") return False return True except Exception as e: # This will catch any exception and return False logger.debug(f"check-connectors-ready: exception: {e}") return False @pytest.fixture(scope="module") def kafka_connect_runner(docker_compose_runner, pytestconfig, test_resources_dir): test_resources_dir_kafka = pytestconfig.rootpath / "tests/integration/kafka" # Share Compose configurations between files and projects # https://docs.docker.com/compose/extends/ docker_compose_file = [ str(test_resources_dir_kafka / "docker-compose.yml"), str(test_resources_dir / "docker-compose.override.yml"), ] with docker_compose_runner(docker_compose_file, "kafka-connect") as docker_services: # We rely on Docker health checks to confirm all services are up & healthy # However healthcheck for test_connect service is not very trustable, so # a double and more robust check here is needed docker_services.wait_until_responsive( timeout=300, pause=10, check=lambda: check_connectors_ready( KAFKA_CONNECT_SERVER, only_plugins=True ), ) yield docker_services @pytest.fixture(scope="module") def test_resources_dir(pytestconfig): return pytestconfig.rootpath / "tests/integration/kafka-connect" @pytest.fixture(scope="module") def loaded_kafka_connect(kafka_connect_runner): print("Initializing MongoDB replica set...") command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-init.js" ret = subprocess.run(command, shell=True, capture_output=True) assert ret.returncode == 0 # Creating MySQL source with no transformations , only topic prefix r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source1", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "test-mysql-jdbc-", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}" } } """, ) assert r.status_code == 201 # Created # Creating MySQL source with regex router transformations , only topic prefix r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source2", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}", "transforms": "TotalReplacement", "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.TotalReplacement.regex": ".*(book)", "transforms.TotalReplacement.replacement": "my-new-topic-$1" } } """, ) assert r.status_code == 201 # Created # Creating MySQL source with regex router transformations , no topic prefix, table whitelist r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source3", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "table.whitelist": "book", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}", "transforms": "TotalReplacement", "transforms.TotalReplacement.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.TotalReplacement.regex": ".*", "transforms.TotalReplacement.replacement": "my-new-topic" } } """, ) assert r.status_code == 201 # Created # Creating MySQL source with query , topic prefix r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source4", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "query": "select * from member", "topic.prefix": "query-topic", "tasks.max": "1", "connection.url": "jdbc:mysql://foo:datahub@test_mysql:${env:MYSQL_PORT}/${env:MYSQL_DB}" } } """, ) assert r.status_code == 201 # Created # Creating MySQL source with ExtractTopic router transformations - source dataset not added r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_source5", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "table.whitelist": "book", "topic.prefix": "test-mysql-jdbc2-", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}", "transforms": "changetopic", "transforms.changetopic.type": "io.confluent.connect.transforms.ExtractTopic$Value", "transforms.changetopic.field": "name" } } """, ) assert r.status_code == 201 # Created # Creating MySQL sink connector - not added r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "mysql_sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "insert.mode": "insert", "auto.create": true, "topics": "my-topic", "tasks.max": "1", "connection.url": "${env:MYSQL_CONNECTION_URL}" } } """, ) assert r.status_code == 201 # Created # Creating Debezium MySQL source connector r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "debezium-mysql-connector", "config": { "name": "debezium-mysql-connector", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "test_mysql", "database.port": "3306", "database.user": "root", "database.password": "rootpwd", "database.server.name": "debezium.topics", "database.history.kafka.bootstrap.servers": "test_broker:9092", "database.history.kafka.topic": "dbhistory.debeziummysql", "database.allowPublicKeyRetrieval": "true", "include.schema.changes": "false" } } """, ) assert r.status_code == 201 # Created # Creating Postgresql source r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "postgres_source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "id", "table.whitelist": "member", "topic.prefix": "test-postgres-jdbc-", "tasks.max": "1", "connection.url": "${env:POSTGRES_CONNECTION_URL}" } }""", ) assert r.status_code == 201 # Created # Creating Generic source r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "generic_source", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "kafka.topic": "my-topic", "quickstart": "product", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "max.interval": 1000, "iterations": 10000000, "tasks.max": "1" } }""", ) r.raise_for_status() assert r.status_code == 201 # Created print("Populating MongoDB with test data...") # we populate the database before creating the connector # and we use copy_existing mode to copy the data from the database to the kafka topic # so ingestion is consistent and deterministic command = "docker exec test_mongo mongosh test_db -f /scripts/mongo-populate.js" ret = subprocess.run(command, shell=True, capture_output=True) assert ret.returncode == 0 # Creating MongoDB source r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data=r"""{ "name": "source_mongodb_connector", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://test_mongo:27017", "topic.prefix": "mongodb", "database": "test_db", "collection": "purchases", "startup.mode": "copy_existing" } }""", ) r.raise_for_status() assert r.status_code == 201 # Created # Creating S3 Sink source r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data=r"""{ "name": "confluent_s3_sink_connector", "config": { "aws.access.key.id": "x", "aws.secret.access.key": "x", "tasks.max": "1", "max.interval": 5000, "connector.class": "io.confluent.connect.s3.S3SinkConnector", "s3.region": "ap-southeast-2", "s3.bucket.name": "test-bucket", "s3.compression.type": "gzip", "store.url": "${env:S3_ENDPOINT_URL}", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "flush.size": 100, "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner", "locale": "en_AU", "timezone": "UTC", "timestamp.extractor": "Record", "topics": "my-topic", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter.schemas.enable": false } }""", ) r.raise_for_status() assert r.status_code == 201 # Creating BigQuery sink connector r = requests.post( KAFKA_CONNECT_ENDPOINT, headers={"Content-Type": "application/json"}, data="""{ "name": "bigquery-sink-connector", "config": { "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "autoCreateTables": "true", "transforms.TableNameTransformation.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.TableNameTransformation.replacement": "my_dest_table_name", "topics": "kafka-topic-name", "transforms.TableNameTransformation.regex": ".*", "transforms": "TableNameTransformation", "name": "bigquery-sink-connector", "project": "my-gcp-project", "defaultDataset": "mybqdataset", "datasets": "kafka-topic-name=mybqdataset" } } """, ) assert r.status_code == 201 # Created # Connectors should be ready to process data thanks to Docker health checks print("Waiting for Kafka Connect connectors to initialize and process data...") kafka_connect_runner.wait_until_responsive( timeout=120, pause=10, check=lambda: check_connectors_ready(KAFKA_CONNECT_SERVER, only_plugins=False), ) print("Kafka Connect connectors are ready!") @freeze_time(FROZEN_TIME) def test_kafka_connect_ingest( loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir ): # Run the metadata ingestion pipeline. config_file = (test_resources_dir / "kafka_connect_to_file.yml").resolve() run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) # Verify the output. mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / "kafka_connect_mces.json", golden_path=test_resources_dir / "kafka_connect_mces_golden.json", ignore_paths=[], ) @freeze_time(FROZEN_TIME) def test_kafka_connect_mongosourceconnect_ingest( loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir ): # Run the metadata ingestion pipeline. config_file = (test_resources_dir / "kafka_connect_mongo_to_file.yml").resolve() run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) # Verify the output. mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / "kafka_connect_mces.json", golden_path=test_resources_dir / "kafka_connect_mongo_mces_golden.json", ignore_paths=[], ) @freeze_time(FROZEN_TIME) def test_kafka_connect_s3sink_ingest( loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir ): # Run the metadata ingestion pipeline. config_file = (test_resources_dir / "kafka_connect_s3sink_to_file.yml").resolve() run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) # Verify the output. mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / "kafka_connect_mces.json", golden_path=test_resources_dir / "kafka_connect_s3sink_mces_golden.json", ignore_paths=[], ) @freeze_time(FROZEN_TIME) def test_kafka_connect_ingest_stateful( loaded_kafka_connect, pytestconfig, tmp_path, mock_datahub_graph, test_resources_dir ): output_file_name: str = "kafka_connect_before_mces.json" golden_file_name: str = "kafka_connect_before_golden_mces.json" output_file_deleted_name: str = "kafka_connect_after_mces.json" golden_file_deleted_name: str = "kafka_connect_after_golden_mces.json" base_pipeline_config = { "run_id": "kafka-connect-stateful-test", "pipeline_name": "kafka-connect-stateful", "source": { "type": "kafka-connect", "config": { "platform_instance": "connect-instance-1", "connect_uri": KAFKA_CONNECT_SERVER, "connector_patterns": {"allow": [".*"]}, "provided_configs": [ { "provider": "env", "path_key": "MYSQL_CONNECTION_URL", "value": "jdbc:mysql://test_mysql:3306/librarydb", }, { "provider": "env", "path_key": "MYSQL_PORT", "value": "3306", }, { "provider": "env", "path_key": "MYSQL_DB", "value": "librarydb", }, ], "stateful_ingestion": { "enabled": True, "remove_stale_metadata": True, "fail_safe_threshold": 100.0, "state_provider": { "type": "datahub", "config": {"datahub_api": {"server": GMS_SERVER}}, }, }, }, }, "sink": { "type": "file", "config": {}, }, } pipeline_run1 = None with mock.patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, ) as mock_checkpoint: mock_checkpoint.return_value = mock_datahub_graph pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore base_pipeline_config # type: ignore ) # Set the special properties for this run pipeline_run1_config["source"]["config"]["connector_patterns"]["allow"] = [ "mysql_source1", "mysql_source2", ] pipeline_run1_config["sink"]["config"]["filename"] = ( f"{tmp_path}/{output_file_name}" ) pipeline_run1 = Pipeline.create(pipeline_run1_config) pipeline_run1.run() pipeline_run1.raise_from_status() pipeline_run1.pretty_print_summary() mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / output_file_name, golden_path=f"{test_resources_dir}/{golden_file_name}", ) checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) assert checkpoint1 assert checkpoint1.state pipeline_run2 = None with mock.patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, ) as mock_checkpoint: mock_checkpoint.return_value = mock_datahub_graph pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( base_pipeline_config # type: ignore ) # Set the special properties for this run pipeline_run1_config["source"]["config"]["connector_patterns"]["allow"] = [ "mysql_source1", ] pipeline_run2_config["sink"]["config"]["filename"] = ( f"{tmp_path}/{output_file_deleted_name}" ) pipeline_run2 = Pipeline.create(pipeline_run2_config) pipeline_run2.run() pipeline_run2.raise_from_status() pipeline_run2.pretty_print_summary() mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / output_file_deleted_name, golden_path=f"{test_resources_dir}/{golden_file_deleted_name}", ) checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) assert checkpoint2 assert checkpoint2.state # Validate that all providers have committed successfully. validate_all_providers_have_committed_successfully( pipeline=pipeline_run1, expected_providers=1 ) validate_all_providers_have_committed_successfully( pipeline=pipeline_run2, expected_providers=1 ) # Perform all assertions on the states. The deleted table should not be # part of the second state state1 = cast(GenericCheckpointState, checkpoint1.state) state2 = cast(GenericCheckpointState, checkpoint2.state) difference_pipeline_urns = list( state1.get_urns_not_in(type="dataFlow", other_checkpoint_state=state2) ) assert len(difference_pipeline_urns) == 1 deleted_pipeline_urns: List[str] = [ "urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD)" ] assert sorted(deleted_pipeline_urns) == sorted(difference_pipeline_urns) difference_job_urns = list( state1.get_urns_not_in(type="dataJob", other_checkpoint_state=state2) ) assert len(difference_job_urns) == 3 deleted_job_urns = [ "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.MixedCaseTable)", "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.book)", "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.member)", ] assert sorted(deleted_job_urns) == sorted(difference_job_urns) def register_mock_api(request_mock: Any, override_data: Optional[dict] = None) -> None: api_vs_response = { "http://localhost:28083": { "method": "GET", "status_code": 200, "json": { "version": "7.4.0-ccs", "commit": "30969fa33c185e880b9e02044761dfaac013151d", "kafka_cluster_id": "MDgRZlZhSZ-4fXhwRR79bw", }, }, } api_vs_response.update(override_data or {}) for url in api_vs_response: request_mock.register_uri( api_vs_response[url]["method"], url, json=api_vs_response[url]["json"], status_code=api_vs_response[url]["status_code"], ) @freeze_time(FROZEN_TIME) def test_kafka_connect_snowflake_sink_ingest( pytestconfig, tmp_path, mock_time, requests_mock ): test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect" override_data = { "http://localhost:28083/connectors": { "method": "GET", "status_code": 200, "json": ["snowflake_sink1"], }, "http://localhost:28083/connectors/snowflake_sink1": { "method": "GET", "status_code": 200, "json": { "name": "snowflake_sink1", "config": { "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", "snowflake.database.name": "kafka_db", "snowflake.schema.name": "kafka_schema", "snowflake.topic2table.map": "topic1:table1", "tasks.max": "1", "topics": "topic1,_topic+2", "snowflake.user.name": "kafka_connector_user_1", "snowflake.private.key": "rrSnqU=", "name": "snowflake_sink1", "snowflake.url.name": "bcaurux-lc62744.snowflakecomputing.com:443", }, "tasks": [{"connector": "snowflake_sink1", "task": 0}], "type": "sink", }, }, "http://localhost:28083/connectors/snowflake_sink1/topics": { "method": "GET", "status_code": 200, "json": { "snowflake_sink1": {"topics": ["topic1", "_topic+2", "extra_old_topic"]} }, }, } register_mock_api(request_mock=requests_mock, override_data=override_data) pipeline = Pipeline.create( { "run_id": "kafka-connect-test", "source": { "type": "kafka-connect", "config": { "platform_instance": "connect-instance-1", "connect_uri": KAFKA_CONNECT_SERVER, "connector_patterns": { "allow": [ "snowflake_sink1", ] }, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/kafka_connect_snowflake_sink_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "kafka_connect_snowflake_sink_mces_golden.json" mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / "kafka_connect_snowflake_sink_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) def test_kafka_connect_bigquery_sink_ingest( loaded_kafka_connect, pytestconfig, tmp_path, test_resources_dir ): # Run the metadata ingestion pipeline. config_file = ( test_resources_dir / "kafka_connect_bigquery_sink_to_file.yml" ).resolve() run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) # Verify the output. mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / "kafka_connect_mces.json", golden_path=test_resources_dir / "kafka_connect_bigquery_sink_mces_golden.json", ignore_paths=[], ) def test_filter_stale_topics_topics_list(): """ Test case for filter_stale_topics method when sink_config has 'topics' key. """ # Create an instance of SinkTopicFilter sink_filter = SinkTopicFilter() # Set up test data processed_topics = ["topic1", "topic2", "topic3", "topic4"] sink_config = {"topics": "topic1,topic3,topic5"} # Call the method under test result = sink_filter.filter_stale_topics(processed_topics, sink_config) # Assert the expected result expected_result = ["topic1", "topic3"] assert result == expected_result, f"Expected {expected_result}, but got {result}" def test_filter_stale_topics_regex_filtering(): """ Test filter_stale_topics when using topics.regex for filtering. """ if not jpype.isJVMStarted(): jpype.startJVM() # Create an instance of SinkTopicFilter sink_filter = SinkTopicFilter() # Set up test data processed_topics = ["topic1", "topic2", "other_topic", "test_topic"] sink_config = {"topics.regex": "topic.*"} # Call the method under test result = sink_filter.filter_stale_topics(processed_topics, sink_config) # Assert the result matches the expected filtered topics assert result == ["topic1", "topic2"] def test_filter_stale_topics_no_topics_config(): """ Test filter_stale_topics when using neither topics.regex not topics Ideally, this will never happen for kafka-connect sink connector """ # Create an instance of SinkTopicFilter sink_filter = SinkTopicFilter() # Set up test data processed_topics = ["topic1", "topic2", "other_topic", "test_topic"] sink_config = {"X": "Y"} # Call the method under test result = sink_filter.filter_stale_topics(processed_topics, sink_config) # Assert the result matches the expected filtered topics assert result == processed_topics