mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-06 00:08:09 +00:00
414 lines
15 KiB
Python
414 lines
15 KiB
Python
"""
|
|
Tests for the DataHub tracking API (v1) that verify events are properly tracked and delivered to different destinations.
|
|
|
|
This test suite validates that events sent to the /openapi/v1/tracking/track endpoint are:
|
|
1. Successfully processed by the server
|
|
2. Correctly delivered to configured tracking destinations:
|
|
- Mixpanel: Verifies events are sent to Mixpanel's JQL API
|
|
- Kafka: Verifies events are published to the DataHubUsageEvent_v1 topic
|
|
- Elasticsearch: Verifies events are indexed and searchable
|
|
|
|
Each test creates a unique event with a custom identifier to ensure test isolation and uses appropriate
|
|
wait times and retries to account for asynchronous processing in the different systems.
|
|
|
|
Note: Some tests may be skipped if required configuration (e.g., Mixpanel API secret) is not available.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
import requests
|
|
from confluent_kafka import Consumer, KafkaError
|
|
from dotenv import load_dotenv
|
|
|
|
from tests.utils import get_kafka_broker_url
|
|
|
|
# Load environment variables from .env file if it exists
|
|
load_dotenv()
|
|
|
|
# Event name constant
|
|
EVENT_NAME = "PageViewEvent"
|
|
|
|
|
|
def test_tracking_api_mixpanel(auth_session, graph_client):
|
|
"""Test that we can post events to the tracking endpoint and verify they are sent to Mixpanel."""
|
|
|
|
# Check if Mixpanel API secret is available in environment variables
|
|
api_secret = os.environ.get("MIXPANEL_API_SECRET")
|
|
if not api_secret:
|
|
pytest.skip("MIXPANEL_API_SECRET environment variable not set, skipping test")
|
|
|
|
# Check if Mixpanel is enabled in server configuration
|
|
graph_client.test_connection() # Hack: Needed to ensure that the server config is loaded
|
|
if not graph_client.server_config.get("telemetry", {}).get(
|
|
"enabledMixpanel", False
|
|
):
|
|
pytest.skip("Mixpanel is disabled in server configuration, skipping test")
|
|
|
|
# Test configuration
|
|
base_url = auth_session.gms_url() # Use the authenticated GMS URL
|
|
|
|
# Create a test event with a unique identifier
|
|
unique_id = f"test_{datetime.now().strftime('%Y%m%d%H%M%S')}"
|
|
print(f"\nLooking for test event with customField: {unique_id}")
|
|
now = datetime.now(timezone.utc)
|
|
test_event = {
|
|
"type": EVENT_NAME,
|
|
"entityType": "dataset",
|
|
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,example_dataset,PROD)",
|
|
"actorUrn": "urn:li:corpuser:test_user",
|
|
"timestamp": int(now.timestamp() * 1000), # Unix epoch milliseconds
|
|
"customField": unique_id, # Use unique ID to identify our test event
|
|
}
|
|
|
|
# Post the event to the tracking endpoint
|
|
response = auth_session.post(
|
|
f"{base_url}/openapi/v1/tracking/track", json=test_event
|
|
)
|
|
|
|
# Print the response for debugging
|
|
print(f"\nResponse status: {response.status_code}")
|
|
print(f"Response body: {response.text}")
|
|
|
|
# Verify the response
|
|
assert response.status_code == 200, f"Failed to post event: {response.text}"
|
|
|
|
# Wait a moment for the event to be processed by Mixpanel
|
|
print("\nWaiting for event to be processed by Mixpanel...")
|
|
time.sleep(10)
|
|
|
|
# Query Mixpanel's JQL API to retrieve our test event
|
|
# Note: This requires a service account with access to JQL
|
|
project_id = os.environ.get(
|
|
"MIXPANEL_PROJECT_ID", "3653440"
|
|
) # Allow project ID to be configurable too
|
|
|
|
# log the unique_id
|
|
print(f"\nLooking for test event with customField: {unique_id}")
|
|
|
|
# JQL query
|
|
jql_query = f"""
|
|
function main() {{
|
|
return Events({{
|
|
from_date: '2024-01-01',
|
|
to_date: '2025-12-31',
|
|
event: '{EVENT_NAME}',
|
|
where: "properties.customField == '{unique_id}'"
|
|
}});
|
|
}}
|
|
"""
|
|
jql_query = f"""
|
|
function main() {{
|
|
return Events({{
|
|
from_date: '2024-01-01',
|
|
to_date: '2025-12-31',
|
|
event_selectors: [{{event: '{EVENT_NAME}'}}]
|
|
}}).filter(function(event) {{
|
|
return event.properties.customField == '{unique_id}';
|
|
}});
|
|
}}
|
|
"""
|
|
|
|
# URL with project ID
|
|
url = f"https://mixpanel.com/api/query/jql?project_id={project_id}"
|
|
|
|
# Headers
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
}
|
|
|
|
# Payload
|
|
payload = {
|
|
"script": jql_query,
|
|
}
|
|
|
|
# attempt to query mixpanel 3 times before giving up
|
|
|
|
for i in range(20):
|
|
# Send request with Basic Auth
|
|
response = requests.post(
|
|
url, data=payload, headers=headers, auth=(api_secret, "")
|
|
)
|
|
|
|
# Print response
|
|
print(f"\nMixpanel JQL response status: {response.status_code}")
|
|
print(response.text)
|
|
|
|
if response.status_code == 200:
|
|
events = response.json()
|
|
print(f"\nFound {len(events)} matching events in Mixpanel")
|
|
|
|
if len(events) > 0:
|
|
# Verify the event properties
|
|
event = events[0]
|
|
properties = event.get("properties", {})
|
|
|
|
assert properties.get("type") == EVENT_NAME, (
|
|
"Mixpanel event type doesn't match"
|
|
)
|
|
assert properties.get("entityType") == "dataset", (
|
|
"Mixpanel entity type doesn't match"
|
|
)
|
|
assert (
|
|
properties.get("entityUrn")
|
|
== "urn:li:dataset:(urn:li:dataPlatform:bigquery,example_dataset,PROD)"
|
|
), "Mixpanel entity URN doesn't match"
|
|
assert properties.get("actorUrn") == "urn:li:corpuser:test_user", (
|
|
"Mixpanel actor URN doesn't match"
|
|
)
|
|
assert properties.get("customField") == unique_id, (
|
|
"Mixpanel custom field doesn't match"
|
|
)
|
|
print("\nSuccessfully verified event in Mixpanel")
|
|
return
|
|
else:
|
|
print(
|
|
"\nNo matching events found in Mixpanel... waiting 1 second and trying again"
|
|
)
|
|
else:
|
|
print(f"\nFailed to query Mixpanel: {response.text}")
|
|
if i == 2:
|
|
raise Exception(f"\nFailed to query Mixpanel: {response.text}")
|
|
|
|
time.sleep(1)
|
|
# Don't fail the test if Mixpanel query fails, as this might be due to API limitations
|
|
# in the test environment
|
|
raise Exception("Failed to verify event in Mixpanel")
|
|
|
|
|
|
def test_tracking_api_kafka(auth_session):
|
|
"""Test that we can post events to the tracking endpoint and verify they are sent to Kafka (DUE)."""
|
|
|
|
# Test configuration
|
|
base_url = auth_session.gms_url() # Use the authenticated GMS URL
|
|
kafka_bootstrap_servers = get_kafka_broker_url().replace(
|
|
":29092", ":9092"
|
|
) # Use port 9092
|
|
due_topic = "DataHubUsageEvent_v1" # The DUE topic we'll read from
|
|
|
|
# Create a test event with a unique identifier
|
|
unique_id = f"test_{datetime.now().strftime('%Y%m%d%H%M%S')}"
|
|
print(f"\nLooking for test event with customField: {unique_id}")
|
|
now = datetime.now(timezone.utc)
|
|
test_event = {
|
|
"type": EVENT_NAME,
|
|
"entityType": "dataset",
|
|
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,example_dataset,PROD)",
|
|
"actorUrn": "urn:li:corpuser:test_user",
|
|
"timestamp": int(now.timestamp() * 1000), # Unix epoch milliseconds
|
|
"customField": unique_id, # Use unique ID to identify our test event
|
|
}
|
|
|
|
# Create a Kafka consumer for DUE events BEFORE sending the event
|
|
group_id = f"test_tracking_consumer_{unique_id}"
|
|
print(
|
|
f"\nCreating Kafka consumer for topic {due_topic} with group_id {group_id}..."
|
|
)
|
|
|
|
# Configure the consumer
|
|
consumer_config = {
|
|
"bootstrap.servers": kafka_bootstrap_servers,
|
|
"group.id": group_id,
|
|
"auto.offset.reset": "earliest",
|
|
"enable.auto.commit": True,
|
|
}
|
|
due_consumer = Consumer(consumer_config)
|
|
due_consumer.subscribe([due_topic])
|
|
print(f"Subscribed to topic: {due_topic}")
|
|
|
|
# Wait a moment to ensure the consumer is ready
|
|
print("Waiting for consumer to be ready...")
|
|
time.sleep(2)
|
|
|
|
# Get the current timestamp before sending the event
|
|
start_time = int(time.time() * 1000) # Convert to milliseconds
|
|
print(f"Starting timestamp: {start_time}")
|
|
|
|
# Post the event to the tracking endpoint
|
|
response = auth_session.post(
|
|
f"{base_url}/openapi/v1/tracking/track", json=test_event
|
|
)
|
|
|
|
# Print the response for debugging
|
|
print(f"\nResponse status: {response.status_code}")
|
|
print(f"Response body: {response.text}")
|
|
|
|
# Verify the response
|
|
assert response.status_code == 200, f"Failed to post event: {response.text}"
|
|
|
|
print(f"\nWaiting for messages on topic {due_topic}...")
|
|
|
|
# Read messages from Kafka
|
|
messages = []
|
|
poll_start_time = time.time()
|
|
timeout = 10 # seconds
|
|
|
|
while time.time() - poll_start_time < timeout:
|
|
msg = due_consumer.poll(1.0) # Poll with 1 second timeout
|
|
if msg is None:
|
|
time.sleep(0.1) # Small delay between polls
|
|
continue
|
|
if msg.error():
|
|
if msg.error().code() == KafkaError._PARTITION_EOF:
|
|
continue
|
|
else:
|
|
print(f"Consumer error: {msg.error()}")
|
|
break
|
|
|
|
try:
|
|
message = json.loads(msg.value().decode("utf-8"))
|
|
print(f"Found message: {message}")
|
|
messages.append(message)
|
|
|
|
# Check if this is our test event
|
|
if (
|
|
message.get("type") == EVENT_NAME
|
|
and message.get("customField") == unique_id
|
|
):
|
|
print(f"Found our test event: {message}")
|
|
break
|
|
except Exception as e:
|
|
print(f"Error processing message: {e}")
|
|
continue
|
|
|
|
# Close the consumer
|
|
due_consumer.close()
|
|
|
|
# Verify we found our test event in DUE
|
|
assert len(messages) > 0, "No messages found in Kafka"
|
|
|
|
# Find our test event by matching the event type and custom field
|
|
found_event = False
|
|
for message in messages:
|
|
if isinstance(message, dict):
|
|
print(f"Found message with customField: {message.get('customField')}")
|
|
if (
|
|
message.get("type") == EVENT_NAME
|
|
and message.get("customField") == unique_id
|
|
):
|
|
found_event = True
|
|
# Verify other fields match
|
|
assert message.get("entityType") == "dataset", (
|
|
"Entity type doesn't match"
|
|
)
|
|
assert (
|
|
message.get("entityUrn")
|
|
== "urn:li:dataset:(urn:li:dataPlatform:bigquery,example_dataset,PROD)"
|
|
), "Entity URN doesn't match"
|
|
assert message.get("actorUrn") == "urn:li:corpuser:test_user", (
|
|
"Actor URN doesn't match"
|
|
)
|
|
break
|
|
|
|
assert found_event, "Test event not found in Kafka messages"
|
|
|
|
|
|
def test_tracking_api_elasticsearch(auth_session):
|
|
"""Test that we can post events to the tracking endpoint and verify they are indexed in Elasticsearch."""
|
|
|
|
# Test configuration
|
|
base_url = auth_session.gms_url() # Use the authenticated GMS URL
|
|
|
|
# Create a test event with a unique identifier
|
|
unique_id = f"test_{datetime.now().strftime('%Y%m%d%H%M%S')}"
|
|
print(f"\nLooking for test event with customField: {unique_id}")
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Create a test event that mimics a real PageViewEvent
|
|
test_event = {
|
|
"title": "Acryl DataHub",
|
|
"url": "http://localhost:9002/",
|
|
"path": "/",
|
|
"hash": "",
|
|
"search": "",
|
|
"width": 2827,
|
|
"height": 1272,
|
|
"prevPathname": "",
|
|
"type": "PageViewEvent",
|
|
"actorUrn": "urn:li:corpuser:test_user",
|
|
"timestamp": int(now.timestamp() * 1000), # Unix epoch milliseconds
|
|
"date": now.strftime("%a %b %d %Y %H:%M:%S GMT%z"),
|
|
"userAgent": "Mozilla/5.0 (Test Browser)",
|
|
"browserId": unique_id, # Use unique ID to identify our test event
|
|
"origin": "http://localhost:9002",
|
|
"isThemeV2Enabled": True,
|
|
"userPersona": "urn:li:dataHubPersona:businessUser",
|
|
"corp_user_name": "Test User",
|
|
"corp_user_username": "test_user",
|
|
"customField": unique_id, # Additional field to identify our test event
|
|
}
|
|
|
|
# Post the event to the tracking endpoint
|
|
response = auth_session.post(
|
|
f"{base_url}/openapi/v1/tracking/track", json=test_event
|
|
)
|
|
|
|
# Print the response for debugging
|
|
print(f"\nResponse status: {response.status_code}")
|
|
print(f"Response body: {response.text}")
|
|
|
|
# Verify the response
|
|
assert response.status_code == 200, f"Failed to post event: {response.text}"
|
|
|
|
# Wait a moment for the event to be processed by Kafka and indexed in Elasticsearch
|
|
print("\nWaiting for event to be processed and indexed in Elasticsearch...")
|
|
time.sleep(10) # Give more time for Elasticsearch indexing
|
|
|
|
# Query Elasticsearch to retrieve our test event
|
|
# This requires the Elasticsearch URL and credentials
|
|
es_url = os.environ.get("ELASTICSEARCH_URL", "http://localhost:9200")
|
|
es_index = os.environ.get("ELASTICSEARCH_INDEX", "datahub_usage_event")
|
|
|
|
# Create a query to find our test event by the unique browserId and customField
|
|
es_query = {
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{"term": {"browserId": unique_id}},
|
|
{"term": {"customField": unique_id}},
|
|
{"term": {"type": "PageViewEvent"}},
|
|
]
|
|
}
|
|
}
|
|
}
|
|
|
|
try:
|
|
es_response = requests.post(f"{es_url}/{es_index}/_search", json=es_query)
|
|
print(f"\nElasticsearch response status: {es_response.status_code}")
|
|
|
|
if es_response.status_code == 200:
|
|
result = es_response.json()
|
|
hits = result.get("hits", {}).get("hits", [])
|
|
print(f"\nFound {len(hits)} matching events in Elasticsearch")
|
|
|
|
# Verify we found our test event
|
|
assert len(hits) > 0, "No matching events found in Elasticsearch"
|
|
|
|
# Verify the event properties
|
|
event = hits[0].get("_source", {})
|
|
|
|
assert event.get("type") == "PageViewEvent", (
|
|
"Elasticsearch event type doesn't match"
|
|
)
|
|
assert event.get("actorUrn") == "urn:li:corpuser:test_user", (
|
|
"Elasticsearch actor URN doesn't match"
|
|
)
|
|
assert event.get("browserId") == unique_id, (
|
|
"Elasticsearch browser ID doesn't match"
|
|
)
|
|
assert event.get("customField") == unique_id, (
|
|
"Elasticsearch custom field doesn't match"
|
|
)
|
|
|
|
print("\nSuccessfully verified event in Elasticsearch")
|
|
else:
|
|
print(f"\nFailed to query Elasticsearch: {es_response.text}")
|
|
# Don't fail the test if Elasticsearch query fails, as this might be due to configuration
|
|
# in the test environment
|
|
except Exception as e:
|
|
print(f"\nError querying Elasticsearch: {str(e)}")
|
|
# Don't fail the test if Elasticsearch query fails
|