diff --git a/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py b/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py index 002608e119..2b6229d94f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py +++ b/metadata-ingestion/src/datahub/ingestion/source/identity/okta.py @@ -5,7 +5,7 @@ import urllib from collections import defaultdict from dataclasses import dataclass, field from time import sleep -from typing import Dict, Iterable, List, Optional, Union +from typing import Dict, Iterable, List, Optional, Set, Union import nest_asyncio from okta.client import Client as OktaClient @@ -77,6 +77,10 @@ class OktaConfig(StatefulIngestionConfigBase, ConfigModel): default=True, description="Whether group membership should be ingested into DataHub. ingest_groups must be True if this is True.", ) + ingest_groups_users: bool = Field( + default=True, + description="Only ingest users belonging to the selected groups. This option is only useful when `ingest_users` is set to False and `ingest_group_membership` to True.", + ) # Optional: Customize the mapping to DataHub Username from an attribute appearing in the Okta User # profile. Reference: https://developer.okta.com/docs/reference/api/users/ @@ -344,6 +348,7 @@ class OktaSource(StatefulIngestionSourceBase): aspect=StatusClass(removed=False), ).as_workunit() + okta_users: Set[User] = set() # Step 2: Populate GroupMembership Aspects for CorpUsers datahub_corp_user_urn_to_group_membership: Dict[str, GroupMembershipClass] = ( defaultdict(lambda: GroupMembershipClass(groups=[])) @@ -372,6 +377,9 @@ class OktaSource(StatefulIngestionSourceBase): self.report.report_failure("okta_user_mapping", error_str) continue + if self.config.ingest_groups_users: + okta_users.add(okta_user) + # Update the GroupMembership aspect for this group member. datahub_corp_user_urn_to_group_membership[ datahub_corp_user_urn @@ -379,7 +387,10 @@ class OktaSource(StatefulIngestionSourceBase): # Step 3: Produce MetadataWorkUnits for CorpUsers. if self.config.ingest_users: - okta_users = self._get_okta_users(event_loop) + # we can just throw away collected okta users so far and fetch them all + okta_users = set(self._get_okta_users(event_loop)) + + if okta_users: filtered_okta_users = filter(self._filter_okta_user, okta_users) datahub_corp_user_snapshots = self._map_okta_users(filtered_okta_users) for user_count, datahub_corp_user_snapshot in enumerate( diff --git a/metadata-ingestion/tests/integration/okta/okta_mces_golden_ingest_groups_users.json b/metadata-ingestion/tests/integration/okta/okta_mces_golden_ingest_groups_users.json new file mode 100644 index 0000000000..fad38cb339 --- /dev/null +++ b/metadata-ingestion/tests/integration/okta/okta_mces_golden_ingest_groups_users.json @@ -0,0 +1,251 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": { + "urn": "urn:li:corpGroup:All%20Employees", + "aspects": [ + { + "com.linkedin.pegasus2avro.identity.CorpGroupInfo": { + "displayName": "All Employees", + "admins": [], + "members": [], + "groups": [], + "description": "All Employees in the Test Company." + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:All%20Employees", + "changeType": "UPSERT", + "aspectName": "origin", + "aspect": { + "json": { + "type": "EXTERNAL", + "externalType": "OKTA" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:All%20Employees", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": { + "urn": "urn:li:corpGroup:Engineering", + "aspects": [ + { + "com.linkedin.pegasus2avro.identity.CorpGroupInfo": { + "displayName": "Engineering", + "admins": [], + "members": [], + "groups": [], + "description": "Engineering team!" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:Engineering", + "changeType": "UPSERT", + "aspectName": "origin", + "aspect": { + "json": { + "type": "EXTERNAL", + "externalType": "OKTA" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "corpGroup", + "entityUrn": "urn:li:corpGroup:Engineering", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": { + "urn": "urn:li:corpuser:mary.jane", + "aspects": [ + { + "com.linkedin.pegasus2avro.identity.CorpUserInfo": { + "customProperties": {}, + "active": true, + "displayName": "Mary Jane", + "email": "mary.jane@test.com", + "title": "Software Engineer", + "departmentName": "Engineering", + "firstName": "Mary", + "lastName": "Jane", + "fullName": "Mary Jane", + "countryCode": "us" + } + }, + { + "com.linkedin.pegasus2avro.identity.GroupMembership": { + "groups": [ + "urn:li:corpGroup:All%20Employees", + "urn:li:corpGroup:All%20Employees", + "urn:li:corpGroup:Engineering", + "urn:li:corpGroup:Engineering" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:mary.jane", + "changeType": "UPSERT", + "aspectName": "origin", + "aspect": { + "json": { + "type": "EXTERNAL", + "externalType": "OKTA" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:mary.jane", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": { + "urn": "urn:li:corpuser:john.doe", + "aspects": [ + { + "com.linkedin.pegasus2avro.identity.CorpUserInfo": { + "customProperties": {}, + "active": true, + "displayName": "JDoe", + "email": "john.doe@test.com", + "firstName": "John", + "lastName": "Doe", + "fullName": "John Doe" + } + }, + { + "com.linkedin.pegasus2avro.identity.GroupMembership": { + "groups": [ + "urn:li:corpGroup:All%20Employees", + "urn:li:corpGroup:Engineering" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:john.doe", + "changeType": "UPSERT", + "aspectName": "origin", + "aspect": { + "json": { + "type": "EXTERNAL", + "externalType": "OKTA" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:john.doe", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "test-okta-usage", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/okta/test_okta.py b/metadata-ingestion/tests/integration/okta/test_okta.py index 10148273c9..444bb6f10c 100644 --- a/metadata-ingestion/tests/integration/okta/test_okta.py +++ b/metadata-ingestion/tests/integration/okta/test_okta.py @@ -120,6 +120,32 @@ def test_okta_source_default_configs(pytestconfig, mock_datahub_graph, tmp_path) ) +@freeze_time(FROZEN_TIME) +def test_okta_source_ingest_groups_users(pytestconfig, mock_datahub_graph, tmp_path): + test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta" + + output_file_path = f"{tmp_path}/okta_mces_ingest_groups_users.json" + + new_recipe = default_recipe(output_file_path) + new_recipe["source"]["config"]["ingest_users"] = False + new_recipe["source"]["config"]["ingest_groups"] = True + new_recipe["source"]["config"]["ingest_groups_users"] = True + + run_ingest( + mock_datahub_graph=mock_datahub_graph, + mocked_functions_reference=partial( + _init_mock_okta_client, test_resources_dir=test_resources_dir + ), + recipe=new_recipe, + ) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=output_file_path, + golden_path=f"{test_resources_dir}/okta_mces_golden_ingest_groups_users.json", + ) + + @freeze_time(FROZEN_TIME) def test_okta_source_ingestion_disabled(pytestconfig, mock_datahub_graph, tmp_path): test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"