mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-06 05:43:44 +00:00
552 lines
18 KiB
Python
552 lines
18 KiB
Python
import json
|
|
|
|
import pytest
|
|
import tenacity
|
|
|
|
from tests.utils import get_sleep_info
|
|
|
|
sleep_sec, sleep_times = get_sleep_info()
|
|
|
|
|
|
def _get_ingestionSources(auth_session):
|
|
json_q = {
|
|
"query": """query listIngestionSources($input: ListIngestionSourcesInput!) {\n
|
|
listIngestionSources(input: $input) {\n
|
|
start\n
|
|
count\n
|
|
total\n
|
|
ingestionSources {\n
|
|
urn\n
|
|
}\n
|
|
}\n
|
|
}""",
|
|
"variables": {"input": {"start": "0", "count": "20"}},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["listIngestionSources"]["total"] is not None
|
|
assert "errors" not in res_data
|
|
return res_data
|
|
|
|
|
|
@tenacity.retry(
|
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
|
)
|
|
def _ensure_ingestion_source_count(auth_session, expected_count):
|
|
res_data = _get_ingestionSources(auth_session)
|
|
after_count = res_data["data"]["listIngestionSources"]["total"]
|
|
assert after_count == expected_count
|
|
return after_count
|
|
|
|
|
|
@tenacity.retry(
|
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
|
)
|
|
def _ensure_secret_increased(auth_session, before_count):
|
|
json_q = {
|
|
"query": """query listSecrets($input: ListSecretsInput!) {\n
|
|
listSecrets(input: $input) {\n
|
|
start\n
|
|
count\n
|
|
total\n
|
|
secrets {\n
|
|
urn\n
|
|
name\n
|
|
}\n
|
|
}\n
|
|
}""",
|
|
"variables": {"input": {"start": "0", "count": "20"}},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["listSecrets"]["total"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
# Assert that there are more secrets now.
|
|
after_count = res_data["data"]["listSecrets"]["total"]
|
|
assert after_count == before_count + 1
|
|
|
|
|
|
@tenacity.retry(
|
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
|
)
|
|
def _ensure_secret_not_present(auth_session):
|
|
# Get the secret value back
|
|
json_q = {
|
|
"query": """query getSecretValues($input: GetSecretValuesInput!) {\n
|
|
getSecretValues(input: $input) {\n
|
|
name\n
|
|
value\n
|
|
}\n
|
|
}""",
|
|
"variables": {"input": {"secrets": ["SMOKE_TEST"]}},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["getSecretValues"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
secret_values = res_data["data"]["getSecretValues"]
|
|
secret_value_arr = [x for x in secret_values if x["name"] == "SMOKE_TEST"]
|
|
assert len(secret_value_arr) == 0
|
|
|
|
|
|
@tenacity.retry(
|
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
|
)
|
|
def _ensure_ingestion_source_present(
|
|
auth_session, ingestion_source_urn, num_execs=None
|
|
):
|
|
json_q = {
|
|
"query": """query ingestionSource($urn: String!) {\n
|
|
ingestionSource(urn: $urn) {\n
|
|
executions(start: 0, count: 1) {\n
|
|
start\n
|
|
count\n
|
|
total\n
|
|
executionRequests {\n
|
|
urn\n
|
|
}\n
|
|
}\n
|
|
}\n
|
|
}""",
|
|
"variables": {"urn": ingestion_source_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
print(res_data)
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["ingestionSource"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
if num_execs is not None:
|
|
ingestion_source = res_data["data"]["ingestionSource"]
|
|
assert ingestion_source["executions"]["total"] >= num_execs
|
|
|
|
return res_data
|
|
|
|
|
|
@tenacity.retry(
|
|
stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
|
|
)
|
|
def _ensure_execution_request_present(auth_session, execution_request_urn):
|
|
json_q = {
|
|
"query": """query executionRequest($urn: String!) {\n
|
|
executionRequest(urn: $urn) {\n
|
|
urn\n
|
|
input {\n
|
|
task\n
|
|
arguments {\n
|
|
key\n
|
|
value\n
|
|
}\n
|
|
}\n
|
|
result {\n
|
|
status\n
|
|
startTimeMs\n
|
|
durationMs\n
|
|
}\n
|
|
}\n
|
|
}""",
|
|
"variables": {"urn": execution_request_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["executionRequest"] is not None
|
|
assert "errors" not in res_data
|
|
return res_data
|
|
|
|
|
|
def test_create_list_get_remove_secret(auth_session):
|
|
# Get count of existing secrets
|
|
json_q = {
|
|
"query": """query listSecrets($input: ListSecretsInput!) {\n
|
|
listSecrets(input: $input) {\n
|
|
start\n
|
|
count\n
|
|
total\n
|
|
secrets {\n
|
|
urn\n
|
|
name\n
|
|
}\n
|
|
}\n
|
|
}""",
|
|
"variables": {"input": {"start": "0", "count": "20"}},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["listSecrets"]["total"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
before_count = res_data["data"]["listSecrets"]["total"]
|
|
|
|
# Create new secret
|
|
json_q = {
|
|
"query": """mutation createSecret($input: CreateSecretInput!) {\n
|
|
createSecret(input: $input)
|
|
}""",
|
|
"variables": {"input": {"name": "SMOKE_TEST", "value": "mytestvalue"}},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["createSecret"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
secret_urn = res_data["data"]["createSecret"]
|
|
|
|
# Get new count of secrets
|
|
_ensure_secret_increased(auth_session, before_count)
|
|
|
|
# Update existing secret
|
|
json_q = {
|
|
"query": """mutation updateSecret($input: UpdateSecretInput!) {\n
|
|
updateSecret(input: $input)
|
|
}""",
|
|
"variables": {
|
|
"input": {
|
|
"urn": secret_urn,
|
|
"name": "SMOKE_TEST",
|
|
"value": "mytestvalue.updated",
|
|
}
|
|
},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["updateSecret"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
secret_urn = res_data["data"]["updateSecret"]
|
|
|
|
# Get the secret value back
|
|
json_q = {
|
|
"query": """query getSecretValues($input: GetSecretValuesInput!) {\n
|
|
getSecretValues(input: $input) {\n
|
|
name\n
|
|
value\n
|
|
}\n
|
|
}""",
|
|
"variables": {"input": {"secrets": ["SMOKE_TEST"]}},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
print(res_data)
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["getSecretValues"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
secret_values = res_data["data"]["getSecretValues"]
|
|
secret_value = [x for x in secret_values if x["name"] == "SMOKE_TEST"][0]
|
|
assert secret_value["value"] == "mytestvalue.updated"
|
|
|
|
# Now cleanup and remove the secret
|
|
json_q = {
|
|
"query": """mutation deleteSecret($urn: String!) {\n
|
|
deleteSecret(urn: $urn)
|
|
}""",
|
|
"variables": {"urn": secret_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["deleteSecret"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
# Re-fetch the secret values and see that they are not there.
|
|
_ensure_secret_not_present(auth_session)
|
|
|
|
|
|
@pytest.mark.dependency()
|
|
def test_create_list_get_remove_ingestion_source(auth_session):
|
|
# Get count of existing ingestion sources
|
|
res_data = _get_ingestionSources(auth_session)
|
|
|
|
before_count = res_data["data"]["listIngestionSources"]["total"]
|
|
|
|
# Create new ingestion source
|
|
json_q = {
|
|
"query": """mutation createIngestionSource($input: UpdateIngestionSourceInput!) {\n
|
|
createIngestionSource(input: $input)
|
|
}""",
|
|
"variables": {
|
|
"input": {
|
|
"name": "My Test Ingestion Source",
|
|
"type": "mysql",
|
|
"description": "My ingestion source description",
|
|
"schedule": {"interval": "*/60 * * * *", "timezone": "UTC"},
|
|
"config": {
|
|
"recipe": '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}',
|
|
"version": "0.8.18",
|
|
"executorId": "mytestexecutor",
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["createIngestionSource"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
ingestion_source_urn = res_data["data"]["createIngestionSource"]
|
|
|
|
# Assert that there are more ingestion sources now.
|
|
after_count = _ensure_ingestion_source_count(auth_session, before_count + 1)
|
|
|
|
# Get the ingestion source back
|
|
json_q = {
|
|
"query": """query ingestionSource($urn: String!) {\n
|
|
ingestionSource(urn: $urn) {\n
|
|
urn\n
|
|
type\n
|
|
name\n
|
|
schedule {\n
|
|
timezone\n
|
|
interval\n
|
|
}\n
|
|
config {\n
|
|
recipe\n
|
|
executorId\n
|
|
version\n
|
|
}\n
|
|
}\n
|
|
}""",
|
|
"variables": {"urn": ingestion_source_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["ingestionSource"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
ingestion_source = res_data["data"]["ingestionSource"]
|
|
assert ingestion_source["urn"] == ingestion_source_urn
|
|
assert ingestion_source["type"] == "mysql"
|
|
assert ingestion_source["name"] == "My Test Ingestion Source"
|
|
assert ingestion_source["schedule"]["interval"] == "*/60 * * * *"
|
|
assert ingestion_source["schedule"]["timezone"] == "UTC"
|
|
assert (
|
|
ingestion_source["config"]["recipe"]
|
|
== '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}'
|
|
)
|
|
assert ingestion_source["config"]["executorId"] == "mytestexecutor"
|
|
assert ingestion_source["config"]["version"] == "0.8.18"
|
|
|
|
# Now cleanup and remove the ingestion source
|
|
json_q = {
|
|
"query": """mutation deleteIngestionSource($urn: String!) {\n
|
|
deleteIngestionSource(urn: $urn)
|
|
}""",
|
|
"variables": {"urn": ingestion_source_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
print(res_data)
|
|
assert res_data["data"]
|
|
assert res_data["data"]["deleteIngestionSource"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
# Ensure the ingestion source has been removed.
|
|
_ensure_ingestion_source_count(auth_session, after_count - 1)
|
|
|
|
|
|
@pytest.mark.dependency(
|
|
depends=[
|
|
"test_create_list_get_remove_ingestion_source",
|
|
]
|
|
)
|
|
def test_create_list_get_ingestion_execution_request(auth_session):
|
|
# Create new ingestion source
|
|
json_q = {
|
|
"query": """mutation createIngestionSource($input: UpdateIngestionSourceInput!) {\n
|
|
createIngestionSource(input: $input)
|
|
}""",
|
|
"variables": {
|
|
"input": {
|
|
"name": "My Test Ingestion Source",
|
|
"type": "mysql",
|
|
"description": "My ingestion source description",
|
|
"schedule": {"interval": "*/5 * * * *", "timezone": "UTC"},
|
|
"config": {
|
|
"recipe": '{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}',
|
|
"version": "0.8.18",
|
|
"executorId": "mytestexecutor",
|
|
},
|
|
}
|
|
},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["createIngestionSource"] is not None
|
|
assert "errors" not in res_data
|
|
|
|
ingestion_source_urn = res_data["data"]["createIngestionSource"]
|
|
|
|
# Create a request to execute the ingestion source
|
|
json_q = {
|
|
"query": """mutation createIngestionExecutionRequest($input: CreateIngestionExecutionRequestInput!) {\n
|
|
createIngestionExecutionRequest(input: $input)
|
|
}""",
|
|
"variables": {"input": {"ingestionSourceUrn": ingestion_source_urn}},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["createIngestionExecutionRequest"] is not None, (
|
|
f"res_data was {res_data}"
|
|
)
|
|
assert "errors" not in res_data
|
|
|
|
execution_request_urn = res_data["data"]["createIngestionExecutionRequest"]
|
|
|
|
res_data = _ensure_ingestion_source_present(auth_session, ingestion_source_urn, 1)
|
|
|
|
ingestion_source = res_data["data"]["ingestionSource"]
|
|
|
|
assert (
|
|
ingestion_source["executions"]["executionRequests"][0]["urn"]
|
|
== execution_request_urn
|
|
)
|
|
|
|
# Get the ingestion request back via direct lookup
|
|
res_data = _ensure_execution_request_present(auth_session, execution_request_urn)
|
|
|
|
execution_request = res_data["data"]["executionRequest"]
|
|
assert execution_request["urn"] == execution_request_urn
|
|
|
|
# Verify input
|
|
assert execution_request["input"]["task"] == "RUN_INGEST"
|
|
assert len(execution_request["input"]["arguments"]) == 3
|
|
assert execution_request["input"]["arguments"][0]["key"] == "recipe"
|
|
assert (
|
|
json.loads(execution_request["input"]["arguments"][0]["value"])["source"]
|
|
== json.loads(
|
|
'{"source":{"type":"mysql","config":{"include_tables":true,"database":null,"password":"${MYSQL_PASSWORD}","profiling":{"enabled":false},"host_port":null,"include_views":true,"username":"${MYSQL_USERNAME}"}},"pipeline_name":"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927"}'
|
|
)["source"]
|
|
)
|
|
assert execution_request["input"]["arguments"][1]["key"] == "version"
|
|
assert execution_request["input"]["arguments"][1]["value"] == "0.8.18"
|
|
assert execution_request["input"]["arguments"][2] == {
|
|
"key": "debug_mode",
|
|
"value": "false",
|
|
}
|
|
|
|
# Verify no result
|
|
assert execution_request["result"] is None
|
|
|
|
# Now cleanup and remove the ingestion source
|
|
json_q = {
|
|
"query": """mutation deleteIngestionSource($urn: String!) {\n
|
|
deleteIngestionSource(urn: $urn)
|
|
}""",
|
|
"variables": {"urn": ingestion_source_urn},
|
|
}
|
|
|
|
response = auth_session.post(
|
|
f"{auth_session.frontend_url()}/api/v2/graphql", json=json_q
|
|
)
|
|
response.raise_for_status()
|
|
res_data = response.json()
|
|
|
|
assert res_data
|
|
assert res_data["data"]
|
|
assert res_data["data"]["deleteIngestionSource"] is not None
|
|
assert "errors" not in res_data
|