mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-29 12:13:01 +00:00
Run black formatting on tests
This commit is contained in:
parent
2307c59296
commit
a87161cad7
@ -4,6 +4,4 @@ import os
|
||||
# See https://stackoverflow.com/a/33515264.
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), 'test_helpers'))
|
||||
|
||||
pytest_plugins = [
|
||||
"tests.integration.fixtures.sql_fixtures"
|
||||
]
|
||||
pytest_plugins = ["tests.integration.fixtures.sql_fixtures"]
|
||||
|
||||
@ -1,21 +1,28 @@
|
||||
import os
|
||||
import pytest
|
||||
|
||||
|
||||
def is_responsive(container: str, port: int):
|
||||
ret = os.system(f"docker exec {container} /setup/wait-for-it.sh localhost:{port}")
|
||||
return ret == 0
|
||||
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def docker_compose_file(pytestconfig):
|
||||
return os.path.join(str(pytestconfig.rootdir), "tests/integration/", "docker-compose.yml")
|
||||
return os.path.join(
|
||||
str(pytestconfig.rootdir), "tests/integration/", "docker-compose.yml"
|
||||
)
|
||||
|
||||
|
||||
def wait_for_db(docker_services, container_name, container_port):
|
||||
port = docker_services.port_for(container_name, container_port)
|
||||
docker_services.wait_until_responsive(
|
||||
timeout=30.0, pause=0.1, check=lambda: is_responsive(container_name, container_port))
|
||||
timeout=30.0,
|
||||
pause=0.1,
|
||||
check=lambda: is_responsive(container_name, container_port),
|
||||
)
|
||||
import time
|
||||
|
||||
time.sleep(5)
|
||||
return port
|
||||
|
||||
@ -24,8 +31,7 @@ def wait_for_db(docker_services, container_name, container_port):
|
||||
def sql_server(docker_ip, docker_services):
|
||||
return wait_for_db(docker_services, "testsqlserver", 1433)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mysql(docker_ip, docker_services):
|
||||
return wait_for_db(docker_services, "testmysql", 3306)
|
||||
|
||||
|
||||
|
||||
@ -5,15 +5,16 @@ import subprocess
|
||||
import mce_helpers
|
||||
|
||||
|
||||
|
||||
def test_ingest(mysql, pytestconfig, tmp_path):
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/mysql"
|
||||
|
||||
config_file=(test_resources_dir / "mysql_to_file.yml").resolve()
|
||||
ingest_command=f'cd {tmp_path} && gometa-ingest -c {config_file}'
|
||||
config_file = (test_resources_dir / "mysql_to_file.yml").resolve()
|
||||
ingest_command = f'cd {tmp_path} && gometa-ingest -c {config_file}'
|
||||
ret = os.system(ingest_command)
|
||||
assert ret == 0
|
||||
|
||||
output = mce_helpers.load_json_file(str(tmp_path / "mysql_mces.json"))
|
||||
golden = mce_helpers.load_json_file(str(test_resources_dir / "mysql_mce_golden.json"))
|
||||
golden = mce_helpers.load_json_file(
|
||||
str(test_resources_dir / "mysql_mce_golden.json")
|
||||
)
|
||||
mce_helpers.assert_mces_equal(output, golden)
|
||||
|
||||
@ -4,15 +4,20 @@ import subprocess
|
||||
|
||||
|
||||
import time
|
||||
|
||||
|
||||
def test_ingest(sql_server, pytestconfig):
|
||||
docker="docker"
|
||||
docker = "docker"
|
||||
command = f"{docker} exec testsqlserver /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql"
|
||||
ret = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
ret = subprocess.run(
|
||||
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
||||
)
|
||||
assert ret.returncode == 0
|
||||
config_file=os.path.join(str(pytestconfig.rootdir), "tests/integration/sql_server", "mssql_to_file.yml")
|
||||
config_file = os.path.join(
|
||||
str(pytestconfig.rootdir), "tests/integration/sql_server", "mssql_to_file.yml"
|
||||
)
|
||||
# delete the output directory. TODO: move to a better way to create an output test fixture
|
||||
os.system("rm -rf output")
|
||||
ingest_command=f'gometa-ingest -c {config_file}'
|
||||
ingest_command = f'gometa-ingest -c {config_file}'
|
||||
ret = os.system(ingest_command)
|
||||
assert ret == 0
|
||||
|
||||
|
||||
@ -5,11 +5,12 @@ def test_allow_all():
|
||||
pattern = AllowDenyPattern.allow_all()
|
||||
assert pattern.allowed("foo.table") == True
|
||||
|
||||
|
||||
def test_deny_all():
|
||||
pattern = AllowDenyPattern(allow=[], deny=[".*"])
|
||||
assert pattern.allowed("foo.table") == False
|
||||
|
||||
|
||||
def test_single_table():
|
||||
pattern = AllowDenyPattern(allow=["foo.mytable"])
|
||||
assert pattern.allowed("foo.mytable") == True
|
||||
|
||||
|
||||
@ -7,6 +7,7 @@ def test_sources_not_abstract():
|
||||
for cls in source_class_mapping.values():
|
||||
assert not inspect.isabstract(cls)
|
||||
|
||||
|
||||
def test_sinks_not_abstract():
|
||||
for cls in sink_class_mapping.values():
|
||||
assert not inspect.isabstract(cls)
|
||||
|
||||
@ -7,14 +7,14 @@ from gometa.ingestion.api.common import RecordEnvelope, PipelineContext
|
||||
|
||||
|
||||
class KafkaSinkTest(unittest.TestCase):
|
||||
|
||||
@patch("gometa.ingestion.sink.datahub_kafka.PipelineContext")
|
||||
@patch("gometa.ingestion.sink.datahub_kafka.SerializingProducer")
|
||||
def test_kafka_sink_config(self, mock_producer, mock_context):
|
||||
mock_producer_instance = mock_producer.return_value
|
||||
kafka_sink = DatahubKafkaSink.create({'connection': {'bootstrap': 'foobar:9092'}}, mock_context)
|
||||
assert mock_producer.call_count == 1 #constructor should be called
|
||||
|
||||
mock_producer_instance = mock_producer.return_value
|
||||
kafka_sink = DatahubKafkaSink.create(
|
||||
{'connection': {'bootstrap': 'foobar:9092'}}, mock_context
|
||||
)
|
||||
assert mock_producer.call_count == 1 # constructor should be called
|
||||
|
||||
def validate_kafka_callback(self, mock_k_callback, record_envelope, write_callback):
|
||||
assert mock_k_callback.call_count == 1 # KafkaCallback constructed
|
||||
@ -22,25 +22,28 @@ class KafkaSinkTest(unittest.TestCase):
|
||||
assert constructor_args[1] == record_envelope
|
||||
assert constructor_args[2] == write_callback
|
||||
|
||||
|
||||
@patch("gometa.ingestion.sink.datahub_kafka.PipelineContext")
|
||||
@patch("gometa.ingestion.sink.datahub_kafka.SerializingProducer")
|
||||
@patch("gometa.ingestion.sink.datahub_kafka.KafkaCallback")
|
||||
def test_kafka_sink_write(self, mock_k_callback, mock_producer, mock_context):
|
||||
mock_producer_instance = mock_producer.return_value
|
||||
mock_producer_instance = mock_producer.return_value
|
||||
mock_k_callback_instance = mock_k_callback.return_value
|
||||
callback = MagicMock(spec=WriteCallback)
|
||||
kafka_sink = DatahubKafkaSink.create({'connection': {'bootstrap': 'foobar:9092'}}, mock_context)
|
||||
kafka_sink = DatahubKafkaSink.create(
|
||||
{'connection': {'bootstrap': 'foobar:9092'}}, mock_context
|
||||
)
|
||||
re = RecordEnvelope(record="test", metadata={})
|
||||
kafka_sink.write_record_async(re, callback)
|
||||
assert mock_producer_instance.poll.call_count == 1 # poll() called once
|
||||
self.validate_kafka_callback(mock_k_callback, re, callback) # validate kafka callback was constructed appropriately
|
||||
|
||||
assert mock_producer_instance.poll.call_count == 1 # poll() called once
|
||||
self.validate_kafka_callback(
|
||||
mock_k_callback, re, callback
|
||||
) # validate kafka callback was constructed appropriately
|
||||
|
||||
# validate that confluent_kafka.Producer.produce was called with the right arguments
|
||||
args, kwargs = mock_producer_instance.produce.call_args
|
||||
created_callback = kwargs["on_delivery"]
|
||||
assert created_callback == mock_k_callback_instance.kafka_callback
|
||||
|
||||
|
||||
## Todo: Test that kafka producer is configured correctly
|
||||
|
||||
@patch("gometa.ingestion.sink.datahub_kafka.PipelineContext")
|
||||
@ -50,11 +53,13 @@ class KafkaSinkTest(unittest.TestCase):
|
||||
kafka_sink = DatahubKafkaSink.create({}, mock_context)
|
||||
kafka_sink.close()
|
||||
mock_producer_instance.flush.assert_called_once()
|
||||
|
||||
|
||||
@patch("gometa.ingestion.sink.datahub_kafka.RecordEnvelope")
|
||||
@patch("gometa.ingestion.sink.datahub_kafka.WriteCallback")
|
||||
def test_kafka_callback_class(self, mock_w_callback, mock_re):
|
||||
callback = KafkaCallback(SinkReport(), record_envelope = mock_re, write_callback = mock_w_callback)
|
||||
callback = KafkaCallback(
|
||||
SinkReport(), record_envelope=mock_re, write_callback=mock_w_callback
|
||||
)
|
||||
mock_error = MagicMock()
|
||||
mock_message = MagicMock()
|
||||
callback.kafka_callback(mock_error, mock_message)
|
||||
@ -62,4 +67,3 @@ class KafkaSinkTest(unittest.TestCase):
|
||||
mock_w_callback.on_failure.called_with(mock_re, None, {"error", mock_error})
|
||||
callback.kafka_callback(None, mock_message)
|
||||
mock_w_callback.on_success.called_once_with(mock_re, {"msg", mock_message})
|
||||
|
||||
|
||||
@ -8,11 +8,12 @@ from unittest.mock import patch, MagicMock
|
||||
|
||||
|
||||
class KafkaSourceTest(unittest.TestCase):
|
||||
|
||||
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")
|
||||
def test_kafka_source_configuration(self, mock_kafka):
|
||||
ctx = PipelineContext(run_id='test')
|
||||
kafka_source = KafkaSource.create({'connection': {'bootstrap': 'foobar:9092'}}, ctx)
|
||||
kafka_source = KafkaSource.create(
|
||||
{'connection': {'bootstrap': 'foobar:9092'}}, ctx
|
||||
)
|
||||
assert mock_kafka.call_count == 1
|
||||
|
||||
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")
|
||||
@ -20,10 +21,12 @@ class KafkaSourceTest(unittest.TestCase):
|
||||
mock_kafka_instance = mock_kafka.return_value
|
||||
mock_cluster_metadata = MagicMock()
|
||||
mock_cluster_metadata.topics = ["foobar", "bazbaz"]
|
||||
mock_kafka_instance.list_topics.return_value=mock_cluster_metadata
|
||||
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
||||
|
||||
ctx = PipelineContext(run_id='test')
|
||||
kafka_source = KafkaSource.create({'connection': {'bootstrap': 'localhost:9092'}}, ctx)
|
||||
kafka_source = KafkaSource.create(
|
||||
{'connection': {'bootstrap': 'localhost:9092'}}, ctx
|
||||
)
|
||||
workunits = []
|
||||
for w in kafka_source.get_workunits():
|
||||
workunits.append(w)
|
||||
@ -39,10 +42,12 @@ class KafkaSourceTest(unittest.TestCase):
|
||||
mock_kafka_instance = mock_kafka.return_value
|
||||
mock_cluster_metadata = MagicMock()
|
||||
mock_cluster_metadata.topics = ["test", "foobar", "bazbaz"]
|
||||
mock_kafka_instance.list_topics.return_value=mock_cluster_metadata
|
||||
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
||||
|
||||
ctx = PipelineContext(run_id='test1')
|
||||
kafka_source = KafkaSource.create({'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}}, ctx)
|
||||
kafka_source = KafkaSource.create(
|
||||
{'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}}, ctx
|
||||
)
|
||||
assert kafka_source.source_config.topic == "test"
|
||||
workunits = [w for w in kafka_source.get_workunits()]
|
||||
|
||||
@ -52,7 +57,9 @@ class KafkaSourceTest(unittest.TestCase):
|
||||
|
||||
mock_cluster_metadata.topics = ["test", "test2", "bazbaz"]
|
||||
ctx = PipelineContext(run_id='test2')
|
||||
kafka_source = KafkaSource.create({'topic': 'test.*', 'connection': {'bootstrap': 'localhost:9092'}}, ctx)
|
||||
kafka_source = KafkaSource.create(
|
||||
{'topic': 'test.*', 'connection': {'bootstrap': 'localhost:9092'}}, ctx
|
||||
)
|
||||
workunits = [w for w in kafka_source.get_workunits()]
|
||||
assert len(workunits) == 2
|
||||
|
||||
@ -60,7 +67,8 @@ class KafkaSourceTest(unittest.TestCase):
|
||||
def test_close(self, mock_kafka):
|
||||
mock_kafka_instance = mock_kafka.return_value
|
||||
ctx = PipelineContext(run_id='test')
|
||||
kafka_source = KafkaSource.create({'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}}, ctx)
|
||||
kafka_source = KafkaSource.create(
|
||||
{'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}}, ctx
|
||||
)
|
||||
kafka_source.close()
|
||||
assert mock_kafka_instance.close.call_count == 1
|
||||
|
||||
|
||||
@ -2,7 +2,8 @@ import pytest
|
||||
import json
|
||||
import mce_helpers
|
||||
|
||||
basic_1 = json.loads("""[
|
||||
basic_1 = json.loads(
|
||||
"""[
|
||||
{
|
||||
"auditHeader": null,
|
||||
"proposedSnapshot": {
|
||||
@ -45,10 +46,12 @@ basic_1 = json.loads("""[
|
||||
},
|
||||
"proposedDelta": null
|
||||
}
|
||||
]""")
|
||||
]"""
|
||||
)
|
||||
|
||||
# Timestamps changed from basic_1 but same otherwise.
|
||||
basic_2 = json.loads("""[
|
||||
basic_2 = json.loads(
|
||||
"""[
|
||||
{
|
||||
"auditHeader": null,
|
||||
"proposedSnapshot": {
|
||||
@ -91,10 +94,12 @@ basic_2 = json.loads("""[
|
||||
},
|
||||
"proposedDelta": null
|
||||
}
|
||||
]""")
|
||||
]"""
|
||||
)
|
||||
|
||||
# Dataset owner changed from basic_2.
|
||||
basic_3 = json.loads("""[
|
||||
basic_3 = json.loads(
|
||||
"""[
|
||||
{
|
||||
"auditHeader": null,
|
||||
"proposedSnapshot": {
|
||||
@ -137,15 +142,19 @@ basic_3 = json.loads("""[
|
||||
},
|
||||
"proposedDelta": null
|
||||
}
|
||||
]""")
|
||||
]"""
|
||||
)
|
||||
|
||||
|
||||
def test_basic_diff_same():
|
||||
mce_helpers.assert_mces_equal(basic_1, basic_2)
|
||||
|
||||
|
||||
def test_basic_diff_only_owner_change():
|
||||
with pytest.raises(AssertionError):
|
||||
mce_helpers.assert_mces_equal(basic_2, basic_3)
|
||||
|
||||
|
||||
def test_basic_diff_owner_change():
|
||||
with pytest.raises(AssertionError):
|
||||
mce_helpers.assert_mces_equal(basic_1, basic_3)
|
||||
|
||||
@ -10,14 +10,10 @@ class PipelineTest(unittest.TestCase):
|
||||
def test_configure(self, mock_sink, mock_source):
|
||||
pipeline = Pipeline(
|
||||
{
|
||||
"source": {
|
||||
"type": "kafka",
|
||||
"kafka": {"bootstrap": "localhost:9092"},
|
||||
},
|
||||
"source": {"type": "kafka", "kafka": {"bootstrap": "localhost:9092"},},
|
||||
"sink": {"type": "console"},
|
||||
}
|
||||
)
|
||||
pipeline.run()
|
||||
mock_source.assert_called_once()
|
||||
mock_sink.assert_called_once()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user