feat(okta): adds ingest_groups_users config parameter (#12371)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Sergio Gómez Villamor 2025-02-13 12:00:07 +01:00 committed by GitHub
parent 08a5183267
commit b20211536c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 290 additions and 2 deletions

View File

@ -5,7 +5,7 @@ import urllib
from collections import defaultdict
from dataclasses import dataclass, field
from time import sleep
from typing import Dict, Iterable, List, Optional, Union
from typing import Dict, Iterable, List, Optional, Set, Union
import nest_asyncio
from okta.client import Client as OktaClient
@ -77,6 +77,10 @@ class OktaConfig(StatefulIngestionConfigBase, ConfigModel):
default=True,
description="Whether group membership should be ingested into DataHub. ingest_groups must be True if this is True.",
)
ingest_groups_users: bool = Field(
default=True,
description="Only ingest users belonging to the selected groups. This option is only useful when `ingest_users` is set to False and `ingest_group_membership` to True.",
)
# Optional: Customize the mapping to DataHub Username from an attribute appearing in the Okta User
# profile. Reference: https://developer.okta.com/docs/reference/api/users/
@ -344,6 +348,7 @@ class OktaSource(StatefulIngestionSourceBase):
aspect=StatusClass(removed=False),
).as_workunit()
okta_users: Set[User] = set()
# Step 2: Populate GroupMembership Aspects for CorpUsers
datahub_corp_user_urn_to_group_membership: Dict[str, GroupMembershipClass] = (
defaultdict(lambda: GroupMembershipClass(groups=[]))
@ -372,6 +377,9 @@ class OktaSource(StatefulIngestionSourceBase):
self.report.report_failure("okta_user_mapping", error_str)
continue
if self.config.ingest_groups_users:
okta_users.add(okta_user)
# Update the GroupMembership aspect for this group member.
datahub_corp_user_urn_to_group_membership[
datahub_corp_user_urn
@ -379,7 +387,10 @@ class OktaSource(StatefulIngestionSourceBase):
# Step 3: Produce MetadataWorkUnits for CorpUsers.
if self.config.ingest_users:
okta_users = self._get_okta_users(event_loop)
# we can just throw away collected okta users so far and fetch them all
okta_users = set(self._get_okta_users(event_loop))
if okta_users:
filtered_okta_users = filter(self._filter_okta_user, okta_users)
datahub_corp_user_snapshots = self._map_okta_users(filtered_okta_users)
for user_count, datahub_corp_user_snapshot in enumerate(

View File

@ -0,0 +1,251 @@
[
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": {
"urn": "urn:li:corpGroup:All%20Employees",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpGroupInfo": {
"displayName": "All Employees",
"admins": [],
"members": [],
"groups": [],
"description": "All Employees in the Test Company."
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:All%20Employees",
"changeType": "UPSERT",
"aspectName": "origin",
"aspect": {
"json": {
"type": "EXTERNAL",
"externalType": "OKTA"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:All%20Employees",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpGroupSnapshot": {
"urn": "urn:li:corpGroup:Engineering",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpGroupInfo": {
"displayName": "Engineering",
"admins": [],
"members": [],
"groups": [],
"description": "Engineering team!"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:Engineering",
"changeType": "UPSERT",
"aspectName": "origin",
"aspect": {
"json": {
"type": "EXTERNAL",
"externalType": "OKTA"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "corpGroup",
"entityUrn": "urn:li:corpGroup:Engineering",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:mary.jane",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"customProperties": {},
"active": true,
"displayName": "Mary Jane",
"email": "mary.jane@test.com",
"title": "Software Engineer",
"departmentName": "Engineering",
"firstName": "Mary",
"lastName": "Jane",
"fullName": "Mary Jane",
"countryCode": "us"
}
},
{
"com.linkedin.pegasus2avro.identity.GroupMembership": {
"groups": [
"urn:li:corpGroup:All%20Employees",
"urn:li:corpGroup:All%20Employees",
"urn:li:corpGroup:Engineering",
"urn:li:corpGroup:Engineering"
]
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:mary.jane",
"changeType": "UPSERT",
"aspectName": "origin",
"aspect": {
"json": {
"type": "EXTERNAL",
"externalType": "OKTA"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:mary.jane",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:john.doe",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"customProperties": {},
"active": true,
"displayName": "JDoe",
"email": "john.doe@test.com",
"firstName": "John",
"lastName": "Doe",
"fullName": "John Doe"
}
},
{
"com.linkedin.pegasus2avro.identity.GroupMembership": {
"groups": [
"urn:li:corpGroup:All%20Employees",
"urn:li:corpGroup:Engineering"
]
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:john.doe",
"changeType": "UPSERT",
"aspectName": "origin",
"aspect": {
"json": {
"type": "EXTERNAL",
"externalType": "OKTA"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "corpuser",
"entityUrn": "urn:li:corpuser:john.doe",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "test-okta-usage",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -120,6 +120,32 @@ def test_okta_source_default_configs(pytestconfig, mock_datahub_graph, tmp_path)
)
@freeze_time(FROZEN_TIME)
def test_okta_source_ingest_groups_users(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
output_file_path = f"{tmp_path}/okta_mces_ingest_groups_users.json"
new_recipe = default_recipe(output_file_path)
new_recipe["source"]["config"]["ingest_users"] = False
new_recipe["source"]["config"]["ingest_groups"] = True
new_recipe["source"]["config"]["ingest_groups_users"] = True
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_ingest_groups_users.json",
)
@freeze_time(FROZEN_TIME)
def test_okta_source_ingestion_disabled(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"