mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-07 17:23:11 +00:00
386 lines
13 KiB
Python
386 lines
13 KiB
Python
import json
|
|
import pathlib
|
|
from functools import partial
|
|
from typing import List
|
|
from unittest.mock import patch
|
|
|
|
from freezegun import freeze_time
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.ingestion.source.identity.azure_ad import AzureADConfig
|
|
from tests.test_helpers import mce_helpers
|
|
from tests.test_helpers.state_helpers import (
|
|
get_current_checkpoint_from_pipeline,
|
|
validate_all_providers_have_committed_successfully,
|
|
)
|
|
|
|
FROZEN_TIME = "2021-08-24 09:00:00"
|
|
GMS_PORT = 8080
|
|
GMS_SERVER = f"http://localhost:{GMS_PORT}"
|
|
|
|
|
|
def default_recipe(tmp_path, output_file_name="azure_ad_mces_default_config.json"):
|
|
return {
|
|
"run_id": "test-azure-ad",
|
|
"source": {
|
|
"type": "azure-ad",
|
|
"config": {
|
|
"client_id": "00000000-0000-0000-0000-000000000000",
|
|
"tenant_id": "00000000-0000-0000-0000-000000000000",
|
|
"client_secret": "client_secret",
|
|
"redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient",
|
|
"authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000",
|
|
"token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token",
|
|
"graph_url": "https://graph.microsoft.com/v1.0",
|
|
"ingest_group_membership": True,
|
|
"ingest_groups": True,
|
|
"ingest_users": True,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/{output_file_name}",
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def run_ingest(
|
|
pytestconfig,
|
|
mock_datahub_graph,
|
|
mocked_functions_reference,
|
|
recipe,
|
|
):
|
|
test_resources_dir: pathlib.Path = (
|
|
pytestconfig.rootpath / "tests/integration/azure_ad"
|
|
)
|
|
|
|
with patch(
|
|
"datahub.ingestion.source.identity.azure_ad.AzureADSource.get_token"
|
|
) as mock_token, patch(
|
|
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_users"
|
|
) as mock_users, patch(
|
|
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_groups"
|
|
) as mock_groups, patch(
|
|
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_group_members"
|
|
) as mock_group_users, patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint:
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
|
|
mocked_functions_reference(
|
|
test_resources_dir, mock_token, mock_users, mock_groups, mock_group_users
|
|
)
|
|
|
|
# Run an azure usage ingestion run.
|
|
pipeline = Pipeline.create(recipe)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
return pipeline
|
|
|
|
|
|
def load_test_resources(test_resources_dir):
|
|
azure_ad_users_json_file = test_resources_dir / "azure_ad_users.json"
|
|
azure_ad_groups_json_file = test_resources_dir / "azure_ad_groups.json"
|
|
azure_ad_nested_group_json_file = test_resources_dir / "azure_ad_nested_group.json"
|
|
azure_ad_nested_groups_members_json_file = (
|
|
test_resources_dir / "azure_ad_nested_groups_members.json"
|
|
)
|
|
|
|
with azure_ad_users_json_file.open() as azure_ad_users_json:
|
|
reference_users = json.loads(azure_ad_users_json.read())
|
|
|
|
with azure_ad_groups_json_file.open() as azure_ad_groups_json:
|
|
reference_groups = json.loads(azure_ad_groups_json.read())
|
|
|
|
with azure_ad_nested_group_json_file.open() as azure_ad_nested_group_json:
|
|
reference_nested_group = json.loads(azure_ad_nested_group_json.read())
|
|
|
|
with (
|
|
azure_ad_nested_groups_members_json_file.open()
|
|
) as azure_ad_nested_groups_users_json:
|
|
reference_nested_groups_users = json.loads(
|
|
azure_ad_nested_groups_users_json.read()
|
|
)
|
|
|
|
return (
|
|
reference_users,
|
|
reference_groups,
|
|
reference_nested_group,
|
|
reference_nested_groups_users,
|
|
)
|
|
|
|
|
|
#
|
|
# Azure offers a tool called 'graph-explorer' that you can use to generate fake data,
|
|
# or use your tenant's Azure AD to generate real data:
|
|
# https://developer.microsoft.com/graph/graph-explorer
|
|
#
|
|
def mocked_functions(
|
|
test_resources_dir,
|
|
mock_token,
|
|
mock_users,
|
|
mock_groups,
|
|
mock_groups_users,
|
|
return_nested_group=False,
|
|
):
|
|
# mock token response
|
|
mock_token.return_value = "xxxxxxxx"
|
|
|
|
# mock users and groups response
|
|
users, groups, nested_group, nested_group_members = load_test_resources(
|
|
test_resources_dir
|
|
)
|
|
mock_users.return_value = iter(list([users]))
|
|
mock_groups.return_value = (
|
|
iter(list([nested_group])) if return_nested_group else iter(list([groups]))
|
|
)
|
|
|
|
# For simplicity, each user is placed in ALL groups.
|
|
# Create a separate response mock for each group in our sample data.
|
|
# mock_groups_users.return_value = [users]
|
|
def mocked_group_members(azure_ad_group: dict) -> List:
|
|
group_id = azure_ad_group.get("id")
|
|
if group_id == "00000000-0000-0000-0000-000000000000":
|
|
return [users]
|
|
if group_id == "00000000-0000-0000-0000-0000000000001":
|
|
return [users]
|
|
if group_id == "00000000-0000-0000-0000-0000000000002":
|
|
return [users[0:1]]
|
|
if group_id == "99999999-9999-9999-9999-999999999999":
|
|
return [nested_group_members]
|
|
raise ValueError(f"Unexpected Azure AD group ID {group_id}")
|
|
|
|
mock_groups_users.side_effect = mocked_group_members
|
|
|
|
|
|
def overwrite_group_in_mocked_data(
|
|
test_resources_dir,
|
|
mock_token,
|
|
mock_users,
|
|
mock_groups,
|
|
mock_groups_users,
|
|
return_nested_group=False,
|
|
):
|
|
"""
|
|
This function will work similar to mocked_functions except it will overwrite mock_groups to test azure-ad stateful
|
|
ingestion
|
|
"""
|
|
mocked_functions(
|
|
test_resources_dir=test_resources_dir,
|
|
mock_token=mock_token,
|
|
mock_users=mock_users,
|
|
mock_groups=mock_groups,
|
|
mock_groups_users=mock_groups_users,
|
|
return_nested_group=return_nested_group,
|
|
)
|
|
# overwrite groups
|
|
azure_ad_groups_json_file = (
|
|
test_resources_dir / "azure_ad_groups_deleted_groupDisplayName3.json"
|
|
)
|
|
|
|
with azure_ad_groups_json_file.open() as azure_ad_groups_json:
|
|
reference_groups = json.loads(azure_ad_groups_json.read())
|
|
|
|
mock_groups.return_value = iter(list([reference_groups]))
|
|
|
|
|
|
def test_azure_ad_config():
|
|
config = AzureADConfig.parse_obj(
|
|
dict(
|
|
client_id="00000000-0000-0000-0000-000000000000",
|
|
tenant_id="00000000-0000-0000-0000-000000000000",
|
|
client_secret="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
|
|
redirect="https://login.microsoftonline.com/common/oauth2/nativeclient",
|
|
authority="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000",
|
|
token_url="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token",
|
|
graph_url="https://graph.microsoft.com/v1.0",
|
|
ingest_users=True,
|
|
ingest_groups=True,
|
|
ingest_group_membership=True,
|
|
)
|
|
)
|
|
|
|
# Sanity on required configurations
|
|
assert config.client_id == "00000000-0000-0000-0000-000000000000"
|
|
assert config.tenant_id == "00000000-0000-0000-0000-000000000000"
|
|
assert config.client_secret == "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
|
|
assert (
|
|
config.redirect
|
|
== "https://login.microsoftonline.com/common/oauth2/nativeclient"
|
|
)
|
|
assert (
|
|
config.authority
|
|
== "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000"
|
|
)
|
|
assert (
|
|
config.token_url
|
|
== "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token"
|
|
)
|
|
assert config.graph_url == "https://graph.microsoft.com/v1.0"
|
|
|
|
# assert on defaults
|
|
assert config.ingest_users
|
|
assert config.ingest_groups
|
|
assert config.ingest_group_membership
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_azure_ad_source_default_configs(pytestconfig, mock_datahub_graph, tmp_path):
|
|
test_resources_dir: pathlib.Path = (
|
|
pytestconfig.rootpath / "tests/integration/azure_ad"
|
|
)
|
|
|
|
run_ingest(
|
|
pytestconfig=pytestconfig,
|
|
mock_datahub_graph=mock_datahub_graph,
|
|
recipe=default_recipe(tmp_path),
|
|
mocked_functions_reference=mocked_functions,
|
|
)
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "azure_ad_mces_default_config.json",
|
|
golden_path=test_resources_dir / "azure_ad_mces_golden_default_config.json",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_azure_ad_source_empty_group_membership(
|
|
pytestconfig, mock_datahub_graph, tmp_path
|
|
):
|
|
test_resources_dir: pathlib.Path = (
|
|
pytestconfig.rootpath / "tests/integration/azure_ad"
|
|
)
|
|
|
|
output_file_name = "azure_ad_mces_no_groups_mcp.json"
|
|
new_recipe = default_recipe(tmp_path, output_file_name)
|
|
|
|
new_recipe["source"]["config"]["ingest_group_membership"] = False
|
|
|
|
run_ingest(
|
|
pytestconfig=pytestconfig,
|
|
mock_datahub_graph=mock_datahub_graph,
|
|
recipe=new_recipe,
|
|
mocked_functions_reference=mocked_functions,
|
|
)
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=f"{tmp_path}/{output_file_name}",
|
|
golden_path=test_resources_dir / "azure_ad_mces_no_groups_golden_mcp.json",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_azure_ad_source_nested_groups(pytestconfig, mock_datahub_graph, tmp_path):
|
|
test_resources_dir: pathlib.Path = (
|
|
pytestconfig.rootpath / "tests/integration/azure_ad"
|
|
)
|
|
|
|
output_file_name = "azure_ad_mces_nested_groups.json"
|
|
new_recipe = default_recipe(tmp_path, output_file_name)
|
|
|
|
new_recipe["source"]["config"]["ingest_users"] = False
|
|
|
|
include_nested_group_in_mock = partial(mocked_functions, return_nested_group=True)
|
|
|
|
run_ingest(
|
|
pytestconfig=pytestconfig,
|
|
mock_datahub_graph=mock_datahub_graph,
|
|
recipe=new_recipe,
|
|
mocked_functions_reference=include_nested_group_in_mock,
|
|
)
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=f"{tmp_path}/{output_file_name}",
|
|
golden_path=test_resources_dir / "azure_ad_mces_golden_nested_groups.json",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_azure_source_ingestion_disabled(pytestconfig, mock_datahub_graph, tmp_path):
|
|
test_resources_dir: pathlib.Path = (
|
|
pytestconfig.rootpath / "tests/integration/azure_ad"
|
|
)
|
|
|
|
output_file_name = "azure_ad_mces_ingestion_disabled.json"
|
|
new_recipe = default_recipe(tmp_path, output_file_name)
|
|
|
|
new_recipe["source"]["config"]["ingest_group_membership"] = False
|
|
new_recipe["source"]["config"]["ingest_groups"] = False
|
|
new_recipe["source"]["config"]["ingest_users"] = False
|
|
|
|
run_ingest(
|
|
pytestconfig=pytestconfig,
|
|
mock_datahub_graph=mock_datahub_graph,
|
|
recipe=new_recipe,
|
|
mocked_functions_reference=mocked_functions,
|
|
)
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=f"{tmp_path}/{output_file_name}",
|
|
golden_path=test_resources_dir / "azure_ad_mces_golden_ingestion_disabled.json",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_azure_ad_stateful_ingestion(
|
|
pytestconfig, tmp_path, mock_time, mock_datahub_graph
|
|
):
|
|
new_recipe = default_recipe(tmp_path)
|
|
|
|
new_recipe["pipeline_name"] = "azure_ad_execution"
|
|
new_recipe["source"]["config"]["stateful_ingestion"] = {
|
|
"enabled": True,
|
|
"state_provider": {
|
|
"type": "datahub",
|
|
"config": {"datahub_api": {"server": GMS_SERVER}},
|
|
},
|
|
}
|
|
|
|
pipeline1 = run_ingest(
|
|
pytestconfig=pytestconfig,
|
|
mock_datahub_graph=mock_datahub_graph,
|
|
recipe=new_recipe,
|
|
mocked_functions_reference=mocked_functions,
|
|
)
|
|
|
|
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline1)
|
|
assert checkpoint1
|
|
assert checkpoint1.state
|
|
|
|
pipeline2 = run_ingest(
|
|
pytestconfig=pytestconfig,
|
|
mock_datahub_graph=mock_datahub_graph,
|
|
recipe=new_recipe,
|
|
mocked_functions_reference=overwrite_group_in_mocked_data,
|
|
)
|
|
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline2)
|
|
assert checkpoint2
|
|
assert checkpoint2.state
|
|
|
|
# Validate that all providers have committed successfully.
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline1, expected_providers=1
|
|
)
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline2, expected_providers=1
|
|
)
|
|
|
|
# Perform all assertions on the states. The deleted Dashboard should not be
|
|
# part of the second state
|
|
state1 = checkpoint1.state
|
|
state2 = checkpoint2.state
|
|
|
|
difference_dashboard_urns = list(
|
|
state1.get_urns_not_in(type="corpGroup", other_checkpoint_state=state2)
|
|
)
|
|
|
|
assert len(difference_dashboard_urns) == 1
|
|
assert difference_dashboard_urns == ["urn:li:corpGroup:groupDisplayName3"]
|