296 lines
9.6 KiB
Python
Raw Permalink Normal View History

import json
import pathlib
from pathlib import Path
from typing import Dict, List, Union
from unittest.mock import MagicMock, Mock, patch
from freezegun import freeze_time
from datahub.emitter.mce_builder import make_dataset_urn, make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.redshift_schema import (
RedshiftTable,
RedshiftView,
)
from datahub.ingestion.source.redshift.report import RedshiftReport
from datahub.ingestion.source.redshift.usage import RedshiftUsageExtractor
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import OperationClass, OperationTypeClass
from datahub.testing import mce_helpers
FROZEN_TIME = "2021-09-15 09:00:00"
def test_redshift_usage_config():
config = RedshiftConfig.parse_obj(
dict(
host_port="xxxxx",
database="xxxxx",
username="xxxxx",
password="xxxxx",
email_domain="xxxxx",
include_views=True,
include_tables=True,
)
)
assert config.host_port == "xxxxx"
assert config.database == "xxxxx"
assert config.username == "xxxxx"
assert config.email_domain == "xxxxx"
assert config.include_views
assert config.include_tables
@freeze_time(FROZEN_TIME)
@patch("redshift_connector.Cursor")
@patch("redshift_connector.Connection")
def test_redshift_usage_source(mock_cursor, mock_connection, pytestconfig, tmp_path):
test_resources_dir = pathlib.Path(
pytestconfig.rootpath / "tests/integration/redshift-usage"
)
generate_mcps_path = Path(f"{tmp_path}/redshift_usages.json")
mock_usage_query_result = open(f"{test_resources_dir}/usage_events_history.json")
mock_operational_query_result = open(
f"{test_resources_dir}/operational_events_history.json"
)
mock_usage_query_result_dict = json.load(mock_usage_query_result)
mock_operational_query_result_dict = json.load(mock_operational_query_result)
mock_cursor.execute.return_value = None
mock_connection.cursor.return_value = mock_cursor
mock_cursor_usage_query = Mock()
mock_cursor_operational_query = Mock()
mock_cursor_usage_query.execute.return_value = None
mock_cursor_operational_query.execute.return_value = None
mock_cursor_usage_query.fetchmany.side_effect = [
[list(row.values()) for row in mock_usage_query_result_dict],
[],
]
mock_cursor_operational_query.fetchmany.side_effect = [
[list(row.values()) for row in mock_operational_query_result_dict],
[],
]
mock_cursor_usage_query.description = [
2025-04-04 11:59:43 +02:00
[key] for key in mock_usage_query_result_dict[0]
]
mock_cursor_operational_query.description = [
2025-04-04 11:59:43 +02:00
[key] for key in mock_operational_query_result_dict[0]
]
mock_connection.cursor.side_effect = [
mock_cursor_operational_query,
mock_cursor_usage_query,
]
config = RedshiftConfig(host_port="test:1234", email_domain="acryl.io")
source_report = RedshiftReport()
usage_extractor = RedshiftUsageExtractor(
config=config,
connection=mock_connection,
report=source_report,
dataset_urn_builder=lambda table: make_dataset_urn("redshift", table),
)
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = {
"db1": {
"schema1": [
RedshiftTable(
name="category",
schema="schema1",
type="BASE TABLE",
created=None,
comment="",
),
]
},
"dev": {
"public": [
RedshiftTable(
name="users",
schema="public",
type="BASE TABLE",
created=None,
comment="",
),
RedshiftTable(
name="orders",
schema="public",
type="BASE TABLE",
created=None,
comment="",
),
]
},
}
mwus = usage_extractor.get_usage_workunits(all_tables=all_tables)
metadata: List[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
] = []
for mwu in mwus:
metadata.append(mwu.metadata)
# There should be 2 calls (usage aspects -1, operation aspects -1).
assert mock_connection.cursor.call_count == 2
assert source_report.num_usage_workunits_emitted == 3
assert source_report.num_operational_stats_workunits_emitted == 3
write_metadata_file(generate_mcps_path, metadata)
mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=tmp_path / "redshift_usages.json",
golden_path=test_resources_dir / "redshift_usages_golden.json",
)
@freeze_time(FROZEN_TIME)
@patch("redshift_connector.Cursor")
@patch("redshift_connector.Connection")
def test_redshift_usage_filtering(mock_cursor, mock_connection, pytestconfig, tmp_path):
test_resources_dir = pathlib.Path(
pytestconfig.rootpath / "tests/integration/redshift-usage"
)
generate_mcps_path = Path(f"{tmp_path}/redshift_usages.json")
mock_usage_query_result = open(f"{test_resources_dir}/usage_events_history.json")
mock_operational_query_result = open(
f"{test_resources_dir}/operational_events_history.json"
)
mock_usage_query_result_dict = json.load(mock_usage_query_result)
mock_operational_query_result_dict = json.load(mock_operational_query_result)
mock_cursor.execute.return_value = None
mock_connection.cursor.return_value = mock_cursor
mock_cursor_usage_query = Mock()
mock_cursor_operational_query = Mock()
mock_cursor_usage_query.execute.return_value = None
mock_cursor_operational_query.execute.return_value = None
mock_cursor_usage_query.fetchmany.side_effect = [
[list(row.values()) for row in mock_usage_query_result_dict],
[],
]
mock_cursor_operational_query.fetchmany.side_effect = [
[list(row.values()) for row in mock_operational_query_result_dict],
[],
]
mock_cursor_usage_query.description = [
2025-04-04 11:59:43 +02:00
[key] for key in mock_usage_query_result_dict[0]
]
mock_cursor_operational_query.description = [
2025-04-04 11:59:43 +02:00
[key] for key in mock_operational_query_result_dict[0]
]
mock_connection.cursor.side_effect = [
mock_cursor_operational_query,
mock_cursor_usage_query,
]
config = RedshiftConfig(host_port="test:1234", email_domain="acryl.io")
usage_extractor = RedshiftUsageExtractor(
config=config,
connection=mock_connection,
report=RedshiftReport(),
dataset_urn_builder=lambda table: make_dataset_urn("redshift", table),
)
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = {
"dev": {
"public": [
RedshiftTable(
name="users",
schema="public",
type="BASE TABLE",
created=None,
comment="",
),
]
},
}
mwus = usage_extractor.get_usage_workunits(all_tables=all_tables)
metadata: List[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
] = []
for mwu in mwus:
metadata.append(mwu.metadata)
write_metadata_file(generate_mcps_path, metadata)
mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=tmp_path / "redshift_usages.json",
golden_path=test_resources_dir / "redshift_usages_filtered_golden.json",
)
def load_access_events(test_resources_dir: pathlib.Path) -> List[Dict]:
access_events_history_file = test_resources_dir / "usage_events_history.json"
with access_events_history_file.open() as access_events_json:
access_events = json.loads(access_events_json.read())
return access_events
def test_duplicate_operations_dropped():
report = RedshiftReport()
usage_extractor = RedshiftUsageExtractor(
config=MagicMock(),
connection=MagicMock(),
report=report,
dataset_urn_builder=MagicMock(),
redundant_run_skip_handler=None,
)
user = make_user_urn("jdoe")
urnA = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableA,PROD)"
urnB = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableB,PROD)"
opA1 = MetadataChangeProposalWrapper(
entityUrn=urnA,
aspect=OperationClass(
timestampMillis=100 * 1000,
lastUpdatedTimestamp=95 * 1000,
actor=user,
operationType=OperationTypeClass.INSERT,
),
)
opB1 = MetadataChangeProposalWrapper(
entityUrn=urnB,
aspect=OperationClass(
timestampMillis=101 * 1000,
lastUpdatedTimestamp=94 * 1000,
actor=user,
operationType=OperationTypeClass.INSERT,
),
)
opA2 = MetadataChangeProposalWrapper(
entityUrn=urnA,
aspect=OperationClass(
timestampMillis=102 * 1000,
lastUpdatedTimestamp=90 * 1000,
actor=user,
operationType=OperationTypeClass.INSERT,
),
)
dedups = list(usage_extractor._drop_repeated_operations([opA1, opB1, opA2]))
assert dedups == [
opA1,
opB1,
]