2021-09-23 16:39:33 -04:00
|
|
|
import json
|
|
|
|
import pathlib
|
2023-04-12 19:15:43 +02:00
|
|
|
from pathlib import Path
|
|
|
|
from typing import Dict, List, Union
|
2023-12-15 15:07:56 -05:00
|
|
|
from unittest.mock import MagicMock, Mock, patch
|
2021-09-23 16:39:33 -04:00
|
|
|
|
|
|
|
from freezegun import freeze_time
|
|
|
|
|
2023-12-15 15:07:56 -05:00
|
|
|
from datahub.emitter.mce_builder import make_dataset_urn, make_user_urn
|
2023-04-12 19:15:43 +02:00
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
|
|
from datahub.ingestion.sink.file import write_metadata_file
|
|
|
|
from datahub.ingestion.source.redshift.config import RedshiftConfig
|
|
|
|
from datahub.ingestion.source.redshift.redshift_schema import (
|
|
|
|
RedshiftTable,
|
|
|
|
RedshiftView,
|
|
|
|
)
|
|
|
|
from datahub.ingestion.source.redshift.report import RedshiftReport
|
|
|
|
from datahub.ingestion.source.redshift.usage import RedshiftUsageExtractor
|
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
|
|
|
MetadataChangeEvent,
|
|
|
|
MetadataChangeProposal,
|
2022-04-07 11:24:26 -07:00
|
|
|
)
|
2023-12-15 15:07:56 -05:00
|
|
|
from datahub.metadata.schema_classes import OperationClass, OperationTypeClass
|
2025-05-19 08:39:53 +02:00
|
|
|
from datahub.testing import mce_helpers
|
2021-09-23 16:39:33 -04:00
|
|
|
|
2023-06-22 17:07:50 -04:00
|
|
|
FROZEN_TIME = "2021-09-15 09:00:00"
|
2021-09-23 16:39:33 -04:00
|
|
|
|
|
|
|
|
|
|
|
def test_redshift_usage_config():
|
2023-04-12 19:15:43 +02:00
|
|
|
config = RedshiftConfig.parse_obj(
|
2021-09-23 16:39:33 -04:00
|
|
|
dict(
|
|
|
|
host_port="xxxxx",
|
|
|
|
database="xxxxx",
|
|
|
|
username="xxxxx",
|
|
|
|
password="xxxxx",
|
|
|
|
email_domain="xxxxx",
|
|
|
|
include_views=True,
|
|
|
|
include_tables=True,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
assert config.host_port == "xxxxx"
|
|
|
|
assert config.database == "xxxxx"
|
|
|
|
assert config.username == "xxxxx"
|
|
|
|
assert config.email_domain == "xxxxx"
|
|
|
|
assert config.include_views
|
|
|
|
assert config.include_tables
|
|
|
|
|
|
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
2023-04-12 19:15:43 +02:00
|
|
|
@patch("redshift_connector.Cursor")
|
|
|
|
@patch("redshift_connector.Connection")
|
|
|
|
def test_redshift_usage_source(mock_cursor, mock_connection, pytestconfig, tmp_path):
|
2021-09-23 16:39:33 -04:00
|
|
|
test_resources_dir = pathlib.Path(
|
|
|
|
pytestconfig.rootpath / "tests/integration/redshift-usage"
|
|
|
|
)
|
2023-04-12 19:15:43 +02:00
|
|
|
generate_mcps_path = Path(f"{tmp_path}/redshift_usages.json")
|
|
|
|
mock_usage_query_result = open(f"{test_resources_dir}/usage_events_history.json")
|
|
|
|
mock_operational_query_result = open(
|
|
|
|
f"{test_resources_dir}/operational_events_history.json"
|
|
|
|
)
|
|
|
|
mock_usage_query_result_dict = json.load(mock_usage_query_result)
|
|
|
|
mock_operational_query_result_dict = json.load(mock_operational_query_result)
|
|
|
|
|
|
|
|
mock_cursor.execute.return_value = None
|
|
|
|
mock_connection.cursor.return_value = mock_cursor
|
|
|
|
|
|
|
|
mock_cursor_usage_query = Mock()
|
|
|
|
mock_cursor_operational_query = Mock()
|
|
|
|
mock_cursor_usage_query.execute.return_value = None
|
|
|
|
mock_cursor_operational_query.execute.return_value = None
|
|
|
|
mock_cursor_usage_query.fetchmany.side_effect = [
|
|
|
|
[list(row.values()) for row in mock_usage_query_result_dict],
|
|
|
|
[],
|
|
|
|
]
|
|
|
|
mock_cursor_operational_query.fetchmany.side_effect = [
|
|
|
|
[list(row.values()) for row in mock_operational_query_result_dict],
|
|
|
|
[],
|
|
|
|
]
|
|
|
|
|
|
|
|
mock_cursor_usage_query.description = [
|
2025-04-04 11:59:43 +02:00
|
|
|
[key] for key in mock_usage_query_result_dict[0]
|
2023-04-12 19:15:43 +02:00
|
|
|
]
|
|
|
|
mock_cursor_operational_query.description = [
|
2025-04-04 11:59:43 +02:00
|
|
|
[key] for key in mock_operational_query_result_dict[0]
|
2023-04-12 19:15:43 +02:00
|
|
|
]
|
|
|
|
|
|
|
|
mock_connection.cursor.side_effect = [
|
|
|
|
mock_cursor_operational_query,
|
|
|
|
mock_cursor_usage_query,
|
|
|
|
]
|
|
|
|
|
|
|
|
config = RedshiftConfig(host_port="test:1234", email_domain="acryl.io")
|
|
|
|
source_report = RedshiftReport()
|
|
|
|
usage_extractor = RedshiftUsageExtractor(
|
|
|
|
config=config,
|
|
|
|
connection=mock_connection,
|
|
|
|
report=source_report,
|
2023-06-22 17:07:50 -04:00
|
|
|
dataset_urn_builder=lambda table: make_dataset_urn("redshift", table),
|
2023-04-12 19:15:43 +02:00
|
|
|
)
|
2021-09-23 16:39:33 -04:00
|
|
|
|
2023-04-12 19:15:43 +02:00
|
|
|
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = {
|
|
|
|
"db1": {
|
|
|
|
"schema1": [
|
|
|
|
RedshiftTable(
|
|
|
|
name="category",
|
|
|
|
schema="schema1",
|
|
|
|
type="BASE TABLE",
|
|
|
|
created=None,
|
|
|
|
comment="",
|
|
|
|
),
|
|
|
|
]
|
|
|
|
},
|
|
|
|
"dev": {
|
|
|
|
"public": [
|
|
|
|
RedshiftTable(
|
|
|
|
name="users",
|
|
|
|
schema="public",
|
|
|
|
type="BASE TABLE",
|
|
|
|
created=None,
|
|
|
|
comment="",
|
|
|
|
),
|
|
|
|
RedshiftTable(
|
|
|
|
name="orders",
|
|
|
|
schema="public",
|
|
|
|
type="BASE TABLE",
|
|
|
|
created=None,
|
|
|
|
comment="",
|
|
|
|
),
|
|
|
|
]
|
|
|
|
},
|
|
|
|
}
|
2023-06-22 17:07:50 -04:00
|
|
|
mwus = usage_extractor.get_usage_workunits(all_tables=all_tables)
|
2023-04-12 19:15:43 +02:00
|
|
|
metadata: List[
|
|
|
|
Union[
|
|
|
|
MetadataChangeEvent,
|
|
|
|
MetadataChangeProposal,
|
|
|
|
MetadataChangeProposalWrapper,
|
|
|
|
]
|
|
|
|
] = []
|
|
|
|
|
|
|
|
for mwu in mwus:
|
|
|
|
metadata.append(mwu.metadata)
|
2021-09-23 16:39:33 -04:00
|
|
|
|
2022-04-07 11:24:26 -07:00
|
|
|
# There should be 2 calls (usage aspects -1, operation aspects -1).
|
2023-04-12 19:15:43 +02:00
|
|
|
assert mock_connection.cursor.call_count == 2
|
2022-04-07 11:24:26 -07:00
|
|
|
assert source_report.num_usage_workunits_emitted == 3
|
|
|
|
assert source_report.num_operational_stats_workunits_emitted == 3
|
2023-04-12 19:15:43 +02:00
|
|
|
|
|
|
|
write_metadata_file(generate_mcps_path, metadata)
|
2021-09-23 16:39:33 -04:00
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig=pytestconfig,
|
|
|
|
output_path=tmp_path / "redshift_usages.json",
|
|
|
|
golden_path=test_resources_dir / "redshift_usages_golden.json",
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2022-04-02 09:18:23 +05:30
|
|
|
@freeze_time(FROZEN_TIME)
|
2023-04-12 19:15:43 +02:00
|
|
|
@patch("redshift_connector.Cursor")
|
|
|
|
@patch("redshift_connector.Connection")
|
|
|
|
def test_redshift_usage_filtering(mock_cursor, mock_connection, pytestconfig, tmp_path):
|
2022-04-02 09:18:23 +05:30
|
|
|
test_resources_dir = pathlib.Path(
|
|
|
|
pytestconfig.rootpath / "tests/integration/redshift-usage"
|
|
|
|
)
|
2023-04-12 19:15:43 +02:00
|
|
|
generate_mcps_path = Path(f"{tmp_path}/redshift_usages.json")
|
|
|
|
mock_usage_query_result = open(f"{test_resources_dir}/usage_events_history.json")
|
|
|
|
mock_operational_query_result = open(
|
|
|
|
f"{test_resources_dir}/operational_events_history.json"
|
|
|
|
)
|
|
|
|
mock_usage_query_result_dict = json.load(mock_usage_query_result)
|
|
|
|
mock_operational_query_result_dict = json.load(mock_operational_query_result)
|
|
|
|
|
|
|
|
mock_cursor.execute.return_value = None
|
|
|
|
mock_connection.cursor.return_value = mock_cursor
|
|
|
|
|
|
|
|
mock_cursor_usage_query = Mock()
|
|
|
|
mock_cursor_operational_query = Mock()
|
|
|
|
mock_cursor_usage_query.execute.return_value = None
|
|
|
|
mock_cursor_operational_query.execute.return_value = None
|
|
|
|
mock_cursor_usage_query.fetchmany.side_effect = [
|
|
|
|
[list(row.values()) for row in mock_usage_query_result_dict],
|
|
|
|
[],
|
|
|
|
]
|
|
|
|
mock_cursor_operational_query.fetchmany.side_effect = [
|
|
|
|
[list(row.values()) for row in mock_operational_query_result_dict],
|
|
|
|
[],
|
|
|
|
]
|
|
|
|
|
|
|
|
mock_cursor_usage_query.description = [
|
2025-04-04 11:59:43 +02:00
|
|
|
[key] for key in mock_usage_query_result_dict[0]
|
2023-04-12 19:15:43 +02:00
|
|
|
]
|
|
|
|
mock_cursor_operational_query.description = [
|
2025-04-04 11:59:43 +02:00
|
|
|
[key] for key in mock_operational_query_result_dict[0]
|
2023-04-12 19:15:43 +02:00
|
|
|
]
|
|
|
|
|
|
|
|
mock_connection.cursor.side_effect = [
|
|
|
|
mock_cursor_operational_query,
|
|
|
|
mock_cursor_usage_query,
|
|
|
|
]
|
|
|
|
|
|
|
|
config = RedshiftConfig(host_port="test:1234", email_domain="acryl.io")
|
|
|
|
usage_extractor = RedshiftUsageExtractor(
|
|
|
|
config=config,
|
|
|
|
connection=mock_connection,
|
|
|
|
report=RedshiftReport(),
|
2023-06-22 17:07:50 -04:00
|
|
|
dataset_urn_builder=lambda table: make_dataset_urn("redshift", table),
|
2023-04-12 19:15:43 +02:00
|
|
|
)
|
2022-04-02 09:18:23 +05:30
|
|
|
|
2023-04-12 19:15:43 +02:00
|
|
|
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = {
|
|
|
|
"dev": {
|
|
|
|
"public": [
|
|
|
|
RedshiftTable(
|
|
|
|
name="users",
|
|
|
|
schema="public",
|
|
|
|
type="BASE TABLE",
|
|
|
|
created=None,
|
|
|
|
comment="",
|
|
|
|
),
|
|
|
|
]
|
|
|
|
},
|
|
|
|
}
|
2023-06-22 17:07:50 -04:00
|
|
|
mwus = usage_extractor.get_usage_workunits(all_tables=all_tables)
|
2023-04-12 19:15:43 +02:00
|
|
|
metadata: List[
|
|
|
|
Union[
|
|
|
|
MetadataChangeEvent,
|
|
|
|
MetadataChangeProposal,
|
|
|
|
MetadataChangeProposalWrapper,
|
|
|
|
]
|
|
|
|
] = []
|
|
|
|
|
|
|
|
for mwu in mwus:
|
|
|
|
metadata.append(mwu.metadata)
|
|
|
|
|
|
|
|
write_metadata_file(generate_mcps_path, metadata)
|
2022-04-02 09:18:23 +05:30
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig=pytestconfig,
|
|
|
|
output_path=tmp_path / "redshift_usages.json",
|
|
|
|
golden_path=test_resources_dir / "redshift_usages_filtered_golden.json",
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2022-04-05 09:21:27 -07:00
|
|
|
def load_access_events(test_resources_dir: pathlib.Path) -> List[Dict]:
|
2021-09-23 16:39:33 -04:00
|
|
|
access_events_history_file = test_resources_dir / "usage_events_history.json"
|
|
|
|
with access_events_history_file.open() as access_events_json:
|
|
|
|
access_events = json.loads(access_events_json.read())
|
|
|
|
return access_events
|
2023-12-15 15:07:56 -05:00
|
|
|
|
|
|
|
|
|
|
|
def test_duplicate_operations_dropped():
|
|
|
|
report = RedshiftReport()
|
|
|
|
usage_extractor = RedshiftUsageExtractor(
|
|
|
|
config=MagicMock(),
|
|
|
|
connection=MagicMock(),
|
|
|
|
report=report,
|
|
|
|
dataset_urn_builder=MagicMock(),
|
|
|
|
redundant_run_skip_handler=None,
|
|
|
|
)
|
|
|
|
|
|
|
|
user = make_user_urn("jdoe")
|
|
|
|
urnA = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableA,PROD)"
|
|
|
|
urnB = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableB,PROD)"
|
|
|
|
|
|
|
|
opA1 = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=urnA,
|
|
|
|
aspect=OperationClass(
|
|
|
|
timestampMillis=100 * 1000,
|
|
|
|
lastUpdatedTimestamp=95 * 1000,
|
|
|
|
actor=user,
|
|
|
|
operationType=OperationTypeClass.INSERT,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
opB1 = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=urnB,
|
|
|
|
aspect=OperationClass(
|
|
|
|
timestampMillis=101 * 1000,
|
|
|
|
lastUpdatedTimestamp=94 * 1000,
|
|
|
|
actor=user,
|
|
|
|
operationType=OperationTypeClass.INSERT,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
opA2 = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=urnA,
|
|
|
|
aspect=OperationClass(
|
|
|
|
timestampMillis=102 * 1000,
|
|
|
|
lastUpdatedTimestamp=90 * 1000,
|
|
|
|
actor=user,
|
|
|
|
operationType=OperationTypeClass.INSERT,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
dedups = list(usage_extractor._drop_repeated_operations([opA1, opB1, opA2]))
|
|
|
|
assert dedups == [
|
|
|
|
opA1,
|
|
|
|
opB1,
|
|
|
|
]
|