feat(ingestion): azure-ad stateful ingestion (#7701)

This commit is contained in:
mohdsiddique 2023-03-29 21:50:31 +05:30 committed by GitHub
parent f348113b38
commit c0f7ba2f85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 695 additions and 253 deletions

View File

@ -4,14 +4,14 @@ import re
import urllib
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Dict, Generator, Iterable, List
from typing import Any, Dict, Generator, Iterable, List, Optional
import click
import requests
from pydantic.fields import Field
from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.emitter.mce_builder import make_group_urn, make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
@ -21,8 +21,20 @@ from datahub.ingestion.api.decorators import ( # SourceCapability,; capability,
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
CorpGroupSnapshot,
CorpUserSnapshot,
@ -36,11 +48,16 @@ from datahub.metadata.schema_classes import (
OriginTypeClass,
StatusClass,
)
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
auto_workunit_reporter,
)
logger = logging.getLogger(__name__)
class AzureADConfig(ConfigModel):
class AzureADConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
"""Config to create a token and connect to Azure AD instance"""
# Required
@ -133,9 +150,14 @@ class AzureADConfig(ConfigModel):
description="Whether workunit ID's for users should be masked to avoid leaking sensitive information.",
)
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="PowerBI Stateful Ingestion Config."
)
@dataclass
class AzureADSourceReport(SourceReport):
class AzureADSourceReport(StaleEntityRemovalSourceReport):
filtered: List[str] = field(default_factory=list)
filtered_tracking: bool = field(default=True, repr=False)
filtered_count: int = field(default=0)
@ -152,7 +174,7 @@ class AzureADSourceReport(SourceReport):
@platform_name("Azure AD")
@config_class(AzureADConfig)
@support_status(SupportStatus.CERTIFIED)
class AzureADSource(Source):
class AzureADSource(StatefulIngestionSourceBase):
"""
This plugin extracts the following:
@ -227,13 +249,21 @@ class AzureADSource(Source):
"""
config: AzureADConfig
report: AzureADSourceReport
token_data: dict
token: str
selected_azure_ad_groups: list
azure_ad_groups_users: list
stale_entity_removal_handler: StaleEntityRemovalHandler
@classmethod
def create(cls, config_dict, ctx):
config = AzureADConfig.parse_obj(config_dict)
return cls(config, ctx)
def __init__(self, config: AzureADConfig, ctx: PipelineContext):
super().__init__(ctx)
super(AzureADSource, self).__init__(config, ctx)
self.config = config
self.report = AzureADSourceReport(
filtered_tracking=self.config.filtered_tracking
@ -249,6 +279,14 @@ class AzureADSource(Source):
self.token = self.get_token()
self.selected_azure_ad_groups: list = []
self.azure_ad_groups_users: list = []
# Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=BaseSQLAlchemyCheckpointState,
pipeline_name=ctx.pipeline_name,
run_id=ctx.run_id,
)
def get_token(self):
token_response = requests.post(self.config.token_url, data=self.token_data)
@ -265,7 +303,7 @@ class AzureADSource(Source):
click.echo("Error: Token response invalid")
exit()
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# for future developers: The actual logic of this ingestion wants to be executed, in order:
# 1) the groups
# 2) the groups' memberships
@ -363,6 +401,14 @@ class AzureADSource(Source):
datahub_corp_user_urn_to_group_membership,
)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_workunit_reporter(
self.report, auto_status_aspect(self.get_workunits_internal())
),
)
def _add_group_members_to_group_membership(
self,
parent_corp_group_urn: str,
@ -486,6 +532,7 @@ class AzureADSource(Source):
f"Response status code: {str(response.status_code)}. "
f"Response content: {str(response.content)}"
)
logger.debug(f"URL = {url}")
logger.error(error_str)
self.report.report_failure("_get_azure_ad_data_", error_str)
continue

View File

@ -0,0 +1,70 @@
[
{
"id": "00000000-0000-0000-0000-000000000000",
"deletedDateTime": null,
"classification": null,
"createdDateTime": "2021-08-20 11: 00: 00",
"creationOptions": [],
"description": null,
"displayName": "groupDisplayName1",
"expirationDateTime": null,
"groupTypes": [],
"isAssignableToRole": null,
"mail": null,
"mailEnabled": false,
"mailNickname": "groupDisplayName1",
"membershipRule": null,
"membershipRuleProcessingState": null,
"onPremisesDomainName": null,
"onPremisesLastSyncDateTime": null,
"onPremisesNetBiosName": null,
"onPremisesSamAccountName": null,
"onPremisesSecurityIdentifier": null,
"onPremisesSyncEnabled": null,
"preferredDataLocation": null,
"preferredLanguage": null,
"proxyAddresses": [],
"renewedDateTime": "2021-08-20 11:00:00",
"resourceBehaviorOptions": [],
"resourceProvisioningOptions": [],
"securityEnabled": true,
"securityIdentifier": "xxxxx",
"theme": null,
"visibility": null,
"onPremisesProvisioningErrors": []
},
{
"id": "00000000-0000-0000-0000-0000000000001",
"deletedDateTime": null,
"classification": null,
"createdDateTime": "2021-08-20 11: 00: 00",
"creationOptions": [],
"description": "This is an interesting description",
"displayName": "groupDisplayName2",
"expirationDateTime": null,
"groupTypes": [],
"isAssignableToRole": null,
"mail": "groupDisplayName2@onmicrosoft.com",
"mailEnabled": false,
"mailNickname": "groupDisplayName2",
"membershipRule": null,
"membershipRuleProcessingState": null,
"onPremisesDomainName": null,
"onPremisesLastSyncDateTime": null,
"onPremisesNetBiosName": null,
"onPremisesSamAccountName": null,
"onPremisesSecurityIdentifier": null,
"onPremisesSyncEnabled": null,
"preferredDataLocation": null,
"preferredLanguage": null,
"proxyAddresses": [],
"renewedDateTime": "2021-08-20 11:00:00",
"resourceBehaviorOptions": [],
"resourceProvisioningOptions": [],
"securityEnabled": true,
"securityIdentifier": "xxxxx",
"theme": null,
"visibility": null,
"onPremisesProvisioningErrors": []
}
]

View File

@ -0,0 +1,282 @@
[
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": {
"urn": "urn:li:corpGroup:groupDisplayName1",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpGroupInfo": {
"displayName": "groupDisplayName1",
"admins": [],
"members": [],
"groups": []
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:groupDisplayName1",
"changeType": "UPSERT",
"aspectName": "origin",
"aspect": {
"json": {
"type": "EXTERNAL",
"externalType": "AZURE_AD"
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:groupDisplayName1",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": {
"urn": "urn:li:corpGroup:groupDisplayName2",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpGroupInfo": {
"displayName": "groupDisplayName2",
"email": "groupDisplayName2@onmicrosoft.com",
"admins": [],
"members": [],
"groups": [],
"description": "This is an interesting description"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:groupDisplayName2",
"changeType": "UPSERT",
"aspectName": "origin",
"aspect": {
"json": {
"type": "EXTERNAL",
"externalType": "AZURE_AD"
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:groupDisplayName2",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": {
"urn": "urn:li:corpGroup:groupDisplayName3",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpGroupInfo": {
"displayName": "groupDisplayName3",
"email": "groupDisplayName3@onmicrosoft.com",
"admins": [],
"members": [],
"groups": [],
"description": "This is an interesting description"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:groupDisplayName3",
"changeType": "UPSERT",
"aspectName": "origin",
"aspect": {
"json": {
"type": "EXTERNAL",
"externalType": "AZURE_AD"
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:groupDisplayName3",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:johngreen@acryl.io",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"customProperties": {},
"active": true,
"displayName": "John Green",
"email": "johngreen@acryl.io",
"firstName": "John",
"lastName": "Green",
"fullName": "John Green"
}
},
{
"com.linkedin.pegasus2avro.identity.GroupMembership": {
"groups": []
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:johngreen@acryl.io",
"changeType": "UPSERT",
"aspectName": "origin",
"aspect": {
"json": {
"type": "EXTERNAL",
"externalType": "AZURE_AD"
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:johngreen@acryl.io",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:adamhall@acryl.io",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"customProperties": {},
"active": true,
"displayName": "Adam Hall",
"email": "adamhall@acryl.io",
"firstName": "Adam",
"lastName": "Hall",
"fullName": "Adam Hall"
}
},
{
"com.linkedin.pegasus2avro.identity.GroupMembership": {
"groups": []
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:adamhall@acryl.io",
"changeType": "UPSERT",
"aspectName": "origin",
"aspect": {
"json": {
"type": "EXTERNAL",
"externalType": "AZURE_AD"
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:adamhall@acryl.io",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-azure-ad"
}
}
]

View File

@ -1,59 +1,58 @@
import json
import pathlib
from typing import List
from functools import partial
from typing import List, Optional, cast
from unittest.mock import patch
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.identity.azure_ad import AzureADConfig
from datahub.ingestion.source.identity.azure_ad import AzureADConfig, AzureADSource
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
validate_all_providers_have_committed_successfully,
)
FROZEN_TIME = "2021-08-24 09:00:00"
GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"
def test_azure_ad_config():
config = AzureADConfig.parse_obj(
dict(
client_id="00000000-0000-0000-0000-000000000000",
tenant_id="00000000-0000-0000-0000-000000000000",
client_secret="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
redirect="https://login.microsoftonline.com/common/oauth2/nativeclient",
authority="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000",
token_url="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token",
graph_url="https://graph.microsoft.com/v1.0",
ingest_users=True,
ingest_groups=True,
ingest_group_membership=True,
)
)
# Sanity on required configurations
assert config.client_id == "00000000-0000-0000-0000-000000000000"
assert config.tenant_id == "00000000-0000-0000-0000-000000000000"
assert config.client_secret == "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
assert (
config.redirect
== "https://login.microsoftonline.com/common/oauth2/nativeclient"
)
assert (
config.authority
== "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000"
)
assert (
config.token_url
== "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token"
)
assert config.graph_url == "https://graph.microsoft.com/v1.0"
# assert on defaults
assert config.ingest_users
assert config.ingest_groups
assert config.ingest_group_membership
def default_recipe(tmp_path, output_file_name="azure_ad_mces_default_config.json"):
return {
"run_id": "test-azure-ad",
"source": {
"type": "azure-ad",
"config": {
"client_id": "00000000-0000-0000-0000-000000000000",
"tenant_id": "00000000-0000-0000-0000-000000000000",
"client_secret": "client_secret",
"redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient",
"authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000",
"token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token",
"graph_url": "https://graph.microsoft.com/v1.0",
"ingest_group_membership": True,
"ingest_groups": True,
"ingest_users": True,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{output_file_name}",
},
},
}
@freeze_time(FROZEN_TIME)
def test_azure_ad_source_default_configs(pytestconfig, tmp_path):
def run_ingest(
pytestconfig,
mock_datahub_graph,
mocked_functions_reference,
recipe,
):
test_resources_dir: pathlib.Path = (
pytestconfig.rootpath / "tests/integration/azure_ad"
)
@ -66,216 +65,22 @@ def test_azure_ad_source_default_configs(pytestconfig, tmp_path):
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_groups"
) as mock_groups, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_group_members"
) as mock_group_users:
mocked_functions(
test_resources_dir, mock_token, mock_users, mock_groups, mock_group_users
)
# Run an azure usage ingestion run.
pipeline = Pipeline.create(
{
"run_id": "test-azure-ad",
"source": {
"type": "azure-ad",
"config": {
"client_id": "00000000-0000-0000-0000-000000000000",
"tenant_id": "00000000-0000-0000-0000-000000000000",
"client_secret": "client_secret",
"redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient",
"authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000",
"token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token",
"graph_url": "https://graph.microsoft.com/v1.0",
"ingest_group_membership": True,
"ingest_groups": True,
"ingest_users": True,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/azure_ad_mces_default_config.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
) as mock_group_users, patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "azure_ad_mces_default_config.json",
golden_path=test_resources_dir / "azure_ad_mces_golden_default_config.json",
)
mock_checkpoint.return_value = mock_datahub_graph
@freeze_time(FROZEN_TIME)
def test_azure_ad_source_empty_group_membership(pytestconfig, tmp_path):
test_resources_dir: pathlib.Path = (
pytestconfig.rootpath / "tests/integration/azure_ad"
)
with patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource.get_token"
) as mock_token, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_users"
) as mock_users, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_groups"
) as mock_groups, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_group_members"
) as mock_group_users:
mocked_functions(
test_resources_dir, mock_token, mock_users, mock_groups, mock_group_users
)
# Run an azure usage ingestion run.
pipeline = Pipeline.create(
{
"run_id": "test-azure-ad",
"source": {
"type": "azure-ad",
"config": {
"client_id": "00000000-0000-0000-0000-000000000002",
"tenant_id": "00000000-0000-0000-0000-000000000002",
"client_secret": "client_secret",
"redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient",
"authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000",
"token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token",
"graph_url": "https://graph.microsoft.com/v1.0",
"ingest_group_membership": True,
"ingest_groups": True,
"ingest_users": True,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/azure_ad_mces_default_config.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "azure_ad_mces_default_config.json",
golden_path=test_resources_dir / "azure_ad_mces_golden_default_config.json",
)
@freeze_time(FROZEN_TIME)
def test_azure_ad_source_nested_groups(pytestconfig, tmp_path):
test_resources_dir: pathlib.Path = (
pytestconfig.rootpath / "tests/integration/azure_ad"
)
with patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource.get_token"
) as mock_token, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_users"
) as mock_users, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_groups"
) as mock_groups, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_group_members"
) as mock_group_users:
mocked_functions(
test_resources_dir,
mock_token,
mock_users,
mock_groups,
mock_group_users,
True,
)
# Run an azure usage ingestion run.
pipeline = Pipeline.create(
{
"run_id": "test-azure-ad",
"source": {
"type": "azure-ad",
"config": {
"client_id": "00000000-0000-0000-0000-000000000000",
"tenant_id": "00000000-0000-0000-0000-000000000000",
"client_secret": "client_secret",
"redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient",
"authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000",
"token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token",
"graph_url": "https://graph.microsoft.com/v1.0",
"ingest_group_membership": True,
"ingest_groups": True,
"ingest_users": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/azure_ad_mces_nested_groups.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "azure_ad_mces_nested_groups.json",
golden_path=test_resources_dir / "azure_ad_mces_golden_nested_groups.json",
)
@freeze_time(FROZEN_TIME)
def test_azure_source_ingestion_disabled(pytestconfig, tmp_path):
test_resources_dir: pathlib.Path = (
pytestconfig.rootpath / "tests/integration/azure_ad"
)
with patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource.get_token"
) as mock_token, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_users"
) as mock_users, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_groups"
) as mock_groups, patch(
"datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_group_members"
) as mock_group_users:
mocked_functions(
mocked_functions_reference(
test_resources_dir, mock_token, mock_users, mock_groups, mock_group_users
)
# Run an Azure usage ingestion run.
pipeline = Pipeline.create(
{
"run_id": "test-azure-ad",
"source": {
"type": "azure-ad",
"config": {
"client_id": "00000000-0000-0000-0000-000000000000",
"tenant_id": "00000000-0000-0000-0000-000000000000",
"client_secret": "client_secret",
"redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient",
"authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000",
"token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token",
"graph_url": "https://graph.microsoft.com/v1.0",
"ingest_group_membership": "False",
"ingest_groups": "False",
"ingest_users": "False",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/azure_ad_mces_ingestion_disabled.json",
},
},
}
)
# Run an azure usage ingestion run.
pipeline = Pipeline.create(recipe)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "azure_ad_mces_ingestion_disabled.json",
golden_path=test_resources_dir / "azure_ad_mces_golden_ingestion_disabled.json",
)
return pipeline
def load_test_resources(test_resources_dir):
@ -349,3 +154,241 @@ def mocked_functions(
raise ValueError(f"Unexpected Azure AD group ID {group_id}")
mock_groups_users.side_effect = mocked_group_members
def overwrite_group_in_mocked_data(
test_resources_dir,
mock_token,
mock_users,
mock_groups,
mock_groups_users,
return_nested_group=False,
):
"""
This function will work similar to mocked_functions except it will overwrite mock_groups to test azure-ad stateful
ingestion
"""
mocked_functions(
test_resources_dir=test_resources_dir,
mock_token=mock_token,
mock_users=mock_users,
mock_groups=mock_groups,
mock_groups_users=mock_groups_users,
return_nested_group=return_nested_group,
)
# overwrite groups
azure_ad_groups_json_file = (
test_resources_dir / "azure_ad_groups_deleted_groupDisplayName3.json"
)
with azure_ad_groups_json_file.open() as azure_ad_groups_json:
reference_groups = json.loads(azure_ad_groups_json.read())
mock_groups.return_value = iter(list([reference_groups]))
def test_azure_ad_config():
config = AzureADConfig.parse_obj(
dict(
client_id="00000000-0000-0000-0000-000000000000",
tenant_id="00000000-0000-0000-0000-000000000000",
client_secret="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
redirect="https://login.microsoftonline.com/common/oauth2/nativeclient",
authority="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000",
token_url="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token",
graph_url="https://graph.microsoft.com/v1.0",
ingest_users=True,
ingest_groups=True,
ingest_group_membership=True,
)
)
# Sanity on required configurations
assert config.client_id == "00000000-0000-0000-0000-000000000000"
assert config.tenant_id == "00000000-0000-0000-0000-000000000000"
assert config.client_secret == "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
assert (
config.redirect
== "https://login.microsoftonline.com/common/oauth2/nativeclient"
)
assert (
config.authority
== "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000"
)
assert (
config.token_url
== "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token"
)
assert config.graph_url == "https://graph.microsoft.com/v1.0"
# assert on defaults
assert config.ingest_users
assert config.ingest_groups
assert config.ingest_group_membership
@freeze_time(FROZEN_TIME)
def test_azure_ad_source_default_configs(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = (
pytestconfig.rootpath / "tests/integration/azure_ad"
)
run_ingest(
pytestconfig=pytestconfig,
mock_datahub_graph=mock_datahub_graph,
recipe=default_recipe(tmp_path),
mocked_functions_reference=mocked_functions,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "azure_ad_mces_default_config.json",
golden_path=test_resources_dir / "azure_ad_mces_golden_default_config.json",
)
@freeze_time(FROZEN_TIME)
def test_azure_ad_source_empty_group_membership(
pytestconfig, mock_datahub_graph, tmp_path
):
test_resources_dir: pathlib.Path = (
pytestconfig.rootpath / "tests/integration/azure_ad"
)
output_file_name = "azure_ad_mces_no_groups_mcp.json"
new_recipe = default_recipe(tmp_path, output_file_name)
new_recipe["source"]["config"]["ingest_group_membership"] = False
run_ingest(
pytestconfig=pytestconfig,
mock_datahub_graph=mock_datahub_graph,
recipe=new_recipe,
mocked_functions_reference=mocked_functions,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{output_file_name}",
golden_path=test_resources_dir / "azure_ad_mces_no_groups_golden_mcp.json",
)
@freeze_time(FROZEN_TIME)
def test_azure_ad_source_nested_groups(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = (
pytestconfig.rootpath / "tests/integration/azure_ad"
)
output_file_name = "azure_ad_mces_nested_groups.json"
new_recipe = default_recipe(tmp_path, output_file_name)
new_recipe["source"]["config"]["ingest_users"] = False
include_nested_group_in_mock = partial(mocked_functions, return_nested_group=True)
run_ingest(
pytestconfig=pytestconfig,
mock_datahub_graph=mock_datahub_graph,
recipe=new_recipe,
mocked_functions_reference=include_nested_group_in_mock,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{output_file_name}",
golden_path=test_resources_dir / "azure_ad_mces_golden_nested_groups.json",
)
@freeze_time(FROZEN_TIME)
def test_azure_source_ingestion_disabled(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = (
pytestconfig.rootpath / "tests/integration/azure_ad"
)
output_file_name = "azure_ad_mces_ingestion_disabled.json"
new_recipe = default_recipe(tmp_path, output_file_name)
new_recipe["source"]["config"]["ingest_group_membership"] = False
new_recipe["source"]["config"]["ingest_groups"] = False
new_recipe["source"]["config"]["ingest_users"] = False
run_ingest(
pytestconfig=pytestconfig,
mock_datahub_graph=mock_datahub_graph,
recipe=new_recipe,
mocked_functions_reference=mocked_functions,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{output_file_name}",
golden_path=test_resources_dir / "azure_ad_mces_golden_ingestion_disabled.json",
)
def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint[GenericCheckpointState]]:
azure_ad_source = cast(AzureADSource, pipeline.source)
return azure_ad_source.get_current_checkpoint(
azure_ad_source.stale_entity_removal_handler.job_id
)
@freeze_time(FROZEN_TIME)
def test_azure_ad_stateful_ingestion(
pytestconfig, tmp_path, mock_time, mock_datahub_graph
):
new_recipe = default_recipe(tmp_path)
new_recipe["pipeline_name"] = "azure_ad_execution"
new_recipe["source"]["config"]["stateful_ingestion"] = {
"enabled": True,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
},
}
pipeline1 = run_ingest(
pytestconfig=pytestconfig,
mock_datahub_graph=mock_datahub_graph,
recipe=new_recipe,
mocked_functions_reference=mocked_functions,
)
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline1)
assert checkpoint1
assert checkpoint1.state
pipeline2 = run_ingest(
pytestconfig=pytestconfig,
mock_datahub_graph=mock_datahub_graph,
recipe=new_recipe,
mocked_functions_reference=overwrite_group_in_mocked_data,
)
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline2)
assert checkpoint2
assert checkpoint2.state
# Validate that all providers have committed successfully.
validate_all_providers_have_committed_successfully(
pipeline=pipeline1, expected_providers=1
)
validate_all_providers_have_committed_successfully(
pipeline=pipeline2, expected_providers=1
)
# Perform all assertions on the states. The deleted Dashboard should not be
# part of the second state
state1 = checkpoint1.state
state2 = checkpoint2.state
difference_dashboard_urns = list(
state1.get_urns_not_in(type="corpGroup", other_checkpoint_state=state2)
)
assert len(difference_dashboard_urns) == 1
assert difference_dashboard_urns == ["urn:li:corpGroup:groupDisplayName3"]