feat(ingest/dynamoDB): flatten struct fields (#9852)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
Tony Ouyang 2024-02-15 13:40:03 -08:00 committed by GitHub
parent bbd818a64e
commit ae1806fefa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 260 additions and 40 deletions

View File

@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass, field
from typing import Any, Counter, Dict, Iterable, List, Optional, Type, Union
from typing import Any, Counter, Dict, Iterable, List, Optional, Tuple, Type, Union
import boto3
import pydantic
@ -61,6 +61,7 @@ MAX_ITEMS_TO_RETRIEVE = 100
PAGE_SIZE = 100
MAX_SCHEMA_SIZE = 300
MAX_PRIMARY_KEYS_SIZE = 100
FIELD_DELIMITER = "."
logger: logging.Logger = logging.getLogger(__name__)
@ -285,13 +286,13 @@ class DynamoDBSource(StatefulIngestionSourceBase):
dynamodb_client: BaseClient,
region: str,
table_name: str,
) -> Dict[str, SchemaDescription]:
) -> Dict[Tuple[str, ...], SchemaDescription]:
"""
This will use the dynamodb client to scan the given table to retrieve items with pagination,
and construct the schema of this table by reading the attributes of the retrieved items
"""
paginator = dynamodb_client.get_paginator("scan")
schema: Dict[str, SchemaDescription] = {}
schema: Dict[Tuple[str, ...], SchemaDescription] = {}
"""
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Paginator.Scan
Note that the behavior of the pagination does not align with the documentation according to https://stackoverflow.com/questions/39201093/how-to-use-boto3-pagination
@ -323,7 +324,7 @@ class DynamoDBSource(StatefulIngestionSourceBase):
dynamodb_client: Any,
region: str,
table_name: str,
schema: Dict[str, SchemaDescription],
schema: Dict[Tuple[str, ...], SchemaDescription],
) -> None:
"""
It will look up in the config include_table_item dict to see if "region.table_name" exists as key,
@ -358,7 +359,9 @@ class DynamoDBSource(StatefulIngestionSourceBase):
self.construct_schema_from_items(items, schema)
def construct_schema_from_items(
slef, items: List[Dict[str, Dict]], schema: Dict[str, SchemaDescription]
self,
items: List[Dict[str, Dict]],
schema: Dict[Tuple[str, ...], SchemaDescription],
) -> None:
"""
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan
@ -367,35 +370,58 @@ class DynamoDBSource(StatefulIngestionSourceBase):
we are writing our own construct schema method, take the attribute name as key and SchemaDescription as value
"""
for document in items:
# the key is the attribute name and the value is a dict with only one entry,
# whose key is the data type and value is the data
for key, value in document.items():
if value is not None:
data_type = list(value.keys())[0]
if key not in schema:
schema[key] = {
"types": Counter(data_type),
"count": 1,
# It seems we don't have collapsed field name so we are using attribute name here
"delimited_name": key,
"type": data_type,
"nullable": False,
}
else:
# update the type count
schema[key]["types"].update({data_type: 1})
schema[key]["count"] += 1
# if we found an attribute name with different attribute type, we consider this attribute type as "mixed"
field_types = schema[key]["types"]
if len(field_types.keys()) > 1:
schema[key]["type"] = "mixed"
self.append_schema(schema, document)
def append_schema(
self,
schema: Dict[Tuple[str, ...], SchemaDescription],
document: Dict[str, Dict],
parent_field_path: Tuple[str, ...] = (),
) -> None:
# the key is the attribute name and the value is a dict with only one entry,
# whose key is the data type and value is the data and we will recursively expand
# map data type to get flattened field
for key, value in document.items():
if value is not None:
data_type = list(value.keys())[0]
attribute_value = value[data_type]
current_field_path = parent_field_path + (key,)
# Handle nested maps by recursive calls
if data_type == "M":
logger.debug(
f"expanding nested fields for map, current_field_path: {current_field_path}"
)
self.append_schema(schema, attribute_value, current_field_path)
if current_field_path not in schema:
schema[current_field_path] = {
"types": Counter({data_type: 1}),
"count": 1,
# It seems we don't have collapsed field name so we are using attribute name here
"delimited_name": FIELD_DELIMITER.join(current_field_path),
"type": data_type,
"nullable": False,
}
else:
schema[current_field_path]["types"].update({data_type: 1})
schema[current_field_path]["count"] += 1
# if we found an attribute name with different attribute type, we consider this attribute type as "mixed"
if len(schema[current_field_path]["types"]) > 1:
schema[current_field_path]["type"] = "mixed"
schema[current_field_path]["nullable"] |= (
attribute_value is None
) # Mark as nullable if null encountered
types = schema[current_field_path]["types"]
logger.debug(
f"append schema with field_path: {current_field_path} and type: {types}"
)
def construct_schema_metadata(
self,
table_name: str,
dataset_urn: str,
dataset_properties: DatasetPropertiesClass,
schema: Dict[str, SchemaDescription],
schema: Dict[Tuple[str, ...], SchemaDescription],
primary_key_dict: Dict[str, str],
) -> SchemaMetadata:
""" "
@ -407,20 +433,23 @@ class DynamoDBSource(StatefulIngestionSourceBase):
canonical_schema: List[SchemaField] = []
schema_size = len(schema.values())
table_fields = list(schema.values())
if schema_size > MAX_SCHEMA_SIZE:
# downsample the schema, using frequency as the sort key
self.report.report_warning(
key=dataset_urn,
reason=f"Downsampling the table schema because MAX_SCHEMA_SIZE threshold is {MAX_SCHEMA_SIZE}",
)
# Add this information to the custom properties so user can know they are looking at down sampled schema
dataset_properties.customProperties["schema.downsampled"] = "True"
dataset_properties.customProperties["schema.totalFields"] = f"{schema_size}"
# append each schema field (sort so output is consistent)
# append each schema field, schema will be sorted by count descending and delimited_name ascending and sliced to only include MAX_SCHEMA_SIZE items
for schema_field in sorted(
table_fields,
key=lambda x: x["delimited_name"],
key=lambda x: (
-x["count"],
x["delimited_name"],
), # Negate `count` for descending order, `delimited_name` stays the same for ascending
)[0:MAX_SCHEMA_SIZE]:
field_path = schema_field["delimited_name"]
native_data_type = self.get_native_type(schema_field["type"], table_name)

View File

@ -46,6 +46,18 @@
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "contactNumbers",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.ArrayType": {}
}
},
"nativeDataType": "List",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "partitionKey",
"nullable": false,
@ -59,6 +71,78 @@
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.RecordType": {}
}
},
"nativeDataType": "Map",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.hours",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.RecordType": {}
}
},
"nativeDataType": "Map",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.hours.close",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "String",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.hours.open",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "String",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.parking",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.BooleanType": {}
}
},
"nativeDataType": "Boolean",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.wifi",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "String",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "zip",
"nullable": true,
@ -76,7 +160,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -95,7 +180,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -111,7 +197,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -126,7 +213,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -46,6 +46,18 @@
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "contactNumbers",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.ArrayType": {}
}
},
"nativeDataType": "List",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "partitionKey",
"nullable": false,
@ -59,6 +71,78 @@
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.RecordType": {}
}
},
"nativeDataType": "Map",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.hours",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.RecordType": {}
}
},
"nativeDataType": "Map",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.hours.close",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "String",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.hours.open",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "String",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.parking",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.BooleanType": {}
}
},
"nativeDataType": "Boolean",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "services.wifi",
"nullable": true,
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "String",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "zip",
"nullable": true,
@ -76,7 +160,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -95,7 +180,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -111,7 +197,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
},
{
@ -126,7 +213,8 @@
},
"systemMetadata": {
"lastObserved": 1693396800000,
"runId": "dynamodb-test"
"runId": "dynamodb-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -15,7 +15,7 @@ FROZEN_TIME = "2023-08-30 12:00:00"
@freeze_time(FROZEN_TIME)
@mock_dynamodb
@pytest.mark.integration
def test_dynamodb(pytestconfig, tmp_path, mock_time):
def test_dynamodb(pytestconfig, tmp_path):
boto3.setup_default_session()
client = boto3.client("dynamodb", region_name="us-west-2")
client.create_table(
@ -35,6 +35,21 @@ def test_dynamodb(pytestconfig, tmp_path, mock_time):
"city": {"S": "San Francisco"},
"address": {"S": "1st Market st"},
"zip": {"N": "94000"},
"contactNumbers": { # List type
"L": [
{"S": "+14150000000"},
{"S": "+14151111111"},
]
},
"services": { # Map type
"M": {
"parking": {"BOOL": True},
"wifi": {"S": "Free"},
"hours": { # Map type inside Map for nested structure
"M": {"open": {"S": "08:00"}, "close": {"S": "22:00"}}
},
}
},
},
)