dify/api/tests/unit_tests/libs/test_passport.py

206 lines
8.4 KiB
Python

from datetime import UTC, datetime, timedelta
from unittest.mock import patch
import jwt
import pytest
from werkzeug.exceptions import Unauthorized
from libs.passport import PassportService
class TestPassportService:
"""Test PassportService JWT operations"""
@pytest.fixture
def passport_service(self):
"""Create PassportService instance with test secret key"""
with patch("libs.passport.dify_config") as mock_config:
mock_config.SECRET_KEY = "test-secret-key-for-testing"
return PassportService()
@pytest.fixture
def another_passport_service(self):
"""Create another PassportService instance with different secret key"""
with patch("libs.passport.dify_config") as mock_config:
mock_config.SECRET_KEY = "another-secret-key-for-testing"
return PassportService()
# Core functionality tests
def test_should_issue_and_verify_token(self, passport_service):
"""Test complete JWT lifecycle: issue and verify"""
payload = {"user_id": "123", "app_code": "test-app"}
token = passport_service.issue(payload)
# Verify token format
assert isinstance(token, str)
assert len(token.split(".")) == 3 # JWT format: header.payload.signature
# Verify token content
decoded = passport_service.verify(token)
assert decoded == payload
def test_should_handle_different_payload_types(self, passport_service):
"""Test issuing and verifying tokens with different payload types"""
test_cases = [
{"string": "value"},
{"number": 42},
{"float": 3.14},
{"boolean": True},
{"null": None},
{"array": [1, 2, 3]},
{"nested": {"key": "value"}},
{"unicode": "中文测试"},
{"emoji": "🔐"},
{}, # Empty payload
]
for payload in test_cases:
token = passport_service.issue(payload)
decoded = passport_service.verify(token)
assert decoded == payload
# Security tests
def test_should_reject_modified_token(self, passport_service):
"""Test that any modification to token invalidates it"""
token = passport_service.issue({"user": "test"})
# Test multiple modification points
test_positions = [0, len(token) // 3, len(token) // 2, len(token) - 1]
for pos in test_positions:
if pos < len(token) and token[pos] != ".":
# Change one character
tampered = token[:pos] + ("X" if token[pos] != "X" else "Y") + token[pos + 1 :]
with pytest.raises(Unauthorized):
passport_service.verify(tampered)
def test_should_reject_token_with_different_secret_key(self, passport_service, another_passport_service):
"""Test key isolation - token from one service should not work with another"""
payload = {"user_id": "123", "app_code": "test-app"}
token = passport_service.issue(payload)
with pytest.raises(Unauthorized) as exc_info:
another_passport_service.verify(token)
assert str(exc_info.value) == "401 Unauthorized: Invalid token signature."
def test_should_use_hs256_algorithm(self, passport_service):
"""Test that HS256 algorithm is used for signing"""
payload = {"test": "data"}
token = passport_service.issue(payload)
# Decode header without relying on JWT internals
# Use jwt.get_unverified_header which is a public API
header = jwt.get_unverified_header(token)
assert header["alg"] == "HS256"
def test_should_reject_token_with_wrong_algorithm(self, passport_service):
"""Test rejection of token signed with different algorithm"""
payload = {"user_id": "123"}
# Create token with different algorithm
with patch("libs.passport.dify_config") as mock_config:
mock_config.SECRET_KEY = "test-secret-key-for-testing"
# Create token with HS512 instead of HS256
wrong_alg_token = jwt.encode(payload, mock_config.SECRET_KEY, algorithm="HS512")
# Should fail because service expects HS256
# InvalidAlgorithmError is now caught by PyJWTError handler
with pytest.raises(Unauthorized) as exc_info:
passport_service.verify(wrong_alg_token)
assert str(exc_info.value) == "401 Unauthorized: Invalid token."
# Exception handling tests
def test_should_handle_invalid_tokens(self, passport_service):
"""Test handling of various invalid token formats"""
invalid_tokens = [
("not.a.token", "Invalid token."),
("invalid-jwt-format", "Invalid token."),
("xxx.yyy.zzz", "Invalid token."),
("a.b", "Invalid token."), # Missing signature
("", "Invalid token."), # Empty string
(" ", "Invalid token."), # Whitespace
(None, "Invalid token."), # None value
# Malformed base64
("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.INVALID_BASE64!@#$.signature", "Invalid token."),
]
for invalid_token, expected_message in invalid_tokens:
with pytest.raises(Unauthorized) as exc_info:
passport_service.verify(invalid_token)
assert expected_message in str(exc_info.value)
def test_should_reject_expired_token(self, passport_service):
"""Test rejection of expired token"""
past_time = datetime.now(UTC) - timedelta(hours=1)
payload = {"user_id": "123", "exp": past_time.timestamp()}
with patch("libs.passport.dify_config") as mock_config:
mock_config.SECRET_KEY = "test-secret-key-for-testing"
token = jwt.encode(payload, mock_config.SECRET_KEY, algorithm="HS256")
with pytest.raises(Unauthorized) as exc_info:
passport_service.verify(token)
assert str(exc_info.value) == "401 Unauthorized: Token has expired."
# Configuration tests
def test_should_handle_empty_secret_key(self):
"""Test behavior when SECRET_KEY is empty"""
with patch("libs.passport.dify_config") as mock_config:
mock_config.SECRET_KEY = ""
service = PassportService()
# Empty secret key should still work but is insecure
payload = {"test": "data"}
token = service.issue(payload)
decoded = service.verify(token)
assert decoded == payload
def test_should_handle_none_secret_key(self):
"""Test behavior when SECRET_KEY is None"""
with patch("libs.passport.dify_config") as mock_config:
mock_config.SECRET_KEY = None
service = PassportService()
payload = {"test": "data"}
# JWT library will raise TypeError when secret is None
with pytest.raises((TypeError, jwt.exceptions.InvalidKeyError)):
service.issue(payload)
# Boundary condition tests
def test_should_handle_large_payload(self, passport_service):
"""Test handling of large payload"""
# Test with 100KB instead of 1MB for faster tests
large_data = "x" * (100 * 1024)
payload = {"data": large_data}
token = passport_service.issue(payload)
decoded = passport_service.verify(token)
assert decoded["data"] == large_data
def test_should_handle_special_characters_in_payload(self, passport_service):
"""Test handling of special characters in payload"""
special_payloads = [
{"special": "!@#$%^&*()"},
{"quotes": 'He said "Hello"'},
{"backslash": "path\\to\\file"},
{"newline": "line1\nline2"},
{"unicode": "🔐🔑🛡️"},
{"mixed": "Test123!@#中文🔐"},
]
for payload in special_payloads:
token = passport_service.issue(payload)
decoded = passport_service.verify(token)
assert decoded == payload
def test_should_catch_generic_pyjwt_errors(self, passport_service):
"""Test that generic PyJWTError exceptions are caught and converted to Unauthorized"""
# Mock jwt.decode to raise a generic PyJWTError
with patch("libs.passport.jwt.decode") as mock_decode:
mock_decode.side_effect = jwt.exceptions.PyJWTError("Generic JWT error")
with pytest.raises(Unauthorized) as exc_info:
passport_service.verify("some-token")
assert str(exc_info.value) == "401 Unauthorized: Invalid token."