2021-12-27 14:48:45 +01:00
|
|
|
import json
|
|
|
|
import os
|
2022-07-19 09:39:09 +05:30
|
|
|
from datetime import datetime
|
2021-12-27 14:48:45 +01:00
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
2022-04-08 01:40:21 +05:30
|
|
|
from datahub.configuration.common import ConfigurationError
|
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
|
|
from datahub.ingestion.source.sql.bigquery import BigQueryConfig, BigQuerySource
|
|
|
|
from datahub.ingestion.source.usage.bigquery_usage import BigQueryTableRef
|
|
|
|
|
2021-12-27 14:48:45 +01:00
|
|
|
|
|
|
|
def test_bigquery_uri():
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"project_id": "test-project",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
assert config.get_sql_alchemy_url() == "bigquery://test-project"
|
|
|
|
|
|
|
|
|
|
|
|
def test_bigquery_uri_with_credential():
|
|
|
|
expected_credential_json = {
|
|
|
|
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
|
|
|
|
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
|
|
|
|
"client_email": "test@acryl.io",
|
|
|
|
"client_id": "test_client-id",
|
|
|
|
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test@acryl.io",
|
|
|
|
"private_key": "random_private_key",
|
|
|
|
"private_key_id": "test-private-key",
|
|
|
|
"project_id": "test-project",
|
|
|
|
"token_uri": "https://oauth2.googleapis.com/token",
|
|
|
|
"type": "service_account",
|
|
|
|
}
|
|
|
|
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"project_id": "test-project",
|
|
|
|
"credential": {
|
|
|
|
"project_id": "test-project",
|
|
|
|
"private_key_id": "test-private-key",
|
|
|
|
"private_key": "random_private_key",
|
|
|
|
"client_email": "test@acryl.io",
|
|
|
|
"client_id": "test_client-id",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
try:
|
|
|
|
assert config.get_sql_alchemy_url() == "bigquery://test-project"
|
2022-03-08 21:29:10 +01:00
|
|
|
assert config._credentials_path
|
2021-12-27 14:48:45 +01:00
|
|
|
|
2022-03-08 21:29:10 +01:00
|
|
|
with open(config._credentials_path) as jsonFile:
|
2021-12-27 14:48:45 +01:00
|
|
|
json_credential = json.load(jsonFile)
|
|
|
|
jsonFile.close()
|
|
|
|
|
|
|
|
credential = json.dumps(json_credential, sort_keys=True)
|
|
|
|
expected_credential = json.dumps(expected_credential_json, sort_keys=True)
|
|
|
|
assert expected_credential == credential
|
|
|
|
|
|
|
|
except AssertionError as e:
|
2022-03-08 21:29:10 +01:00
|
|
|
if config._credentials_path:
|
|
|
|
os.unlink(str(config._credentials_path))
|
2021-12-27 14:48:45 +01:00
|
|
|
raise e
|
2022-02-20 23:23:23 +01:00
|
|
|
|
|
|
|
|
|
|
|
def test_simple_upstream_table_generation():
|
|
|
|
a: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="test-dataset", table="a"
|
|
|
|
)
|
|
|
|
b: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="test-dataset", table="b"
|
|
|
|
)
|
|
|
|
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"project_id": "test-project",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
source.lineage_metadata = {str(a): set([str(b)])}
|
|
|
|
upstreams = source.get_upstream_tables(str(a), [])
|
|
|
|
assert list(upstreams) == [b]
|
|
|
|
|
|
|
|
|
2022-04-08 01:40:21 +05:30
|
|
|
def test_error_on_missing_config():
|
|
|
|
with pytest.raises(ConfigurationError):
|
|
|
|
BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"project_id": "test-project",
|
|
|
|
"use_exported_bigquery_audit_metadata": True,
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2022-02-20 23:23:23 +01:00
|
|
|
def test_upstream_table_generation_with_temporary_table_without_temp_upstream():
|
|
|
|
a: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="test-dataset", table="a"
|
|
|
|
)
|
|
|
|
b: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="_temp-dataset", table="b"
|
|
|
|
)
|
|
|
|
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"project_id": "test-project",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
source.lineage_metadata = {str(a): set([str(b)])}
|
|
|
|
upstreams = source.get_upstream_tables(str(a), [])
|
|
|
|
assert list(upstreams) == []
|
|
|
|
|
|
|
|
|
|
|
|
def test_upstream_table_generation_with_temporary_table_with_temp_upstream():
|
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
|
|
from datahub.ingestion.source.sql.bigquery import BigQueryConfig, BigQuerySource
|
|
|
|
from datahub.ingestion.source.usage.bigquery_usage import BigQueryTableRef
|
|
|
|
|
|
|
|
a: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="test-dataset", table="a"
|
|
|
|
)
|
|
|
|
b: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="_temp-dataset", table="b"
|
|
|
|
)
|
|
|
|
c: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="test-dataset", table="c"
|
|
|
|
)
|
|
|
|
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"project_id": "test-project",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
source.lineage_metadata = {str(a): set([str(b)]), str(b): set([str(c)])}
|
|
|
|
upstreams = source.get_upstream_tables(str(a), [])
|
|
|
|
assert list(upstreams) == [c]
|
|
|
|
|
|
|
|
|
|
|
|
def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream():
|
|
|
|
a: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="test-dataset", table="a"
|
|
|
|
)
|
|
|
|
b: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="_temp-dataset", table="b"
|
|
|
|
)
|
|
|
|
c: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="test-dataset", table="c"
|
|
|
|
)
|
|
|
|
d: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="_test-dataset", table="d"
|
|
|
|
)
|
|
|
|
e: BigQueryTableRef = BigQueryTableRef(
|
|
|
|
project="test-project", dataset="test-dataset", table="e"
|
|
|
|
)
|
|
|
|
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"project_id": "test-project",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
source.lineage_metadata = {
|
|
|
|
str(a): set([str(b)]),
|
|
|
|
str(b): set([str(c), str(d)]),
|
|
|
|
str(d): set([str(e)]),
|
|
|
|
}
|
|
|
|
upstreams = source.get_upstream_tables(str(a), [])
|
|
|
|
assert list(upstreams).sort() == [c, e].sort()
|
2022-07-19 09:39:09 +05:30
|
|
|
|
|
|
|
|
|
|
|
def test_bq_get_profile_candidate_query_all_params():
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"profiling": {
|
|
|
|
"profile_if_updated_since_days": 1,
|
|
|
|
"profile_table_size_limit": 5,
|
|
|
|
"profile_table_row_limit": 50000,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
threshold_time = datetime.fromtimestamp(1648876349)
|
|
|
|
expected_query = (
|
2022-10-10 20:35:09 +05:30
|
|
|
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM `dataset_foo.__TABLES__` WHERE "
|
2022-07-19 09:39:09 +05:30
|
|
|
"row_count<50000 and ROUND(size_bytes/POW(10,9),2)<5 and last_modified_time>=1648876349000 "
|
|
|
|
)
|
|
|
|
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
|
|
|
|
assert query == expected_query
|
|
|
|
|
|
|
|
|
|
|
|
def test_bq_get_profile_candidate_query_no_day_limit():
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"profiling": {
|
|
|
|
"profile_if_updated_since_days": None,
|
|
|
|
"profile_table_size_limit": 5,
|
|
|
|
"profile_table_row_limit": 50000,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
expected_query = (
|
2022-10-10 20:35:09 +05:30
|
|
|
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM `dataset_foo.__TABLES__` WHERE "
|
2022-07-19 09:39:09 +05:30
|
|
|
"row_count<50000 and ROUND(size_bytes/POW(10,9),2)<5 "
|
|
|
|
)
|
|
|
|
query = source.generate_profile_candidate_query(None, "dataset_foo")
|
|
|
|
assert query == expected_query
|
|
|
|
|
|
|
|
|
|
|
|
def test_bq_get_profile_candidate_query_no_size_limit():
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"profiling": {
|
|
|
|
"profile_if_updated_since_days": 1,
|
|
|
|
"profile_table_size_limit": None,
|
|
|
|
"profile_table_row_limit": 50000,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
threshold_time = datetime.fromtimestamp(1648876349)
|
|
|
|
expected_query = (
|
2022-10-10 20:35:09 +05:30
|
|
|
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM `dataset_foo.__TABLES__` WHERE "
|
2022-07-19 09:39:09 +05:30
|
|
|
"row_count<50000 and last_modified_time>=1648876349000 "
|
|
|
|
)
|
|
|
|
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
|
|
|
|
assert query == expected_query
|
|
|
|
|
|
|
|
|
|
|
|
def test_bq_get_profile_candidate_query_no_row_limit():
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"profiling": {
|
|
|
|
"profile_if_updated_since_days": 1,
|
|
|
|
"profile_table_size_limit": 5,
|
|
|
|
"profile_table_row_limit": None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
threshold_time = datetime.fromtimestamp(1648876349)
|
|
|
|
expected_query = (
|
2022-10-10 20:35:09 +05:30
|
|
|
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM `dataset_foo.__TABLES__` WHERE "
|
2022-07-19 09:39:09 +05:30
|
|
|
"ROUND(size_bytes/POW(10,9),2)<5 and last_modified_time>=1648876349000 "
|
|
|
|
)
|
|
|
|
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
|
|
|
|
assert query == expected_query
|
|
|
|
|
|
|
|
|
|
|
|
def test_bq_get_profile_candidate_query_all_null():
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"profiling": {
|
|
|
|
"profile_if_updated_since_days": None,
|
|
|
|
"profile_table_size_limit": None,
|
|
|
|
"profile_table_row_limit": None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
expected_query = ""
|
|
|
|
query = source.generate_profile_candidate_query(None, "dataset_foo")
|
|
|
|
assert query == expected_query
|
|
|
|
|
|
|
|
|
|
|
|
def test_bq_get_profile_candidate_query_only_row():
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"profiling": {
|
|
|
|
"profile_if_updated_since_days": None,
|
|
|
|
"profile_table_size_limit": None,
|
|
|
|
"profile_table_row_limit": 50000,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
expected_query = (
|
2022-10-10 20:35:09 +05:30
|
|
|
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM `dataset_foo.__TABLES__` WHERE "
|
2022-07-19 09:39:09 +05:30
|
|
|
"row_count<50000 "
|
|
|
|
)
|
|
|
|
query = source.generate_profile_candidate_query(None, "dataset_foo")
|
|
|
|
assert query == expected_query
|
|
|
|
|
|
|
|
|
|
|
|
def test_bq_get_profile_candidate_query_only_days():
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"profiling": {
|
|
|
|
"profile_if_updated_since_days": 1,
|
|
|
|
"profile_table_size_limit": None,
|
|
|
|
"profile_table_row_limit": None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
threshold_time = datetime.fromtimestamp(1648876349)
|
|
|
|
expected_query = (
|
2022-10-10 20:35:09 +05:30
|
|
|
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM `dataset_foo.__TABLES__` WHERE "
|
2022-07-19 09:39:09 +05:30
|
|
|
"last_modified_time>=1648876349000 "
|
|
|
|
)
|
|
|
|
query = source.generate_profile_candidate_query(threshold_time, "dataset_foo")
|
|
|
|
assert query == expected_query
|
|
|
|
|
|
|
|
|
|
|
|
def test_bq_get_profile_candidate_query_only_size():
|
|
|
|
config = BigQueryConfig.parse_obj(
|
|
|
|
{
|
|
|
|
"profiling": {
|
|
|
|
"profile_if_updated_since_days": None,
|
|
|
|
"profile_table_size_limit": 5,
|
|
|
|
"profile_table_row_limit": None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test"))
|
|
|
|
expected_query = (
|
2022-10-10 20:35:09 +05:30
|
|
|
"SELECT table_id, size_bytes, last_modified_time, row_count, FROM `dataset_foo.__TABLES__` WHERE "
|
2022-07-19 09:39:09 +05:30
|
|
|
"ROUND(size_bytes/POW(10,9),2)<5 "
|
|
|
|
)
|
|
|
|
query = source.generate_profile_candidate_query(None, "dataset_foo")
|
|
|
|
assert query == expected_query
|