diff --git a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py index 652295c383..e3c790a2f8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py +++ b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py @@ -17,11 +17,12 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( # SourceCapability,; capability, SupportStatus, + capability, config_class, platform_name, support_status, ) -from datahub.ingestion.api.source import SourceReport +from datahub.ingestion.api.source import SourceCapability, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.state.sql_common_state import ( BaseSQLAlchemyCheckpointState, @@ -152,7 +153,7 @@ class AzureADConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin): # Configuration for stateful ingestion stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( - default=None, description="PowerBI Stateful Ingestion Config." + default=None, description="Azure AD Stateful Ingestion Config." ) @@ -174,6 +175,9 @@ class AzureADSourceReport(StaleEntityRemovalSourceReport): @platform_name("Azure AD") @config_class(AzureADConfig) @support_status(SupportStatus.CERTIFIED) +@capability( + SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion" +) class AzureADSource(StatefulIngestionSourceBase): """ This plugin extracts the following: diff --git a/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py b/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py index 615cf58e3f..51e4a43a99 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py +++ b/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py @@ -13,7 +13,6 @@ from okta.models import Group, GroupProfile, User, UserProfile, UserStatus from pydantic import validator from pydantic.fields import Field -from datahub.configuration import ConfigModel from datahub.configuration.common import ConfigurationError from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext @@ -25,8 +24,19 @@ from datahub.ingestion.api.decorators import ( platform_name, support_status, ) -from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.state.sql_common_state import ( + BaseSQLAlchemyCheckpointState, +) +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalHandler, + StaleEntityRemovalSourceReport, + StatefulStaleMetadataRemovalConfig, +) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfigBase, + StatefulIngestionSourceBase, +) from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( CorpGroupSnapshot, CorpUserSnapshot, @@ -41,11 +51,15 @@ from datahub.metadata.schema_classes import ( OriginTypeClass, StatusClass, ) +from datahub.utilities.source_helpers import ( + auto_stale_entity_removal, + auto_status_aspect, +) logger = logging.getLogger(__name__) -class OktaConfig(ConfigModel): +class OktaConfig(StatefulIngestionConfigBase): # Required: Domain of the Okta deployment. Example: dev-33231928.okta.com okta_domain: str = Field( description="The location of your Okta Domain, without a protocol. Can be found in Okta Developer console. e.g. dev-33231928.okta.com", @@ -131,6 +145,11 @@ class OktaConfig(ConfigModel): description="Okta search expression (not regex) for ingesting groups. Only one of `okta_groups_filter` and `okta_groups_search` can be set. See (https://developer.okta.com/docs/reference/api/groups/#list-groups-with-search) for more info.", ) + # Configuration for stateful ingestion + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( + default=None, description="Okta Stateful Ingestion Config." + ) + # Optional: Whether to mask sensitive information from workunit ID's. On by default. mask_group_id: bool = True mask_user_id: bool = True @@ -153,7 +172,7 @@ class OktaConfig(ConfigModel): @dataclass -class OktaSourceReport(SourceReport): +class OktaSourceReport(StaleEntityRemovalSourceReport): filtered: List[str] = field(default_factory=list) def report_filtered(self, name: str) -> None: @@ -178,7 +197,10 @@ class OktaSourceReport(SourceReport): @config_class(OktaConfig) @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.DESCRIPTIONS, "Optionally enabled via configuration") -class OktaSource(Source): +@capability( + SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion" +) +class OktaSource(StatefulIngestionSourceBase): """ This plugin extracts the following: @@ -256,18 +278,32 @@ class OktaSource(Source): """ + config: OktaConfig + report: OktaSourceReport + okta_client: OktaClient + stale_entity_removal_handler: StaleEntityRemovalHandler + @classmethod def create(cls, config_dict, ctx): config = OktaConfig.parse_obj(config_dict) return cls(config, ctx) def __init__(self, config: OktaConfig, ctx: PipelineContext): - super().__init__(ctx) + super(OktaSource, self).__init__(config, ctx) self.config = config self.report = OktaSourceReport() self.okta_client = self._create_okta_client() - def get_workunits(self) -> Iterable[MetadataWorkUnit]: + # Create and register the stateful ingestion use-case handler. + self.stale_entity_removal_handler = StaleEntityRemovalHandler( + source=self, + config=self.config, + state_type_class=BaseSQLAlchemyCheckpointState, + pipeline_name=ctx.pipeline_name, + run_id=ctx.run_id, + ) + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: # Step 0: get or create the event loop # This method can be called on the main thread or an async thread, so we must create a new loop if one doesn't exist # See https://docs.python.org/3/library/asyncio-eventloop.html for more info. @@ -407,6 +443,12 @@ class OktaSource(Source): # Step 4: Close the event loop event_loop.close() + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + return auto_stale_entity_removal( + self.stale_entity_removal_handler, + auto_status_aspect(self.get_workunits_internal()), + ) + def get_report(self): return self.report diff --git a/metadata-ingestion/tests/integration/okta/okta_deleted_groups.json b/metadata-ingestion/tests/integration/okta/okta_deleted_groups.json new file mode 100644 index 0000000000..f70787ab56 --- /dev/null +++ b/metadata-ingestion/tests/integration/okta/okta_deleted_groups.json @@ -0,0 +1,16 @@ +[ + { + "id": "0", + "created": "2015-02-06T10:11:28.000Z", + "lastUpdated": "2015-10-05T19:16:43.000Z", + "lastMembershipUpdated": "2015-11-28T19:15:32.000Z", + "objectClass": [ + "okta:user_group" + ], + "type": "OKTA_GROUP", + "profile": { + "name": "All Employees", + "description": "All Employees in the Test Company." + } + } +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/okta/test_okta.py b/metadata-ingestion/tests/integration/okta/test_okta.py index 2ef86c874e..04f78efacf 100644 --- a/metadata-ingestion/tests/integration/okta/test_okta.py +++ b/metadata-ingestion/tests/integration/okta/test_okta.py @@ -1,5 +1,7 @@ import asyncio import pathlib +from functools import partial +from typing import Optional, cast from unittest.mock import Mock, patch import jsonpickle @@ -8,12 +10,75 @@ from freezegun import freeze_time from okta.models import Group, User from datahub.ingestion.run.pipeline import Pipeline -from datahub.ingestion.source.identity.okta import OktaConfig +from datahub.ingestion.source.identity.okta import OktaConfig, OktaSource +from datahub.ingestion.source.state.checkpoint import Checkpoint +from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from tests.test_helpers import mce_helpers +from tests.test_helpers.state_helpers import ( + validate_all_providers_have_committed_successfully, +) FROZEN_TIME = "2020-04-14 07:00:00" USER_ID_NOT_IN_GROUPS = "5" +GMS_PORT = 8080 +GMS_SERVER = f"http://localhost:{GMS_PORT}" + + +def default_recipe(output_file_path): + return { + "run_id": "test-okta-usage", + "source": { + "type": "okta", + "config": { + "okta_domain": "mock-domain.okta.com", + "okta_api_token": "mock-okta-token", + "ingest_users": "True", + "ingest_groups": "True", + "ingest_group_membership": "True", + "okta_profile_to_username_attr": "login", + "okta_profile_to_username_regex": "([^@]+)", + "okta_profile_to_group_name_attr": "name", + "okta_profile_to_group_name_regex": "(.*)", + "include_deprovisioned_users": "False", + "include_suspended_users": "False", + "page_size": "2", + "delay_seconds": "0.00", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{output_file_path}", + }, + }, + } + + +def run_ingest( + mock_datahub_graph, + mocked_functions_reference, + recipe, +): + + with patch( + "datahub.ingestion.source.identity.okta.OktaClient" + ) as MockClient, patch( + "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", + mock_datahub_graph, + ) as mock_checkpoint: + + mock_checkpoint.return_value = mock_datahub_graph + + mocked_functions_reference(MockClient=MockClient) + + # Run an Okta usage ingestion run. + pipeline = Pipeline.create(recipe) + pipeline.run() + pipeline.raise_from_status() + + return pipeline + def test_okta_config(): config = OktaConfig.parse_obj( @@ -39,200 +104,201 @@ def test_okta_config(): @freeze_time(FROZEN_TIME) -def test_okta_source_default_configs(pytestconfig, tmp_path): +def test_okta_source_default_configs(pytestconfig, mock_datahub_graph, tmp_path): test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta" - with patch("datahub.ingestion.source.identity.okta.OktaClient") as MockClient: - _init_mock_okta_client(test_resources_dir, MockClient) + output_file_path = f"{tmp_path}/okta_mces_default_config.json" - # Run an Okta usage ingestion run. - pipeline = Pipeline.create( - { - "run_id": "test-okta-usage", - "source": { - "type": "okta", - "config": { - "okta_domain": "mock-domain.okta.com", - "okta_api_token": "mock-okta-token", - "ingest_users": "True", - "ingest_groups": "True", - "ingest_group_membership": "True", - "okta_profile_to_username_attr": "login", - "okta_profile_to_username_regex": "([^@]+)", - "okta_profile_to_group_name_attr": "name", - "okta_profile_to_group_name_regex": "(.*)", - "include_deprovisioned_users": "False", - "include_suspended_users": "False", - "page_size": "2", - "delay_seconds": "0.00", - }, - }, - "sink": { - "type": "file", - "config": { - "filename": f"{tmp_path}/okta_mces_default_config.json", - }, - }, - } - ) - pipeline.run() - pipeline.raise_from_status() + run_ingest( + mock_datahub_graph=mock_datahub_graph, + mocked_functions_reference=partial( + _init_mock_okta_client, test_resources_dir=test_resources_dir + ), + recipe=default_recipe(output_file_path), + ) mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "okta_mces_default_config.json", - golden_path=test_resources_dir / "okta_mces_golden_default_config.json", + output_path=output_file_path, + golden_path=f"{test_resources_dir}/okta_mces_golden_default_config.json", ) @freeze_time(FROZEN_TIME) -def test_okta_source_ingestion_disabled(pytestconfig, tmp_path): +def test_okta_source_ingestion_disabled(pytestconfig, mock_datahub_graph, tmp_path): test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta" - with patch("datahub.ingestion.source.identity.okta.OktaClient") as MockClient: - _init_mock_okta_client(test_resources_dir, MockClient) + output_file_path = f"{tmp_path}/okta_mces_ingestion_disabled.json" + new_recipe = default_recipe(output_file_path) + new_recipe["source"]["config"]["ingest_users"] = False + new_recipe["source"]["config"]["ingest_groups"] = False + new_recipe["source"]["config"]["ingest_group_membership"] = False - # Run an Okta usage ingestion run. - pipeline = Pipeline.create( - { - "run_id": "test-okta-usage", - "source": { - "type": "okta", - "config": { - "okta_domain": "mock-domain.okta.com", - "okta_api_token": "mock-okta-token", - "ingest_users": "False", - "ingest_groups": "False", - "ingest_group_membership": "False", - "okta_profile_to_username_attr": "login", - "okta_profile_to_username_regex": "([^@]+)", - "okta_profile_to_group_name_attr": "name", - "okta_profile_to_group_name_regex": "(.*)", - "include_deprovisioned_users": "False", - "include_suspended_users": "False", - "page_size": "2", - "delay_seconds": "0.00", - }, - }, - "sink": { - "type": "file", - "config": { - "filename": f"{tmp_path}/okta_mces_ingestion_disabled.json", - }, - }, - } - ) - pipeline.run() - pipeline.raise_from_status() + run_ingest( + mock_datahub_graph=mock_datahub_graph, + mocked_functions_reference=partial( + _init_mock_okta_client, test_resources_dir=test_resources_dir + ), + recipe=new_recipe, + ) mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "okta_mces_ingestion_disabled.json", - golden_path=test_resources_dir / "okta_mces_golden_ingestion_disabled.json", + output_path=output_file_path, + golden_path=f"{test_resources_dir}/okta_mces_golden_ingestion_disabled.json", ) @freeze_time(FROZEN_TIME) @pytest.mark.asyncio -def test_okta_source_include_deprovisioned_suspended_users(pytestconfig, tmp_path): +def test_okta_source_include_deprovisioned_suspended_users( + pytestconfig, mock_datahub_graph, tmp_path +): test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta" - with patch("datahub.ingestion.source.identity.okta.OktaClient") as MockClient: - _init_mock_okta_client(test_resources_dir, MockClient) + output_file_path = ( + f"{tmp_path}/okta_mces_include_deprovisioned_suspended_users.json" + ) + new_recipe = default_recipe(output_file_path) + new_recipe["source"]["config"]["include_deprovisioned_users"] = True + new_recipe["source"]["config"]["include_suspended_users"] = True - # Run an Okta usage ingestion run. - pipeline = Pipeline.create( - { - "run_id": "test-okta-usage", - "source": { - "type": "okta", - "config": { - "okta_domain": "mock-domain.okta.com", - "okta_api_token": "mock-okta-token", - "ingest_users": "True", - "ingest_groups": "True", - "ingest_group_membership": "True", - "okta_profile_to_username_attr": "login", - "okta_profile_to_username_regex": "([^@]+)", - "okta_profile_to_group_name_attr": "name", - "okta_profile_to_group_name_regex": "(.*)", - "include_deprovisioned_users": "True", - "include_suspended_users": "True", - "page_size": "2", - "delay_seconds": "0.00", - }, - }, - "sink": { - "type": "file", - "config": { - "filename": f"{tmp_path}/okta_mces_include_deprovisioned_suspended_users.json", - }, - }, - } - ) - pipeline.run() - pipeline.raise_from_status() + run_ingest( + mock_datahub_graph=mock_datahub_graph, + mocked_functions_reference=partial( + _init_mock_okta_client, test_resources_dir=test_resources_dir + ), + recipe=new_recipe, + ) mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "okta_mces_include_deprovisioned_suspended_users.json", - golden_path=test_resources_dir - / "okta_mces_golden_include_deprovisioned_suspended_users.json", + output_path=output_file_path, + golden_path=f"{test_resources_dir}/okta_mces_golden_include_deprovisioned_suspended_users.json", ) @freeze_time(FROZEN_TIME) @pytest.mark.asyncio -def test_okta_source_custom_user_name_regex(pytestconfig, tmp_path): +def test_okta_source_custom_user_name_regex(pytestconfig, mock_datahub_graph, tmp_path): test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta" - with patch("datahub.ingestion.source.identity.okta.OktaClient") as MockClient: - _init_mock_okta_client(test_resources_dir, MockClient) + output_file_path = f"{tmp_path}/okta_mces_custom_user_name_regex.json" + new_recipe = default_recipe(output_file_path) + new_recipe["source"]["config"]["okta_profile_to_username_regex"] = "(.*)" + new_recipe["source"]["config"]["okta_profile_to_group_name_regex"] = "(.*)" - # Run an Okta usage ingestion run. - pipeline = Pipeline.create( - { - "run_id": "test-okta-usage", - "source": { - "type": "okta", - "config": { - "okta_domain": "mock-domain.okta.com", - "okta_api_token": "mock-okta-token", - "ingest_users": "True", - "ingest_groups": "True", - "ingest_group_membership": "True", - "okta_profile_to_username_attr": "email", - "okta_profile_to_username_regex": "(.*)", - "okta_profile_to_group_name_attr": "name", - "okta_profile_to_group_name_regex": "(.*)", - "include_deprovisioned_users": "False", - "include_suspended_users": "False", - "page_size": "2", - "delay_seconds": "0.00", - }, - }, - "sink": { - "type": "file", - "config": { - "filename": f"{tmp_path}/okta_mces_custom_user_name_regex.json", - }, - }, - } - ) - pipeline.run() - pipeline.raise_from_status() + run_ingest( + mock_datahub_graph=mock_datahub_graph, + mocked_functions_reference=partial( + _init_mock_okta_client, test_resources_dir=test_resources_dir + ), + recipe=new_recipe, + ) mce_helpers.check_golden_file( pytestconfig, - output_path=tmp_path / "okta_mces_custom_user_name_regex.json", - golden_path=test_resources_dir / "okta_mces_golden_custom_user_name_regex.json", + output_path=output_file_path, + golden_path=f"{test_resources_dir}/okta_mces_golden_custom_user_name_regex.json", + ) + + +def get_current_checkpoint_from_pipeline( + pipeline: Pipeline, +) -> Optional[Checkpoint[GenericCheckpointState]]: + azure_ad_source = cast(OktaSource, pipeline.source) + return azure_ad_source.get_current_checkpoint( + azure_ad_source.stale_entity_removal_handler.job_id + ) + + +@freeze_time(FROZEN_TIME) +def test_okta_stateful_ingestion(pytestconfig, tmp_path, mock_time, mock_datahub_graph): + test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta" + + output_file_path = f"{tmp_path}/temporary_mces.json" + new_recipe = default_recipe(output_file_path) + + new_recipe["pipeline_name"] = "okta_execution" + new_recipe["source"]["config"]["stateful_ingestion"] = { + "enabled": True, + "state_provider": { + "type": "datahub", + "config": {"datahub_api": {"server": GMS_SERVER}}, + }, + } + + pipeline1 = run_ingest( + mock_datahub_graph=mock_datahub_graph, + mocked_functions_reference=partial( + _init_mock_okta_client, test_resources_dir=test_resources_dir + ), + recipe=new_recipe, + ) + + checkpoint1 = get_current_checkpoint_from_pipeline(pipeline1) + assert checkpoint1 + assert checkpoint1.state + + # Create new event loop as last one is closed because of previous ingestion run + event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(event_loop) + + pipeline2 = run_ingest( + mock_datahub_graph=mock_datahub_graph, + mocked_functions_reference=partial( + overwrite_group_in_mocked_data, test_resources_dir=test_resources_dir + ), + recipe=new_recipe, + ) + checkpoint2 = get_current_checkpoint_from_pipeline(pipeline2) + assert checkpoint2 + assert checkpoint2.state + # + # Validate that all providers have committed successfully. + validate_all_providers_have_committed_successfully( + pipeline=pipeline1, expected_providers=1 + ) + validate_all_providers_have_committed_successfully( + pipeline=pipeline2, expected_providers=1 + ) + + # Perform all assertions on the states. The deleted group should not be + # part of the second state + state1 = checkpoint1.state + state2 = checkpoint2.state + + difference_group_urns = list( + state1.get_urns_not_in(type="corpGroup", other_checkpoint_state=state2) + ) + + assert len(difference_group_urns) == 1 + assert difference_group_urns == ["urn:li:corpGroup:Engineering"] + + +def overwrite_group_in_mocked_data(test_resources_dir, MockClient): + _init_mock_okta_client( + test_resources_dir, + MockClient, + mock_groups_json=test_resources_dir / "okta_deleted_groups.json", ) # Initializes a Mock Okta Client to return users from okta_users.json and groups from okta_groups.json. -def _init_mock_okta_client(test_resources_dir, MockClient): - okta_users_json_file = test_resources_dir / "okta_users.json" - okta_groups_json_file = test_resources_dir / "okta_groups.json" +def _init_mock_okta_client( + test_resources_dir, MockClient, mock_users_json=None, mock_groups_json=None +): + + okta_users_json_file = ( + test_resources_dir / "okta_users.json" + if mock_users_json is None + else mock_users_json + ) + okta_groups_json_file = ( + test_resources_dir / "okta_groups.json" + if mock_groups_json is None + else mock_groups_json + ) # Add mock Okta API responses. with okta_users_json_file.open() as okta_users_json: