2023-05-17 00:01:57 -04:00
|
|
|
import time
|
2021-12-13 18:16:24 +01:00
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
import pytest
|
2023-05-17 00:01:57 -04:00
|
|
|
from freezegun import freeze_time
|
2021-12-13 18:16:24 +01:00
|
|
|
from pydantic import ValidationError
|
|
|
|
|
2022-02-01 09:11:23 +01:00
|
|
|
from datahub.configuration.common import AllowDenyPattern
|
2021-12-13 18:16:24 +01:00
|
|
|
from datahub.configuration.time_window_config import BucketDuration, get_time_bucket
|
2022-09-10 20:36:10 -07:00
|
|
|
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
|
2021-12-13 18:16:24 +01:00
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
|
|
|
from datahub.ingestion.source.usage.usage_common import (
|
2023-08-30 15:53:08 -04:00
|
|
|
DEFAULT_QUERIES_CHARACTER_LIMIT,
|
2021-12-13 18:16:24 +01:00
|
|
|
BaseUsageConfig,
|
|
|
|
GenericAggregatedDataset,
|
2023-05-17 00:01:57 -04:00
|
|
|
convert_usage_aggregation_class,
|
|
|
|
)
|
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
CalendarIntervalClass,
|
|
|
|
DatasetFieldUsageCountsClass,
|
|
|
|
DatasetUsageStatisticsClass,
|
|
|
|
DatasetUserUsageCountsClass,
|
|
|
|
FieldUsageCountsClass,
|
|
|
|
TimeWindowSizeClass,
|
|
|
|
UsageAggregationClass,
|
|
|
|
UsageAggregationMetricsClass,
|
|
|
|
UserUsageCountsClass,
|
|
|
|
WindowDurationClass,
|
2021-12-13 18:16:24 +01:00
|
|
|
)
|
|
|
|
|
2021-12-15 01:44:42 -05:00
|
|
|
_TestTableRef = str
|
2021-12-13 18:16:24 +01:00
|
|
|
|
2021-12-15 01:44:42 -05:00
|
|
|
_TestAggregatedDataset = GenericAggregatedDataset[_TestTableRef]
|
2023-04-07 12:18:26 -07:00
|
|
|
USAGE_ASPECT_NAME = DatasetUsageStatisticsClass.get_aspect_name()
|
2021-12-13 18:16:24 +01:00
|
|
|
|
|
|
|
|
2022-09-10 20:36:10 -07:00
|
|
|
def _simple_urn_builder(resource):
|
|
|
|
return make_dataset_urn_with_platform_instance(
|
|
|
|
"snowflake",
|
|
|
|
resource.lower(),
|
|
|
|
"snowflake-dev",
|
|
|
|
"DEV",
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2021-12-13 18:16:24 +01:00
|
|
|
def test_add_one_query_without_columns():
|
|
|
|
test_email = "test_email@test.com"
|
|
|
|
test_query = "select * from test"
|
|
|
|
event_time = datetime(2020, 1, 1)
|
|
|
|
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
|
|
|
|
resource = "test_db.test_schema.test_table"
|
|
|
|
|
2021-12-15 01:44:42 -05:00
|
|
|
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
|
2021-12-13 18:16:24 +01:00
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
|
|
|
)
|
|
|
|
|
|
|
|
assert ta.queryCount == 1
|
|
|
|
assert ta.queryFreq[test_query] == 1
|
|
|
|
assert ta.userFreq[test_email] == 1
|
|
|
|
assert len(ta.columnFreq) == 0
|
|
|
|
|
|
|
|
|
2022-02-01 09:11:23 +01:00
|
|
|
def test_add_one_query_with_ignored_user():
|
|
|
|
test_email = "test_email@test.com"
|
|
|
|
test_query = "select * from test"
|
|
|
|
event_time = datetime(2020, 1, 1)
|
|
|
|
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
|
|
|
|
resource = "test_db.test_schema.test_table"
|
2023-03-01 19:09:51 -05:00
|
|
|
user_email_pattern = AllowDenyPattern(deny=list(["test_email@test.com"]))
|
2022-02-01 09:11:23 +01:00
|
|
|
|
|
|
|
ta = _TestAggregatedDataset(
|
|
|
|
bucket_start_time=floored_ts,
|
|
|
|
resource=resource,
|
|
|
|
)
|
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
2023-03-01 19:09:51 -05:00
|
|
|
user_email_pattern=user_email_pattern,
|
2022-02-01 09:11:23 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
assert ta.queryCount == 0
|
|
|
|
assert ta.queryFreq[test_query] == 0
|
|
|
|
assert ta.userFreq[test_email] == 0
|
|
|
|
assert len(ta.columnFreq) == 0
|
|
|
|
|
|
|
|
|
|
|
|
def test_multiple_query_with_ignored_user():
|
2021-12-13 18:16:24 +01:00
|
|
|
test_email = "test_email@test.com"
|
|
|
|
test_email2 = "test_email2@test.com"
|
|
|
|
test_query = "select * from test"
|
|
|
|
test_query2 = "select * from test2"
|
|
|
|
event_time = datetime(2020, 1, 1)
|
|
|
|
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
|
2022-02-01 09:11:23 +01:00
|
|
|
resource = "test_db.test_schema.test_table"
|
2023-03-01 19:09:51 -05:00
|
|
|
user_email_pattern = AllowDenyPattern(deny=list(["test_email@test.com"]))
|
2021-12-13 18:16:24 +01:00
|
|
|
|
2022-02-01 09:11:23 +01:00
|
|
|
ta = _TestAggregatedDataset(
|
|
|
|
bucket_start_time=floored_ts,
|
|
|
|
resource=resource,
|
|
|
|
)
|
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
2023-03-01 19:09:51 -05:00
|
|
|
user_email_pattern=user_email_pattern,
|
2022-02-01 09:11:23 +01:00
|
|
|
)
|
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
2023-03-01 19:09:51 -05:00
|
|
|
user_email_pattern=user_email_pattern,
|
2022-02-01 09:11:23 +01:00
|
|
|
)
|
|
|
|
ta.add_read_entry(
|
|
|
|
test_email2,
|
|
|
|
test_query2,
|
|
|
|
[],
|
2023-03-01 19:09:51 -05:00
|
|
|
user_email_pattern=user_email_pattern,
|
2022-02-01 09:11:23 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
assert ta.queryCount == 1
|
|
|
|
assert ta.queryFreq[test_query] == 0
|
|
|
|
assert ta.userFreq[test_email] == 0
|
|
|
|
assert ta.queryFreq[test_query2] == 1
|
|
|
|
assert ta.userFreq[test_email2] == 1
|
|
|
|
assert len(ta.columnFreq) == 0
|
|
|
|
|
|
|
|
|
|
|
|
def test_multiple_query_without_columns():
|
|
|
|
test_email = "test_email@test.com"
|
|
|
|
test_email2 = "test_email2@test.com"
|
|
|
|
test_query = "select * from test"
|
|
|
|
test_query2 = "select * from test2"
|
|
|
|
event_time = datetime(2020, 1, 1)
|
|
|
|
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
|
2021-12-13 18:16:24 +01:00
|
|
|
resource = "test_db.test_schema.test_table"
|
|
|
|
|
2021-12-15 01:44:42 -05:00
|
|
|
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
|
2021-12-13 18:16:24 +01:00
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
|
|
|
)
|
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
|
|
|
)
|
|
|
|
ta.add_read_entry(
|
|
|
|
test_email2,
|
|
|
|
test_query2,
|
|
|
|
[],
|
|
|
|
)
|
|
|
|
|
|
|
|
assert ta.queryCount == 3
|
|
|
|
assert ta.queryFreq[test_query] == 2
|
|
|
|
assert ta.userFreq[test_email] == 2
|
|
|
|
assert ta.queryFreq[test_query2] == 1
|
|
|
|
assert ta.userFreq[test_email2] == 1
|
|
|
|
assert len(ta.columnFreq) == 0
|
|
|
|
|
|
|
|
|
|
|
|
def test_make_usage_workunit():
|
|
|
|
test_email = "test_email@test.com"
|
|
|
|
test_query = "select * from test"
|
|
|
|
event_time = datetime(2020, 1, 1)
|
|
|
|
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
|
|
|
|
resource = "test_db.test_schema.test_table"
|
|
|
|
|
2021-12-15 01:44:42 -05:00
|
|
|
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
|
2021-12-13 18:16:24 +01:00
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
|
|
|
)
|
|
|
|
wu: MetadataWorkUnit = ta.make_usage_workunit(
|
2022-04-01 00:15:09 +02:00
|
|
|
bucket_duration=BucketDuration.DAY,
|
2023-05-01 14:30:09 -04:00
|
|
|
resource_urn_builder=_simple_urn_builder,
|
2022-04-01 00:15:09 +02:00
|
|
|
top_n_queries=10,
|
|
|
|
format_sql_queries=False,
|
2022-05-12 17:26:03 -04:00
|
|
|
include_top_n_queries=True,
|
2023-08-30 15:53:08 -04:00
|
|
|
queries_character_limit=DEFAULT_QUERIES_CHARACTER_LIMIT,
|
2021-12-13 18:16:24 +01:00
|
|
|
)
|
2022-02-01 09:11:23 +01:00
|
|
|
|
2023-04-07 12:18:26 -07:00
|
|
|
ts_timestamp = int(floored_ts.timestamp() * 1000)
|
|
|
|
assert (
|
|
|
|
wu.id == f"{_simple_urn_builder(resource)}-{USAGE_ASPECT_NAME}-{ts_timestamp}"
|
|
|
|
)
|
2021-12-13 18:16:24 +01:00
|
|
|
assert isinstance(wu.get_metadata()["metadata"], MetadataChangeProposalWrapper)
|
|
|
|
du: DatasetUsageStatisticsClass = wu.get_metadata()["metadata"].aspect
|
|
|
|
assert du.totalSqlQueries == 1
|
|
|
|
assert du.topSqlQueries
|
|
|
|
assert du.topSqlQueries.pop() == test_query
|
|
|
|
|
|
|
|
|
2022-04-01 00:15:09 +02:00
|
|
|
def test_query_formatting():
|
|
|
|
test_email = "test_email@test.com"
|
|
|
|
test_query = "select * from foo where id in (select id from bar);"
|
2023-07-28 13:10:19 -07:00
|
|
|
formatted_test_query: str = "SELECT *\n FROM foo\n WHERE id IN (\n SELECT id\n FROM bar\n );"
|
2022-04-01 00:15:09 +02:00
|
|
|
event_time = datetime(2020, 1, 1)
|
|
|
|
|
|
|
|
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
|
|
|
|
|
|
|
|
resource = "test_db.test_schema.test_table"
|
|
|
|
|
|
|
|
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
|
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
|
|
|
)
|
|
|
|
wu: MetadataWorkUnit = ta.make_usage_workunit(
|
|
|
|
bucket_duration=BucketDuration.DAY,
|
2023-05-01 14:30:09 -04:00
|
|
|
resource_urn_builder=_simple_urn_builder,
|
2022-04-01 00:15:09 +02:00
|
|
|
top_n_queries=10,
|
|
|
|
format_sql_queries=True,
|
2022-05-12 17:26:03 -04:00
|
|
|
include_top_n_queries=True,
|
2023-08-30 15:53:08 -04:00
|
|
|
queries_character_limit=DEFAULT_QUERIES_CHARACTER_LIMIT,
|
2022-04-01 00:15:09 +02:00
|
|
|
)
|
2023-04-07 12:18:26 -07:00
|
|
|
ts_timestamp = int(floored_ts.timestamp() * 1000)
|
|
|
|
assert (
|
|
|
|
wu.id == f"{_simple_urn_builder(resource)}-{USAGE_ASPECT_NAME}-{ts_timestamp}"
|
|
|
|
)
|
2022-04-01 00:15:09 +02:00
|
|
|
assert isinstance(wu.get_metadata()["metadata"], MetadataChangeProposalWrapper)
|
|
|
|
du: DatasetUsageStatisticsClass = wu.get_metadata()["metadata"].aspect
|
|
|
|
assert du.totalSqlQueries == 1
|
|
|
|
assert du.topSqlQueries
|
|
|
|
assert du.topSqlQueries.pop() == formatted_test_query
|
|
|
|
|
|
|
|
|
2021-12-13 18:16:24 +01:00
|
|
|
def test_query_trimming():
|
|
|
|
test_email: str = "test_email@test.com"
|
|
|
|
test_query: str = "select * from test where a > 10 and b > 20 order by a asc"
|
|
|
|
top_n_queries: int = 10
|
2023-08-30 15:53:08 -04:00
|
|
|
queries_character_limit: int = 200
|
2021-12-13 18:16:24 +01:00
|
|
|
event_time = datetime(2020, 1, 1)
|
|
|
|
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
|
|
|
|
resource = "test_db.test_schema.test_table"
|
|
|
|
|
2021-12-15 01:44:42 -05:00
|
|
|
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
|
2021-12-13 18:16:24 +01:00
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
|
|
|
)
|
|
|
|
wu: MetadataWorkUnit = ta.make_usage_workunit(
|
|
|
|
bucket_duration=BucketDuration.DAY,
|
2023-05-01 14:30:09 -04:00
|
|
|
resource_urn_builder=_simple_urn_builder,
|
2021-12-13 18:16:24 +01:00
|
|
|
top_n_queries=top_n_queries,
|
2022-04-01 00:15:09 +02:00
|
|
|
format_sql_queries=False,
|
2022-05-12 17:26:03 -04:00
|
|
|
include_top_n_queries=True,
|
2023-08-30 15:53:08 -04:00
|
|
|
queries_character_limit=queries_character_limit,
|
2021-12-13 18:16:24 +01:00
|
|
|
)
|
2022-02-01 09:11:23 +01:00
|
|
|
|
2023-04-07 12:18:26 -07:00
|
|
|
ts_timestamp = int(floored_ts.timestamp() * 1000)
|
|
|
|
assert (
|
|
|
|
wu.id == f"{_simple_urn_builder(resource)}-{USAGE_ASPECT_NAME}-{ts_timestamp}"
|
|
|
|
)
|
2021-12-13 18:16:24 +01:00
|
|
|
assert isinstance(wu.get_metadata()["metadata"], MetadataChangeProposalWrapper)
|
|
|
|
du: DatasetUsageStatisticsClass = wu.get_metadata()["metadata"].aspect
|
|
|
|
assert du.totalSqlQueries == 1
|
|
|
|
assert du.topSqlQueries
|
2023-03-01 19:09:51 -05:00
|
|
|
assert du.topSqlQueries.pop() == "select * from te ..."
|
2021-12-13 18:16:24 +01:00
|
|
|
|
|
|
|
|
|
|
|
def test_top_n_queries_validator_fails():
|
|
|
|
with pytest.raises(ValidationError) as excinfo:
|
2023-08-30 15:53:08 -04:00
|
|
|
BaseUsageConfig(top_n_queries=2, queries_character_limit=20)
|
2021-12-13 18:16:24 +01:00
|
|
|
assert "top_n_queries is set to 2 but it can be maximum 1" in str(excinfo.value)
|
2022-05-12 17:26:03 -04:00
|
|
|
|
|
|
|
|
|
|
|
def test_make_usage_workunit_include_top_n_queries():
|
|
|
|
test_email = "test_email@test.com"
|
|
|
|
test_query = "select * from test"
|
|
|
|
event_time = datetime(2020, 1, 1)
|
|
|
|
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
|
|
|
|
resource = "test_db.test_schema.test_table"
|
|
|
|
|
|
|
|
ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
|
|
|
|
ta.add_read_entry(
|
|
|
|
test_email,
|
|
|
|
test_query,
|
|
|
|
[],
|
|
|
|
)
|
|
|
|
wu: MetadataWorkUnit = ta.make_usage_workunit(
|
|
|
|
bucket_duration=BucketDuration.DAY,
|
2023-05-01 14:30:09 -04:00
|
|
|
resource_urn_builder=_simple_urn_builder,
|
2022-05-12 17:26:03 -04:00
|
|
|
top_n_queries=10,
|
|
|
|
format_sql_queries=False,
|
|
|
|
include_top_n_queries=False,
|
2023-08-30 15:53:08 -04:00
|
|
|
queries_character_limit=DEFAULT_QUERIES_CHARACTER_LIMIT,
|
2022-05-12 17:26:03 -04:00
|
|
|
)
|
|
|
|
|
2023-04-07 12:18:26 -07:00
|
|
|
ts_timestamp = int(floored_ts.timestamp() * 1000)
|
|
|
|
assert (
|
|
|
|
wu.id == f"{_simple_urn_builder(resource)}-{USAGE_ASPECT_NAME}-{ts_timestamp}"
|
|
|
|
)
|
2022-05-12 17:26:03 -04:00
|
|
|
assert isinstance(wu.get_metadata()["metadata"], MetadataChangeProposalWrapper)
|
|
|
|
du: DatasetUsageStatisticsClass = wu.get_metadata()["metadata"].aspect
|
|
|
|
assert du.totalSqlQueries == 1
|
|
|
|
assert du.topSqlQueries is None
|
2023-05-17 00:01:57 -04:00
|
|
|
|
|
|
|
|
|
|
|
@freeze_time("2023-01-01 00:00:00")
|
|
|
|
def test_convert_usage_aggregation_class():
|
|
|
|
urn = make_dataset_urn_with_platform_instance(
|
|
|
|
"platform", "test_db.test_schema.test_table", None
|
|
|
|
)
|
|
|
|
usage_aggregation = UsageAggregationClass(
|
|
|
|
bucket=int(time.time() * 1000),
|
|
|
|
duration=WindowDurationClass.DAY,
|
|
|
|
resource=urn,
|
|
|
|
metrics=UsageAggregationMetricsClass(
|
|
|
|
uniqueUserCount=5,
|
|
|
|
users=[
|
|
|
|
UserUsageCountsClass(count=3, user="abc", userEmail="abc@acryl.io"),
|
|
|
|
UserUsageCountsClass(count=2),
|
|
|
|
UserUsageCountsClass(count=1, user="def"),
|
|
|
|
],
|
|
|
|
totalSqlQueries=10,
|
|
|
|
topSqlQueries=["SELECT * FROM my_table", "SELECT col from a.b.c"],
|
|
|
|
fields=[FieldUsageCountsClass("col", 7), FieldUsageCountsClass("col2", 0)],
|
|
|
|
),
|
|
|
|
)
|
|
|
|
assert convert_usage_aggregation_class(
|
|
|
|
usage_aggregation
|
|
|
|
) == MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=urn,
|
|
|
|
aspect=DatasetUsageStatisticsClass(
|
|
|
|
timestampMillis=int(time.time() * 1000),
|
|
|
|
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
|
|
|
|
uniqueUserCount=5,
|
|
|
|
totalSqlQueries=10,
|
|
|
|
topSqlQueries=["SELECT * FROM my_table", "SELECT col from a.b.c"],
|
|
|
|
userCounts=[
|
|
|
|
DatasetUserUsageCountsClass(
|
|
|
|
user="abc", count=3, userEmail="abc@acryl.io"
|
|
|
|
),
|
|
|
|
DatasetUserUsageCountsClass(user="def", count=1),
|
|
|
|
],
|
|
|
|
fieldCounts=[
|
|
|
|
DatasetFieldUsageCountsClass(fieldPath="col", count=7),
|
|
|
|
DatasetFieldUsageCountsClass(fieldPath="col2", count=0),
|
|
|
|
],
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
empty_urn = make_dataset_urn_with_platform_instance(
|
|
|
|
"platform",
|
|
|
|
"test_db.test_schema.empty_table",
|
|
|
|
None,
|
|
|
|
)
|
|
|
|
empty_usage_aggregation = UsageAggregationClass(
|
|
|
|
bucket=int(time.time() * 1000) - 1000 * 60 * 60 * 24,
|
|
|
|
duration=WindowDurationClass.MONTH,
|
|
|
|
resource=empty_urn,
|
|
|
|
metrics=UsageAggregationMetricsClass(),
|
|
|
|
)
|
|
|
|
assert convert_usage_aggregation_class(
|
|
|
|
empty_usage_aggregation
|
|
|
|
) == MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=empty_urn,
|
|
|
|
aspect=DatasetUsageStatisticsClass(
|
|
|
|
timestampMillis=int(time.time() * 1000) - 1000 * 60 * 60 * 24,
|
|
|
|
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.MONTH),
|
|
|
|
),
|
|
|
|
)
|