fix(ingest): downgrade column type mapping warning to info (#11115)

This commit is contained in:
Harshal Sheth 2024-08-07 14:57:05 -07:00 committed by GitHub
parent a25df8e6a0
commit d6e46b9bcf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 15 additions and 152 deletions

View File

@ -8,29 +8,10 @@ import time
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime from datetime import datetime
from pathlib import PurePath from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple from typing import Dict, Iterable, List, Optional, Tuple
import smart_open.compression as so_compression import smart_open.compression as so_compression
from more_itertools import peekable from more_itertools import peekable
from pyspark.sql.types import (
ArrayType,
BinaryType,
BooleanType,
ByteType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
NullType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)
from smart_open import open as smart_open from smart_open import open as smart_open
from datahub.emitter.mce_builder import ( from datahub.emitter.mce_builder import (
@ -48,7 +29,7 @@ from datahub.ingestion.api.decorators import (
platform_name, platform_name,
support_status, support_status,
) )
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec
from datahub.ingestion.source.abs.report import DataLakeSourceReport from datahub.ingestion.source.abs.report import DataLakeSourceReport
@ -72,22 +53,14 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase, StatefulIngestionSourceBase,
) )
from datahub.metadata.com.linkedin.pegasus2avro.schema import ( from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField, SchemaField,
SchemaFieldDataType, SchemaFieldDataType,
SchemaMetadata, SchemaMetadata,
StringTypeClass, StringTypeClass,
TimeTypeClass,
) )
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
DataPlatformInstanceClass, DataPlatformInstanceClass,
DatasetPropertiesClass, DatasetPropertiesClass,
MapTypeClass,
OperationClass, OperationClass,
OperationTypeClass, OperationTypeClass,
OtherSchemaClass, OtherSchemaClass,
@ -100,55 +73,12 @@ from datahub.utilities.perf_timer import PerfTimer
logging.getLogger("py4j").setLevel(logging.ERROR) logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html
_field_type_mapping = {
NullType: NullTypeClass,
StringType: StringTypeClass,
BinaryType: BytesTypeClass,
BooleanType: BooleanTypeClass,
DateType: DateTypeClass,
TimestampType: TimeTypeClass,
DecimalType: NumberTypeClass,
DoubleType: NumberTypeClass,
FloatType: NumberTypeClass,
ByteType: BytesTypeClass,
IntegerType: NumberTypeClass,
LongType: NumberTypeClass,
ShortType: NumberTypeClass,
ArrayType: NullTypeClass,
MapType: MapTypeClass,
StructField: RecordTypeClass,
StructType: RecordTypeClass,
}
PAGE_SIZE = 1000 PAGE_SIZE = 1000
# Hack to support the .gzip extension with smart_open. # Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"]) so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])
def get_column_type(
report: SourceReport, dataset_name: str, column_type: str
) -> SchemaFieldDataType:
"""
Maps known Spark types to datahub types
"""
TypeClass: Any = None
for field_type, type_class in _field_type_mapping.items():
if isinstance(column_type, field_type):
TypeClass = type_class
break
# if still not found, report the warning
if TypeClass is None:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
)
TypeClass = NullTypeClass
return SchemaFieldDataType(type=TypeClass())
# config flags to emit telemetry for # config flags to emit telemetry for
config_options_to_report = [ config_options_to_report = [
"platform", "platform",

View File

@ -849,8 +849,11 @@ def get_column_type(
# if still not found, report the warning # if still not found, report the warning
if TypeClass is None: if TypeClass is None:
if column_type: if column_type:
report.report_warning( report.info(
dataset_name, f"unable to map type {column_type} to metadata schema" title="Unable to map column types to DataHub types",
message="Got an unexpected column type. The column's parsed field type will not be populated.",
context=f"{dataset_name} - {column_type}",
log=False,
) )
TypeClass = NullTypeClass TypeClass = NullTypeClass

View File

@ -8,32 +8,13 @@ import time
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime from datetime import datetime
from pathlib import PurePath from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple from typing import Dict, Iterable, List, Optional, Tuple
import smart_open.compression as so_compression import smart_open.compression as so_compression
from more_itertools import peekable from more_itertools import peekable
from pyspark.conf import SparkConf from pyspark.conf import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import (
ArrayType,
BinaryType,
BooleanType,
ByteType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
NullType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)
from pyspark.sql.utils import AnalysisException from pyspark.sql.utils import AnalysisException
from smart_open import open as smart_open from smart_open import open as smart_open
@ -52,7 +33,7 @@ from datahub.ingestion.api.decorators import (
platform_name, platform_name,
support_status, support_status,
) )
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_boto_utils import get_s3_tags, list_folders from datahub.ingestion.source.aws.s3_boto_utils import get_s3_tags, list_folders
from datahub.ingestion.source.aws.s3_util import ( from datahub.ingestion.source.aws.s3_util import (
@ -72,22 +53,13 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase, StatefulIngestionSourceBase,
) )
from datahub.metadata.com.linkedin.pegasus2avro.schema import ( from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField, SchemaField,
SchemaFieldDataType,
SchemaMetadata, SchemaMetadata,
StringTypeClass, StringTypeClass,
TimeTypeClass,
) )
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
DataPlatformInstanceClass, DataPlatformInstanceClass,
DatasetPropertiesClass, DatasetPropertiesClass,
MapTypeClass,
OperationClass, OperationClass,
OperationTypeClass, OperationTypeClass,
OtherSchemaClass, OtherSchemaClass,
@ -101,55 +73,12 @@ from datahub.utilities.perf_timer import PerfTimer
logging.getLogger("py4j").setLevel(logging.ERROR) logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html
_field_type_mapping = {
NullType: NullTypeClass,
StringType: StringTypeClass,
BinaryType: BytesTypeClass,
BooleanType: BooleanTypeClass,
DateType: DateTypeClass,
TimestampType: TimeTypeClass,
DecimalType: NumberTypeClass,
DoubleType: NumberTypeClass,
FloatType: NumberTypeClass,
ByteType: BytesTypeClass,
IntegerType: NumberTypeClass,
LongType: NumberTypeClass,
ShortType: NumberTypeClass,
ArrayType: NullTypeClass,
MapType: MapTypeClass,
StructField: RecordTypeClass,
StructType: RecordTypeClass,
}
PAGE_SIZE = 1000 PAGE_SIZE = 1000
# Hack to support the .gzip extension with smart_open. # Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"]) so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])
def get_column_type(
report: SourceReport, dataset_name: str, column_type: str
) -> SchemaFieldDataType:
"""
Maps known Spark types to datahub types
"""
TypeClass: Any = None
for field_type, type_class in _field_type_mapping.items():
if isinstance(column_type, field_type):
TypeClass = type_class
break
# if still not found, report the warning
if TypeClass is None:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
)
TypeClass = NullTypeClass
return SchemaFieldDataType(type=TypeClass())
# config flags to emit telemetry for # config flags to emit telemetry for
config_options_to_report = [ config_options_to_report = [
"platform", "platform",
@ -490,9 +419,7 @@ class S3Source(StatefulIngestionSourceBase):
if not is_fieldpath_v2 if not is_fieldpath_v2
else f"[version=2.0].[type=string].{partition_key}", else f"[version=2.0].[type=string].{partition_key}",
nativeDataType="string", nativeDataType="string",
type=SchemaFieldDataType(StringTypeClass()) type=SchemaFieldDataTypeClass(StringTypeClass()),
if not is_fieldpath_v2
else SchemaFieldDataTypeClass(type=StringTypeClass()),
isPartitioningKey=True, isPartitioningKey=True,
nullable=True, nullable=True,
recursive=False, recursive=False,

View File

@ -263,8 +263,11 @@ def get_column_type(
break break
if TypeClass is None: if TypeClass is None:
sql_report.report_warning( sql_report.info(
dataset_name, f"unable to map type {column_type!r} to metadata schema" title="Unable to map column types to DataHub types",
message="Got an unexpected column type. The column's parsed field type will not be populated.",
context=f"{dataset_name} - {column_type!r}",
log=False,
) )
TypeClass = NullTypeClass TypeClass = NullTypeClass