381 lines
13 KiB
Python
Raw Permalink Normal View History

import asyncio
import pathlib
from functools import partial
from unittest.mock import Mock, patch
import jsonpickle
import pytest
from freezegun import freeze_time
from okta.models import Group, User
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.identity.okta import OktaConfig
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
get_current_checkpoint_from_pipeline,
validate_all_providers_have_committed_successfully,
)
FROZEN_TIME = "2020-04-14 07:00:00"
USER_ID_NOT_IN_GROUPS = "5"
GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"
def default_recipe(output_file_path):
return {
"run_id": "test-okta-usage",
"source": {
"type": "okta",
"config": {
"okta_domain": "mock-domain.okta.com",
"okta_api_token": "mock-okta-token",
"ingest_users": "True",
"ingest_groups": "True",
"ingest_group_membership": "True",
"okta_profile_to_username_attr": "login",
"okta_profile_to_username_regex": "([^@]+)",
"okta_profile_to_group_name_attr": "name",
"okta_profile_to_group_name_regex": "(.*)",
"include_deprovisioned_users": "False",
"include_suspended_users": "False",
"page_size": "2",
"delay_seconds": "0.00",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{output_file_path}",
},
},
}
def run_ingest(
mock_datahub_graph,
mocked_functions_reference,
recipe,
):
with patch(
"datahub.ingestion.source.identity.okta.OktaClient"
) as MockClient, patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:
mock_checkpoint.return_value = mock_datahub_graph
mocked_functions_reference(MockClient=MockClient)
# Run an Okta usage ingestion run.
pipeline = Pipeline.create(recipe)
pipeline.run()
pipeline.raise_from_status()
return pipeline
def test_okta_config():
config = OktaConfig.parse_obj(
dict(okta_domain="test.okta.com", okta_api_token="test-token")
)
# Sanity on required configurations
assert config.okta_domain == "test.okta.com"
assert config.okta_api_token == "test-token"
# Assert on default configurations
assert config.ingest_users is True
assert config.ingest_groups is True
assert config.ingest_group_membership is True
assert config.okta_profile_to_username_attr == "email"
assert config.okta_profile_to_username_regex == "(.*)"
assert config.okta_profile_to_group_name_attr == "name"
assert config.okta_profile_to_group_name_regex == "(.*)"
assert config.include_deprovisioned_users is False
assert config.include_suspended_users is False
assert config.page_size == 100
assert config.delay_seconds == 0.01
@freeze_time(FROZEN_TIME)
def test_okta_source_default_configs(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
output_file_path = f"{tmp_path}/okta_mces_default_config.json"
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=default_recipe(output_file_path),
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_default_config.json",
)
@freeze_time(FROZEN_TIME)
def test_okta_source_ingest_groups_users(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
output_file_path = f"{tmp_path}/okta_mces_ingest_groups_users.json"
new_recipe = default_recipe(output_file_path)
new_recipe["source"]["config"]["ingest_users"] = False
new_recipe["source"]["config"]["ingest_groups"] = True
new_recipe["source"]["config"]["ingest_groups_users"] = True
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_ingest_groups_users.json",
)
@freeze_time(FROZEN_TIME)
def test_okta_source_ingestion_disabled(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
output_file_path = f"{tmp_path}/okta_mces_ingestion_disabled.json"
new_recipe = default_recipe(output_file_path)
new_recipe["source"]["config"]["ingest_users"] = False
new_recipe["source"]["config"]["ingest_groups"] = False
new_recipe["source"]["config"]["ingest_group_membership"] = False
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_ingestion_disabled.json",
)
@freeze_time(FROZEN_TIME)
@pytest.mark.asyncio
def test_okta_source_include_deprovisioned_suspended_users(
pytestconfig, mock_datahub_graph, tmp_path
):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
output_file_path = (
f"{tmp_path}/okta_mces_include_deprovisioned_suspended_users.json"
)
new_recipe = default_recipe(output_file_path)
new_recipe["source"]["config"]["include_deprovisioned_users"] = True
new_recipe["source"]["config"]["include_suspended_users"] = True
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_include_deprovisioned_suspended_users.json",
)
@freeze_time(FROZEN_TIME)
@pytest.mark.asyncio
def test_okta_source_custom_user_name_regex(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
output_file_path = f"{tmp_path}/okta_mces_custom_user_name_regex.json"
new_recipe = default_recipe(output_file_path)
new_recipe["source"]["config"]["okta_profile_to_username_regex"] = "(.*)"
new_recipe["source"]["config"]["okta_profile_to_group_name_regex"] = "(.*)"
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_custom_user_name_regex.json",
)
@freeze_time(FROZEN_TIME)
def test_okta_stateful_ingestion(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
output_file_path = f"{tmp_path}/temporary_mces.json"
new_recipe = default_recipe(output_file_path)
new_recipe["pipeline_name"] = "okta_execution"
new_recipe["source"]["config"]["stateful_ingestion"] = {
"enabled": True,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
},
}
pipeline1 = run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline1)
assert checkpoint1
assert checkpoint1.state
# Create new event loop as last one is closed because of previous ingestion run
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
pipeline2 = run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
overwrite_group_in_mocked_data, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline2)
assert checkpoint2
assert checkpoint2.state
#
# Validate that all providers have committed successfully.
validate_all_providers_have_committed_successfully(
pipeline=pipeline1, expected_providers=1
)
validate_all_providers_have_committed_successfully(
pipeline=pipeline2, expected_providers=1
)
# Perform all assertions on the states. The deleted group should not be
# part of the second state
state1 = checkpoint1.state
state2 = checkpoint2.state
difference_group_urns = list(
state1.get_urns_not_in(type="corpGroup", other_checkpoint_state=state2)
)
assert len(difference_group_urns) == 1
assert difference_group_urns == ["urn:li:corpGroup:Engineering"]
def overwrite_group_in_mocked_data(test_resources_dir, MockClient):
_init_mock_okta_client(
test_resources_dir,
MockClient,
mock_groups_json=test_resources_dir / "okta_deleted_groups.json",
)
# Initializes a Mock Okta Client to return users from okta_users.json and groups from okta_groups.json.
def _init_mock_okta_client(
test_resources_dir, MockClient, mock_users_json=None, mock_groups_json=None
):
okta_users_json_file = (
test_resources_dir / "okta_users.json"
if mock_users_json is None
else mock_users_json
)
okta_groups_json_file = (
test_resources_dir / "okta_groups.json"
if mock_groups_json is None
else mock_groups_json
)
# Add mock Okta API responses.
with okta_users_json_file.open() as okta_users_json:
reference_users = jsonpickle.decode(okta_users_json.read())
# Create users from JSON dicts
users = list(map(lambda userJson: User(userJson), reference_users))
with okta_groups_json_file.open() as okta_groups_json:
reference_groups = jsonpickle.decode(okta_groups_json.read())
# Create groups from JSON dicts
groups = list(map(lambda groupJson: Group(groupJson), reference_groups))
# Mock Client List response.
users_resp_mock = Mock()
users_resp_mock.has_next.side_effect = [True, False]
users_next_future = asyncio.Future() # type: asyncio.Future
users_next_future.set_result(
# users, err
([users[-1]], None)
)
users_resp_mock.next.return_value = users_next_future
# users, resp, err
list_users_future = asyncio.Future() # type: asyncio.Future
list_users_future.set_result(
# users, resp, err
(users[0:-1], users_resp_mock, None)
)
MockClient().list_users.return_value = list_users_future
# Mock Client Init
groups_resp_mock = Mock()
groups_resp_mock.has_next.side_effect = [True, False]
groups_next_future = asyncio.Future() # type: asyncio.Future
groups_next_future.set_result(
# groups, err
([groups[-1]], None)
)
groups_resp_mock.next.return_value = groups_next_future
# groups, resp, err
list_groups_future = asyncio.Future() # type: asyncio.Future
list_groups_future.set_result((groups[0:-1], groups_resp_mock, None))
MockClient().list_groups.return_value = list_groups_future
# Create a separate response mock for each group in our sample data.
list_group_users_result_values = []
for _ in groups:
# Mock Get Group Membership
group_users_resp_mock = Mock()
group_users_resp_mock.has_next.side_effect = [True, False]
group_users_next_future = asyncio.Future() # type: asyncio.Future
group_users_next_future.set_result(
# users, err
([users[-1]], None)
)
group_users_resp_mock.next.return_value = group_users_next_future
# users, resp, err
list_group_users_future = asyncio.Future() # type: asyncio.Future
# Exclude last user from being in any groups
filtered_users = [user for user in users if user.id != USER_ID_NOT_IN_GROUPS]
list_group_users_future.set_result(
(filtered_users, group_users_resp_mock, None)
)
list_group_users_result_values.append(list_group_users_future)
MockClient().list_group_users.side_effect = list_group_users_result_values