feat(cli): add user add command (#15011)

This commit is contained in:
Aseem Bansal 2025-10-16 20:36:59 +05:30 committed by GitHub
parent b3d354bf89
commit 45dda5e33a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1173 additions and 9 deletions

View File

@ -27,6 +27,13 @@ If you are using git worktrees then exclude this as that might cause git related
./gradlew ... -x generateGitPropertiesGlobal
```
**IMPORTANT: Verifying Python code changes:**
- **ALWAYS use `./gradlew :metadata-ingestion:lintFix`** to verify Python code changes
- **NEVER use `python3 -m py_compile`** - it doesn't catch style issues or type errors
- lintFix runs ruff formatting and fixing automatically, ensuring code quality
- For smoke-test changes, the lintFix command will also check those files
**Development setup:**
```bash

View File

@ -612,9 +612,11 @@ datahub dataset upsert -f dataset.yaml
### user (User Entity)
The `user` command allows you to interact with the User entity.
It currently supports the `upsert` operation, which can be used to create a new user or update an existing one.
For detailed information, please refer to [Creating Users and Groups with Datahub CLI](/docs/api/tutorials/owners.md#upsert-users).
The `user` command allows you to interact with the User entity in DataHub. It supports two main operations:
#### upsert
Create or update users from a YAML file. For detailed information, please refer to [Creating Users and Groups with Datahub CLI](/docs/api/tutorials/owners.md#upsert-users).
```shell
datahub user upsert -f users.yaml
@ -638,6 +640,32 @@ An example of `users.yaml` would look like as in [bar.user.dhub.yaml](https://gi
picture_link: "https://raw.githubusercontent.com/datahub-project/datahub/master/datahub-web-react/src/images/datahub-logo-color-stable.svg"
```
#### add
Create a native DataHub user with email/password authentication. This command creates users who can log in directly to DataHub using their email and password.
```shell
# Create a user with a role
datahub user add --email user@example.com --display-name "John Doe" --password --role Admin
# Create a user without a role
datahub user add --email user@example.com --display-name "Jane Smith" --password
```
**Options:**
- `--email` (required): User's email address, which will be used as their login ID
- `--display-name` (required): User's full display name
- `--password` (required): Flag to prompt for password input (password will be hidden during entry)
- `--role` (optional): Role to assign to the user. Valid values are `Admin`, `Editor`, or `Reader` (case-insensitive)
**Notes:**
- The command will check if a user with the specified email already exists and exit if found
- Passwords are entered securely via a hidden prompt and require confirmation
- If role assignment fails, the user will still be created but without the role
- Requires admin permissions to execute
### group (Group Entity)
The `group` command allows you to interact with the Group entity.

View File

@ -117,7 +117,6 @@ def load_client_config() -> DatahubClientConfig:
datahub_config: DatahubClientConfig = DatahubConfig.parse_obj(
client_config_dict
).gms
return datahub_config
except ValidationError as e:
click.echo(f"Error loading your {CONDENSED_DATAHUB_CONFIG_PATH}")

View File

@ -1,13 +1,15 @@
import logging
import pathlib
from pathlib import Path
from typing import Optional
import click
from click_default_group import DefaultGroup
from datahub.api.entities.corpuser.corpuser import CorpUser, CorpUserGenerationConfig
from datahub.cli.specific.file_loader import load_file
from datahub.ingestion.graph.client import get_default_graph
from datahub.configuration.common import OperationalError
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.config import ClientMode
from datahub.upgrade import upgrade
@ -55,3 +57,172 @@ def upsert(file: Path, override_editable: bool) -> None:
f"Update failed for id {user_config.get('id')}. due to {e}",
fg="red",
)
def validate_user_id_options(
user_id: Optional[str], email_as_id: bool, email: str
) -> str:
"""
Validate user ID options and return the final user ID to use.
Args:
user_id: Optional explicit user ID
email_as_id: Whether to use email as the user ID
email: User's email address
Returns:
The final user ID to use for the URN
Raises:
ValueError: If validation fails (neither or both options provided)
"""
if not user_id and not email_as_id:
raise ValueError("Must specify either --id or --email-as-id flag")
if user_id and email_as_id:
raise ValueError("Cannot specify both --id and --email-as-id flag")
if email_as_id:
return email
assert user_id is not None
return user_id
def create_native_user_in_datahub(
graph: DataHubGraph,
user_id: str,
email: str,
display_name: str,
password: str,
role: Optional[str] = None,
) -> str:
"""
Create a native DataHub user.
Args:
graph: DataHubGraph client
user_id: User identifier (used in URN)
email: User's email address
display_name: User's full display name
password: User's password
role: Optional role to assign (Admin, Editor, or Reader)
Returns:
The created user's URN
Raises:
ValueError: If user already exists or role is invalid
OperationalError: If user creation fails due to API/network errors
"""
user_urn = f"urn:li:corpuser:{user_id}"
if graph.exists(user_urn):
raise ValueError(f"User with ID {user_id} already exists (urn: {user_urn})")
created_user_urn = graph.create_native_user(
user_id=user_id,
email=email,
display_name=display_name,
password=password,
role=role,
)
return created_user_urn
@user.command(name="add")
@click.option("--id", "user_id", type=str, help="User identifier (used in URN)")
@click.option("--email", required=True, type=str, help="User's email address")
@click.option(
"--email-as-id",
is_flag=True,
default=False,
help="Use email address as user ID (alternative to --id)",
)
@click.option(
"--display-name", required=True, type=str, help="User's full display name"
)
@click.option(
"--password",
is_flag=True,
default=False,
help="Prompt for password (hidden input)",
)
@click.option(
"--role",
required=False,
type=click.Choice(
["Admin", "Editor", "Reader", "admin", "editor", "reader"], case_sensitive=False
),
help="Optional role to assign (Admin, Editor, or Reader)",
)
@upgrade.check_upgrade
def add(
user_id: str,
email: str,
email_as_id: bool,
display_name: str,
password: bool,
role: str,
) -> None:
"""Create a native DataHub user with email/password authentication"""
try:
final_user_id = validate_user_id_options(user_id, email_as_id, email)
except ValueError as e:
click.secho(f"Error: {str(e)}", fg="red")
raise SystemExit(1) from e
if not password:
click.secho(
"Error: --password flag is required to prompt for password input",
fg="red",
)
raise SystemExit(1)
password_value = click.prompt(
"Enter password", hide_input=True, confirmation_prompt=True
)
with get_default_graph(ClientMode.CLI) as graph:
try:
created_user_urn = create_native_user_in_datahub(
graph, final_user_id, email, display_name, password_value, role
)
if role:
click.secho(
f"Successfully created user {final_user_id} with role {role.capitalize()} (URN: {created_user_urn})",
fg="green",
)
else:
click.secho(
f"Successfully created user {final_user_id} (URN: {created_user_urn})",
fg="green",
)
except ValueError as e:
click.secho(f"Error: {str(e)}", fg="red")
raise SystemExit(1) from e
except OperationalError as e:
error_msg = e.message if hasattr(e, "message") else str(e.args[0])
click.secho(f"Error: {error_msg}", fg="red")
if hasattr(e, "info") and e.info:
logger.debug(f"Error details: {e.info}")
if "status_code" in e.info:
click.secho(f" HTTP Status: {e.info['status_code']}", fg="red")
if "response_text" in e.info:
click.secho(
f" Response: {e.info['response_text'][:200]}", fg="red"
)
click.secho(
"\nTip: Run with DATAHUB_DEBUG=1 environment variable for detailed logs",
fg="yellow",
)
raise SystemExit(1) from e
except Exception as e:
click.secho(f"Unexpected error: {str(e)}", fg="red")
logger.exception("Unexpected error during user creation")
raise SystemExit(1) from e

View File

@ -30,6 +30,7 @@ from typing_extensions import deprecated
from datahub._codegen.aspect import _Aspect
from datahub.cli import config_utils
from datahub.cli.cli_utils import guess_frontend_url_from_gms_url
from datahub.configuration.common import ConfigModel, GraphError, OperationalError
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
from datahub.emitter.mce_builder import DEFAULT_ENV, Aspect
@ -2071,6 +2072,202 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
return res["reportAssertionResult"]
def _get_invite_token(self) -> str:
"""
Retrieve an invite token for user creation.
Returns:
Invite token string
Raises:
OperationalError: If invite token retrieval fails
"""
get_invite_token_query = """
query getInviteToken($input: GetInviteTokenInput!) {
getInviteToken(input: $input) {
inviteToken
}
}
"""
try:
invite_token_response = self.execute_graphql(
query=get_invite_token_query,
variables={"input": {}},
)
invite_token = invite_token_response.get("getInviteToken", {}).get(
"inviteToken"
)
if not invite_token:
raise OperationalError(
"Failed to retrieve invite token. Ensure you have admin permissions.",
{},
)
return invite_token
except Exception as e:
raise OperationalError(
f"Failed to retrieve invite token: {str(e)}", {}
) from e
def _create_user_with_token(
self,
user_urn: str,
email: str,
display_name: str,
password: str,
invite_token: str,
) -> None:
"""
Create a user using the signup endpoint.
Args:
user_urn: User URN (urn:li:corpuser:{user_id})
email: User's email address
display_name: Full display name for the user
password: User's password
invite_token: Invite token for user creation
Raises:
OperationalError: If user creation fails
"""
frontend_url = guess_frontend_url_from_gms_url(self._gms_server)
signup_url = f"{frontend_url}/signUp"
signup_payload = {
"userUrn": user_urn,
"email": email,
"fullName": display_name,
"password": password,
"title": "Other",
"inviteToken": invite_token,
}
logger.debug(
f"Creating user with URN={user_urn}, email={email} at URL: {signup_url}"
)
logger.debug(
f"Signup payload: {json.dumps({**signup_payload, 'password': '***'})}"
)
try:
response = self._session.post(signup_url, json=signup_payload)
logger.debug(f"Response status code: {response.status_code}")
logger.debug(f"Response headers: {dict(response.headers)}")
logger.debug(f"Response content length: {len(response.text)}")
response.raise_for_status()
# The /signUp endpoint returns 200 with empty body on success
logger.debug("User created successfully")
except HTTPError as http_err:
error_details = {
"url": signup_url,
"status_code": response.status_code,
"response_text": response.text[:500],
}
try:
error_json = response.json()
error_details["error_response"] = error_json
error_msg = error_json.get("message", str(http_err))
except JSONDecodeError:
error_msg = f"HTTP {response.status_code}: {response.text[:200]}"
raise OperationalError(
f"Failed to create user: {error_msg}",
error_details,
) from http_err
except Exception as e:
raise OperationalError(
f"Failed to create user: {str(e)}",
{"url": signup_url, "error_type": type(e).__name__},
) from e
def _assign_role_to_user(self, user_urn: str, role: str) -> None:
"""
Assign a role to a user.
Args:
user_urn: User URN
role: Role to assign (Admin, Editor, or Reader)
Raises:
ValueError: If role is invalid
"""
normalized_role = role.capitalize()
valid_roles = ["Admin", "Editor", "Reader"]
if normalized_role not in valid_roles:
raise ValueError(
f"Invalid role '{role}'. Must be one of: {', '.join(valid_roles)}"
)
role_urn = f"urn:li:dataHubRole:{normalized_role}"
batch_assign_role_mutation = """
mutation batchAssignRole($input: BatchAssignRoleInput!) {
batchAssignRole(input: $input)
}
"""
try:
self.execute_graphql(
query=batch_assign_role_mutation,
variables={"input": {"roleUrn": role_urn, "actors": [user_urn]}},
)
except Exception as e:
logger.warning(f"Role assignment failed for user {user_urn}: {str(e)}")
raise
def create_native_user(
self,
user_id: str,
email: str,
display_name: str,
password: str,
role: Optional[str] = None,
) -> str:
"""
Create a native DataHub user with email/password authentication.
Args:
user_id: User identifier (will be used in the URN)
email: User's email address
display_name: Full display name for the user
password: User's password
role: Optional role to assign (Admin, Editor, or Reader)
Returns:
User URN of the created user (urn:li:corpuser:{user_id})
Raises:
OperationalError: If user creation fails
ValueError: If role is invalid
"""
# Validate role before creating user
if role:
normalized_role = role.capitalize()
valid_roles = ["Admin", "Editor", "Reader"]
if normalized_role not in valid_roles:
raise ValueError(
f"Invalid role '{role}'. Must be one of: {', '.join(valid_roles)}"
)
user_urn = f"urn:li:corpuser:{user_id}"
invite_token = self._get_invite_token()
self._create_user_with_token(
user_urn, email, display_name, password, invite_token
)
if role:
try:
self._assign_role_to_user(user_urn, role)
except Exception as e:
logger.warning(
f"User {email} created successfully, but role assignment failed: {str(e)}"
)
return user_urn
def close(self) -> None:
self._make_schema_resolver.cache_clear()
super().close()

View File

@ -179,6 +179,18 @@ class _LogBuffer:
return text
class _ResilientStreamHandler(logging.StreamHandler):
"""StreamHandler that gracefully handles closed streams."""
def emit(self, record: logging.LogRecord) -> None:
try:
super().emit(record)
except (ValueError, OSError):
# Stream was closed (e.g., during pytest teardown)
# Silently ignore to prevent test failures
pass
class _BufferLogHandler(logging.Handler):
def __init__(self, storage: _LogBuffer) -> None:
super().__init__()
@ -201,7 +213,11 @@ class _BufferLogHandler(logging.Handler):
def _remove_all_handlers(logger: logging.Logger) -> None:
for handler in logger.handlers[:]:
logger.removeHandler(handler)
handler.close()
try:
handler.close()
except (ValueError, OSError):
# Handler stream may already be closed (e.g., during pytest teardown)
pass
_log_buffer = _LogBuffer(maxlen=IN_MEMORY_LOG_BUFFER_SIZE)
@ -226,7 +242,7 @@ def configure_logging(debug: bool, log_file: Optional[str] = None) -> Iterator[N
with contextlib.ExitStack() as stack:
# Create stdout handler.
stream_handler = logging.StreamHandler()
stream_handler = _ResilientStreamHandler()
stream_handler.addFilter(_DatahubLogFilter(debug=debug))
stream_handler.setFormatter(_stream_formatter)
@ -237,7 +253,7 @@ def configure_logging(debug: bool, log_file: Optional[str] = None) -> Iterator[N
tee = TeeIO(sys.stdout, file)
stack.enter_context(contextlib.redirect_stdout(tee)) # type: ignore
file_handler = logging.StreamHandler(file)
file_handler = _ResilientStreamHandler(file)
file_handler.addFilter(_DatahubLogFilter(debug=True))
file_handler.setFormatter(_default_formatter)
else:

View File

@ -0,0 +1 @@
# Init file for user CLI tests

View File

@ -0,0 +1,172 @@
from unittest.mock import MagicMock
import pytest
from datahub.cli.specific.user_cli import (
create_native_user_in_datahub,
validate_user_id_options,
)
from datahub.configuration.common import OperationalError
class TestValidateUserIdOptions:
def test_valid_with_user_id(self):
"""Test validation succeeds when --id is provided."""
result = validate_user_id_options(
user_id="jdoe", email_as_id=False, email="john@example.com"
)
assert result == "jdoe"
def test_valid_with_email_as_id(self):
"""Test validation succeeds when --email-as-id is provided."""
result = validate_user_id_options(
user_id=None, email_as_id=True, email="john@example.com"
)
assert result == "john@example.com"
def test_error_when_neither_provided(self):
"""Test validation fails when neither --id nor --email-as-id is provided."""
with pytest.raises(ValueError) as exc_info:
validate_user_id_options(
user_id=None, email_as_id=False, email="john@example.com"
)
assert (
"must specify either --id or --email-as-id" in str(exc_info.value).lower()
)
def test_error_when_both_provided(self):
"""Test validation fails when both --id and --email-as-id are provided."""
with pytest.raises(ValueError) as exc_info:
validate_user_id_options(
user_id="jdoe", email_as_id=True, email="john@example.com"
)
assert "cannot specify both" in str(exc_info.value).lower()
class TestCreateNativeUserInDatahub:
def test_successful_user_creation(self):
"""Test successful user creation without role."""
mock_graph = MagicMock()
mock_graph.exists.return_value = False
mock_graph.create_native_user.return_value = "urn:li:corpuser:jdoe"
result = create_native_user_in_datahub(
graph=mock_graph,
user_id="jdoe",
email="john@example.com",
display_name="John Doe",
password="securepass123",
role=None,
)
assert result == "urn:li:corpuser:jdoe"
mock_graph.exists.assert_called_once_with("urn:li:corpuser:jdoe")
mock_graph.create_native_user.assert_called_once_with(
user_id="jdoe",
email="john@example.com",
display_name="John Doe",
password="securepass123",
role=None,
)
def test_successful_user_creation_with_role(self):
"""Test successful user creation with role."""
mock_graph = MagicMock()
mock_graph.exists.return_value = False
mock_graph.create_native_user.return_value = "urn:li:corpuser:jdoe"
result = create_native_user_in_datahub(
graph=mock_graph,
user_id="jdoe",
email="john@example.com",
display_name="John Doe",
password="securepass123",
role="Admin",
)
assert result == "urn:li:corpuser:jdoe"
mock_graph.create_native_user.assert_called_once_with(
user_id="jdoe",
email="john@example.com",
display_name="John Doe",
password="securepass123",
role="Admin",
)
def test_user_already_exists(self):
"""Test error when user already exists."""
mock_graph = MagicMock()
mock_graph.exists.return_value = True
with pytest.raises(ValueError) as exc_info:
create_native_user_in_datahub(
graph=mock_graph,
user_id="jdoe",
email="john@example.com",
display_name="John Doe",
password="securepass123",
role=None,
)
assert "already exists" in str(exc_info.value).lower()
assert "jdoe" in str(exc_info.value)
mock_graph.create_native_user.assert_not_called()
def test_invalid_role(self):
"""Test error handling for invalid role."""
mock_graph = MagicMock()
mock_graph.exists.return_value = False
mock_graph.create_native_user.side_effect = ValueError(
"Invalid role 'InvalidRole'"
)
with pytest.raises(ValueError) as exc_info:
create_native_user_in_datahub(
graph=mock_graph,
user_id="jdoe",
email="john@example.com",
display_name="John Doe",
password="securepass123",
role="InvalidRole",
)
assert "invalid role" in str(exc_info.value).lower()
def test_operational_error(self):
"""Test handling of OperationalError from graph client."""
mock_graph = MagicMock()
mock_graph.exists.return_value = False
mock_graph.create_native_user.side_effect = OperationalError(
"Failed to create user: Network error", {"status_code": 500}
)
with pytest.raises(OperationalError) as exc_info:
create_native_user_in_datahub(
graph=mock_graph,
user_id="jdoe",
email="john@example.com",
display_name="John Doe",
password="securepass123",
role=None,
)
assert "network error" in str(exc_info.value).lower()
def test_email_as_user_id(self):
"""Test user creation where email is used as user ID."""
mock_graph = MagicMock()
mock_graph.exists.return_value = False
mock_graph.create_native_user.return_value = "urn:li:corpuser:john@example.com"
result = create_native_user_in_datahub(
graph=mock_graph,
user_id="john@example.com", # Email as ID
email="john@example.com",
display_name="John Doe",
password="securepass123",
role="Editor",
)
assert result == "urn:li:corpuser:john@example.com"
mock_graph.exists.assert_called_once_with("urn:li:corpuser:john@example.com")
mock_graph.create_native_user.assert_called_once()

View File

@ -0,0 +1,245 @@
from unittest.mock import MagicMock, patch
import pytest
from datahub.configuration.common import OperationalError
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
def test_create_native_user_success(mock_test_connection):
mock_test_connection.return_value = {}
graph = DataHubGraph(DatahubClientConfig(server="http://localhost:8080"))
email = "test@example.com"
display_name = "Test User"
password = "testpassword123"
with (
patch.object(graph, "execute_graphql") as mock_graphql,
patch.object(graph._session, "post") as mock_session_post,
):
mock_graphql.return_value = {
"getInviteToken": {"inviteToken": "test-token-123"}
}
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = ""
mock_session_post.return_value = mock_response
user_urn = graph.create_native_user(
user_id=email, email=email, display_name=display_name, password=password
)
assert user_urn == f"urn:li:corpuser:{email}"
assert mock_graphql.call_count == 1
assert mock_session_post.call_count == 1
signup_call_args = mock_session_post.call_args
assert signup_call_args[1]["json"]["email"] == email
assert signup_call_args[1]["json"]["fullName"] == display_name
assert signup_call_args[1]["json"]["password"] == password
assert signup_call_args[1]["json"]["inviteToken"] == "test-token-123"
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
def test_create_native_user_with_role(mock_test_connection):
mock_test_connection.return_value = {}
graph = DataHubGraph(DatahubClientConfig(server="http://localhost:8080"))
email = "admin@example.com"
display_name = "Admin User"
password = "adminpass123"
role = "admin"
with (
patch.object(graph, "execute_graphql") as mock_graphql,
patch.object(graph._session, "post") as mock_session_post,
):
mock_graphql.side_effect = [
{"getInviteToken": {"inviteToken": "test-token-123"}},
{"batchAssignRole": True},
]
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = ""
mock_session_post.return_value = mock_response
user_urn = graph.create_native_user(
user_id=email,
email=email,
display_name=display_name,
password=password,
role=role,
)
assert user_urn == f"urn:li:corpuser:{email}"
assert mock_graphql.call_count == 2
assert mock_session_post.call_count == 1
role_call = mock_graphql.call_args_list[1]
assert (
role_call[1]["variables"]["input"]["roleUrn"] == "urn:li:dataHubRole:Admin"
)
assert role_call[1]["variables"]["input"]["actors"] == [user_urn]
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
def test_create_native_user_role_normalization(mock_test_connection):
mock_test_connection.return_value = {}
graph = DataHubGraph(DatahubClientConfig(server="http://localhost:8080"))
email = "editor@example.com"
display_name = "Editor User"
password = "editorpass123"
test_cases = [
("ADMIN", "Admin"),
("admin", "Admin"),
("Admin", "Admin"),
("EDITOR", "Editor"),
("editor", "Editor"),
("READER", "Reader"),
("reader", "Reader"),
]
for input_role, expected_role in test_cases:
with (
patch.object(graph, "execute_graphql") as mock_graphql,
patch.object(graph._session, "post") as mock_session_post,
):
mock_graphql.side_effect = [
{"getInviteToken": {"inviteToken": "test-token-123"}},
{"batchAssignRole": True},
]
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = ""
mock_session_post.return_value = mock_response
graph.create_native_user(
user_id=email,
email=email,
display_name=display_name,
password=password,
role=input_role,
)
role_call = mock_graphql.call_args_list[1]
assert (
role_call[1]["variables"]["input"]["roleUrn"]
== f"urn:li:dataHubRole:{expected_role}"
)
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
def test_create_native_user_invalid_role(mock_test_connection):
mock_test_connection.return_value = {}
graph = DataHubGraph(DatahubClientConfig(server="http://localhost:8080"))
email = "test@example.com"
display_name = "Test User"
password = "testpass123"
invalid_role = "InvalidRole"
with (
patch.object(graph, "execute_graphql") as mock_graphql,
patch.object(graph._session, "post") as mock_session_post,
):
mock_graphql.return_value = {
"getInviteToken": {"inviteToken": "test-token-123"}
}
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = ""
mock_session_post.return_value = mock_response
with pytest.raises(ValueError, match="Invalid role"):
graph.create_native_user(
user_id=email,
email=email,
display_name=display_name,
password=password,
role=invalid_role,
)
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
def test_create_native_user_no_invite_token(mock_test_connection):
mock_test_connection.return_value = {}
graph = DataHubGraph(DatahubClientConfig(server="http://localhost:8080"))
email = "test@example.com"
display_name = "Test User"
password = "testpass123"
with patch.object(graph, "execute_graphql") as mock_graphql:
mock_graphql.return_value = {"getInviteToken": {}}
with pytest.raises(OperationalError, match="invite token"):
graph.create_native_user(
user_id=email, email=email, display_name=display_name, password=password
)
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
def test_create_native_user_signup_failure(mock_test_connection):
mock_test_connection.return_value = {}
graph = DataHubGraph(DatahubClientConfig(server="http://localhost:8080"))
email = "test@example.com"
display_name = "Test User"
password = "testpass123"
with (
patch.object(graph, "execute_graphql") as mock_graphql,
patch.object(graph._session, "post") as mock_session_post,
):
mock_graphql.return_value = {
"getInviteToken": {"inviteToken": "test-token-123"}
}
mock_session_post.side_effect = Exception("Backend error")
with pytest.raises(OperationalError, match="Failed to create user"):
graph.create_native_user(
user_id=email, email=email, display_name=display_name, password=password
)
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
def test_create_native_user_role_assignment_failure(mock_test_connection):
mock_test_connection.return_value = {}
graph = DataHubGraph(DatahubClientConfig(server="http://localhost:8080"))
email = "test@example.com"
display_name = "Test User"
password = "testpass123"
role = "admin"
with (
patch.object(graph, "execute_graphql") as mock_graphql,
patch.object(graph._session, "post") as mock_session_post,
patch("datahub.ingestion.graph.client.logger") as mock_logger,
):
mock_graphql.side_effect = [
{"getInviteToken": {"inviteToken": "test-token-123"}},
Exception("Role assignment failed"),
]
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = ""
mock_session_post.return_value = mock_response
user_urn = graph.create_native_user(
user_id=email,
email=email,
display_name=display_name,
password=password,
role=role,
)
assert user_urn == f"urn:li:corpuser:{email}"
# Should be called twice: once in _assign_role_to_user and once in create_native_user
assert mock_logger.warning.call_count == 2
warning_messages = [str(call) for call in mock_logger.warning.call_args_list]
assert any("role assignment failed" in msg.lower() for msg in warning_messages)

View File

@ -4,7 +4,7 @@ import pytest
from typing import List, Optional, Tuple
from _pytest.nodes import Item
import requests
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph, get_default_graph
from tests.test_result_msg import send_message
from tests.utils import (
@ -19,6 +19,8 @@ from tests.utils import (
# Disable telemetry
os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false"
# Suppress logging manager to prevent I/O errors during pytest teardown
os.environ["DATAHUB_SUPPRESS_LOGGING_MANAGER"] = "1"
def build_auth_session():
@ -53,6 +55,18 @@ def openapi_graph_client(auth_session) -> DataHubGraph:
return build_graph_client(auth_session, openapi_ingestion=True)
@pytest.fixture(scope="function", autouse=True)
def clear_graph_cache():
"""Clear the get_default_graph LRU cache before each test.
This ensures that tests using run_datahub_cmd() with custom environment
variables get a fresh DataHubGraph instance instead of a cached one with
stale credentials.
"""
get_default_graph.cache_clear()
yield
def _ingest_cleanup_data_impl(
auth_session,
graph_client,

View File

@ -0,0 +1,314 @@
import logging
import uuid
from typing import Any, List
import pytest
from datahub.ingestion.graph.client import DataHubGraph
from tests.consistency_utils import wait_for_writes_to_sync
from tests.utils import run_datahub_cmd
logger = logging.getLogger(__name__)
def generate_test_email() -> str:
"""Generate a unique email for testing to avoid conflicts."""
return f"test-user-{uuid.uuid4()}@example.com"
def datahub_user_add(
auth_session: Any,
email: str,
display_name: str,
password: str,
role: str | None = None,
user_id: str | None = None,
email_as_id: bool = False,
) -> Any:
"""Run the datahub user add command."""
add_args: List[str] = [
"user",
"add",
"--email",
email,
"--display-name",
display_name,
"--password",
]
if user_id:
add_args.extend(["--id", user_id])
elif email_as_id:
add_args.append("--email-as-id")
if role:
add_args.extend(["--role", role])
result = run_datahub_cmd(
add_args,
input=f"{password}\n{password}\n",
env={
"DATAHUB_GMS_URL": auth_session.gms_url(),
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
},
)
return result
@pytest.fixture
def test_users_cleanup(auth_session: Any, graph_client: DataHubGraph):
"""Fixture to track and clean up test users."""
created_users: List[str] = []
def add_user_to_cleanup(email: str):
user_urn = f"urn:li:corpuser:{email}"
created_users.append(user_urn)
yield add_user_to_cleanup
for user_urn in created_users:
try:
graph_client.hard_delete_entity(user_urn)
except Exception as e:
logger.warning(f"Failed to clean up user {user_urn}: {e}")
if created_users:
wait_for_writes_to_sync()
def test_user_add_without_role(auth_session: Any, test_users_cleanup: Any) -> None:
"""Test creating a user without specifying a role."""
email = generate_test_email()
display_name = "Test User No Role"
password = "testpassword123"
test_users_cleanup(email)
result = datahub_user_add(
auth_session, email, display_name, password, email_as_id=True
)
assert result.exit_code == 0
assert "Successfully created user" in result.output
assert email in result.output
assert "URN:" in result.output
wait_for_writes_to_sync()
def test_user_add_with_admin_role(auth_session: Any, test_users_cleanup: Any) -> None:
"""Test creating a user with Admin role."""
email = generate_test_email()
display_name = "Test Admin User"
password = "adminpass123"
test_users_cleanup(email)
result = datahub_user_add(
auth_session, email, display_name, password, role="Admin", email_as_id=True
)
assert result.exit_code == 0
assert "Successfully created user" in result.output
assert email in result.output
assert "Admin" in result.output
assert "URN:" in result.output
wait_for_writes_to_sync()
def test_user_add_with_editor_role(auth_session: Any, test_users_cleanup: Any) -> None:
"""Test creating a user with Editor role."""
email = generate_test_email()
display_name = "Test Editor User"
password = "editorpass123"
test_users_cleanup(email)
result = datahub_user_add(
auth_session, email, display_name, password, role="editor", email_as_id=True
)
assert result.exit_code == 0
assert "Successfully created user" in result.output
assert email in result.output
assert "Editor" in result.output
assert "URN:" in result.output
wait_for_writes_to_sync()
def test_user_add_with_reader_role(auth_session: Any, test_users_cleanup: Any) -> None:
"""Test creating a user with Reader role."""
email = generate_test_email()
display_name = "Test Reader User"
password = "readerpass123"
test_users_cleanup(email)
result = datahub_user_add(
auth_session, email, display_name, password, role="READER", email_as_id=True
)
assert result.exit_code == 0
assert "Successfully created user" in result.output
assert email in result.output
assert "Reader" in result.output
assert "URN:" in result.output
wait_for_writes_to_sync()
def test_user_add_duplicate_user(auth_session: Any, test_users_cleanup: Any) -> None:
"""Test that creating a duplicate user returns an error."""
email = generate_test_email()
display_name = "Test Duplicate User"
password = "duplicatepass123"
test_users_cleanup(email)
first_result = datahub_user_add(
auth_session, email, display_name, password, email_as_id=True
)
assert first_result.exit_code == 0
wait_for_writes_to_sync()
second_result = datahub_user_add(
auth_session, email, display_name, password, email_as_id=True
)
assert second_result.exit_code == 1
assert "already exists" in second_result.output.lower()
assert email in second_result.output
def test_user_add_without_password_flag(auth_session: Any) -> None:
"""Test that not providing --password flag results in error."""
email = generate_test_email()
display_name = "Test User No Password Flag"
add_args: List[str] = [
"user",
"add",
"--email",
email,
"--display-name",
display_name,
"--email-as-id",
]
result = run_datahub_cmd(
add_args,
env={
"DATAHUB_GMS_URL": auth_session.gms_url(),
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
},
)
assert result.exit_code == 1
assert "password flag is required" in result.output.lower()
def test_user_add_with_explicit_id(auth_session: Any, test_users_cleanup: Any) -> None:
"""Test creating a user with explicit --id option."""
email = generate_test_email()
user_id = f"testuser_{uuid.uuid4().hex[:8]}"
display_name = "Test User With ID"
password = "testpassword123"
# Cleanup with the user ID, not email
test_users_cleanup(user_id)
result = datahub_user_add(
auth_session, email, display_name, password, user_id=user_id
)
assert result.exit_code == 0
assert "Successfully created user" in result.output
assert user_id in result.output
assert "URN:" in result.output
wait_for_writes_to_sync()
def test_user_add_without_id_or_flag(auth_session: Any) -> None:
"""Test that not providing --id or --email-as-id results in error."""
email = generate_test_email()
display_name = "Test User No ID"
add_args: List[str] = [
"user",
"add",
"--email",
email,
"--display-name",
display_name,
"--password",
]
result = run_datahub_cmd(
add_args,
input="password123\npassword123\n",
env={
"DATAHUB_GMS_URL": auth_session.gms_url(),
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
},
)
assert result.exit_code == 1
assert "must specify either --id or --email-as-id" in result.output.lower()
def test_user_add_with_both_id_and_flag(auth_session: Any) -> None:
"""Test that providing both --id and --email-as-id results in error."""
email = generate_test_email()
display_name = "Test User Both Options"
add_args: List[str] = [
"user",
"add",
"--email",
email,
"--id",
"testuser",
"--email-as-id",
"--display-name",
display_name,
"--password",
]
result = run_datahub_cmd(
add_args,
input="password123\npassword123\n",
env={
"DATAHUB_GMS_URL": auth_session.gms_url(),
"DATAHUB_GMS_TOKEN": auth_session.gms_token(),
},
)
assert result.exit_code == 1
assert "cannot specify both --id and --email-as-id" in result.output.lower()
def test_user_add_id_different_from_email(
auth_session: Any, test_users_cleanup: Any
) -> None:
"""Test creating a user where ID is different from email."""
email = "john.doe@company.com"
user_id = "jdoe"
display_name = "John Doe"
password = "securepass123"
test_users_cleanup(user_id)
result = datahub_user_add(
auth_session, email, display_name, password, user_id=user_id, role="Editor"
)
assert result.exit_code == 0
assert "Successfully created user" in result.output
assert user_id in result.output
assert "Editor" in result.output
assert f"urn:li:corpuser:{user_id}" in result.output
wait_for_writes_to_sync()