Teddy 63a55437ae
GEN-1412: Implement load test logic (#19155)
* feat: implemented load test logic

* style: ran python linting

* fix: added locust dependency in test

* fix: skip locust in 3.8 as not supported

* fix: update gcsfs version

* fix: revert gcsfs versionning

* fix: fix gcsf version to 2023.10

* fix: dagster graphql and gx versions

* fix: dagster version to 1.8 for py8 compatibility

* fix: fix clickhouse to 0.2 as 0.3 requires SQA 2+

* fix: revert changes from main

* fix: revert changes compared to main
2025-04-24 16:08:38 +02:00

42 lines
1.2 KiB
Python

"""Load test for the test case resources"""
from locust import TaskSet, task
from _openmetadata_testutils.helpers.login_user import login_user
TEST_CASE_RESOURCE_PATH = "/api/v1/dataQuality/testCases"
class TestCaseTasks(TaskSet):
"""Test case resource load test"""
def _list_test_cases(self):
"""Paginate through the test cases"""
resp = self.client.get(
f"{TEST_CASE_RESOURCE_PATH}",
params={"limit": 10},
auth=self.bearer,
name=f"{TEST_CASE_RESOURCE_PATH}?limit=10",
)
after = resp.json().get("paging", {}).get("after")
while after:
resp = self.client.get(
f"{TEST_CASE_RESOURCE_PATH}",
params={"limit": 10, "after": after},
auth=self.bearer,
name=f"{TEST_CASE_RESOURCE_PATH}?limit=10",
)
after = resp.json().get("paging", {}).get("after")
@task(2)
def list_test_cases(self):
"""List test cases. Weighted 2"""
self._list_test_cases()
@task
def stop(self):
self.interrupt()
def on_start(self):
self.bearer = login_user(self.client)