316 lines
12 KiB
Python

import random
import string
from datetime import datetime, timezone
from typing import cast
from unittest import mock
import pytest
from freezegun import freeze_time
from datahub.configuration.common import AllowDenyPattern, DynamicTypedConfig
from datahub.ingestion.glossary.classifier import (
ClassificationConfig,
DynamicTypedClassifierConfig,
)
from datahub.ingestion.glossary.datahub_classifier import (
DataHubClassifierConfig,
InfoTypeConfig,
PredictionFactorsAndWeights,
ValuesFactorConfig,
)
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.run.pipeline_config import PipelineConfig, SourceConfig
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeV2Config,
TagOption,
)
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.testing import mce_helpers
from tests.integration.snowflake.common import FROZEN_TIME, default_query_results
pytestmark = pytest.mark.integration_batch_2
def random_email():
return (
"".join(
[
random.choice(string.ascii_lowercase)
for i in range(random.randint(10, 15))
]
)
+ "@xyz.com"
)
def random_cloud_region():
return "".join(
[
random.choice(["af", "ap", "ca", "eu", "me", "sa", "us"]),
"-",
random.choice(["central", "north", "south", "east", "west"]),
"-",
str(random.randint(1, 2)),
]
)
def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
test_resources_dir = pytestconfig.rootpath / "tests/integration/snowflake"
# Run the metadata ingestion pipeline.
output_file = tmp_path / "snowflake_test_events.json"
golden_file = test_resources_dir / "snowflake_golden.json"
with mock.patch("snowflake.connector.connect") as mock_connect, mock.patch(
"datahub.ingestion.source.snowflake.snowflake_data_reader.SnowflakeDataReader.get_sample_data_for_table"
) as mock_sample_values:
sf_connection = mock.MagicMock()
sf_cursor = mock.MagicMock()
mock_connect.return_value = sf_connection
sf_connection.cursor.return_value = sf_cursor
sf_cursor.execute.side_effect = default_query_results
mock_sample_values.return_value = {
"col_1": [random.randint(1, 80) for i in range(20)],
"col_2": [random_email() for i in range(20)],
"col_3": [random_cloud_region() for i in range(20)],
}
datahub_classifier_config = DataHubClassifierConfig(
minimum_values_threshold=10,
confidence_level_threshold=0.58,
info_types_config={
"Age": InfoTypeConfig(
prediction_factors_and_weights=PredictionFactorsAndWeights(
name=0, values=1, description=0, datatype=0
)
),
"CloudRegion": InfoTypeConfig(
prediction_factors_and_weights=PredictionFactorsAndWeights(
name=0,
description=0,
datatype=0,
values=1,
),
values=ValuesFactorConfig(
prediction_type="regex",
regex=[
r"(af|ap|ca|eu|me|sa|us)-(central|north|(north(?:east|west))|south|south(?:east|west)|east|west)-\d+"
],
),
),
},
)
pipeline = Pipeline(
config=PipelineConfig(
source=SourceConfig(
type="snowflake",
config=SnowflakeV2Config(
account_id="ABC12345.ap-south-1.aws",
username="TST_USR",
password="TST_PWD",
match_fully_qualified_names=True,
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
include_technical_schema=True,
include_table_lineage=True,
include_usage_stats=True,
format_sql_queries=True,
validate_upstreams_against_patterns=False,
include_operational_stats=True,
email_as_user_identifier=True,
incremental_lineage=False,
start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(
tzinfo=timezone.utc
),
end_time=datetime(2022, 6, 7, 7, 17, 0, 0).replace(
tzinfo=timezone.utc
),
classification=ClassificationConfig(
enabled=True,
column_pattern=AllowDenyPattern(
allow=[".*col_1$", ".*col_2$", ".*col_3$"]
),
classifiers=[
DynamicTypedClassifierConfig(
type="datahub", config=datahub_classifier_config
)
],
max_workers=1,
),
profiling=GEProfilingConfig(
enabled=True,
profile_if_updated_since_days=None,
profile_table_row_limit=None,
profile_table_size_limit=None,
profile_table_level_only=True,
),
extract_tags=TagOption.without_lineage,
),
),
sink=DynamicTypedConfig(
type="file", config={"filename": str(output_file)}
),
)
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()
assert not pipeline.source.get_report().warnings
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_file,
golden_path=golden_file,
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['timestampMillis'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['created'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['lastModified'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['fields'\]\[\d+\]\['glossaryTerms'\]\['auditStamp'\]\['time'\]",
r"root\[\d+\]\['systemMetadata'\]",
],
)
report = cast(SnowflakeV2Report, pipeline.source.get_report())
assert report.data_dictionary_cache is not None
cache_info = report.data_dictionary_cache.as_obj()
assert cache_info["get_tables_for_database"]["misses"] == 1
assert cache_info["get_views_for_database"]["misses"] == 1
# When streams query specific tables, the query will not be cached resulting in 2 cache misses
assert cache_info["get_columns_for_schema"]["misses"] == 2
assert cache_info["get_pk_constraints_for_schema"]["misses"] == 1
assert cache_info["get_fk_constraints_for_schema"]["misses"] == 1
def test_snowflake_tags_as_structured_properties(
pytestconfig, tmp_path, mock_time, mock_datahub_graph
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/snowflake"
# Run the metadata ingestion pipeline.
output_file = tmp_path / "snowflake_structured_properties_test_events.json"
golden_file = test_resources_dir / "snowflake_structured_properties_golden.json"
with mock.patch("snowflake.connector.connect") as mock_connect:
sf_connection = mock.MagicMock()
sf_cursor = mock.MagicMock()
mock_connect.return_value = sf_connection
sf_connection.cursor.return_value = sf_cursor
sf_cursor.execute.side_effect = default_query_results
pipeline = Pipeline(
config=PipelineConfig(
source=SourceConfig(
type="snowflake",
config=SnowflakeV2Config(
extract_tags_as_structured_properties=True,
structured_property_pattern=AllowDenyPattern(
deny=["test_db.test_schema.my_tag_2"]
),
extract_tags=TagOption.without_lineage,
account_id="ABC12345.ap-south-1.aws",
username="TST_USR",
password="TST_PWD",
match_fully_qualified_names=True,
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
include_technical_schema=True,
include_table_lineage=False,
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
structured_properties_template_cache_invalidation_interval=0,
),
),
sink=DynamicTypedConfig(
type="file", config={"filename": str(output_file)}
),
)
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()
assert not pipeline.source.get_report().warnings
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_file,
golden_path=golden_file,
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['timestampMillis'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['created'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['lastModified'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['fields'\]\[\d+\]\['glossaryTerms'\]\['auditStamp'\]\['time'\]",
r"root\[\d+\]\['systemMetadata'\]",
],
)
@freeze_time(FROZEN_TIME)
def test_snowflake_private_link_and_incremental_mcps(
pytestconfig, tmp_path, mock_time, mock_datahub_graph
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/snowflake"
# Run the metadata ingestion pipeline.
output_file = tmp_path / "snowflake_privatelink_test_events.json"
golden_file = test_resources_dir / "snowflake_privatelink_golden.json"
with mock.patch("snowflake.connector.connect") as mock_connect:
sf_connection = mock.MagicMock()
sf_cursor = mock.MagicMock()
mock_connect.return_value = sf_connection
sf_connection.cursor.return_value = sf_cursor
sf_cursor.execute.side_effect = default_query_results
pipeline = Pipeline(
config=PipelineConfig(
source=SourceConfig(
type="snowflake",
config=SnowflakeV2Config(
account_id="ABC12345.ap-south-1.privatelink",
username="TST_USR",
password="TST_PWD",
schema_pattern=AllowDenyPattern(allow=["test_schema"]),
include_technical_schema=True,
include_table_lineage=True,
include_column_lineage=False,
include_views=True,
include_usage_stats=False,
format_sql_queries=True,
incremental_lineage=False,
incremental_properties=True,
include_operational_stats=False,
platform_instance="instance1",
start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(
tzinfo=timezone.utc
),
end_time=datetime(2022, 6, 7, 7, 17, 0, 0).replace(
tzinfo=timezone.utc
),
),
),
sink=DynamicTypedConfig(
type="file", config={"filename": str(output_file)}
),
)
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_file,
golden_path=golden_file,
ignore_paths=[],
)