From c0f7ba2f8551cc8f12957f3daaa47dcad2a92996 Mon Sep 17 00:00:00 2001 From: mohdsiddique Date: Wed, 29 Mar 2023 21:50:31 +0530 Subject: [PATCH] feat(ingestion): azure-ad stateful ingestion (#7701) --- .../ingestion/source/identity/azure_ad.py | 63 ++- ...e_ad_groups_deleted_groupDisplayName3.json | 70 +++ .../azure_ad_mces_no_groups_golden_mcp.json | 282 +++++++++ .../integration/azure_ad/test_azure_ad.py | 533 ++++++++++-------- 4 files changed, 695 insertions(+), 253 deletions(-) create mode 100644 metadata-ingestion/tests/integration/azure_ad/azure_ad_groups_deleted_groupDisplayName3.json create mode 100644 metadata-ingestion/tests/integration/azure_ad/azure_ad_mces_no_groups_golden_mcp.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py index 5675cd195d..652295c383 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py +++ b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py @@ -4,14 +4,14 @@ import re import urllib from collections import defaultdict from dataclasses import dataclass, field -from typing import Any, Dict, Generator, Iterable, List +from typing import Any, Dict, Generator, Iterable, List, Optional import click import requests from pydantic.fields import Field -from datahub.configuration import ConfigModel from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.source_common import DatasetSourceConfigMixin from datahub.emitter.mce_builder import make_group_urn, make_user_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext @@ -21,8 +21,20 @@ from datahub.ingestion.api.decorators import ( # SourceCapability,; capability, platform_name, support_status, ) -from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.state.sql_common_state import ( + BaseSQLAlchemyCheckpointState, +) +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalHandler, + StaleEntityRemovalSourceReport, + StatefulStaleMetadataRemovalConfig, +) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfigBase, + StatefulIngestionSourceBase, +) from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( CorpGroupSnapshot, CorpUserSnapshot, @@ -36,11 +48,16 @@ from datahub.metadata.schema_classes import ( OriginTypeClass, StatusClass, ) +from datahub.utilities.source_helpers import ( + auto_stale_entity_removal, + auto_status_aspect, + auto_workunit_reporter, +) logger = logging.getLogger(__name__) -class AzureADConfig(ConfigModel): +class AzureADConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): """Config to create a token and connect to Azure AD instance""" # Required @@ -133,9 +150,14 @@ class AzureADConfig(ConfigModel): description="Whether workunit ID's for users should be masked to avoid leaking sensitive information.", ) + # Configuration for stateful ingestion + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( + default=None, description="PowerBI Stateful Ingestion Config." + ) + @dataclass -class AzureADSourceReport(SourceReport): +class AzureADSourceReport(StaleEntityRemovalSourceReport): filtered: List[str] = field(default_factory=list) filtered_tracking: bool = field(default=True, repr=False) filtered_count: int = field(default=0) @@ -152,7 +174,7 @@ class AzureADSourceReport(SourceReport): @platform_name("Azure AD") @config_class(AzureADConfig) @support_status(SupportStatus.CERTIFIED) -class AzureADSource(Source): +class AzureADSource(StatefulIngestionSourceBase): """ This plugin extracts the following: @@ -227,13 +249,21 @@ class AzureADSource(Source): """ + config: AzureADConfig + report: AzureADSourceReport + token_data: dict + token: str + selected_azure_ad_groups: list + azure_ad_groups_users: list + stale_entity_removal_handler: StaleEntityRemovalHandler + @classmethod def create(cls, config_dict, ctx): config = AzureADConfig.parse_obj(config_dict) return cls(config, ctx) def __init__(self, config: AzureADConfig, ctx: PipelineContext): - super().__init__(ctx) + super(AzureADSource, self).__init__(config, ctx) self.config = config self.report = AzureADSourceReport( filtered_tracking=self.config.filtered_tracking @@ -249,6 +279,14 @@ class AzureADSource(Source): self.token = self.get_token() self.selected_azure_ad_groups: list = [] self.azure_ad_groups_users: list = [] + # Create and register the stateful ingestion use-case handler. + self.stale_entity_removal_handler = StaleEntityRemovalHandler( + source=self, + config=self.config, + state_type_class=BaseSQLAlchemyCheckpointState, + pipeline_name=ctx.pipeline_name, + run_id=ctx.run_id, + ) def get_token(self): token_response = requests.post(self.config.token_url, data=self.token_data) @@ -265,7 +303,7 @@ class AzureADSource(Source): click.echo("Error: Token response invalid") exit() - def get_workunits(self) -> Iterable[MetadataWorkUnit]: + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: # for future developers: The actual logic of this ingestion wants to be executed, in order: # 1) the groups # 2) the groups' memberships @@ -363,6 +401,14 @@ class AzureADSource(Source): datahub_corp_user_urn_to_group_membership, ) + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + return auto_stale_entity_removal( + self.stale_entity_removal_handler, + auto_workunit_reporter( + self.report, auto_status_aspect(self.get_workunits_internal()) + ), + ) + def _add_group_members_to_group_membership( self, parent_corp_group_urn: str, @@ -486,6 +532,7 @@ class AzureADSource(Source): f"Response status code: {str(response.status_code)}. " f"Response content: {str(response.content)}" ) + logger.debug(f"URL = {url}") logger.error(error_str) self.report.report_failure("_get_azure_ad_data_", error_str) continue diff --git a/metadata-ingestion/tests/integration/azure_ad/azure_ad_groups_deleted_groupDisplayName3.json b/metadata-ingestion/tests/integration/azure_ad/azure_ad_groups_deleted_groupDisplayName3.json new file mode 100644 index 0000000000..e4b9ecfa75 --- /dev/null +++ b/metadata-ingestion/tests/integration/azure_ad/azure_ad_groups_deleted_groupDisplayName3.json @@ -0,0 +1,70 @@ +[ + { + "id": "00000000-0000-0000-0000-000000000000", + "deletedDateTime": null, + "classification": null, + "createdDateTime": "2021-08-20 11: 00: 00", + "creationOptions": [], + "description": null, + "displayName": "groupDisplayName1", + "expirationDateTime": null, + "groupTypes": [], + "isAssignableToRole": null, + "mail": null, + "mailEnabled": false, + "mailNickname": "groupDisplayName1", + "membershipRule": null, + "membershipRuleProcessingState": null, + "onPremisesDomainName": null, + "onPremisesLastSyncDateTime": null, + "onPremisesNetBiosName": null, + "onPremisesSamAccountName": null, + "onPremisesSecurityIdentifier": null, + "onPremisesSyncEnabled": null, + "preferredDataLocation": null, + "preferredLanguage": null, + "proxyAddresses": [], + "renewedDateTime": "2021-08-20 11:00:00", + "resourceBehaviorOptions": [], + "resourceProvisioningOptions": [], + "securityEnabled": true, + "securityIdentifier": "xxxxx", + "theme": null, + "visibility": null, + "onPremisesProvisioningErrors": [] + }, + { + "id": "00000000-0000-0000-0000-0000000000001", + "deletedDateTime": null, + "classification": null, + "createdDateTime": "2021-08-20 11: 00: 00", + "creationOptions": [], + "description": "This is an interesting description", + "displayName": "groupDisplayName2", + "expirationDateTime": null, + "groupTypes": [], + "isAssignableToRole": null, + "mail": "groupDisplayName2@onmicrosoft.com", + "mailEnabled": false, + "mailNickname": "groupDisplayName2", + "membershipRule": null, + "membershipRuleProcessingState": null, + "onPremisesDomainName": null, + "onPremisesLastSyncDateTime": null, + "onPremisesNetBiosName": null, + "onPremisesSamAccountName": null, + "onPremisesSecurityIdentifier": null, + "onPremisesSyncEnabled": null, + "preferredDataLocation": null, + "preferredLanguage": null, + "proxyAddresses": [], + "renewedDateTime": "2021-08-20 11:00:00", + "resourceBehaviorOptions": [], + "resourceProvisioningOptions": [], + "securityEnabled": true, + "securityIdentifier": "xxxxx", + "theme": null, + "visibility": null, + "onPremisesProvisioningErrors": [] + } +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/azure_ad/azure_ad_mces_no_groups_golden_mcp.json b/metadata-ingestion/tests/integration/azure_ad/azure_ad_mces_no_groups_golden_mcp.json new file mode 100644 index 0000000000..b3463e2587 --- /dev/null +++ b/metadata-ingestion/tests/integration/azure_ad/azure_ad_mces_no_groups_golden_mcp.json @@ -0,0 +1,282 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": { + "urn": "urn:li:corpGroup:groupDisplayName1", + "aspects": [ + { + "com.linkedin.pegasus2avro.identity.CorpGroupInfo": { + "displayName": "groupDisplayName1", + "admins": [], + "members": [], + "groups": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:groupDisplayName1", + "changeType": "UPSERT", + "aspectName": "origin", + "aspect": { + "json": { + "type": "EXTERNAL", + "externalType": "AZURE_AD" + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:groupDisplayName1", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": { + "urn": "urn:li:corpGroup:groupDisplayName2", + "aspects": [ + { + "com.linkedin.pegasus2avro.identity.CorpGroupInfo": { + "displayName": "groupDisplayName2", + "email": "groupDisplayName2@onmicrosoft.com", + "admins": [], + "members": [], + "groups": [], + "description": "This is an interesting description" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:groupDisplayName2", + "changeType": "UPSERT", + "aspectName": "origin", + "aspect": { + "json": { + "type": "EXTERNAL", + "externalType": "AZURE_AD" + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:groupDisplayName2", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": { + "urn": "urn:li:corpGroup:groupDisplayName3", + "aspects": [ + { + "com.linkedin.pegasus2avro.identity.CorpGroupInfo": { + "displayName": "groupDisplayName3", + "email": "groupDisplayName3@onmicrosoft.com", + "admins": [], + "members": [], + "groups": [], + "description": "This is an interesting description" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:groupDisplayName3", + "changeType": "UPSERT", + "aspectName": "origin", + "aspect": { + "json": { + "type": "EXTERNAL", + "externalType": "AZURE_AD" + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:groupDisplayName3", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": { + "urn": "urn:li:corpuser:johngreen@acryl.io", + "aspects": [ + { + "com.linkedin.pegasus2avro.identity.CorpUserInfo": { + "customProperties": {}, + "active": true, + "displayName": "John Green", + "email": "johngreen@acryl.io", + "firstName": "John", + "lastName": "Green", + "fullName": "John Green" + } + }, + { + "com.linkedin.pegasus2avro.identity.GroupMembership": { + "groups": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:johngreen@acryl.io", + "changeType": "UPSERT", + "aspectName": "origin", + "aspect": { + "json": { + "type": "EXTERNAL", + "externalType": "AZURE_AD" + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:johngreen@acryl.io", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": { + "urn": "urn:li:corpuser:adamhall@acryl.io", + "aspects": [ + { + "com.linkedin.pegasus2avro.identity.CorpUserInfo": { + "customProperties": {}, + "active": true, + "displayName": "Adam Hall", + "email": "adamhall@acryl.io", + "firstName": "Adam", + "lastName": "Hall", + "fullName": "Adam Hall" + } + }, + { + "com.linkedin.pegasus2avro.identity.GroupMembership": { + "groups": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:adamhall@acryl.io", + "changeType": "UPSERT", + "aspectName": "origin", + "aspect": { + "json": { + "type": "EXTERNAL", + "externalType": "AZURE_AD" + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:adamhall@acryl.io", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1629795600000, + "runId": "test-azure-ad" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/azure_ad/test_azure_ad.py b/metadata-ingestion/tests/integration/azure_ad/test_azure_ad.py index 43133f6bd3..42b9da3b98 100644 --- a/metadata-ingestion/tests/integration/azure_ad/test_azure_ad.py +++ b/metadata-ingestion/tests/integration/azure_ad/test_azure_ad.py @@ -1,59 +1,58 @@ import json import pathlib -from typing import List +from functools import partial +from typing import List, Optional, cast from unittest.mock import patch from freezegun import freeze_time from datahub.ingestion.run.pipeline import Pipeline -from datahub.ingestion.source.identity.azure_ad import AzureADConfig +from datahub.ingestion.source.identity.azure_ad import AzureADConfig, AzureADSource +from datahub.ingestion.source.state.checkpoint import Checkpoint +from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from tests.test_helpers import mce_helpers +from tests.test_helpers.state_helpers import ( + validate_all_providers_have_committed_successfully, +) FROZEN_TIME = "2021-08-24 09:00:00" +GMS_PORT = 8080 +GMS_SERVER = f"http://localhost:{GMS_PORT}" -def test_azure_ad_config(): - config = AzureADConfig.parse_obj( - dict( - client_id="00000000-0000-0000-0000-000000000000", - tenant_id="00000000-0000-0000-0000-000000000000", - client_secret="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", - redirect="https://login.microsoftonline.com/common/oauth2/nativeclient", - authority="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000", - token_url="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token", - graph_url="https://graph.microsoft.com/v1.0", - ingest_users=True, - ingest_groups=True, - ingest_group_membership=True, - ) - ) - - # Sanity on required configurations - assert config.client_id == "00000000-0000-0000-0000-000000000000" - assert config.tenant_id == "00000000-0000-0000-0000-000000000000" - assert config.client_secret == "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" - assert ( - config.redirect - == "https://login.microsoftonline.com/common/oauth2/nativeclient" - ) - assert ( - config.authority - == "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000" - ) - assert ( - config.token_url - == "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token" - ) - assert config.graph_url == "https://graph.microsoft.com/v1.0" - - # assert on defaults - assert config.ingest_users - assert config.ingest_groups - assert config.ingest_group_membership +def default_recipe(tmp_path, output_file_name="azure_ad_mces_default_config.json"): + return { + "run_id": "test-azure-ad", + "source": { + "type": "azure-ad", + "config": { + "client_id": "00000000-0000-0000-0000-000000000000", + "tenant_id": "00000000-0000-0000-0000-000000000000", + "client_secret": "client_secret", + "redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient", + "authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000", + "token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token", + "graph_url": "https://graph.microsoft.com/v1.0", + "ingest_group_membership": True, + "ingest_groups": True, + "ingest_users": True, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/{output_file_name}", + }, + }, + } -@freeze_time(FROZEN_TIME) -def test_azure_ad_source_default_configs(pytestconfig, tmp_path): +def run_ingest( + pytestconfig, + mock_datahub_graph, + mocked_functions_reference, + recipe, +): test_resources_dir: pathlib.Path = ( pytestconfig.rootpath / "tests/integration/azure_ad" ) @@ -66,216 +65,22 @@ def test_azure_ad_source_default_configs(pytestconfig, tmp_path): "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_groups" ) as mock_groups, patch( "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_group_members" - ) as mock_group_users: - mocked_functions( - test_resources_dir, mock_token, mock_users, mock_groups, mock_group_users - ) - # Run an azure usage ingestion run. - pipeline = Pipeline.create( - { - "run_id": "test-azure-ad", - "source": { - "type": "azure-ad", - "config": { - "client_id": "00000000-0000-0000-0000-000000000000", - "tenant_id": "00000000-0000-0000-0000-000000000000", - "client_secret": "client_secret", - "redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient", - "authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000", - "token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token", - "graph_url": "https://graph.microsoft.com/v1.0", - "ingest_group_membership": True, - "ingest_groups": True, - "ingest_users": True, - }, - }, - "sink": { - "type": "file", - "config": { - "filename": f"{tmp_path}/azure_ad_mces_default_config.json", - }, - }, - } - ) - pipeline.run() - pipeline.raise_from_status() + ) as mock_group_users, patch( + "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", + mock_datahub_graph, + ) as mock_checkpoint: - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / "azure_ad_mces_default_config.json", - golden_path=test_resources_dir / "azure_ad_mces_golden_default_config.json", - ) + mock_checkpoint.return_value = mock_datahub_graph - -@freeze_time(FROZEN_TIME) -def test_azure_ad_source_empty_group_membership(pytestconfig, tmp_path): - test_resources_dir: pathlib.Path = ( - pytestconfig.rootpath / "tests/integration/azure_ad" - ) - - with patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource.get_token" - ) as mock_token, patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_users" - ) as mock_users, patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_groups" - ) as mock_groups, patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_group_members" - ) as mock_group_users: - mocked_functions( - test_resources_dir, mock_token, mock_users, mock_groups, mock_group_users - ) - # Run an azure usage ingestion run. - pipeline = Pipeline.create( - { - "run_id": "test-azure-ad", - "source": { - "type": "azure-ad", - "config": { - "client_id": "00000000-0000-0000-0000-000000000002", - "tenant_id": "00000000-0000-0000-0000-000000000002", - "client_secret": "client_secret", - "redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient", - "authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000", - "token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token", - "graph_url": "https://graph.microsoft.com/v1.0", - "ingest_group_membership": True, - "ingest_groups": True, - "ingest_users": True, - }, - }, - "sink": { - "type": "file", - "config": { - "filename": f"{tmp_path}/azure_ad_mces_default_config.json", - }, - }, - } - ) - pipeline.run() - pipeline.raise_from_status() - - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / "azure_ad_mces_default_config.json", - golden_path=test_resources_dir / "azure_ad_mces_golden_default_config.json", - ) - - -@freeze_time(FROZEN_TIME) -def test_azure_ad_source_nested_groups(pytestconfig, tmp_path): - test_resources_dir: pathlib.Path = ( - pytestconfig.rootpath / "tests/integration/azure_ad" - ) - - with patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource.get_token" - ) as mock_token, patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_users" - ) as mock_users, patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_groups" - ) as mock_groups, patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_group_members" - ) as mock_group_users: - mocked_functions( - test_resources_dir, - mock_token, - mock_users, - mock_groups, - mock_group_users, - True, - ) - # Run an azure usage ingestion run. - pipeline = Pipeline.create( - { - "run_id": "test-azure-ad", - "source": { - "type": "azure-ad", - "config": { - "client_id": "00000000-0000-0000-0000-000000000000", - "tenant_id": "00000000-0000-0000-0000-000000000000", - "client_secret": "client_secret", - "redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient", - "authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000", - "token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token", - "graph_url": "https://graph.microsoft.com/v1.0", - "ingest_group_membership": True, - "ingest_groups": True, - "ingest_users": False, - }, - }, - "sink": { - "type": "file", - "config": { - "filename": f"{tmp_path}/azure_ad_mces_nested_groups.json", - }, - }, - } - ) - pipeline.run() - pipeline.raise_from_status() - - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / "azure_ad_mces_nested_groups.json", - golden_path=test_resources_dir / "azure_ad_mces_golden_nested_groups.json", - ) - - -@freeze_time(FROZEN_TIME) -def test_azure_source_ingestion_disabled(pytestconfig, tmp_path): - test_resources_dir: pathlib.Path = ( - pytestconfig.rootpath / "tests/integration/azure_ad" - ) - - with patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource.get_token" - ) as mock_token, patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_users" - ) as mock_users, patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_groups" - ) as mock_groups, patch( - "datahub.ingestion.source.identity.azure_ad.AzureADSource._get_azure_ad_group_members" - ) as mock_group_users: - mocked_functions( + mocked_functions_reference( test_resources_dir, mock_token, mock_users, mock_groups, mock_group_users ) - # Run an Azure usage ingestion run. - pipeline = Pipeline.create( - { - "run_id": "test-azure-ad", - "source": { - "type": "azure-ad", - "config": { - "client_id": "00000000-0000-0000-0000-000000000000", - "tenant_id": "00000000-0000-0000-0000-000000000000", - "client_secret": "client_secret", - "redirect": "https://login.microsoftonline.com/common/oauth2/nativeclient", - "authority": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000", - "token_url": "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token", - "graph_url": "https://graph.microsoft.com/v1.0", - "ingest_group_membership": "False", - "ingest_groups": "False", - "ingest_users": "False", - }, - }, - "sink": { - "type": "file", - "config": { - "filename": f"{tmp_path}/azure_ad_mces_ingestion_disabled.json", - }, - }, - } - ) + # Run an azure usage ingestion run. + pipeline = Pipeline.create(recipe) pipeline.run() pipeline.raise_from_status() - - mce_helpers.check_golden_file( - pytestconfig, - output_path=tmp_path / "azure_ad_mces_ingestion_disabled.json", - golden_path=test_resources_dir / "azure_ad_mces_golden_ingestion_disabled.json", - ) + return pipeline def load_test_resources(test_resources_dir): @@ -349,3 +154,241 @@ def mocked_functions( raise ValueError(f"Unexpected Azure AD group ID {group_id}") mock_groups_users.side_effect = mocked_group_members + + +def overwrite_group_in_mocked_data( + test_resources_dir, + mock_token, + mock_users, + mock_groups, + mock_groups_users, + return_nested_group=False, +): + """ + This function will work similar to mocked_functions except it will overwrite mock_groups to test azure-ad stateful + ingestion + """ + mocked_functions( + test_resources_dir=test_resources_dir, + mock_token=mock_token, + mock_users=mock_users, + mock_groups=mock_groups, + mock_groups_users=mock_groups_users, + return_nested_group=return_nested_group, + ) + # overwrite groups + azure_ad_groups_json_file = ( + test_resources_dir / "azure_ad_groups_deleted_groupDisplayName3.json" + ) + + with azure_ad_groups_json_file.open() as azure_ad_groups_json: + reference_groups = json.loads(azure_ad_groups_json.read()) + + mock_groups.return_value = iter(list([reference_groups])) + + +def test_azure_ad_config(): + config = AzureADConfig.parse_obj( + dict( + client_id="00000000-0000-0000-0000-000000000000", + tenant_id="00000000-0000-0000-0000-000000000000", + client_secret="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + redirect="https://login.microsoftonline.com/common/oauth2/nativeclient", + authority="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000", + token_url="https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token", + graph_url="https://graph.microsoft.com/v1.0", + ingest_users=True, + ingest_groups=True, + ingest_group_membership=True, + ) + ) + + # Sanity on required configurations + assert config.client_id == "00000000-0000-0000-0000-000000000000" + assert config.tenant_id == "00000000-0000-0000-0000-000000000000" + assert config.client_secret == "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + assert ( + config.redirect + == "https://login.microsoftonline.com/common/oauth2/nativeclient" + ) + assert ( + config.authority + == "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000" + ) + assert ( + config.token_url + == "https://login.microsoftonline.com/00000000-0000-0000-0000-000000000000/oauth2/token" + ) + assert config.graph_url == "https://graph.microsoft.com/v1.0" + + # assert on defaults + assert config.ingest_users + assert config.ingest_groups + assert config.ingest_group_membership + + +@freeze_time(FROZEN_TIME) +def test_azure_ad_source_default_configs(pytestconfig, mock_datahub_graph, tmp_path): + test_resources_dir: pathlib.Path = ( + pytestconfig.rootpath / "tests/integration/azure_ad" + ) + + run_ingest( + pytestconfig=pytestconfig, + mock_datahub_graph=mock_datahub_graph, + recipe=default_recipe(tmp_path), + mocked_functions_reference=mocked_functions, + ) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "azure_ad_mces_default_config.json", + golden_path=test_resources_dir / "azure_ad_mces_golden_default_config.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_azure_ad_source_empty_group_membership( + pytestconfig, mock_datahub_graph, tmp_path +): + test_resources_dir: pathlib.Path = ( + pytestconfig.rootpath / "tests/integration/azure_ad" + ) + + output_file_name = "azure_ad_mces_no_groups_mcp.json" + new_recipe = default_recipe(tmp_path, output_file_name) + + new_recipe["source"]["config"]["ingest_group_membership"] = False + + run_ingest( + pytestconfig=pytestconfig, + mock_datahub_graph=mock_datahub_graph, + recipe=new_recipe, + mocked_functions_reference=mocked_functions, + ) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=f"{tmp_path}/{output_file_name}", + golden_path=test_resources_dir / "azure_ad_mces_no_groups_golden_mcp.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_azure_ad_source_nested_groups(pytestconfig, mock_datahub_graph, tmp_path): + test_resources_dir: pathlib.Path = ( + pytestconfig.rootpath / "tests/integration/azure_ad" + ) + + output_file_name = "azure_ad_mces_nested_groups.json" + new_recipe = default_recipe(tmp_path, output_file_name) + + new_recipe["source"]["config"]["ingest_users"] = False + + include_nested_group_in_mock = partial(mocked_functions, return_nested_group=True) + + run_ingest( + pytestconfig=pytestconfig, + mock_datahub_graph=mock_datahub_graph, + recipe=new_recipe, + mocked_functions_reference=include_nested_group_in_mock, + ) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=f"{tmp_path}/{output_file_name}", + golden_path=test_resources_dir / "azure_ad_mces_golden_nested_groups.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_azure_source_ingestion_disabled(pytestconfig, mock_datahub_graph, tmp_path): + test_resources_dir: pathlib.Path = ( + pytestconfig.rootpath / "tests/integration/azure_ad" + ) + + output_file_name = "azure_ad_mces_ingestion_disabled.json" + new_recipe = default_recipe(tmp_path, output_file_name) + + new_recipe["source"]["config"]["ingest_group_membership"] = False + new_recipe["source"]["config"]["ingest_groups"] = False + new_recipe["source"]["config"]["ingest_users"] = False + + run_ingest( + pytestconfig=pytestconfig, + mock_datahub_graph=mock_datahub_graph, + recipe=new_recipe, + mocked_functions_reference=mocked_functions, + ) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=f"{tmp_path}/{output_file_name}", + golden_path=test_resources_dir / "azure_ad_mces_golden_ingestion_disabled.json", + ) + + +def get_current_checkpoint_from_pipeline( + pipeline: Pipeline, +) -> Optional[Checkpoint[GenericCheckpointState]]: + azure_ad_source = cast(AzureADSource, pipeline.source) + return azure_ad_source.get_current_checkpoint( + azure_ad_source.stale_entity_removal_handler.job_id + ) + + +@freeze_time(FROZEN_TIME) +def test_azure_ad_stateful_ingestion( + pytestconfig, tmp_path, mock_time, mock_datahub_graph +): + new_recipe = default_recipe(tmp_path) + + new_recipe["pipeline_name"] = "azure_ad_execution" + new_recipe["source"]["config"]["stateful_ingestion"] = { + "enabled": True, + "state_provider": { + "type": "datahub", + "config": {"datahub_api": {"server": GMS_SERVER}}, + }, + } + + pipeline1 = run_ingest( + pytestconfig=pytestconfig, + mock_datahub_graph=mock_datahub_graph, + recipe=new_recipe, + mocked_functions_reference=mocked_functions, + ) + + checkpoint1 = get_current_checkpoint_from_pipeline(pipeline1) + assert checkpoint1 + assert checkpoint1.state + + pipeline2 = run_ingest( + pytestconfig=pytestconfig, + mock_datahub_graph=mock_datahub_graph, + recipe=new_recipe, + mocked_functions_reference=overwrite_group_in_mocked_data, + ) + checkpoint2 = get_current_checkpoint_from_pipeline(pipeline2) + assert checkpoint2 + assert checkpoint2.state + + # Validate that all providers have committed successfully. + validate_all_providers_have_committed_successfully( + pipeline=pipeline1, expected_providers=1 + ) + validate_all_providers_have_committed_successfully( + pipeline=pipeline2, expected_providers=1 + ) + + # Perform all assertions on the states. The deleted Dashboard should not be + # part of the second state + state1 = checkpoint1.state + state2 = checkpoint2.state + + difference_dashboard_urns = list( + state1.get_urns_not_in(type="corpGroup", other_checkpoint_state=state2) + ) + + assert len(difference_dashboard_urns) == 1 + assert difference_dashboard_urns == ["urn:li:corpGroup:groupDisplayName3"]