mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-05 16:22:17 +00:00
199 lines
6.4 KiB
Python
199 lines
6.4 KiB
Python
import pathlib
|
|
import time
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from freezegun import freeze_time
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.testing import mce_helpers
|
|
from tests.test_helpers.docker_helpers import wait_for_port
|
|
from tests.test_helpers.state_helpers import (
|
|
get_current_checkpoint_from_pipeline,
|
|
validate_all_providers_have_committed_successfully,
|
|
)
|
|
|
|
FROZEN_TIME = "2022-08-14 07:00:00"
|
|
|
|
GMS_PORT = 8080
|
|
GMS_SERVER = f"http://localhost:{GMS_PORT}"
|
|
|
|
|
|
def ldap_ingest_common(
|
|
docker_compose_runner,
|
|
pytestconfig,
|
|
tmp_path,
|
|
golden_file_name,
|
|
output_file_name,
|
|
filter,
|
|
mock_datahub_graph,
|
|
):
|
|
test_resources_dir = pathlib.Path(pytestconfig.rootpath / "tests/integration/ldap")
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", "ldap"
|
|
) as docker_services:
|
|
# The openldap container loads the sample data after exposing the port publicly. As such,
|
|
# we must wait a little bit extra to ensure that the sample data is loaded.
|
|
wait_for_port(docker_services, "openldap", 389)
|
|
# without this ldap server can provide empty results
|
|
time.sleep(5)
|
|
|
|
with mock.patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint:
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "ldap-test",
|
|
"pipeline_name": "ldap-test-pipeline",
|
|
"source": {
|
|
"type": "ldap",
|
|
"config": {
|
|
"ldap_server": "ldap://localhost",
|
|
"ldap_user": "cn=admin,dc=example,dc=org",
|
|
"ldap_password": "admin",
|
|
"base_dn": "dc=example,dc=org",
|
|
"filter": filter,
|
|
"attrs_list": ["+", "*"],
|
|
"stateful_ingestion": {
|
|
"enabled": True,
|
|
"remove_stale_metadata": True,
|
|
"fail_safe_threshold": 100.0,
|
|
"state_provider": {
|
|
"type": "datahub",
|
|
"config": {"datahub_api": {"server": GMS_SERVER}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/{output_file_name}",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / output_file_name,
|
|
golden_path=test_resources_dir / golden_file_name,
|
|
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
|
|
)
|
|
return pipeline
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
@pytest.mark.integration
|
|
def test_ldap_stateful(
|
|
docker_compose_runner, pytestconfig, tmp_path, mock_time, mock_datahub_graph
|
|
):
|
|
golden_file_name: str = "ldap_mces_golden_stateful.json"
|
|
output_file_name: str = "ldap_mces_stateful.json"
|
|
|
|
golden_file_deleted_name: str = "ldap_mces_golden_deleted_stateful.json"
|
|
output_file_deleted_name: str = "ldap_mces_deleted_stateful.json"
|
|
|
|
golden_file_group: str = "ldap_mces_golden_group_stateful.json"
|
|
output_file_group: str = "ldap_mces_group_stateful.json"
|
|
|
|
golden_file_delete_group: str = "ldap_mces_golden_deleted_group_stateful.json"
|
|
output_file_delete_group: str = "ldap_mces_deleted_group_stateful.json"
|
|
|
|
filter1: str = "(|(cn=Bart Simpson)(cn=Homer Simpson))"
|
|
filter2: str = "(|(cn=Bart Simpson))"
|
|
filter3: str = "(&(objectClass=top)(|(cn=HR Department)(cn=Finance Department)))"
|
|
filter4: str = "(&(objectClass=top)(|(cn=HR Department)))"
|
|
|
|
pipeline_run1 = ldap_ingest_common(
|
|
docker_compose_runner,
|
|
pytestconfig,
|
|
tmp_path,
|
|
golden_file_name,
|
|
output_file_name,
|
|
filter1,
|
|
mock_datahub_graph,
|
|
)
|
|
|
|
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
|
|
assert checkpoint1
|
|
assert checkpoint1.state
|
|
|
|
pipeline_run2 = ldap_ingest_common(
|
|
docker_compose_runner,
|
|
pytestconfig,
|
|
tmp_path,
|
|
golden_file_deleted_name,
|
|
output_file_deleted_name,
|
|
filter2,
|
|
mock_datahub_graph,
|
|
)
|
|
|
|
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2)
|
|
assert checkpoint2
|
|
assert checkpoint2.state
|
|
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline_run1, expected_providers=1
|
|
)
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline_run2, expected_providers=1
|
|
)
|
|
|
|
state1 = checkpoint1.state
|
|
state2 = checkpoint2.state
|
|
|
|
difference_dataset_urns = list(
|
|
state1.get_urns_not_in(type="corpuser", other_checkpoint_state=state2)
|
|
)
|
|
assert len(difference_dataset_urns) == 1
|
|
deleted_dataset_urns = [
|
|
"urn:li:corpuser:hsimpson",
|
|
]
|
|
assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns)
|
|
|
|
pipeline_run3 = ldap_ingest_common(
|
|
docker_compose_runner,
|
|
pytestconfig,
|
|
tmp_path,
|
|
golden_file_group,
|
|
output_file_group,
|
|
filter3,
|
|
mock_datahub_graph,
|
|
)
|
|
|
|
checkpoint3 = get_current_checkpoint_from_pipeline(pipeline_run3)
|
|
assert checkpoint3
|
|
assert checkpoint3.state
|
|
|
|
pipeline_run4 = ldap_ingest_common(
|
|
docker_compose_runner,
|
|
pytestconfig,
|
|
tmp_path,
|
|
golden_file_delete_group,
|
|
output_file_delete_group,
|
|
filter4,
|
|
mock_datahub_graph,
|
|
)
|
|
|
|
checkpoint4 = get_current_checkpoint_from_pipeline(pipeline_run4)
|
|
assert checkpoint4
|
|
assert checkpoint4.state
|
|
|
|
state3 = checkpoint3.state
|
|
state4 = checkpoint4.state
|
|
|
|
difference_dataset_urns = list(
|
|
state3.get_urns_not_in(type="corpGroup", other_checkpoint_state=state4)
|
|
)
|
|
assert len(difference_dataset_urns) == 1
|
|
deleted_dataset_urns = [
|
|
"urn:li:corpGroup:Finance Department",
|
|
]
|
|
assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns)
|