diff --git a/metadata-ingestion/src/datahub/ingestion/source/abs/source.py b/metadata-ingestion/src/datahub/ingestion/source/abs/source.py index 39ebd79c2e..66f268799b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/abs/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/abs/source.py @@ -8,29 +8,10 @@ import time from collections import OrderedDict from datetime import datetime from pathlib import PurePath -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple import smart_open.compression as so_compression from more_itertools import peekable -from pyspark.sql.types import ( - ArrayType, - BinaryType, - BooleanType, - ByteType, - DateType, - DecimalType, - DoubleType, - FloatType, - IntegerType, - LongType, - MapType, - NullType, - ShortType, - StringType, - StructField, - StructType, - TimestampType, -) from smart_open import open as smart_open from datahub.emitter.mce_builder import ( @@ -48,7 +29,7 @@ from datahub.ingestion.api.decorators import ( platform_name, support_status, ) -from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport +from datahub.ingestion.api.source import MetadataWorkUnitProcessor from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec from datahub.ingestion.source.abs.report import DataLakeSourceReport @@ -72,22 +53,14 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) from datahub.metadata.com.linkedin.pegasus2avro.schema import ( - BooleanTypeClass, - BytesTypeClass, - DateTypeClass, - NullTypeClass, - NumberTypeClass, - RecordTypeClass, SchemaField, SchemaFieldDataType, SchemaMetadata, StringTypeClass, - TimeTypeClass, ) from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, DatasetPropertiesClass, - MapTypeClass, OperationClass, OperationTypeClass, OtherSchemaClass, @@ -100,55 +73,12 @@ from datahub.utilities.perf_timer import PerfTimer logging.getLogger("py4j").setLevel(logging.ERROR) logger: logging.Logger = logging.getLogger(__name__) -# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html -_field_type_mapping = { - NullType: NullTypeClass, - StringType: StringTypeClass, - BinaryType: BytesTypeClass, - BooleanType: BooleanTypeClass, - DateType: DateTypeClass, - TimestampType: TimeTypeClass, - DecimalType: NumberTypeClass, - DoubleType: NumberTypeClass, - FloatType: NumberTypeClass, - ByteType: BytesTypeClass, - IntegerType: NumberTypeClass, - LongType: NumberTypeClass, - ShortType: NumberTypeClass, - ArrayType: NullTypeClass, - MapType: MapTypeClass, - StructField: RecordTypeClass, - StructType: RecordTypeClass, -} PAGE_SIZE = 1000 # Hack to support the .gzip extension with smart_open. so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"]) -def get_column_type( - report: SourceReport, dataset_name: str, column_type: str -) -> SchemaFieldDataType: - """ - Maps known Spark types to datahub types - """ - TypeClass: Any = None - - for field_type, type_class in _field_type_mapping.items(): - if isinstance(column_type, field_type): - TypeClass = type_class - break - - # if still not found, report the warning - if TypeClass is None: - report.report_warning( - dataset_name, f"unable to map type {column_type} to metadata schema" - ) - TypeClass = NullTypeClass - - return SchemaFieldDataType(type=TypeClass()) - - # config flags to emit telemetry for config_options_to_report = [ "platform", diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index ead86acc29..e2b5f83787 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -849,8 +849,11 @@ def get_column_type( # if still not found, report the warning if TypeClass is None: if column_type: - report.report_warning( - dataset_name, f"unable to map type {column_type} to metadata schema" + report.info( + title="Unable to map column types to DataHub types", + message="Got an unexpected column type. The column's parsed field type will not be populated.", + context=f"{dataset_name} - {column_type}", + log=False, ) TypeClass = NullTypeClass diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index b8c7fd5aa8..f81d06c35e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -8,32 +8,13 @@ import time from collections import OrderedDict from datetime import datetime from pathlib import PurePath -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple import smart_open.compression as so_compression from more_itertools import peekable from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.sql.dataframe import DataFrame -from pyspark.sql.types import ( - ArrayType, - BinaryType, - BooleanType, - ByteType, - DateType, - DecimalType, - DoubleType, - FloatType, - IntegerType, - LongType, - MapType, - NullType, - ShortType, - StringType, - StructField, - StructType, - TimestampType, -) from pyspark.sql.utils import AnalysisException from smart_open import open as smart_open @@ -52,7 +33,7 @@ from datahub.ingestion.api.decorators import ( platform_name, support_status, ) -from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport +from datahub.ingestion.api.source import MetadataWorkUnitProcessor from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_boto_utils import get_s3_tags, list_folders from datahub.ingestion.source.aws.s3_util import ( @@ -72,22 +53,13 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) from datahub.metadata.com.linkedin.pegasus2avro.schema import ( - BooleanTypeClass, - BytesTypeClass, - DateTypeClass, - NullTypeClass, - NumberTypeClass, - RecordTypeClass, SchemaField, - SchemaFieldDataType, SchemaMetadata, StringTypeClass, - TimeTypeClass, ) from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, DatasetPropertiesClass, - MapTypeClass, OperationClass, OperationTypeClass, OtherSchemaClass, @@ -101,55 +73,12 @@ from datahub.utilities.perf_timer import PerfTimer logging.getLogger("py4j").setLevel(logging.ERROR) logger: logging.Logger = logging.getLogger(__name__) -# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html -_field_type_mapping = { - NullType: NullTypeClass, - StringType: StringTypeClass, - BinaryType: BytesTypeClass, - BooleanType: BooleanTypeClass, - DateType: DateTypeClass, - TimestampType: TimeTypeClass, - DecimalType: NumberTypeClass, - DoubleType: NumberTypeClass, - FloatType: NumberTypeClass, - ByteType: BytesTypeClass, - IntegerType: NumberTypeClass, - LongType: NumberTypeClass, - ShortType: NumberTypeClass, - ArrayType: NullTypeClass, - MapType: MapTypeClass, - StructField: RecordTypeClass, - StructType: RecordTypeClass, -} PAGE_SIZE = 1000 # Hack to support the .gzip extension with smart_open. so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"]) -def get_column_type( - report: SourceReport, dataset_name: str, column_type: str -) -> SchemaFieldDataType: - """ - Maps known Spark types to datahub types - """ - TypeClass: Any = None - - for field_type, type_class in _field_type_mapping.items(): - if isinstance(column_type, field_type): - TypeClass = type_class - break - - # if still not found, report the warning - if TypeClass is None: - report.report_warning( - dataset_name, f"unable to map type {column_type} to metadata schema" - ) - TypeClass = NullTypeClass - - return SchemaFieldDataType(type=TypeClass()) - - # config flags to emit telemetry for config_options_to_report = [ "platform", @@ -490,9 +419,7 @@ class S3Source(StatefulIngestionSourceBase): if not is_fieldpath_v2 else f"[version=2.0].[type=string].{partition_key}", nativeDataType="string", - type=SchemaFieldDataType(StringTypeClass()) - if not is_fieldpath_v2 - else SchemaFieldDataTypeClass(type=StringTypeClass()), + type=SchemaFieldDataTypeClass(StringTypeClass()), isPartitioningKey=True, nullable=True, recursive=False, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 1fa308eae6..2ab1e6bb41 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -263,8 +263,11 @@ def get_column_type( break if TypeClass is None: - sql_report.report_warning( - dataset_name, f"unable to map type {column_type!r} to metadata schema" + sql_report.info( + title="Unable to map column types to DataHub types", + message="Got an unexpected column type. The column's parsed field type will not be populated.", + context=f"{dataset_name} - {column_type!r}", + log=False, ) TypeClass = NullTypeClass