feat(ingestion/okta): okta stateful ingestion (#7736)

Co-authored-by: MohdSiddiqueBagwan <mohdsiddique.bagwan@gslab.com>
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
Co-authored-by: John Joyce <john@acryl.io>
This commit is contained in:
mohdsiddique 2023-04-08 02:14:32 +05:30 committed by GitHub
parent 5fd7981532
commit 5e145cbb2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 290 additions and 162 deletions

View File

@ -17,11 +17,12 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import ( # SourceCapability,; capability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.source import SourceCapability, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
@ -152,7 +153,7 @@ class AzureADConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="PowerBI Stateful Ingestion Config."
default=None, description="Azure AD Stateful Ingestion Config."
)
@ -174,6 +175,9 @@ class AzureADSourceReport(StaleEntityRemovalSourceReport):
@platform_name("Azure AD")
@config_class(AzureADConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class AzureADSource(StatefulIngestionSourceBase):
"""
This plugin extracts the following:

View File

@ -13,7 +13,6 @@ from okta.models import Group, GroupProfile, User, UserProfile, UserStatus
from pydantic import validator
from pydantic.fields import Field
from datahub.configuration import ConfigModel
from datahub.configuration.common import ConfigurationError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
@ -25,8 +24,19 @@ from datahub.ingestion.api.decorators import (
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
CorpGroupSnapshot,
CorpUserSnapshot,
@ -41,11 +51,15 @@ from datahub.metadata.schema_classes import (
OriginTypeClass,
StatusClass,
)
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)
logger = logging.getLogger(__name__)
class OktaConfig(ConfigModel):
class OktaConfig(StatefulIngestionConfigBase):
# Required: Domain of the Okta deployment. Example: dev-33231928.okta.com
okta_domain: str = Field(
description="The location of your Okta Domain, without a protocol. Can be found in Okta Developer console. e.g. dev-33231928.okta.com",
@ -131,6 +145,11 @@ class OktaConfig(ConfigModel):
description="Okta search expression (not regex) for ingesting groups. Only one of `okta_groups_filter` and `okta_groups_search` can be set. See (https://developer.okta.com/docs/reference/api/groups/#list-groups-with-search) for more info.",
)
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Okta Stateful Ingestion Config."
)
# Optional: Whether to mask sensitive information from workunit ID's. On by default.
mask_group_id: bool = True
mask_user_id: bool = True
@ -153,7 +172,7 @@ class OktaConfig(ConfigModel):
@dataclass
class OktaSourceReport(SourceReport):
class OktaSourceReport(StaleEntityRemovalSourceReport):
filtered: List[str] = field(default_factory=list)
def report_filtered(self, name: str) -> None:
@ -178,7 +197,10 @@ class OktaSourceReport(SourceReport):
@config_class(OktaConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.DESCRIPTIONS, "Optionally enabled via configuration")
class OktaSource(Source):
@capability(
SourceCapability.DELETION_DETECTION, "Optionally enabled via stateful_ingestion"
)
class OktaSource(StatefulIngestionSourceBase):
"""
This plugin extracts the following:
@ -256,18 +278,32 @@ class OktaSource(Source):
"""
config: OktaConfig
report: OktaSourceReport
okta_client: OktaClient
stale_entity_removal_handler: StaleEntityRemovalHandler
@classmethod
def create(cls, config_dict, ctx):
config = OktaConfig.parse_obj(config_dict)
return cls(config, ctx)
def __init__(self, config: OktaConfig, ctx: PipelineContext):
super().__init__(ctx)
super(OktaSource, self).__init__(config, ctx)
self.config = config
self.report = OktaSourceReport()
self.okta_client = self._create_okta_client()
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=BaseSQLAlchemyCheckpointState,
pipeline_name=ctx.pipeline_name,
run_id=ctx.run_id,
)
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# Step 0: get or create the event loop
# This method can be called on the main thread or an async thread, so we must create a new loop if one doesn't exist
# See https://docs.python.org/3/library/asyncio-eventloop.html for more info.
@ -407,6 +443,12 @@ class OktaSource(Source):
# Step 4: Close the event loop
event_loop.close()
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)
def get_report(self):
return self.report

View File

@ -0,0 +1,16 @@
[
{
"id": "0",
"created": "2015-02-06T10:11:28.000Z",
"lastUpdated": "2015-10-05T19:16:43.000Z",
"lastMembershipUpdated": "2015-11-28T19:15:32.000Z",
"objectClass": [
"okta:user_group"
],
"type": "OKTA_GROUP",
"profile": {
"name": "All Employees",
"description": "All Employees in the Test Company."
}
}
]

View File

@ -1,5 +1,7 @@
import asyncio
import pathlib
from functools import partial
from typing import Optional, cast
from unittest.mock import Mock, patch
import jsonpickle
@ -8,12 +10,75 @@ from freezegun import freeze_time
from okta.models import Group, User
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.identity.okta import OktaConfig
from datahub.ingestion.source.identity.okta import OktaConfig, OktaSource
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
validate_all_providers_have_committed_successfully,
)
FROZEN_TIME = "2020-04-14 07:00:00"
USER_ID_NOT_IN_GROUPS = "5"
GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"
def default_recipe(output_file_path):
return {
"run_id": "test-okta-usage",
"source": {
"type": "okta",
"config": {
"okta_domain": "mock-domain.okta.com",
"okta_api_token": "mock-okta-token",
"ingest_users": "True",
"ingest_groups": "True",
"ingest_group_membership": "True",
"okta_profile_to_username_attr": "login",
"okta_profile_to_username_regex": "([^@]+)",
"okta_profile_to_group_name_attr": "name",
"okta_profile_to_group_name_regex": "(.*)",
"include_deprovisioned_users": "False",
"include_suspended_users": "False",
"page_size": "2",
"delay_seconds": "0.00",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{output_file_path}",
},
},
}
def run_ingest(
mock_datahub_graph,
mocked_functions_reference,
recipe,
):
with patch(
"datahub.ingestion.source.identity.okta.OktaClient"
) as MockClient, patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:
mock_checkpoint.return_value = mock_datahub_graph
mocked_functions_reference(MockClient=MockClient)
# Run an Okta usage ingestion run.
pipeline = Pipeline.create(recipe)
pipeline.run()
pipeline.raise_from_status()
return pipeline
def test_okta_config():
config = OktaConfig.parse_obj(
@ -39,200 +104,201 @@ def test_okta_config():
@freeze_time(FROZEN_TIME)
def test_okta_source_default_configs(pytestconfig, tmp_path):
def test_okta_source_default_configs(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
with patch("datahub.ingestion.source.identity.okta.OktaClient") as MockClient:
_init_mock_okta_client(test_resources_dir, MockClient)
output_file_path = f"{tmp_path}/okta_mces_default_config.json"
# Run an Okta usage ingestion run.
pipeline = Pipeline.create(
{
"run_id": "test-okta-usage",
"source": {
"type": "okta",
"config": {
"okta_domain": "mock-domain.okta.com",
"okta_api_token": "mock-okta-token",
"ingest_users": "True",
"ingest_groups": "True",
"ingest_group_membership": "True",
"okta_profile_to_username_attr": "login",
"okta_profile_to_username_regex": "([^@]+)",
"okta_profile_to_group_name_attr": "name",
"okta_profile_to_group_name_regex": "(.*)",
"include_deprovisioned_users": "False",
"include_suspended_users": "False",
"page_size": "2",
"delay_seconds": "0.00",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/okta_mces_default_config.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=default_recipe(output_file_path),
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "okta_mces_default_config.json",
golden_path=test_resources_dir / "okta_mces_golden_default_config.json",
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_default_config.json",
)
@freeze_time(FROZEN_TIME)
def test_okta_source_ingestion_disabled(pytestconfig, tmp_path):
def test_okta_source_ingestion_disabled(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
with patch("datahub.ingestion.source.identity.okta.OktaClient") as MockClient:
_init_mock_okta_client(test_resources_dir, MockClient)
output_file_path = f"{tmp_path}/okta_mces_ingestion_disabled.json"
new_recipe = default_recipe(output_file_path)
new_recipe["source"]["config"]["ingest_users"] = False
new_recipe["source"]["config"]["ingest_groups"] = False
new_recipe["source"]["config"]["ingest_group_membership"] = False
# Run an Okta usage ingestion run.
pipeline = Pipeline.create(
{
"run_id": "test-okta-usage",
"source": {
"type": "okta",
"config": {
"okta_domain": "mock-domain.okta.com",
"okta_api_token": "mock-okta-token",
"ingest_users": "False",
"ingest_groups": "False",
"ingest_group_membership": "False",
"okta_profile_to_username_attr": "login",
"okta_profile_to_username_regex": "([^@]+)",
"okta_profile_to_group_name_attr": "name",
"okta_profile_to_group_name_regex": "(.*)",
"include_deprovisioned_users": "False",
"include_suspended_users": "False",
"page_size": "2",
"delay_seconds": "0.00",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/okta_mces_ingestion_disabled.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "okta_mces_ingestion_disabled.json",
golden_path=test_resources_dir / "okta_mces_golden_ingestion_disabled.json",
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_ingestion_disabled.json",
)
@freeze_time(FROZEN_TIME)
@pytest.mark.asyncio
def test_okta_source_include_deprovisioned_suspended_users(pytestconfig, tmp_path):
def test_okta_source_include_deprovisioned_suspended_users(
pytestconfig, mock_datahub_graph, tmp_path
):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
with patch("datahub.ingestion.source.identity.okta.OktaClient") as MockClient:
_init_mock_okta_client(test_resources_dir, MockClient)
output_file_path = (
f"{tmp_path}/okta_mces_include_deprovisioned_suspended_users.json"
)
new_recipe = default_recipe(output_file_path)
new_recipe["source"]["config"]["include_deprovisioned_users"] = True
new_recipe["source"]["config"]["include_suspended_users"] = True
# Run an Okta usage ingestion run.
pipeline = Pipeline.create(
{
"run_id": "test-okta-usage",
"source": {
"type": "okta",
"config": {
"okta_domain": "mock-domain.okta.com",
"okta_api_token": "mock-okta-token",
"ingest_users": "True",
"ingest_groups": "True",
"ingest_group_membership": "True",
"okta_profile_to_username_attr": "login",
"okta_profile_to_username_regex": "([^@]+)",
"okta_profile_to_group_name_attr": "name",
"okta_profile_to_group_name_regex": "(.*)",
"include_deprovisioned_users": "True",
"include_suspended_users": "True",
"page_size": "2",
"delay_seconds": "0.00",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/okta_mces_include_deprovisioned_suspended_users.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "okta_mces_include_deprovisioned_suspended_users.json",
golden_path=test_resources_dir
/ "okta_mces_golden_include_deprovisioned_suspended_users.json",
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_include_deprovisioned_suspended_users.json",
)
@freeze_time(FROZEN_TIME)
@pytest.mark.asyncio
def test_okta_source_custom_user_name_regex(pytestconfig, tmp_path):
def test_okta_source_custom_user_name_regex(pytestconfig, mock_datahub_graph, tmp_path):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
with patch("datahub.ingestion.source.identity.okta.OktaClient") as MockClient:
_init_mock_okta_client(test_resources_dir, MockClient)
output_file_path = f"{tmp_path}/okta_mces_custom_user_name_regex.json"
new_recipe = default_recipe(output_file_path)
new_recipe["source"]["config"]["okta_profile_to_username_regex"] = "(.*)"
new_recipe["source"]["config"]["okta_profile_to_group_name_regex"] = "(.*)"
# Run an Okta usage ingestion run.
pipeline = Pipeline.create(
{
"run_id": "test-okta-usage",
"source": {
"type": "okta",
"config": {
"okta_domain": "mock-domain.okta.com",
"okta_api_token": "mock-okta-token",
"ingest_users": "True",
"ingest_groups": "True",
"ingest_group_membership": "True",
"okta_profile_to_username_attr": "email",
"okta_profile_to_username_regex": "(.*)",
"okta_profile_to_group_name_attr": "name",
"okta_profile_to_group_name_regex": "(.*)",
"include_deprovisioned_users": "False",
"include_suspended_users": "False",
"page_size": "2",
"delay_seconds": "0.00",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/okta_mces_custom_user_name_regex.json",
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "okta_mces_custom_user_name_regex.json",
golden_path=test_resources_dir / "okta_mces_golden_custom_user_name_regex.json",
output_path=output_file_path,
golden_path=f"{test_resources_dir}/okta_mces_golden_custom_user_name_regex.json",
)
def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint[GenericCheckpointState]]:
azure_ad_source = cast(OktaSource, pipeline.source)
return azure_ad_source.get_current_checkpoint(
azure_ad_source.stale_entity_removal_handler.job_id
)
@freeze_time(FROZEN_TIME)
def test_okta_stateful_ingestion(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
test_resources_dir: pathlib.Path = pytestconfig.rootpath / "tests/integration/okta"
output_file_path = f"{tmp_path}/temporary_mces.json"
new_recipe = default_recipe(output_file_path)
new_recipe["pipeline_name"] = "okta_execution"
new_recipe["source"]["config"]["stateful_ingestion"] = {
"enabled": True,
"state_provider": {
"type": "datahub",
"config": {"datahub_api": {"server": GMS_SERVER}},
},
}
pipeline1 = run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
_init_mock_okta_client, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline1)
assert checkpoint1
assert checkpoint1.state
# Create new event loop as last one is closed because of previous ingestion run
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
pipeline2 = run_ingest(
mock_datahub_graph=mock_datahub_graph,
mocked_functions_reference=partial(
overwrite_group_in_mocked_data, test_resources_dir=test_resources_dir
),
recipe=new_recipe,
)
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline2)
assert checkpoint2
assert checkpoint2.state
#
# Validate that all providers have committed successfully.
validate_all_providers_have_committed_successfully(
pipeline=pipeline1, expected_providers=1
)
validate_all_providers_have_committed_successfully(
pipeline=pipeline2, expected_providers=1
)
# Perform all assertions on the states. The deleted group should not be
# part of the second state
state1 = checkpoint1.state
state2 = checkpoint2.state
difference_group_urns = list(
state1.get_urns_not_in(type="corpGroup", other_checkpoint_state=state2)
)
assert len(difference_group_urns) == 1
assert difference_group_urns == ["urn:li:corpGroup:Engineering"]
def overwrite_group_in_mocked_data(test_resources_dir, MockClient):
_init_mock_okta_client(
test_resources_dir,
MockClient,
mock_groups_json=test_resources_dir / "okta_deleted_groups.json",
)
# Initializes a Mock Okta Client to return users from okta_users.json and groups from okta_groups.json.
def _init_mock_okta_client(test_resources_dir, MockClient):
okta_users_json_file = test_resources_dir / "okta_users.json"
okta_groups_json_file = test_resources_dir / "okta_groups.json"
def _init_mock_okta_client(
test_resources_dir, MockClient, mock_users_json=None, mock_groups_json=None
):
okta_users_json_file = (
test_resources_dir / "okta_users.json"
if mock_users_json is None
else mock_users_json
)
okta_groups_json_file = (
test_resources_dir / "okta_groups.json"
if mock_groups_json is None
else mock_groups_json
)
# Add mock Okta API responses.
with okta_users_json_file.open() as okta_users_json: