fix(ingest): update athena type mapping (#9061)

This commit is contained in:
Harshal Sheth 2023-10-24 19:59:42 -07:00 committed by GitHub
parent 9a59c452bf
commit 916235d31a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 11 deletions

View File

@ -37,7 +37,7 @@ from datahub.ingestion.source.sql.sql_utils import (
gen_database_key,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
from datahub.metadata.schema_classes import RecordTypeClass
from datahub.metadata.schema_classes import MapTypeClass, RecordTypeClass
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column
from datahub.utilities.sqlalchemy_type_converter import (
MapType,
@ -46,7 +46,9 @@ from datahub.utilities.sqlalchemy_type_converter import (
logger = logging.getLogger(__name__)
assert STRUCT, "required type modules are not available"
register_custom_type(STRUCT, RecordTypeClass)
register_custom_type(MapType, MapTypeClass)
class CustomAthenaRestDialect(AthenaRestDialect):

View File

@ -80,7 +80,6 @@ from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
DatasetPropertiesClass,
GlobalTagsClass,
MapTypeClass,
SubTypesClass,
TagAssociationClass,
UpstreamClass,
@ -90,7 +89,6 @@ from datahub.telemetry import telemetry
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport
from datahub.utilities.sqlalchemy_type_converter import MapType
if TYPE_CHECKING:
from datahub.ingestion.source.ge_data_profiler import (
@ -140,6 +138,7 @@ class SqlWorkUnit(MetadataWorkUnit):
_field_type_mapping: Dict[Type[TypeEngine], Type] = {
# Note: to add dialect-specific types to this mapping, use the `register_custom_type` function.
types.Integer: NumberTypeClass,
types.Numeric: NumberTypeClass,
types.Boolean: BooleanTypeClass,
@ -156,8 +155,6 @@ _field_type_mapping: Dict[Type[TypeEngine], Type] = {
types.DATETIME: TimeTypeClass,
types.TIMESTAMP: TimeTypeClass,
types.JSON: RecordTypeClass,
# additional type definitions that are used by the Athena source
MapType: MapTypeClass, # type: ignore
# Because the postgresql dialect is used internally by many other dialects,
# we add some postgres types here. This is ok to do because the postgresql
# dialect is built-in to sqlalchemy.

View File

@ -7,7 +7,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesType,
DateType,
EnumType,
MapType as MapTypeAvro,
MapType,
NullType,
NumberType,
RecordType,
@ -15,7 +15,6 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
TimeType,
UnionType,
)
from datahub.utilities.sqlalchemy_type_converter import MapType
# these can be obtained by running `select format_type(oid, null),* from pg_type;`
# we've omitted the types without a meaningful DataHub type (e.g. postgres-specific types, index vectors, etc.)
@ -364,7 +363,7 @@ TRINO_SQL_TYPES_MAP: Dict[str, Any] = {
"time": TimeType,
"timestamp": TimeType,
"row": RecordType,
"map": MapTypeAvro,
"map": MapType,
"array": ArrayType,
}

View File

@ -4,7 +4,6 @@ import uuid
from typing import Any, Dict, List, Optional, Type, Union
from sqlalchemy import types
from sqlalchemy_bigquery import STRUCT
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
@ -12,6 +11,12 @@ from datahub.metadata.schema_classes import NullTypeClass, SchemaFieldDataTypeCl
logger = logging.getLogger(__name__)
try:
# This is used for both BigQuery and Athena.
from sqlalchemy_bigquery import STRUCT
except ImportError:
STRUCT = None
class MapType(types.TupleType):
# Wrapper class around SQLalchemy's TupleType to increase compatibility with DataHub
@ -42,7 +47,9 @@ class SqlAlchemyColumnToAvroConverter:
) -> Dict[str, Any]:
"""Determines the concrete AVRO schema type for a SQLalchemy-typed column"""
if type(column_type) in cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys():
if isinstance(
column_type, tuple(cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys())
):
return {
"type": cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE[type(column_type)],
"native_data_type": str(column_type),
@ -88,7 +95,7 @@ class SqlAlchemyColumnToAvroConverter:
"key_type": cls.get_avro_type(column_type=key_type, nullable=nullable),
"key_native_data_type": str(key_type),
}
if isinstance(column_type, STRUCT):
if STRUCT and isinstance(column_type, STRUCT):
fields = []
for field_def in column_type._STRUCT_fields:
field_name, field_type = field_def