GEN-1412: Implement load test logic (#19155)

* feat: implemented load test logic

* style: ran python linting

* fix: added locust dependency in test

* fix: skip locust in 3.8 as not supported

* fix: update gcsfs version

* fix: revert gcsfs versionning

* fix: fix gcsf version to 2023.10

* fix: dagster graphql and gx versions

* fix: dagster version to 1.8 for py8 compatibility

* fix: fix clickhouse to 0.2 as 0.3 requires SQA 2+

* fix: revert changes from main

* fix: revert changes compared to main
This commit is contained in:
Teddy 2025-04-24 16:08:38 +02:00 committed by GitHub
parent 4fe9c247e8
commit 63a55437ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 444 additions and 2 deletions

1
.gitignore vendored
View File

@ -103,6 +103,7 @@ ingestion/requirements.txt
ingestion/.python-version
ingestion/venv2/**
.python-version
ingestion/tests/load/summaries/*.csv
# MLFlow
mlruns/

View File

@ -13,6 +13,7 @@
Python Dependencies
"""
import sys
from typing import Dict, List, Set
from setuptools import setup
@ -189,7 +190,7 @@ plugins: Dict[str, Set[str]] = {
},
"clickhouse": {
"clickhouse-driver~=0.2",
"clickhouse-sqlalchemy~=0.2",
"clickhouse-sqlalchemy~=0.2.0",
DATA_DIFF["clickhouse"],
},
"dagster": {
@ -197,7 +198,7 @@ plugins: Dict[str, Set[str]] = {
VERSIONS["pymysql"],
"psycopg2-binary",
VERSIONS["geoalchemy2"],
"dagster_graphql~=1.1",
"dagster_graphql>=1.8.0",
},
"dbt": {
"google-cloud",
@ -438,6 +439,9 @@ test = {
*plugins["bigquery"],
}
if sys.version_info >= (3, 9):
test.add("locust~=2.32.0")
e2e_test = {
# playwright dependencies
"pytest-playwright",

View File

@ -0,0 +1,22 @@
"""Base class for resource tests."""
from locust.contrib.fasthttp import FastHttpSession
from requests.auth import AuthBase
class BearerAuth(AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers["authorization"] = "Bearer " + self.token
return r
def login_user(client: FastHttpSession) -> BearerAuth:
resp = client.post(
"/api/v1/users/login",
json={"email": "admin@open-metadata.org", "password": "YWRtaW4="},
)
token = resp.json().get("accessToken")
return BearerAuth(token)

View File

@ -0,0 +1,129 @@
## Adding a new resource to load tests
Add a new `*.py` file to `test_resources/tasks`. The naming does not matter, but we use the resource name as defined in Java, but seperated by `_` (e.g. `TestCaseResource` becomes `test_case_tasks.py`).
In your newly created file, you'll need to import at minimum 1 package
```python
from locust import task, TaskSet
```
`task` will be used as a decorator to define our task that will run as part of our load test. `TaskSet` wil be inherited by our task set class.
Here is an example of a locust task definition. The integer argument in `@task` will give a specific weigth to the task (i.e. increasing its probability to be ran)
```
class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""
def _list_test_case_results(self, start_ts: int, end_ts: int, days_range: str):
"""List test case results for a given time range
Args:
start_ts (int): start timestamp
end_ts (int): end timestamp
range (str):
"""
for test_case in self.test_cases:
fqn = test_case.get("fullyQualifiedName")
if fqn:
self.client.get(
f"{TEST_CASE_RESULT_RESOURCE_PATH}/{fqn}",
params={ # type: ignore
"startTs": start_ts,
"endTs": end_ts,
},
auth=self.bearer,
name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}"
)
@task(3)
def list_test_case_results_30_days(self):
"""List test case results for the last 30 days. Weighted 3"""
now = datetime.now()
last_30_days = int((now - timedelta(days=30)).timestamp() * 1000)
self._list_test_case_results(last_30_days, int(now.timestamp() * 1000), "30_days")
```
Notice how we use `self.client.get` to perform the request. This is provided by locust `HttpSession`. If the request needs to be authenticated, you can use `auth=self.bearer`. You will need to first define `self.bearer`, you can achieve this using the `on_start` hook from locust.
```python
from _openmetadata_testutils.helpers.login_user import login_user
class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""
[...]
def on_start(self):
"""Get a list of test cases to fetch results for"""
self.bearer = login_user(self.client)
resp = self.client.get(f"{TEST_CASE_RESOURCE_PATH}", params={"limit": 100}, auth=self.bearer)
json = resp.json()
self.test_cases = json.get("data", [])
```
**IMPORTANT**
You MUST define a `def stop(self)` methodd in your `TaskSet` class as shown below so that control is given back to the parent user class.
```python
class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""
[...]
@task
def stop(self):
self.interrupt()
```
If your request contains a parameter (i.e. `/api/v1/dataQuality/testCases/testCaseResults/{fqn}`) you can name your request so all the request sent you will be grouped together like this
```python
self.client.get(
f"{TEST_CASE_RESULT_RESOURCE_PATH}/{fqn}",
params={ # type: ignore
"startTs": start_ts,
"endTs": end_ts,
},
auth=self.bearer,
name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}"
)
```
Notice the argument `name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}"`, this will define under which name the requests will be grouped. Example of statistics summary below grouped by the request `name`
```csv
Type,Name,Request Count,Failure Count,Median Response Time,Average Response Time,Min Response Time,Max Response Time,Average Content Size,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%
GET,/api/v1/dataQuality/testCases/testCaseResults/[fqn]/60_days,3510,0,13,16.2354597524217,5.146791999997902,100.67633299999557,84567.57407407407,49.30531562959204,0.0,13,17,20,21,28,35,45,56,92,100,100
```
As a final step in `test_resources/manifest.yaml` add the resources, the metrics and the thresholds you want to test.
```yaml
/api/v1/dataQuality/testCases/testCaseResults/[fqn]/30_days:
type: GET
99%: 100
/api/v1/dataQuality/testCases/testCaseResults/[fqn]/60_days:
type: GET
99%: 100
```
This will test that our GET request for the defined resources are running 99% of the time in less than 100 milliseconds (0.1 seconds).
Below is a list of all the metrics you can use:
- Request Count
- Failure Count
- Median Response Time
- Average Response Time
- Min Response Time
- Max Response Time
- Average Content Size
- Requests/s
- Failures/s
- 50%
- 66%
- 75%
- 80%
- 90%
- 95%
- 98%
- 99%
- 99.9%
- 99.99%
- 100%

View File

View File

View File

@ -0,0 +1,73 @@
"""Run test case result resource load test"""
import csv
import os
import sys
from pathlib import Path
from unittest import TestCase, skipIf
import yaml
from ingestion.tests.load.utils import run_load_test
def run_all_resources(summary_file: str, locust_file: str):
"""Test test case result resource"""
args = [
"locust",
"--headless",
"-H",
"http://localhost:8585",
"--user",
os.getenv("LOCUST_USER", "50"),
"--spawn-rate",
"1",
"-f",
str(locust_file),
"--run-time",
os.getenv("LOCUST_RUNTIME", "1m"),
"--only-summary",
"--csv",
str(summary_file),
]
run_load_test(args)
class TestAllResources(TestCase):
"""Test class to run all resources load test"""
@skipIf(sys.version_info < (3, 9), "locust is not supported on python 3.8")
def test_all_resources(self):
"""Test all resources"""
directory = Path(__file__).parent
test_resources_dir = directory.joinpath("test_resources")
locust_file = test_resources_dir.joinpath("all_resources.py")
summary_file = directory.parent.joinpath("load/summaries/all_")
manifest_file = test_resources_dir.joinpath("manifest.yaml")
run_all_resources(str(summary_file), str(locust_file))
with open(manifest_file, "r", encoding="utf-8") as f:
manifest = yaml.safe_load(f)
with open(str(summary_file) + "_stats.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
name = row.get("Name")
if name in manifest:
resource = manifest.get(name)
type_ = resource.get("type")
if type_ == row.get("Type"):
for metric, threshold in resource.items():
with self.subTest(stat=metric, resource=name, type_=type_):
stat = row.get(metric)
if stat:
stat = int(stat)
self.assertLessEqual(
stat,
threshold,
msg=f"{metric} for {name} is greater than threshold",
)

View File

@ -0,0 +1,50 @@
"""Test class to run all resources load test"""
import importlib.util
import inspect
import logging
from pathlib import Path
from typing import List
from locust import HttpUser, TaskSet, constant
TASKS_DIR = "tasks"
logger = logging.getLogger(__name__)
def get_all_tasks_set() -> List:
resource_classes = []
wd = Path(__file__).parent.joinpath(TASKS_DIR)
for file_path in wd.glob("*.py"):
if not str(file_path).startswith("base_"):
module_path = str(file_path)
module_name = file_path.stem
spec = importlib.util.spec_from_file_location(module_name, module_path)
if not spec:
logger.error(f"Could not load module {module_name}")
continue
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore
for _, obj in inspect.getmembers(module, inspect.isclass):
if obj.__module__ == module_name:
resource_classes.append(obj)
return resource_classes
class AllResources(TaskSet):
"""Execute tasks for all resources"""
@classmethod
def set_tasks(cls):
tasks = get_all_tasks_set()
cls.tasks = set(tasks)
class All(HttpUser):
host = "http://localhost:8585"
wait_time = constant(1) # closed workload
AllResources.set_tasks()
tasks = [AllResources]

View File

@ -0,0 +1,19 @@
# Description: This file contains the manifest for the test resources.
# You can add as a key of a resource any metric available in `summaries/all__summaries.csv` file.
# times should be expressed in milliseconds (e.g. 1000ms = 1s)
/api/v1/dataQuality/testCases/testCaseResults/[fqn]/30_days:
type: GET
99%: 1000
/api/v1/dataQuality/testCases/testCaseResults/[fqn]/60_days:
type: GET
99%: 1000
/api/v1/dataQuality/testCases/testCaseResults/[fqn]/180_days:
type: GET
99%: 1000
/api/v1/dataQuality/testCases?limit=10:
type: GET
99%: 6000

View File

@ -0,0 +1,81 @@
"""Load test for the test case result resources"""
from datetime import datetime, timedelta
from locust import TaskSet, task
from _openmetadata_testutils.helpers.login_user import login_user
TEST_CASE_RESULT_RESOURCE_PATH = "/api/v1/dataQuality/testCases/testCaseResults"
TEST_CASE_RESOURCE_PATH = "/api/v1/dataQuality/testCases"
class TestCaseResultTasks(TaskSet):
"""Test case result resource load test"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.test_cases = []
def _list_test_case_results(self, start_ts: int, end_ts: int, days_range: str):
"""List test case results for a given time range
Args:
start_ts (int): start timestamp
end_ts (int): end timestamp
range (str):
"""
for test_case in self.test_cases:
fqn = test_case.get("fullyQualifiedName")
if fqn:
self.client.get(
f"{TEST_CASE_RESULT_RESOURCE_PATH}/{fqn}",
params={ # type: ignore
"startTs": start_ts,
"endTs": end_ts,
},
auth=self.bearer,
name=f"{TEST_CASE_RESULT_RESOURCE_PATH}/[fqn]/{days_range}",
)
@task(3)
def list_test_case_results_30_days(self):
"""List test case results for the last 30 days. Weighted 3"""
now = datetime.now()
last_30_days = int((now - timedelta(days=30)).timestamp() * 1000)
self._list_test_case_results(
last_30_days, int(now.timestamp() * 1000), "30_days"
)
@task(2)
def list_test_case_results_60_days(self):
"""List test case results for the last 60 days. Weighted 2"""
now = datetime.now()
last_60_days = int((now - timedelta(days=60)).timestamp() * 1000)
self._list_test_case_results(
last_60_days, int(now.timestamp() * 1000), "60_days"
)
@task
def list_test_case_results_180_days(self):
"""List test case results for the last 180 days"""
now = datetime.now()
last_180_days = int((now - timedelta(days=180)).timestamp() * 1000)
self._list_test_case_results(
last_180_days, int(now.timestamp() * 1000), "180_days"
)
@task
def stop(self):
self.interrupt()
def on_start(self):
"""Get a list of test cases to fetch results for"""
self.bearer = login_user(self.client)
resp = self.client.get(
f"{TEST_CASE_RESOURCE_PATH}",
params={"limit": 100},
auth=self.bearer,
name=f"{TEST_CASE_RESOURCE_PATH}?limit=100",
)
json = resp.json()
self.test_cases = json.get("data", [])

View File

@ -0,0 +1,41 @@
"""Load test for the test case resources"""
from locust import TaskSet, task
from _openmetadata_testutils.helpers.login_user import login_user
TEST_CASE_RESOURCE_PATH = "/api/v1/dataQuality/testCases"
class TestCaseTasks(TaskSet):
"""Test case resource load test"""
def _list_test_cases(self):
"""Paginate through the test cases"""
resp = self.client.get(
f"{TEST_CASE_RESOURCE_PATH}",
params={"limit": 10},
auth=self.bearer,
name=f"{TEST_CASE_RESOURCE_PATH}?limit=10",
)
after = resp.json().get("paging", {}).get("after")
while after:
resp = self.client.get(
f"{TEST_CASE_RESOURCE_PATH}",
params={"limit": 10, "after": after},
auth=self.bearer,
name=f"{TEST_CASE_RESOURCE_PATH}?limit=10",
)
after = resp.json().get("paging", {}).get("after")
@task(2)
def list_test_cases(self):
"""List test cases. Weighted 2"""
self._list_test_cases()
@task
def stop(self):
self.interrupt()
def on_start(self):
self.bearer = login_user(self.client)

View File

@ -0,0 +1,22 @@
"""Utils functions for load testing."""
import sys
from typing import List
import pytest
from locust import main
TEST_CASE_RESOURCE_PATH = "/api/v1/dataQuality/testCases"
TEST_CASE_RESULT_RESOURCE_PATH = "/api/v1/dataQuality/testCases/testCaseResults"
def run_load_test(args: List[str]):
"""Test test case result resource"""
original_argv = sys.argv
try:
sys.argv = args
with pytest.raises(SystemExit) as excinfo:
main.main()
assert excinfo.value.code == 0
finally:
sys.argv = original_argv