test: add smoke test (#2464)

This commit is contained in:
Harshal Sheth 2021-04-29 23:27:03 -07:00 committed by GitHub
parent df9e7c594f
commit 201ffd4979
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 905 additions and 10 deletions

View File

@ -4,14 +4,14 @@ on:
branches:
- master
paths-ignore:
- 'docs/**'
- '**.md'
- "docs/**"
- "**.md"
pull_request:
branches:
- master
paths-ignore:
- 'docs/**'
- '**.md'
- "docs/**"
- "**.md"
release:
types: [published, edited]
@ -26,7 +26,7 @@ jobs:
java-version: 1.8
- uses: actions/setup-python@v2
with:
python-version: '3.6'
python-version: "3.6"
- name: Gradle build (and test)
run: ./gradlew build
- name: Python ingest framework tests
@ -38,3 +38,29 @@ jobs:
job-status: ${{ job.status }}
slack-bot-token: ${{ secrets.SLACK_BOT_TOKEN }}
channel: github-activities
smoke-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- uses: actions/setup-python@v2
with:
python-version: "3.6"
- name: Gradle build
run: ./gradlew build -x check -x docs-website:build
- name: Smoke test
run: |
./docker/dev.sh -d
sleep 30
./smoke-test/smoke.sh
- name: Slack failure notification
if: failure() && github.event_name == 'push'
uses: kpritam/slack-job-status-action@v1
with:
job-status: ${{ job.status }}
slack-bot-token: ${{ secrets.SLACK_BOT_TOKEN }}
channel: github-activities

View File

@ -43,5 +43,5 @@ ENV JAVA_OPTS=" \
-Djava.security.auth.login.config=datahub-frontend/conf/jaas.conf \
-Dlogback.configurationFile=datahub-frontend/conf/logback.xml \
-Dlogback.debug=true \
-Dpidfile.path=/datahub-frontend/play.pid"
-Dpidfile.path=/dev/null"
CMD ["datahub-frontend/bin/playBinary"]

View File

@ -28,9 +28,8 @@ REQUIRED_CONTAINERS = [
# "kafka-rest-proxy",
]
MIN_MEMORY_NEEDED = 8 # GB
# docker seems to under-report memory allocated, adding a bit of buffer to account for it
MEMORY_TOLERANCE = 0.2 # GB
# Docker seems to under-report memory allocated, so we also need a bit of buffer to account for it.
MIN_MEMORY_NEEDED = 6.75 # GB
@contextmanager
@ -59,7 +58,7 @@ def check_local_docker_containers() -> List[str]:
# Check total memory.
total_mem_configured = int(client.info()["MemTotal"])
if memory_in_gb(total_mem_configured) + MEMORY_TOLERANCE < MIN_MEMORY_NEEDED:
if memory_in_gb(total_mem_configured) < MIN_MEMORY_NEEDED:
issues.append(
f"Total Docker memory configured {memory_in_gb(total_mem_configured):.2f}GB is below the minimum threshold {MIN_MEMORY_NEEDED}GB"
)

132
smoke-test/.gitignore vendored Normal file
View File

@ -0,0 +1,132 @@
.envrc
.vscode/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

View File

@ -0,0 +1,3 @@
pytest>=6.2
pytest-dependency>=0.5.1
-e ../metadata-ingestion[datahub-rest,datahub-kafka]

View File

@ -0,0 +1,384 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"description": "This dataset shows hourly average border crossing duration for US-Canada and US-Mexico borders starting from 2020-03-16. Hourly trip volume is compared to average trip volume calculated between Feb.1st and Mar.15th, 2020 as a control group in each country.",
"uri": null,
"tags": [],
"customProperties": {}
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times",
"platform": "urn:li:dataPlatform:bigquery",
"version": 0,
"created": {
"time": 1616104630716,
"actor": "urn:li:corpuser:etl",
"impersonator": null
},
"lastModified": {
"time": 1616104630716,
"actor": "urn:li:corpuser:etl",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "border_id",
"jsonPath": null,
"nullable": true,
"description": "Unique ID of the border crossing",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "port_name",
"jsonPath": null,
"nullable": true,
"description": "Port Name in Canada or Mexico",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "port_name_us",
"jsonPath": null,
"nullable": true,
"description": "Port Name in the US",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "trip_direction",
"jsonPath": null,
"nullable": true,
"description": "Direction of the trip",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "hour_local",
"jsonPath": null,
"nullable": true,
"description": "Local hour of the data",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "Integer()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "date_local",
"jsonPath": null,
"nullable": true,
"description": "Local date of the data",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
}
},
"nativeDataType": "DATE()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "day_type",
"jsonPath": null,
"nullable": true,
"description": "Weekday/Weekend indicator",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "date_utc",
"jsonPath": null,
"nullable": true,
"description": "UTC date of the data",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
}
},
"nativeDataType": "DATE()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "hour_utc",
"jsonPath": null,
"nullable": true,
"description": "UTC hour of the data",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "Integer()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "avg_crossing_duration",
"jsonPath": null,
"nullable": true,
"description": "Average border crossing times (in minutes)",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "Float()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "aggregation_method",
"jsonPath": null,
"nullable": true,
"description": "Daily Average: the average is taken for the current LocalHour; Weekly Average: the average is taken for the full week prior to the current LocalDate; Monthly Average: the average is taken for the full month prior to the current LocalDate; Yearly Average: the average is taken for the full year prior to the LocalDate",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "percent_of_baseline_trip_volume",
"jsonPath": null,
"nullable": true,
"description": "Proportion of trips in this time interval as compared to Avg number of trips on the same hour of day in baseline period i.e 1st February 2020 - 15th March 2020. Data is only available for daily aggregation level with valid baseline number.",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "Float()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "border_zone",
"jsonPath": null,
"nullable": true,
"description": "Polygon of the Port in Canada or Mexico",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NullType": {}
}
},
"nativeDataType": "NullType()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "province_code",
"jsonPath": null,
"nullable": true,
"description": "ISO 3166-2 Country-Province code in Canada or Mexico",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "border_zone_us",
"jsonPath": null,
"nullable": true,
"description": "Polygon of the Port in the US",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NullType": {}
}
},
"nativeDataType": "NullType()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "state_code_us",
"jsonPath": null,
"nullable": true,
"description": "ISO 3166-2 Country-State code for US",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "border_latitude",
"jsonPath": null,
"nullable": true,
"description": "Latitude of the border",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "Float()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "border_longitude",
"jsonPath": null,
"nullable": true,
"description": "Longitude of the border",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "Float()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "border_geohash",
"jsonPath": null,
"nullable": true,
"description": "Geohash of the Border Station with level of 7",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "version",
"jsonPath": null,
"nullable": true,
"description": "Version of the table",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
}
],
"primaryKeys": null,
"foreignKeysSpecs": null
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:Geotab",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": "Geotab",
"email": "Geotab-demo@example.com",
"title": null,
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": "Geotab",
"countryCode": null
}
}
]
}
},
"proposedDelta": null
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:Geotab",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 1616107219522,
"actor": "urn:li:corpuser:datahub",
"impersonator": null
}
}
}
]
}
},
"proposedDelta": null
}
]

26
smoke-test/smoke.sh Executable file
View File

@ -0,0 +1,26 @@
#!/bin/bash
# Runs a basic e2e test. It is not meant to be fully comprehensive,
# but rather should catch obvious bugs before they make it into prod.
#
# Script assumptions:
# - The gradle build has already been run.
# - Python 3.6+ is installed.
# - The metadata-ingestion codegen script has been run.
# - A full DataHub setup is running on localhost with standard ports.
# The easiest way to do this is by using the quickstart or dev
# quickstart scripts.
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$DIR"
set -euxo pipefail
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip wheel setuptools
pip install -r requirements.txt
(cd ../metadata-ingestion && ./scripts/codegen.sh)
pytest -vv

325
smoke-test/test_e2e.py Normal file
View File

@ -0,0 +1,325 @@
import time
import pytest
import requests
from datahub.ingestion.run.pipeline import Pipeline
from datahub.check.docker import check_local_docker_containers
GMS_ENDPOINT = "http://localhost:8080"
FRONTEND_ENDPOINT = "http://localhost:9002"
KAFKA_BROKER = "localhost:9092"
bootstrap_sample_data = "../metadata-ingestion/examples/mce_files/bootstrap_mce.json"
bq_sample_data = "./sample_bq_data.json"
restli_default_headers = {
"X-RestLi-Protocol-Version": "2.0.0",
}
kafka_post_ingestion_wait_sec = 60
healthcheck_wait_retries = 20
healthcheck_wait_interval_sec = 15
@pytest.fixture(scope="session")
def wait_for_healthchecks():
tries = 0
while tries < healthcheck_wait_retries:
if tries > 0:
time.sleep(healthcheck_wait_interval_sec)
tries += 1
issues = check_local_docker_containers()
if not issues:
print(f"finished waiting for healthchecks after {tries} tries")
yield
return
issues_str = '\n'.join(f"- {issue}" for issue in issues)
raise RuntimeError(f"retry limit exceeded while waiting for docker healthchecks\n{issues_str}")
@pytest.mark.dependency()
def test_healthchecks(wait_for_healthchecks):
# Call to wait_for_healthchecks fixture will do the actual functionality.
pass
@pytest.mark.dependency(depends=["test_healthchecks"])
def test_ingestion_via_rest(wait_for_healthchecks):
pipeline = Pipeline.create(
{
"source": {
"type": "file",
"config": {"filename": bootstrap_sample_data},
},
"sink": {
"type": "datahub-rest",
"config": {"server": GMS_ENDPOINT},
},
}
)
pipeline.run()
pipeline.raise_from_status()
@pytest.mark.dependency(depends=["test_healthchecks"])
def test_ingestion_via_kafka(wait_for_healthchecks):
pipeline = Pipeline.create(
{
"source": {
"type": "file",
"config": {"filename": bq_sample_data},
},
"sink": {
"type": "datahub-kafka",
"config": {
"connection": {
"bootstrap": KAFKA_BROKER,
}
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
# Since Kafka emission is asynchronous, we must wait a little bit so that
# the changes are actually processed.
time.sleep(kafka_post_ingestion_wait_sec)
@pytest.mark.dependency(depends=["test_ingestion_via_rest", "test_ingestion_via_kafka"])
def test_run_ingestion(wait_for_healthchecks):
# Dummy test so that future ones can just depend on this one.
pass
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_list_data_platforms():
response = requests.get(
f"{GMS_ENDPOINT}/dataPlatforms",
headers={
**restli_default_headers,
"X-RestLi-Method": "get_all",
},
)
response.raise_for_status()
data = response.json()
assert len(data["elements"]) > 10
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_get_all_users():
response = requests.get(
f"{GMS_ENDPOINT}/corpUsers",
headers={
**restli_default_headers,
"X-RestLi-Method": "get_all",
},
)
response.raise_for_status()
data = response.json()
assert len(data["elements"]) >= 3
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_get_user():
username = "jdoe"
response = requests.get(
f"{GMS_ENDPOINT}/corpUsers/($params:(),name:{username})",
headers={
**restli_default_headers,
},
)
response.raise_for_status()
data = response.json()
assert data["username"] == username
assert data["info"]["displayName"]
assert data["info"]["email"]
@pytest.mark.parametrize(
"platform,dataset_name,env",
[
(
# This one tests the bootstrap sample data.
"urn:li:dataPlatform:kafka",
"SampleKafkaDataset",
"PROD",
),
(
# This one tests BigQuery ingestion.
"urn:li:dataPlatform:bigquery",
"bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times",
"PROD",
),
],
)
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_get_dataset(platform, dataset_name, env):
platform = "urn:li:dataPlatform:bigquery"
dataset_name = (
"bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times"
)
env = "PROD"
urn = f"urn:li:dataset:({platform},{dataset_name},{env})"
response = requests.get(
f"{GMS_ENDPOINT}/datasets/($params:(),name:{dataset_name},origin:{env},platform:{requests.utils.quote(platform)})",
headers={
**restli_default_headers,
"X-RestLi-Method": "get",
},
)
response.raise_for_status()
data = response.json()
assert data["urn"] == urn
assert data["name"] == dataset_name
assert data["platform"] == platform
assert len(data["schemaMetadata"]["fields"]) >= 2
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
],
)
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_search_dataset(query, min_expected_results):
response = requests.get(
f"{GMS_ENDPOINT}/datasets?q=search&input={query}",
headers={
**restli_default_headers,
"X-RestLi-Method": "finder",
},
)
response.raise_for_status()
data = response.json()
assert len(data["elements"]) >= min_expected_results
assert data["paging"]["total"] >= min_expected_results
assert data["elements"][0]["urn"]
@pytest.fixture(scope="session")
def frontend_session(wait_for_healthchecks):
session = requests.Session()
headers = {
"Content-Type": "application/json",
}
data = '{"username":"datahub", "password":"datahub"}'
response = session.post(
f"{FRONTEND_ENDPOINT}/authenticate", headers=headers, data=data
)
response.raise_for_status()
yield session
@pytest.mark.dependency(depends=["test_healthchecks"])
def test_frontend_auth(frontend_session):
pass
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_frontend_browse_datasets(frontend_session):
response = frontend_session.get(
f"{FRONTEND_ENDPOINT}/api/v2/browse?type=dataset&path=/prod"
)
response.raise_for_status()
data = response.json()
assert data["metadata"]["totalNumEntities"] >= 4
assert len(data["metadata"]["groups"]) >= 4
assert len(data["metadata"]["groups"]) <= 8
@pytest.mark.parametrize(
"query,min_expected_results",
[
("covid", 1),
("sample", 3),
],
)
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_frontend_browse_datasets(frontend_session, query, min_expected_results):
response = frontend_session.get(
f"{FRONTEND_ENDPOINT}/api/v2/search?type=dataset&input={query}"
)
response.raise_for_status()
data = response.json()
assert len(data["elements"]) >= min_expected_results
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_frontend_list_users(frontend_session):
response = frontend_session.get(f"{FRONTEND_ENDPOINT}/api/v1/party/entities")
response.raise_for_status()
data = response.json()
assert data["status"] == "ok"
assert len(data["userEntities"]) >= 3
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_frontend_user_info(frontend_session):
response = frontend_session.get(f"{FRONTEND_ENDPOINT}/api/v1/user/me")
response.raise_for_status()
data = response.json()
assert data["status"] == "ok"
assert data["user"]["userName"] == "datahub"
assert data["user"]["name"]
assert data["user"]["email"]
@pytest.mark.parametrize(
"platform,dataset_name,env",
[
(
# This one tests the bootstrap sample data.
"urn:li:dataPlatform:kafka",
"SampleKafkaDataset",
"PROD",
),
(
# This one tests BigQuery ingestion.
"urn:li:dataPlatform:bigquery",
"bigquery-public-data.covid19_geotab_mobility_impact.us_border_wait_times",
"PROD",
),
],
)
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_frontend_user_info(frontend_session, platform, dataset_name, env):
urn = f"urn:li:dataset:({platform},{dataset_name},{env})"
# Basic dataset info.
response = frontend_session.get(f"{FRONTEND_ENDPOINT}/api/v2/datasets/{urn}")
response.raise_for_status()
data = response.json()
assert data["nativeName"] == dataset_name
assert data["fabric"] == env
assert data["uri"] == urn
# Schema info.
response = frontend_session.get(f"{FRONTEND_ENDPOINT}/api/v2/datasets/{urn}/schema")
response.raise_for_status()
data = response.json()
assert len(data["schema"]["columns"]) >= 2
# Ownership info.
response = frontend_session.get(f"{FRONTEND_ENDPOINT}/api/v2/datasets/{urn}/owners")
response.raise_for_status()
data = response.json()
assert len(data["owners"]) >= 1