datahub/smoke-test/tests/managed_ingestion/managed_ingestion_test.py

552 lines
18 KiB
Python

import json
import pytest
import tenacity
from tests.utils import get_sleep_info
sleep_sec, sleep_times = get_sleep_info()
def _get_ingestionSources(auth_session):
json_q = {
"query": """query listIngestionSources($input: ListIngestionSourcesInput!) {\n
listIngestionSources(input: $input) {\n
start\n
count\n
total\n
ingestionSources {\n
urn\n
}\n
}\n
}""",
"variables": {"input": {"start": "0", "count": "20"}},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["listIngestionSources"]["total"] is not None
assert "errors" not in res_data
return res_data
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_ingestion_source_count(auth_session, expected_count):
res_data = _get_ingestionSources(auth_session)
after_count = res_data["data"]["listIngestionSources"]["total"]
assert after_count == expected_count
return after_count
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_secret_increased(auth_session, before_count):
json_q = {
"query": """query listSecrets($input: ListSecretsInput!) {\n
listSecrets(input: $input) {\n
start\n
count\n
total\n
secrets {\n
urn\n
name\n
}\n
}\n
}""",
"variables": {"input": {"start": "0", "count": "20"}},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["listSecrets"]["total"] is not None
assert "errors" not in res_data
# Assert that there are more secrets now.
after_count = res_data["data"]["listSecrets"]["total"]
assert after_count == before_count + 1
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_secret_not_present(auth_session):
# Get the secret value back
json_q = {
"query": """query getSecretValues($input: GetSecretValuesInput!) {\n
getSecretValues(input: $input) {\n
name\n
value\n
}\n
}""",
"variables": {"input": {"secrets": ["SMOKE_TEST"]}},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["getSecretValues"] is not None
assert "errors" not in res_data
secret_values = res_data["data"]["getSecretValues"]
secret_value_arr = [x for x in secret_values if x["name"] == "SMOKE_TEST"]
assert len(secret_value_arr) == 0
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_ingestion_source_present(
auth_session, ingestion_source_urn, num_execs=None
):
json_q = {
"query": """query ingestionSource($urn: String!) {\n
ingestionSource(urn: $urn) {\n
executions(start: 0, count: 1) {\n
start\n
count\n
total\n
executionRequests {\n
urn\n
}\n
}\n
}\n
}""",
"variables": {"urn": ingestion_source_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
print(res_data)
assert res_data
assert res_data["data"]
assert res_data["data"]["ingestionSource"] is not None
assert "errors" not in res_data
if num_execs is not None:
ingestion_source = res_data["data"]["ingestionSource"]
assert ingestion_source["executions"]["total"] >= num_execs
return res_data
@tenacity.retry(
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
)
def _ensure_execution_request_present(auth_session, execution_request_urn):
json_q = {
"query": """query executionRequest($urn: String!) {\n
executionRequest(urn: $urn) {\n
urn\n
input {\n
task\n
arguments {\n
key\n
value\n
}\n
}\n
result {\n
status\n
startTimeMs\n
durationMs\n
}\n
}\n
}""",
"variables": {"urn": execution_request_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["executionRequest"] is not None
assert "errors" not in res_data
return res_data
def test_create_list_get_remove_secret(auth_session):
# Get count of existing secrets
json_q = {
"query": """query listSecrets($input: ListSecretsInput!) {\n
listSecrets(input: $input) {\n
start\n
count\n
total\n
secrets {\n
urn\n
name\n
}\n
}\n
}""",
"variables": {"input": {"start": "0", "count": "20"}},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["listSecrets"]["total"] is not None
assert "errors" not in res_data
before_count = res_data["data"]["listSecrets"]["total"]
# Create new secret
json_q = {
"query": """mutation createSecret($input: CreateSecretInput!) {\n
createSecret(input: $input)
}""",
"variables": {"input": {"name": "SMOKE_TEST", "value": "mytestvalue"}},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["createSecret"] is not None
assert "errors" not in res_data
secret_urn = res_data["data"]["createSecret"]
# Get new count of secrets
_ensure_secret_increased(auth_session, before_count)
# Update existing secret
json_q = {
"query": """mutation updateSecret($input: UpdateSecretInput!) {\n
updateSecret(input: $input)
}""",
"variables": {
"input": {
"urn": secret_urn,
"name": "SMOKE_TEST",
"value": "mytestvalue.updated",
}
},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["updateSecret"] is not None
assert "errors" not in res_data
secret_urn = res_data["data"]["updateSecret"]
# Get the secret value back
json_q = {
"query": """query getSecretValues($input: GetSecretValuesInput!) {\n
getSecretValues(input: $input) {\n
name\n
value\n
}\n
}""",
"variables": {"input": {"secrets": ["SMOKE_TEST"]}},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
print(res_data)
assert res_data
assert res_data["data"]
assert res_data["data"]["getSecretValues"] is not None
assert "errors" not in res_data
secret_values = res_data["data"]["getSecretValues"]
secret_value = [x for x in secret_values if x["name"] == "SMOKE_TEST"][0]
assert secret_value["value"] == "mytestvalue.updated"
# Now cleanup and remove the secret
json_q = {
"query": """mutation deleteSecret($urn: String!) {\n
deleteSecret(urn: $urn)
}""",
"variables": {"urn": secret_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["deleteSecret"] is not None
assert "errors" not in res_data
# Re-fetch the secret values and see that they are not there.
_ensure_secret_not_present(auth_session)
@pytest.mark.dependency()
def test_create_list_get_remove_ingestion_source(auth_session):
# Get count of existing ingestion sources
res_data = _get_ingestionSources(auth_session)
before_count = res_data["data"]["listIngestionSources"]["total"]
# Create new ingestion source
json_q = {
"query": """mutation createIngestionSource($input: UpdateIngestionSourceInput!) {\n
createIngestionSource(input: $input)
}""",
"variables": {
"input": {
"name": "My Test Ingestion Source",
"type": "mysql",
"description": "My ingestion source description",
"schedule": {"interval": "*/60 * * * *", "timezone": "UTC"},
"config": {
"recipe": '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}',
"version": "0.8.18",
"executorId": "mytestexecutor",
},
}
},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["createIngestionSource"] is not None
assert "errors" not in res_data
ingestion_source_urn = res_data["data"]["createIngestionSource"]
# Assert that there are more ingestion sources now.
after_count = _ensure_ingestion_source_count(auth_session, before_count + 1)
# Get the ingestion source back
json_q = {
"query": """query ingestionSource($urn: String!) {\n
ingestionSource(urn: $urn) {\n
urn\n
type\n
name\n
schedule {\n
timezone\n
interval\n
}\n
config {\n
recipe\n
executorId\n
version\n
}\n
}\n
}""",
"variables": {"urn": ingestion_source_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["ingestionSource"] is not None
assert "errors" not in res_data
ingestion_source = res_data["data"]["ingestionSource"]
assert ingestion_source["urn"] == ingestion_source_urn
assert ingestion_source["type"] == "mysql"
assert ingestion_source["name"] == "My Test Ingestion Source"
assert ingestion_source["schedule"]["interval"] == "*/60 * * * *"
assert ingestion_source["schedule"]["timezone"] == "UTC"
assert (
ingestion_source["config"]["recipe"]
== '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}'
)
assert ingestion_source["config"]["executorId"] == "mytestexecutor"
assert ingestion_source["config"]["version"] == "0.8.18"
# Now cleanup and remove the ingestion source
json_q = {
"query": """mutation deleteIngestionSource($urn: String!) {\n
deleteIngestionSource(urn: $urn)
}""",
"variables": {"urn": ingestion_source_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
print(res_data)
assert res_data["data"]
assert res_data["data"]["deleteIngestionSource"] is not None
assert "errors" not in res_data
# Ensure the ingestion source has been removed.
_ensure_ingestion_source_count(auth_session, after_count - 1)
@pytest.mark.dependency(
depends=[
"test_create_list_get_remove_ingestion_source",
]
)
def test_create_list_get_ingestion_execution_request(auth_session):
# Create new ingestion source
json_q = {
"query": """mutation createIngestionSource($input: UpdateIngestionSourceInput!) {\n
createIngestionSource(input: $input)
}""",
"variables": {
"input": {
"name": "My Test Ingestion Source",
"type": "mysql",
"description": "My ingestion source description",
"schedule": {"interval": "*/5 * * * *", "timezone": "UTC"},
"config": {
"recipe": '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}',
"version": "0.8.18",
"executorId": "mytestexecutor",
},
}
},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["createIngestionSource"] is not None
assert "errors" not in res_data
ingestion_source_urn = res_data["data"]["createIngestionSource"]
# Create a request to execute the ingestion source
json_q = {
"query": """mutation createIngestionExecutionRequest($input: CreateIngestionExecutionRequestInput!) {\n
createIngestionExecutionRequest(input: $input)
}""",
"variables": {"input": {"ingestionSourceUrn": ingestion_source_urn}},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["createIngestionExecutionRequest"] is not None, (
f"res_data was {res_data}"
)
assert "errors" not in res_data
execution_request_urn = res_data["data"]["createIngestionExecutionRequest"]
res_data = _ensure_ingestion_source_present(auth_session, ingestion_source_urn, 1)
ingestion_source = res_data["data"]["ingestionSource"]
assert (
ingestion_source["executions"]["executionRequests"][0]["urn"]
== execution_request_urn
)
# Get the ingestion request back via direct lookup
res_data = _ensure_execution_request_present(auth_session, execution_request_urn)
execution_request = res_data["data"]["executionRequest"]
assert execution_request["urn"] == execution_request_urn
# Verify input
assert execution_request["input"]["task"] == "RUN_INGEST"
assert len(execution_request["input"]["arguments"]) == 3
assert execution_request["input"]["arguments"][0]["key"] == "recipe"
assert (
json.loads(execution_request["input"]["arguments"][0]["value"])["source"]
== json.loads(
'{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}'
)["source"]
)
assert execution_request["input"]["arguments"][1]["key"] == "version"
assert execution_request["input"]["arguments"][1]["value"] == "0.8.18"
assert execution_request["input"]["arguments"][2] == {
"key": "debug_mode",
"value": "false",
}
# Verify no result
assert execution_request["result"] is None
# Now cleanup and remove the ingestion source
json_q = {
"query": """mutation deleteIngestionSource($urn: String!) {\n
deleteIngestionSource(urn: $urn)
}""",
"variables": {"urn": ingestion_source_urn},
}
response = auth_session.post(
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
)
response.raise_for_status()
res_data = response.json()
assert res_data
assert res_data["data"]
assert res_data["data"]["deleteIngestionSource"] is not None
assert "errors" not in res_data