fix(ingest/iceberg): update iceberg source to support newer versions of pyiceberg at runtime (#10614)

This commit is contained in:
Eric L (CCCS) 2024-06-04 12:45:29 -04:00 committed by GitHub
parent fcab544f17
commit c04b3bc2e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 709 additions and 592 deletions

View File

@ -10,7 +10,7 @@ This ingestion source maps the following Source System Concepts to DataHub Conce
| Source Concept | DataHub Concept | Notes |
| -- | -- | -- |
| `iceberg` | [Data Platform](docs/generated/metamodel/entities/dataPlatform.md) | |
| Table | [Dataset](docs/generated/metamodel/entities/dataset.md) | Each Iceberg table maps to a Dataset named using the parent folders. If a table is stored under `my/namespace/table`, the dataset name will be `my.namespace.table`. If a [Platform Instance](https://datahubproject.io/docs/platform-instances/) is configured, it will be used as a prefix: `<platform_instance>.my.namespace.table`. |
| Table | [Dataset](docs/generated/metamodel/entities/dataset.md) | An Iceberg table is registered inside a catalog using a name, where the catalog is responsible for creating, dropping and renaming tables. Catalogs manage a collection of tables that are usually grouped into namespaces. The name of a table is mapped to a Dataset name. If a [Platform Instance](https://datahubproject.io/docs/platform-instances/) is configured, it will be used as a prefix: `<platform_instance>.my.namespace.table`. |
| [Table property](https://iceberg.apache.org/docs/latest/configuration/#table-properties) | [User (a.k.a CorpUser)](docs/generated/metamodel/entities/corpuser.md) | The value of a table property can be used as the name of a CorpUser owner. This table property name can be configured with the source option `user_ownership_property`. |
| [Table property](https://iceberg.apache.org/docs/latest/configuration/#table-properties) | CorpGroup | The value of a table property can be used as the name of a CorpGroup owner. This table property name can be configured with the source option `group_ownership_property`. |
| Table parent folders (excluding [warehouse catalog location](https://iceberg.apache.org/docs/latest/configuration/#catalog-properties)) | Container | Available in a future release |

View File

@ -3,17 +3,25 @@ source:
config:
env: PROD
catalog:
name: my_iceberg_catalog
type: rest
# Catalog configuration follows pyiceberg's documentation (https://py.iceberg.apache.org/configuration)
config:
# REST catalog configuration example using S3 storage
my_rest_catalog:
type: rest
# Catalog configuration follows pyiceberg's documentation (https://py.iceberg.apache.org/configuration)
uri: http://localhost:8181
s3.access-key-id: admin
s3.secret-access-key: password
s3.region: us-east-1
warehouse: s3a://warehouse/wh/
s3.endpoint: http://localhost:9000
platform_instance: my_iceberg_catalog
# SQL catalog configuration example using Azure datalake storage and a PostgreSQL database
# my_sql_catalog:
# type: sql
# uri: postgresql+psycopg2://user:password@sqldatabase.postgres.database.azure.com:5432/icebergcatalog
# adlfs.tenant-id: <Azure tenant ID>
# adlfs.account-name: <Azure storage account name>
# adlfs.client-id: <Azure Client/Application ID>
# adlfs.client-secret: <Azure Client Secret>
platform_instance: my_rest_catalog
table_pattern:
allow:
- marketing.*
@ -21,5 +29,4 @@ source:
enabled: true
sink:
# sink configs
# sink configs

View File

@ -225,11 +225,7 @@ microsoft_common = {"msal==1.22.0"}
iceberg_common = {
# Iceberg Python SDK
"pyiceberg~=0.4",
# We currently pin to pydantic v1, since we only test against pydantic v1 in CI.
# However, we should remove this once we fix compatibility with newer versions
# of pyiceberg, which depend on pydantic v2.
*pydantic_no_v2,
"pyiceberg>=0.4,<0.7",
}
mssql_common = {
@ -797,6 +793,7 @@ See the [DataHub docs](https://datahubproject.io/docs/metadata-ingestion).
"datahub-kafka",
"sync-file-emitter",
"sql-parser",
"iceberg",
}
else set()
)

View File

@ -4,6 +4,7 @@ import uuid
from typing import Any, Dict, Iterable, List, Optional
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchIcebergTableError
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
from pyiceberg.table import Table
from pyiceberg.typedef import Identifier
@ -76,6 +77,9 @@ from datahub.metadata.schema_classes import (
)
LOGGER = logging.getLogger(__name__)
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(
logging.WARNING
)
@platform_name("Iceberg")
@ -134,9 +138,7 @@ class IcebergSource(StatefulIngestionSourceBase):
catalog = self.config.get_catalog()
except Exception as e:
LOGGER.error("Failed to get catalog", exc_info=True)
self.report.report_failure(
"get-catalog", f"Failed to get catalog {self.config.catalog.name}: {e}"
)
self.report.report_failure("get-catalog", f"Failed to get catalog: {e}")
return
for dataset_path in self._get_datasets(catalog):
@ -150,7 +152,7 @@ class IcebergSource(StatefulIngestionSourceBase):
# Try to load an Iceberg table. Might not contain one, this will be caught by NoSuchIcebergTableError.
table = catalog.load_table(dataset_path)
yield from self._create_iceberg_workunit(dataset_name, table)
except Exception as e:
except NoSuchIcebergTableError as e:
self.report.report_failure("general", f"Failed to create workunit: {e}")
LOGGER.exception(
f"Exception while processing table {dataset_path}, skipping it.",
@ -175,6 +177,7 @@ class IcebergSource(StatefulIngestionSourceBase):
custom_properties = table.metadata.properties.copy()
custom_properties["location"] = table.metadata.location
custom_properties["format-version"] = str(table.metadata.format_version)
custom_properties["partition-spec"] = str(self._get_partition_aspect(table))
if table.current_snapshot():
custom_properties["snapshot-id"] = str(table.current_snapshot().snapshot_id)
custom_properties["manifest-list"] = table.current_snapshot().manifest_list
@ -204,6 +207,49 @@ class IcebergSource(StatefulIngestionSourceBase):
profiler = IcebergProfiler(self.report, self.config.profiling)
yield from profiler.profile_table(dataset_name, dataset_urn, table)
def _get_partition_aspect(self, table: Table) -> Optional[str]:
"""Extracts partition information from the provided table and returns a JSON array representing the [partition spec](https://iceberg.apache.org/spec/?#partition-specs) of the table.
Each element of the returned array represents a field in the [partition spec](https://iceberg.apache.org/spec/?#partition-specs) that follows [Appendix-C](https://iceberg.apache.org/spec/?#appendix-c-json-serialization) of the Iceberg specification.
Extra information has been added to this spec to make the information more user-friendly.
Since Datahub does not have a place in its model to store this information, it is saved as a JSON string and displayed as a table property.
Here is an example:
```json
"partition-spec": "[{\"name\": \"timeperiod_loaded\", \"transform\": \"identity\", \"source\": \"timeperiod_loaded\", \"source-id\": 19, \"source-type\": \"date\", \"field-id\": 1000}]",
```
Args:
table (Table): The Iceberg table to extract partition spec from.
Returns:
str: JSON representation of the partition spec of the provided table (empty array if table is not partitioned) or `None` if an error occured.
"""
try:
return json.dumps(
[
{
"name": partition.name,
"transform": str(partition.transform),
"source": str(
table.schema().find_column_name(partition.source_id)
),
"source-id": partition.source_id,
"source-type": str(
table.schema().find_type(partition.source_id)
),
"field-id": partition.field_id,
}
for partition in table.spec().fields
]
)
except Exception as e:
self.report.report_warning(
"extract-partition",
f"Failed to extract partition spec from Iceberg table {table.name()} due to error: {str(e)}",
)
return None
def _get_ownership_aspect(self, table: Table) -> Optional[OwnershipClass]:
owners = []
if self.config.user_ownership_property:
@ -432,6 +478,25 @@ class ToAvroSchemaIcebergVisitor(SchemaVisitorPerPrimitiveType[Dict[str, Any]]):
"native_data_type": str(timestamp_type),
}
# visit_timestamptz() is required when using pyiceberg >= 0.5.0, which is essentially a duplicate
# of visit_timestampz(). The function has been renamed from visit_timestampz().
# Once Datahub can upgrade its pyiceberg dependency to >=0.5.0, the visit_timestampz() function can be safely removed.
def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> Dict[str, Any]:
# Avro supports 2 types of timestamp:
# - Timestamp: independent of a particular timezone or calendar (TZ information is lost)
# - Local Timestamp: represents a timestamp in a local timezone, regardless of what specific time zone is considered local
# utcAdjustment: bool = True
return {
"type": "long",
"logicalType": "timestamp-micros",
# Commented out since Avro's Python implementation (1.11.0) does not support local-timestamp-micros, even though it exists in the spec.
# See bug report: https://issues.apache.org/jira/browse/AVRO-3476 and PR https://github.com/apache/avro/pull/1634
# "logicalType": "timestamp-micros"
# if timestamp_type.adjust_to_utc
# else "local-timestamp-micros",
"native_data_type": str(timestamptz_type),
}
def visit_timestampz(self, timestamptz_type: TimestamptzType) -> Dict[str, Any]:
# Avro supports 2 types of timestamp:
# - Timestamp: independent of a particular timezone or calendar (TZ information is lost)

View File

@ -1,7 +1,8 @@
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from pydantic import Field
from pydantic import Field, validator
from pyiceberg.catalog import Catalog, load_catalog
from datahub.configuration.common import AllowDenyPattern, ConfigModel
@ -18,6 +19,8 @@ from datahub.ingestion.source_config.operation_config import (
is_profiling_enabled,
)
logger = logging.getLogger(__name__)
class IcebergProfilingConfig(ConfigModel):
enabled: bool = Field(
@ -50,32 +53,14 @@ class IcebergProfilingConfig(ConfigModel):
# include_field_sample_values: bool = True
class IcebergCatalogConfig(ConfigModel):
"""
Iceberg catalog config.
https://py.iceberg.apache.org/configuration/
"""
name: str = Field(
default="default",
description="Name of catalog",
)
type: str = Field(
description="Type of catalog. See [PyIceberg](https://py.iceberg.apache.org/configuration/) for list of possible values.",
)
config: Dict[str, str] = Field(
description="Catalog specific configuration. See [PyIceberg documentation](https://py.iceberg.apache.org/configuration/) for details.",
)
class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
# Override the stateful_ingestion config param with the Iceberg custom stateful ingestion config in the IcebergSourceConfig
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Iceberg Stateful Ingestion Config."
)
catalog: IcebergCatalogConfig = Field(
description="Catalog configuration where to find Iceberg tables. See [pyiceberg's catalog configuration details](https://py.iceberg.apache.org/configuration/).",
# The catalog configuration is using a dictionary to be open and flexible. All the keys and values are handled by pyiceberg. This will future-proof any configuration change done by pyiceberg.
catalog: Dict[str, Dict[str, str]] = Field(
description="Catalog configuration where to find Iceberg tables. Only one catalog specification is supported. The format is the same as [pyiceberg's catalog configuration](https://py.iceberg.apache.org/configuration/), where the catalog name is specified as the object name and attributes are set as key-value pairs.",
)
table_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
@ -91,6 +76,45 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin)
)
profiling: IcebergProfilingConfig = IcebergProfilingConfig()
@validator("catalog", pre=True, always=True)
def handle_deprecated_catalog_format(cls, value):
# Once support for deprecated format is dropped, we can remove this validator.
if (
isinstance(value, dict)
and "name" in value
and "type" in value
and "config" in value
):
# This looks like the deprecated format
logger.warning(
"The catalog configuration format you are using is deprecated and will be removed in a future version. Please update to the new format.",
)
catalog_name = value["name"]
catalog_type = value["type"]
catalog_config = value["config"]
new_catalog_config = {
catalog_name: {"type": catalog_type, **catalog_config}
}
return new_catalog_config
# In case the input is already the new format or is invalid
return value
@validator("catalog")
def validate_catalog_size(cls, value):
if len(value) != 1:
raise ValueError("The catalog must contain exactly one entry.")
# Retrieve the dict associated with the one catalog entry
catalog_name, catalog_config = next(iter(value.items()))
# Check if that dict is not empty
if not catalog_config or not isinstance(catalog_config, dict):
raise ValueError(
f"The catalog configuration for '{catalog_name}' must not be empty and should be a dictionary with at least one key-value pair."
)
return value
def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
@ -102,9 +126,12 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin)
Returns:
Catalog: Iceberg catalog instance.
"""
return load_catalog(
name=self.catalog.name, **{"type": self.catalog.type, **self.catalog.config}
)
if not self.catalog:
raise ValueError("No catalog configuration found")
# Retrieve the dict associated with the one catalog entry
catalog_name, catalog_config = next(iter(self.catalog.items()))
return load_catalog(name=catalog_name, **catalog_config)
@dataclass

View File

@ -1,184 +1,189 @@
[
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.another_taxis,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "root",
"created-at": "2023-07-04T14:23:10.457317300Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/another_taxis",
"format-version": "1",
"snapshot-id": "6904764113937987369",
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-6904764113937987369-1-f18ce54a-d59c-461a-a066-9d3085ccf2f2.avro"
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.another_taxis,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "root",
"created-at": "2024-05-22T14:09:15.234903700Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/another_taxis",
"format-version": "1",
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
"snapshot-id": "1706020810864905360",
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-1706020810864905360-1-90ad8346-ac1b-4e73-bb30-dfd9b0b0e0dc.avro"
},
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:root",
"type": "TECHNICAL_OWNER"
},
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:root",
"type": "TECHNICAL_OWNER"
},
{
"owner": "urn:li:corpGroup:root",
"type": "TECHNICAL_OWNER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
{
"owner": "urn:li:corpGroup:root",
"type": "TECHNICAL_OWNER"
}
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "nyc.another_taxis",
"platform": "urn:li:dataPlatform:iceberg",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": "table {\n 1: vendor_id: optional long\n 2: trip_date: optional timestamptz\n 3: trip_id: optional long\n 4: trip_distance: optional float\n 5: fare_amount: optional double\n 6: store_and_fwd_flag: optional string\n}"
}
},
"fields": [
{
"fieldPath": "[version=2.0].[type=struct].[type=long].vendor_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_date",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamptz",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"logicalType\": \"timestamp-micros\", \"native_data_type\": \"timestamptz\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=float].trip_distance",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "float",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"float\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=double].fare_amount",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "double",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=string].store_and_fwd_flag",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
]
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00"
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "nyc.another_taxis",
"platform": "urn:li:dataPlatform:iceberg",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": "table {\n 1: vendor_id: optional long\n 2: trip_date: optional timestamptz\n 3: trip_id: optional long\n 4: trip_distance: optional float\n 5: fare_amount: optional double\n 6: store_and_fwd_flag: optional string\n}"
}
},
"fields": [
{
"fieldPath": "[version=2.0].[type=struct].[type=long].vendor_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_date",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamptz",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"logicalType\": \"timestamp-micros\", \"native_data_type\": \"timestamptz\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=float].trip_distance",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "float",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"float\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=double].fare_amount",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "double",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=string].store_and_fwd_flag",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
]
}
}
]
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.another_taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:iceberg",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:iceberg,test_platform_instance)"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": true
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00"
}
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
]
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.another_taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:iceberg",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:iceberg,test_platform_instance)"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": true
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -1,153 +1,156 @@
[
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "root",
"created-at": "2023-06-12T17:32:17.227545005Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/taxis",
"format-version": "1",
"snapshot-id": "2505818429184337337",
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-2505818429184337337-1-a64915c4-afc8-40e3-97a7-98b072b42e10.avro"
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "root",
"created-at": "2024-05-22T14:08:04.001538500Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/taxis",
"format-version": "1",
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
"snapshot-id": "5259199139271057622",
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-5259199139271057622-1-24dca7b8-d437-458e-ae91-df1d3e30bdc8.avro"
},
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:root",
"type": "TECHNICAL_OWNER"
},
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:root",
"type": "TECHNICAL_OWNER"
},
{
"owner": "urn:li:corpGroup:root",
"type": "TECHNICAL_OWNER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
{
"owner": "urn:li:corpGroup:root",
"type": "TECHNICAL_OWNER"
}
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "nyc.taxis",
"platform": "urn:li:dataPlatform:iceberg",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": "table {\n 1: vendor_id: optional long\n 2: trip_date: optional timestamptz\n 3: trip_id: optional long\n 4: trip_distance: optional float\n 5: fare_amount: optional double\n 6: store_and_fwd_flag: optional string\n}"
}
},
"fields": [
{
"fieldPath": "[version=2.0].[type=struct].[type=long].vendor_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_date",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamptz",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"logicalType\": \"timestamp-micros\", \"native_data_type\": \"timestamptz\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=float].trip_distance",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "float",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"float\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=double].fare_amount",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "double",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=string].store_and_fwd_flag",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
]
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test"
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "nyc.taxis",
"platform": "urn:li:dataPlatform:iceberg",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": "table {\n 1: vendor_id: optional long\n 2: trip_date: optional timestamptz\n 3: trip_id: optional long\n 4: trip_distance: optional float\n 5: fare_amount: optional double\n 6: store_and_fwd_flag: optional string\n}"
}
},
"fields": [
{
"fieldPath": "[version=2.0].[type=struct].[type=long].vendor_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_date",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamptz",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"logicalType\": \"timestamp-micros\", \"native_data_type\": \"timestamptz\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=float].trip_distance",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "float",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"float\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=double].fare_amount",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "double",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=string].store_and_fwd_flag",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
]
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
]
}
]

View File

@ -1,216 +1,220 @@
[
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "root",
"created-at": "2023-06-12T17:33:25.422993540Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/taxis",
"format-version": "1",
"snapshot-id": "2585047006374307840",
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-2585047006374307840-1-2e2bef19-40d1-4ad1-8fad-e57783477710.avro"
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "root",
"created-at": "2024-05-22T14:10:22.926080700Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/taxis",
"format-version": "1",
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
"snapshot-id": "564034874306625146",
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-564034874306625146-1-562a1705-d774-4e0a-baf0-1988bcc7be72.avro"
},
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:root",
"type": "TECHNICAL_OWNER"
},
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:root",
"type": "TECHNICAL_OWNER"
},
{
"owner": "urn:li:corpGroup:root",
"type": "TECHNICAL_OWNER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
{
"owner": "urn:li:corpGroup:root",
"type": "TECHNICAL_OWNER"
}
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "nyc.taxis",
"platform": "urn:li:dataPlatform:iceberg",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": "table {\n 1: vendor_id: optional long\n 2: trip_date: optional timestamptz\n 3: trip_id: optional long\n 4: trip_distance: optional float\n 5: fare_amount: optional double\n 6: store_and_fwd_flag: optional string\n}"
}
},
"fields": [
{
"fieldPath": "[version=2.0].[type=struct].[type=long].vendor_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_date",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamptz",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"logicalType\": \"timestamp-micros\", \"native_data_type\": \"timestamptz\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=float].trip_distance",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "float",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"float\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=double].fare_amount",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "double",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=string].store_and_fwd_flag",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
]
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test"
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "nyc.taxis",
"platform": "urn:li:dataPlatform:iceberg",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": "table {\n 1: vendor_id: optional long\n 2: trip_date: optional timestamptz\n 3: trip_id: optional long\n 4: trip_distance: optional float\n 5: fare_amount: optional double\n 6: store_and_fwd_flag: optional string\n}"
}
},
"fields": [
{
"fieldPath": "[version=2.0].[type=struct].[type=long].vendor_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_date",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamptz",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"logicalType\": \"timestamp-micros\", \"native_data_type\": \"timestamptz\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=long].trip_id",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "long",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"long\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=float].trip_distance",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "float",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"float\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=double].fare_amount",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "double",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"double\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=string].store_and_fwd_flag",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
]
}
}
]
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProfile",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"rowCount": 5,
"columnCount": 6,
"fieldProfiles": [
{
"fieldPath": "vendor_id",
"nullCount": 0,
"nullProportion": 0.0,
"min": "1",
"max": "3"
},
{
"fieldPath": "trip_date",
"nullCount": 0,
"nullProportion": 0.0,
"min": "2000-01-01T12:00:00+00:00",
"max": "2000-01-04T12:00:00+00:00"
},
{
"fieldPath": "trip_id",
"nullCount": 0,
"nullProportion": 0.0,
"min": "1000371",
"max": "1000375"
},
{
"fieldPath": "trip_distance",
"nullCount": 0,
"nullProportion": 0.0,
"min": "0.0",
"max": "8.399999618530273"
},
{
"fieldPath": "fare_amount",
"nullCount": 0,
"nullProportion": 0.0,
"min": "0.0",
"max": "42.13"
},
{
"fieldPath": "store_and_fwd_flag",
"nullCount": 0,
"nullProportion": 0.0
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test"
}
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
]
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProfile",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"rowCount": 5,
"columnCount": 6,
"fieldProfiles": [
{
"fieldPath": "vendor_id",
"nullCount": 0,
"nullProportion": 0.0,
"min": "1",
"max": "3"
},
{
"fieldPath": "trip_date",
"nullCount": 0,
"nullProportion": 0.0,
"min": "2000-01-01T12:00:00+00:00",
"max": "2000-01-04T12:00:00+00:00"
},
{
"fieldPath": "trip_id",
"nullCount": 0,
"nullProportion": 0.0,
"min": "1000371",
"max": "1000375"
},
{
"fieldPath": "trip_distance",
"nullCount": 0,
"nullProportion": 0.0,
"min": "0.0",
"max": "8.399999618530273"
},
{
"fieldPath": "fare_amount",
"nullCount": 0,
"nullProportion": 0.0,
"min": "0.0",
"max": "42.13"
},
{
"fieldPath": "store_and_fwd_flag",
"nullCount": 0,
"nullProportion": 0.0
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -4,9 +4,8 @@ source:
type: iceberg
config:
catalog:
name: default
type: rest
config:
default:
type: rest
uri: http://localhost:8181
s3.access-key-id: admin
s3.secret-access-key: password

View File

@ -4,9 +4,8 @@ source:
type: iceberg
config:
catalog:
name: default
type: rest
config:
default:
type: rest
uri: http://localhost:8181
s3.access-key-id: admin
s3.secret-access-key: password

View File

@ -79,9 +79,8 @@ def test_iceberg_stateful_ingest(
"type": "iceberg",
"config": {
"catalog": {
"name": "default",
"type": "rest",
"config": {
"default": {
"type": "rest",
"uri": "http://localhost:8181",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",

View File

@ -34,7 +34,6 @@ from datahub.ingestion.source.iceberg.iceberg import (
IcebergSource,
IcebergSourceConfig,
)
from datahub.ingestion.source.iceberg.iceberg_common import IcebergCatalogConfig
from datahub.metadata.com.linkedin.pegasus2avro.schema import ArrayType, SchemaField
from datahub.metadata.schema_classes import (
ArrayTypeClass,
@ -50,9 +49,7 @@ from datahub.metadata.schema_classes import (
def with_iceberg_source() -> IcebergSource:
catalog: IcebergCatalogConfig = IcebergCatalogConfig(
name="test", type="rest", config={}
)
catalog = {"test": {"type": "rest"}}
return IcebergSource(
ctx=PipelineContext(run_id="iceberg-source-test"),
config=IcebergSourceConfig(catalog=catalog),
@ -95,14 +92,29 @@ def test_config_catalog_not_configured():
"""
Test when an Iceberg catalog is provided, but not properly configured.
"""
with pytest.raises(ValidationError):
IcebergCatalogConfig() # type: ignore
with pytest.raises(ValidationError, match="conf"):
IcebergCatalogConfig(type="a type") # type: ignore
# When no catalog configurationis provided, the config should be invalid
with pytest.raises(ValidationError, match="type"):
IcebergCatalogConfig(conf={}) # type: ignore
IcebergSourceConfig(catalog={}) # type: ignore
# When a catalog name is provided without configuration, the config should be invalid
with pytest.raises(ValidationError):
IcebergSourceConfig(catalog={"test": {}})
def test_config_deprecated_catalog_configuration():
"""
Test when a deprecated Iceberg catalog configuration is provided, it should be converted to the current scheme.
"""
deprecated_config = {
"name": "test",
"type": "rest",
"config": {"uri": "http://a.uri.test", "another_prop": "another_value"},
}
migrated_config = IcebergSourceConfig(catalog=deprecated_config)
assert migrated_config.catalog["test"] is not None
assert migrated_config.catalog["test"]["type"] == "rest"
assert migrated_config.catalog["test"]["uri"] == "http://a.uri.test"
assert migrated_config.catalog["test"]["another_prop"] == "another_value"
def test_config_for_tests():