145 lines
5.2 KiB
Python
Raw Permalink Normal View History

import pathlib
import boto3
import pytest
from freezegun import freeze_time
from moto import mock_dynamodb
from datahub.ingestion.glossary.classification_mixin import ClassificationConfig
from datahub.ingestion.glossary.classifier import DynamicTypedClassifierConfig
from datahub.ingestion.glossary.datahub_classifier import (
DataHubClassifierConfig,
InfoTypeConfig,
PredictionFactorsAndWeights,
)
from datahub.ingestion.run.pipeline import Pipeline
from datahub.testing import mce_helpers
test_resources_dir = pathlib.Path(__file__).parent
FROZEN_TIME = "2023-08-30 12:00:00"
@freeze_time(FROZEN_TIME)
@mock_dynamodb
@pytest.mark.integration
def test_dynamodb(pytestconfig, tmp_path):
boto3.setup_default_session()
client = boto3.client("dynamodb", region_name="us-west-2")
client.create_table(
TableName="Location",
KeySchema=[
{"AttributeName": "partitionKey", "KeyType": "HASH"},
{"AttributeName": "city", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "partitionKey", "AttributeType": "S"},
{"AttributeName": "city", "AttributeType": "S"},
],
ProvisionedThroughput={"ReadCapacityUnits": 10, "WriteCapacityUnits": 10},
)
client.put_item(
TableName="Location",
Item={
"partitionKey": {"S": "1"},
"city": {"S": "San Francisco"},
"address": {"S": "1st Market st"},
"zip": {"N": "94000"},
"contactNumbers": { # List type
"L": [
{"S": "+14150000000"},
{"S": "+14151111111"},
]
},
"services": { # Map type
"M": {
"parking": {"BOOL": True},
"wifi": {"S": "Free"},
"hours": { # Map type inside Map for nested structure
"M": {"open": {"S": "08:00"}, "close": {"S": "22:00"}}
},
}
},
},
)
pipeline_default_platform_instance = Pipeline.create(
{
"run_id": "dynamodb-test",
"source": {
"type": "dynamodb",
"config": {
"aws_access_key_id": "test",
"aws_secret_access_key": "test",
2024-05-02 16:02:40 -07:00
"aws_session_token": "test",
"aws_region": "us-west-2",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/dynamodb_default_platform_instance_mces.json",
},
},
}
)
pipeline_default_platform_instance.run()
pipeline_default_platform_instance.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/dynamodb_default_platform_instance_mces.json",
golden_path=test_resources_dir
/ "dynamodb_default_platform_instance_mces_golden.json",
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)
pipeline_with_platform_instance = Pipeline.create(
{
"run_id": "dynamodb-test",
"source": {
"type": "dynamodb",
"config": {
"platform_instance": "dynamodb_test",
"aws_access_key_id": "test",
"aws_secret_access_key": "test",
2024-05-02 16:02:40 -07:00
"aws_session_token": "test",
"aws_region": "us-west-2",
"classification": ClassificationConfig(
enabled=True,
classifiers=[
DynamicTypedClassifierConfig(
type="datahub",
config=DataHubClassifierConfig(
minimum_values_threshold=1,
info_types_config={
"Phone_Number": InfoTypeConfig(
prediction_factors_and_weights=PredictionFactorsAndWeights(
name=0.7,
description=0,
datatype=0,
values=0.3,
)
)
},
),
)
],
),
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/dynamodb_platform_instance_mces.json",
},
},
}
)
pipeline_with_platform_instance.run()
pipeline_with_platform_instance.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/dynamodb_platform_instance_mces.json",
golden_path=test_resources_dir / "dynamodb_platform_instance_mces_golden.json",
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)