datahub/smoke-test/conftest.py

100 lines
3.2 KiB
Python
Raw Normal View History

import os
import pytest
2024-12-26 21:22:16 +05:30
from typing import List, Tuple
from _pytest.nodes import Item
import requests
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from tests.test_result_msg import send_message
from tests.utils import (
TestSessionWrapper,
get_frontend_session,
wait_for_healthcheck_util,
)
# Disable telemetry
os.environ["DATAHUB_TELEMETRY_ENABLED"] = "false"
2024-10-22 06:59:40 -05:00
def build_auth_session():
wait_for_healthcheck_util(requests)
return TestSessionWrapper(get_frontend_session())
@pytest.fixture(scope="session")
def auth_session():
2024-10-22 06:59:40 -05:00
auth_session = build_auth_session()
yield auth_session
auth_session.destroy()
2024-10-22 06:59:40 -05:00
def build_graph_client(auth_session):
print(auth_session.cookies)
graph: DataHubGraph = DataHubGraph(
config=DatahubClientConfig(
server=auth_session.gms_url(), token=auth_session.gms_token()
)
)
return graph
2024-10-22 06:59:40 -05:00
@pytest.fixture(scope="session")
def graph_client(auth_session) -> DataHubGraph:
return build_graph_client(auth_session)
def pytest_sessionfinish(session, exitstatus):
"""whole test run finishes."""
send_message(exitstatus)
2024-12-26 21:22:16 +05:30
def get_batch_start_end(num_tests: int) -> Tuple[int, int]:
batch_count_env = os.getenv("BATCH_COUNT", 1)
batch_count = int(batch_count_env)
batch_number_env = os.getenv("BATCH_NUMBER", 0)
batch_number = int(batch_number_env)
if batch_count == 0 or batch_count > num_tests:
raise ValueError(
f"Invalid batch count {batch_count}: must be >0 and <= {num_tests} (num_tests)"
)
if batch_number >= batch_count:
raise ValueError(
f"Invalid batch number: {batch_number}, must be less than {batch_count} (zer0 based index)"
)
batch_size = round(num_tests / batch_count)
batch_start = batch_size * batch_number
batch_end = batch_start + batch_size
# We must have exactly as many batches as specified by BATCH_COUNT.
if (
num_tests - batch_end < batch_size
): # We must have exactly as many batches as specified by BATCH_COUNT, put the remaining in the last batch.
batch_end = num_tests
if batch_count > 0:
print(f"Running tests for batch {batch_number} of {batch_count}")
return batch_start, batch_end
def pytest_collection_modifyitems(
session: pytest.Session, config: pytest.Config, items: List[Item]
) -> None:
if os.getenv("TEST_STRATEGY") == "cypress":
return # We launch cypress via pytests, but needs a different batching mechanism at cypress level.
# If BATCH_COUNT and BATCH_ENV vars are set, splits the pytests to batches and runs filters only the BATCH_NUMBER
# batch for execution. Enables multiple parallel launches. Current implementation assumes all test are of equal
# weight for batching. TODO. A weighted batching method can help make batches more equal sized by cost.
# this effectively is a no-op if BATCH_COUNT=1
start_index, end_index = get_batch_start_end(num_tests=len(items))
items.sort(key=lambda x: x.nodeid) # we want the order to be stable across batches
# replace items with the filtered list
print(f"Running tests for batch {start_index}-{end_index}")
items[:] = items[start_index:end_index]