296 lines
9.6 KiB
Python

import json
import pathlib
from pathlib import Path
from typing import Dict, List, Union
from unittest.mock import MagicMock, Mock, patch
from freezegun import freeze_time
from datahub.emitter.mce_builder import make_dataset_urn, make_user_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.redshift_schema import (
RedshiftTable,
RedshiftView,
)
from datahub.ingestion.source.redshift.report import RedshiftReport
from datahub.ingestion.source.redshift.usage import RedshiftUsageExtractor
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import OperationClass, OperationTypeClass
from datahub.testing import mce_helpers
FROZEN_TIME = "2021-09-15 09:00:00"
def test_redshift_usage_config():
config = RedshiftConfig.parse_obj(
dict(
host_port="xxxxx",
database="xxxxx",
username="xxxxx",
password="xxxxx",
email_domain="xxxxx",
include_views=True,
include_tables=True,
)
)
assert config.host_port == "xxxxx"
assert config.database == "xxxxx"
assert config.username == "xxxxx"
assert config.email_domain == "xxxxx"
assert config.include_views
assert config.include_tables
@freeze_time(FROZEN_TIME)
@patch("redshift_connector.Cursor")
@patch("redshift_connector.Connection")
def test_redshift_usage_source(mock_cursor, mock_connection, pytestconfig, tmp_path):
test_resources_dir = pathlib.Path(
pytestconfig.rootpath / "tests/integration/redshift-usage"
)
generate_mcps_path = Path(f"{tmp_path}/redshift_usages.json")
mock_usage_query_result = open(f"{test_resources_dir}/usage_events_history.json")
mock_operational_query_result = open(
f"{test_resources_dir}/operational_events_history.json"
)
mock_usage_query_result_dict = json.load(mock_usage_query_result)
mock_operational_query_result_dict = json.load(mock_operational_query_result)
mock_cursor.execute.return_value = None
mock_connection.cursor.return_value = mock_cursor
mock_cursor_usage_query = Mock()
mock_cursor_operational_query = Mock()
mock_cursor_usage_query.execute.return_value = None
mock_cursor_operational_query.execute.return_value = None
mock_cursor_usage_query.fetchmany.side_effect = [
[list(row.values()) for row in mock_usage_query_result_dict],
[],
]
mock_cursor_operational_query.fetchmany.side_effect = [
[list(row.values()) for row in mock_operational_query_result_dict],
[],
]
mock_cursor_usage_query.description = [
[key] for key in mock_usage_query_result_dict[0]
]
mock_cursor_operational_query.description = [
[key] for key in mock_operational_query_result_dict[0]
]
mock_connection.cursor.side_effect = [
mock_cursor_operational_query,
mock_cursor_usage_query,
]
config = RedshiftConfig(host_port="test:1234", email_domain="acryl.io")
source_report = RedshiftReport()
usage_extractor = RedshiftUsageExtractor(
config=config,
connection=mock_connection,
report=source_report,
dataset_urn_builder=lambda table: make_dataset_urn("redshift", table),
)
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = {
"db1": {
"schema1": [
RedshiftTable(
name="category",
schema="schema1",
type="BASE TABLE",
created=None,
comment="",
),
]
},
"dev": {
"public": [
RedshiftTable(
name="users",
schema="public",
type="BASE TABLE",
created=None,
comment="",
),
RedshiftTable(
name="orders",
schema="public",
type="BASE TABLE",
created=None,
comment="",
),
]
},
}
mwus = usage_extractor.get_usage_workunits(all_tables=all_tables)
metadata: List[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
] = []
for mwu in mwus:
metadata.append(mwu.metadata)
# There should be 2 calls (usage aspects -1, operation aspects -1).
assert mock_connection.cursor.call_count == 2
assert source_report.num_usage_workunits_emitted == 3
assert source_report.num_operational_stats_workunits_emitted == 3
write_metadata_file(generate_mcps_path, metadata)
mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=tmp_path / "redshift_usages.json",
golden_path=test_resources_dir / "redshift_usages_golden.json",
)
@freeze_time(FROZEN_TIME)
@patch("redshift_connector.Cursor")
@patch("redshift_connector.Connection")
def test_redshift_usage_filtering(mock_cursor, mock_connection, pytestconfig, tmp_path):
test_resources_dir = pathlib.Path(
pytestconfig.rootpath / "tests/integration/redshift-usage"
)
generate_mcps_path = Path(f"{tmp_path}/redshift_usages.json")
mock_usage_query_result = open(f"{test_resources_dir}/usage_events_history.json")
mock_operational_query_result = open(
f"{test_resources_dir}/operational_events_history.json"
)
mock_usage_query_result_dict = json.load(mock_usage_query_result)
mock_operational_query_result_dict = json.load(mock_operational_query_result)
mock_cursor.execute.return_value = None
mock_connection.cursor.return_value = mock_cursor
mock_cursor_usage_query = Mock()
mock_cursor_operational_query = Mock()
mock_cursor_usage_query.execute.return_value = None
mock_cursor_operational_query.execute.return_value = None
mock_cursor_usage_query.fetchmany.side_effect = [
[list(row.values()) for row in mock_usage_query_result_dict],
[],
]
mock_cursor_operational_query.fetchmany.side_effect = [
[list(row.values()) for row in mock_operational_query_result_dict],
[],
]
mock_cursor_usage_query.description = [
[key] for key in mock_usage_query_result_dict[0]
]
mock_cursor_operational_query.description = [
[key] for key in mock_operational_query_result_dict[0]
]
mock_connection.cursor.side_effect = [
mock_cursor_operational_query,
mock_cursor_usage_query,
]
config = RedshiftConfig(host_port="test:1234", email_domain="acryl.io")
usage_extractor = RedshiftUsageExtractor(
config=config,
connection=mock_connection,
report=RedshiftReport(),
dataset_urn_builder=lambda table: make_dataset_urn("redshift", table),
)
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]] = {
"dev": {
"public": [
RedshiftTable(
name="users",
schema="public",
type="BASE TABLE",
created=None,
comment="",
),
]
},
}
mwus = usage_extractor.get_usage_workunits(all_tables=all_tables)
metadata: List[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
] = []
for mwu in mwus:
metadata.append(mwu.metadata)
write_metadata_file(generate_mcps_path, metadata)
mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=tmp_path / "redshift_usages.json",
golden_path=test_resources_dir / "redshift_usages_filtered_golden.json",
)
def load_access_events(test_resources_dir: pathlib.Path) -> List[Dict]:
access_events_history_file = test_resources_dir / "usage_events_history.json"
with access_events_history_file.open() as access_events_json:
access_events = json.loads(access_events_json.read())
return access_events
def test_duplicate_operations_dropped():
report = RedshiftReport()
usage_extractor = RedshiftUsageExtractor(
config=MagicMock(),
connection=MagicMock(),
report=report,
dataset_urn_builder=MagicMock(),
redundant_run_skip_handler=None,
)
user = make_user_urn("jdoe")
urnA = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableA,PROD)"
urnB = "urn:li:dataset:(urn:li:dataPlatform:redshift,db.schema.tableB,PROD)"
opA1 = MetadataChangeProposalWrapper(
entityUrn=urnA,
aspect=OperationClass(
timestampMillis=100 * 1000,
lastUpdatedTimestamp=95 * 1000,
actor=user,
operationType=OperationTypeClass.INSERT,
),
)
opB1 = MetadataChangeProposalWrapper(
entityUrn=urnB,
aspect=OperationClass(
timestampMillis=101 * 1000,
lastUpdatedTimestamp=94 * 1000,
actor=user,
operationType=OperationTypeClass.INSERT,
),
)
opA2 = MetadataChangeProposalWrapper(
entityUrn=urnA,
aspect=OperationClass(
timestampMillis=102 * 1000,
lastUpdatedTimestamp=90 * 1000,
actor=user,
operationType=OperationTypeClass.INSERT,
),
)
dedups = list(usage_extractor._drop_repeated_operations([opA1, opB1, opA2]))
assert dedups == [
opA1,
opB1,
]