feat(ingestion/iceberg): Refactor iceberg connector (#12921)

This commit is contained in:
skrydal 2025-03-28 18:34:36 +01:00 committed by GitHub
parent d4812d7b05
commit b7bb0b056c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 708 additions and 169 deletions

View File

@ -117,6 +117,14 @@ class ContainerKey(DatahubKey):
PlatformKey = ContainerKey
class NamespaceKey(ContainerKey):
"""
For Iceberg namespaces (databases/schemas)
"""
namespace: str
class DatabaseKey(ContainerKey):
database: str

View File

@ -45,6 +45,7 @@ class DatasetContainerSubTypes(StrEnum):
GCS_BUCKET = "GCS bucket"
ABS_CONTAINER = "ABS container"
KEYSPACE = "Keyspace" # Cassandra
NAMESPACE = "Namespace" # Iceberg
class BIContainerSubTypes(StrEnum):

View File

@ -38,6 +38,7 @@ from pyiceberg.types import (
)
from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
@ -45,6 +46,7 @@ from datahub.emitter.mce_builder import (
make_user_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import NamespaceKey
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
@ -57,6 +59,10 @@ from datahub.ingestion.api.decorators import (
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.iceberg.iceberg_common import (
IcebergSourceConfig,
IcebergSourceReport,
@ -68,19 +74,22 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import (
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.common import Status, SubTypes
from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProperties
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
OtherSchema,
SchemaField,
SchemaMetadata,
)
from datahub.metadata.schema_classes import (
ContainerClass,
DataPlatformInstanceClass,
DatasetPropertiesClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
TimeStampClass,
_Aspect,
)
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
@ -119,9 +128,10 @@ class IcebergSource(StatefulIngestionSourceBase):
[pyiceberg library](https://py.iceberg.apache.org/).
"""
platform: str = "iceberg"
def __init__(self, config: IcebergSourceConfig, ctx: PipelineContext) -> None:
super().__init__(config, ctx)
self.platform: str = "iceberg"
self.report: IcebergSourceReport = IcebergSourceReport()
self.config: IcebergSourceConfig = config
@ -138,13 +148,12 @@ class IcebergSource(StatefulIngestionSourceBase):
).workunit_processor,
]
def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]:
def _get_namespaces(self, catalog: Catalog) -> Iterable[Identifier]:
namespaces = catalog.list_namespaces()
LOGGER.debug(
f"Retrieved {len(namespaces)} namespaces, first 10: {namespaces[:10]}"
)
self.report.report_no_listed_namespaces(len(namespaces))
tables_count = 0
for namespace in namespaces:
namespace_repr = ".".join(namespace)
if not self.config.namespace_pattern.allowed(namespace_repr):
@ -153,6 +162,14 @@ class IcebergSource(StatefulIngestionSourceBase):
)
self.report.report_dropped(f"{namespace_repr}.*")
continue
yield namespace
def _get_datasets(
self, catalog: Catalog, namespaces: Iterable[Tuple[Identifier, str]]
) -> Iterable[Tuple[Identifier, str]]:
LOGGER.debug("Starting to retrieve tables")
tables_count = 0
for namespace, namespace_urn in namespaces:
try:
tables = catalog.list_tables(namespace)
tables_count += len(tables)
@ -162,29 +179,27 @@ class IcebergSource(StatefulIngestionSourceBase):
self.report.report_listed_tables_for_namespace(
".".join(namespace), len(tables)
)
yield from tables
except NoSuchNamespaceError:
self.report.report_warning(
"no-such-namespace",
f"Couldn't list tables for namespace {namespace} due to NoSuchNamespaceError exception",
)
LOGGER.warning(
f"NoSuchNamespaceError exception while trying to get list of tables from namespace {namespace}, skipping it",
yield from [(table, namespace_urn) for table in tables]
except NoSuchNamespaceError as e:
self.report.warning(
title="No such namespace",
message="Skipping the missing namespace.",
context=str(namespace),
exc=e,
)
except Exception as e:
self.report.report_failure(
"listing-tables-exception",
f"Couldn't list tables for namespace {namespace} due to {e}",
)
LOGGER.exception(
f"Unexpected exception while trying to get list of tables for namespace {namespace}, skipping it"
title="Error when processing a namespace",
message="Skipping the namespace due to errors while processing it.",
context=str(namespace),
exc=e,
)
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
thread_local = threading.local()
def _try_processing_dataset(
dataset_path: Tuple[str, ...], dataset_name: str
dataset_path: Tuple[str, ...], dataset_name: str, namespace_urn: str
) -> Iterable[MetadataWorkUnit]:
try:
if not hasattr(thread_local, "local_catalog"):
@ -200,56 +215,66 @@ class IcebergSource(StatefulIngestionSourceBase):
time_taken, dataset_name, table.metadata_location
)
LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}")
yield from self._create_iceberg_workunit(dataset_name, table)
except NoSuchPropertyException as e:
self.report.report_warning(
"table-property-missing",
f"Failed to create workunit for {dataset_name}. {e}",
dataset_urn: str = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
LOGGER.warning(
f"NoSuchPropertyException while processing table {dataset_path}, skipping it.",
for aspect in self._create_iceberg_table_aspects(
dataset_name, table, namespace_urn
):
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=aspect
).as_workunit()
except NoSuchPropertyException as e:
self.report.warning(
title="Unable to process table",
message="Table was not processed due to expected property missing (table is probably not an iceberg table).",
context=dataset_name,
exc=e,
)
except NoSuchIcebergTableError as e:
self.report.report_warning(
"not-an-iceberg-table",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchIcebergTableError while processing table {dataset_path}, skipping it.",
self.report.warning(
title="Skipped non-iceberg table",
message="Table was recognized as non-iceberg and skipped.",
context=dataset_name,
exc=e,
)
except NoSuchTableError as e:
self.report.report_warning(
"no-such-table",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchTableError while processing table {dataset_path}, skipping it.",
self.report.warning(
title="Table not found",
message="Table was returned by the catalog in the list of table but catalog can't find its details, table was skipped.",
context=dataset_name,
exc=e,
)
except FileNotFoundError as e:
self.report.report_warning(
"file-not-found",
f"Encountered FileNotFoundError when trying to read manifest file for {dataset_name}. {e}",
)
LOGGER.warning(
f"FileNotFoundError while processing table {dataset_path}, skipping it."
self.report.warning(
title="Manifest file not found",
message="Couldn't find manifest file to read for the table, skipping it.",
context=dataset_name,
exc=e,
)
except ServerError as e:
self.report.report_warning(
"iceberg-rest-server-error",
f"Iceberg Rest Catalog returned 500 status due to an unhandled exception for {dataset_name}. Exception: {e}",
)
LOGGER.warning(
f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it."
self.report.warning(
title="Iceberg REST Server Error",
message="Iceberg returned 500 HTTP status when trying to process a table, skipping it.",
context=dataset_name,
exc=e,
)
except ValueError as e:
if "Could not initialize FileIO" not in str(e):
raise
self.report.warning(
"Could not initialize FileIO",
f"Could not initialize FileIO for {dataset_path} due to: {e}",
title="Could not initialize FileIO",
message="Could not initialize FileIO for a table (are you using custom FileIO?). Skipping the table.",
context=dataset_name,
exc=e,
)
def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
def _process_dataset(
dataset_path: Identifier, namespace_urn: str
) -> Iterable[MetadataWorkUnit]:
try:
LOGGER.debug(f"Processing dataset for path {dataset_path}")
dataset_name = ".".join(dataset_path)
@ -261,66 +286,88 @@ class IcebergSource(StatefulIngestionSourceBase):
)
return
yield from _try_processing_dataset(dataset_path, dataset_name)
yield from _try_processing_dataset(
dataset_path, dataset_name, namespace_urn
)
except Exception as e:
self.report.report_failure(
"general",
f"Failed to create workunit for dataset {dataset_path}: {e}",
)
LOGGER.exception(
f"Exception while processing table {dataset_path}, skipping it.",
title="Error when processing a table",
message="Skipping the table due to errors when processing it.",
context=str(dataset_path),
exc=e,
)
try:
catalog = self.config.get_catalog()
except Exception as e:
self.report.report_failure("get-catalog", f"Failed to get catalog: {e}")
self.report.report_failure(
title="Failed to initialize catalog object",
message="Couldn't start the ingestion due to failure to initialize catalog object.",
exc=e,
)
return
try:
namespace_ids = self._get_namespaces(catalog)
namespaces: List[Tuple[Identifier, str]] = []
for namespace in namespace_ids:
namespace_repr = ".".join(namespace)
LOGGER.debug(f"Processing namespace {namespace_repr}")
namespace_urn = make_container_urn(
NamespaceKey(
namespace=namespace_repr,
platform=self.platform,
instance=self.config.platform_instance,
env=self.config.env,
)
)
namespaces.append((namespace, namespace_urn))
for aspect in self._create_iceberg_namespace_aspects(namespace):
yield MetadataChangeProposalWrapper(
entityUrn=namespace_urn, aspect=aspect
).as_workunit()
LOGGER.debug("Namespaces ingestion completed")
except Exception as e:
self.report.report_failure(
title="Failed to list namespaces",
message="Couldn't start the ingestion due to a failure to process the list of the namespaces",
exc=e,
)
return
for wu in ThreadedIteratorExecutor.process(
worker_func=_process_dataset,
args_list=[(dataset_path,) for dataset_path in self._get_datasets(catalog)],
args_list=[
(dataset_path, namespace_urn)
for dataset_path, namespace_urn in self._get_datasets(
catalog, namespaces
)
],
max_workers=self.config.processing_threads,
):
yield wu
def _create_iceberg_workunit(
self, dataset_name: str, table: Table
) -> Iterable[MetadataWorkUnit]:
def _create_iceberg_table_aspects(
self, dataset_name: str, table: Table, namespace_urn: str
) -> Iterable[_Aspect]:
with PerfTimer() as timer:
self.report.report_table_scanned(dataset_name)
LOGGER.debug(f"Processing table {dataset_name}")
dataset_urn: str = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=Status(removed=False)
).as_workunit()
yield Status(removed=False)
yield SubTypes(typeNames=[DatasetSubTypes.TABLE])
dataset_properties = self._get_dataset_properties_aspect(
dataset_name, table
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=dataset_properties
).as_workunit()
yield self._get_dataset_properties_aspect(dataset_name, table)
dataset_ownership = self._get_ownership_aspect(table)
if dataset_ownership:
LOGGER.debug(
f"Adding ownership: {dataset_ownership} to the dataset {dataset_name}"
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=dataset_ownership
).as_workunit()
yield dataset_ownership
schema_metadata = self._create_schema_metadata(dataset_name, table)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=schema_metadata
).as_workunit()
yield self._get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
yield self._create_schema_metadata(dataset_name, table)
yield self._get_dataplatform_instance_aspect()
yield ContainerClass(container=str(namespace_urn))
self.report.report_table_processing_time(
timer.elapsed_seconds(), dataset_name, table.metadata_location
@ -328,7 +375,7 @@ class IcebergSource(StatefulIngestionSourceBase):
if self.config.is_profiling_enabled():
profiler = IcebergProfiler(self.report, self.config.profiling)
yield from profiler.profile_table(dataset_name, dataset_urn, table)
yield from profiler.profile_table(dataset_name, table)
def _get_partition_aspect(self, table: Table) -> Optional[str]:
"""Extracts partition information from the provided table and returns a JSON array representing the [partition spec](https://iceberg.apache.org/spec/?#partition-specs) of the table.
@ -367,9 +414,11 @@ class IcebergSource(StatefulIngestionSourceBase):
]
)
except Exception as e:
self.report.report_warning(
"extract-partition",
f"Failed to extract partition spec from Iceberg table {table.name()} due to error: {str(e)}",
self.report.warning(
title="Failed to extract partition information",
message="Failed to extract partition information for a table. Table metadata will be ingested without it.",
context=str(table.name),
exc=e,
)
return None
@ -435,18 +484,15 @@ class IcebergSource(StatefulIngestionSourceBase):
)
return OwnershipClass(owners=owners) if owners else None
def _get_dataplatform_instance_aspect(self, dataset_urn: str) -> MetadataWorkUnit:
return MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
)
if self.config.platform_instance
else None,
),
).as_workunit()
def _get_dataplatform_instance_aspect(self) -> DataPlatformInstanceClass:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
)
if self.config.platform_instance
else None,
)
def _create_schema_metadata(
self, dataset_name: str, table: Table
@ -475,6 +521,17 @@ class IcebergSource(StatefulIngestionSourceBase):
def get_report(self) -> SourceReport:
return self.report
def _create_iceberg_namespace_aspects(
self, namespace: Identifier
) -> Iterable[_Aspect]:
namespace_repr = ".".join(namespace)
yield Status(removed=False)
yield ContainerProperties(
name=namespace_repr, qualifiedName=namespace_repr, env=self.config.env
)
yield SubTypes(typeNames=[DatasetContainerSubTypes.NAMESPACE])
yield self._get_dataplatform_instance_aspect()
class ToAvroSchemaIcebergVisitor(SchemaVisitorPerPrimitiveType[Dict[str, Any]]):
"""Implementation of a visitor to build an Avro schema as a dictionary from an Iceberg schema."""

View File

@ -1,5 +1,5 @@
import logging
from typing import Any, Callable, Dict, Iterable, Union, cast
from typing import Any, Callable, Dict, Iterable, Optional, cast
from pyiceberg.conversions import from_bytes
from pyiceberg.schema import Schema
@ -24,8 +24,6 @@ from pyiceberg.utils.datetime import (
)
from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.iceberg.iceberg_common import (
IcebergProfilingConfig,
IcebergSourceReport,
@ -33,6 +31,7 @@ from datahub.ingestion.source.iceberg.iceberg_common import (
from datahub.metadata.schema_classes import (
DatasetFieldProfileClass,
DatasetProfileClass,
_Aspect,
)
from datahub.utilities.perf_timer import PerfTimer
@ -86,9 +85,8 @@ class IcebergProfiler:
def profile_table(
self,
dataset_name: str,
dataset_urn: str,
table: Table,
) -> Iterable[MetadataWorkUnit]:
) -> Iterable[_Aspect]:
"""This method will profile the supplied Iceberg table by looking at the table's manifest.
The overall profile of the table is aggregated from the individual manifest files.
@ -167,11 +165,11 @@ class IcebergProfiler:
)
total_count += data_file.record_count
except Exception as e:
# Catch any errors that arise from attempting to read the Iceberg table's manifests
# This will prevent stateful ingestion from being blocked by an error (profiling is not critical)
self.report.report_warning(
"profiling",
f"Error while profiling dataset {dataset_name}: {e}",
self.report.warning(
title="Error when profiling a table",
message="Skipping profiling of the table due to errors",
context=dataset_name,
exc=e,
)
if row_count:
# Iterating through fieldPaths introduces unwanted stats for list element fields...
@ -211,14 +209,11 @@ class IcebergProfiler:
f"Finished profiling of dataset: {dataset_name} in {time_taken}"
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=dataset_profile,
).as_workunit()
yield dataset_profile
def _render_value(
self, dataset_name: str, value_type: IcebergType, value: Any
) -> Union[str, None]:
) -> Optional[str]:
try:
if isinstance(value_type, TimestampType):
return to_human_timestamp(value)
@ -230,9 +225,17 @@ class IcebergProfiler:
return to_human_time(value)
return str(value)
except Exception as e:
self.report.report_warning(
"profiling",
f"Error in dataset {dataset_name} when profiling a {value_type} field with value {value}: {e}",
self.report.warning(
title="Couldn't render value when profiling a table",
message="Encountered error, when trying to redner a value for table profile.",
context=str(
{
"value": value,
"value_type": value_type,
"dataset_name": dataset_name,
}
),
exc=e,
)
return None

View File

@ -1,4 +1,41 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:76a48cd3253f4ce45b6e65bfd4e96d5d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00-mbclry",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:76a48cd3253f4ce45b6e65bfd4e96d5d",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "nyc",
"qualifiedName": "nyc",
"env": "PROD"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00-mbclry",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.another_taxis,PROD)",
@ -51,6 +88,65 @@
"pipelineName": "test_pipeline"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:76a48cd3253f4ce45b6e65bfd4e96d5d",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:iceberg,test_platform_instance)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:iceberg,test_platform_instance)"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00-mbclry",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:76a48cd3253f4ce45b6e65bfd4e96d5d",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:iceberg",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:iceberg,test_platform_instance)"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00-mbclry",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:76a48cd3253f4ce45b6e65bfd4e96d5d",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Namespace"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00-mbclry",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.another_taxis,PROD)",
@ -229,5 +325,67 @@
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.another_taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00-mbclry",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.another_taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:76a48cd3253f4ce45b6e65bfd4e96d5d"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00-mbclry",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,test_platform_instance.nyc.another_taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:iceberg,test_platform_instance)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:iceberg,test_platform_instance)"
},
{
"id": "urn:li:container:76a48cd3253f4ce45b6e65bfd4e96d5d",
"urn": "urn:li:container:76a48cd3253f4ce45b6e65bfd4e96d5d"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00-mbclry",
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
}
]

View File

@ -1,4 +1,39 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "nyc",
"qualifiedName": "nyc",
"env": "PROD"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
@ -49,6 +84,56 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:iceberg"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Namespace"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
@ -206,5 +291,60 @@
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:25384e01fcde1063114925e9661766e7"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:25384e01fcde1063114925e9661766e7",
"urn": "urn:li:container:25384e01fcde1063114925e9661766e7"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -1,4 +1,39 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {},
"name": "nyc",
"qualifiedName": "nyc",
"env": "PROD"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
@ -49,6 +84,56 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:iceberg"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:25384e01fcde1063114925e9661766e7",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Namespace"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
@ -270,5 +355,60 @@
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:25384e01fcde1063114925e9661766e7"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:iceberg,nyc.taxis,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:container:25384e01fcde1063114925e9661766e7",
"urn": "urn:li:container:25384e01fcde1063114925e9661766e7"
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-test",
"lastRunId": "no-run-id-provided"
}
}
]

View File

@ -63,6 +63,9 @@ from datahub.metadata.schema_classes import (
TimeTypeClass,
)
MCPS_PER_TABLE = 6 # assuming no profiling
MCPS_PER_NAMESPACE = 4
def with_iceberg_source(processing_threads: int = 1, **kwargs: Any) -> IcebergSource:
catalog = {"test": {"type": "rest"}}
@ -573,9 +576,11 @@ def test_exception_while_listing_namespaces() -> None:
mock_catalog = MockCatalogExceptionListingNamespaces({})
with patch(
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
) as get_catalog, pytest.raises(Exception, match="Test exception"):
) as get_catalog:
get_catalog.return_value = mock_catalog
[*source.get_workunits_internal()]
wus = [*source.get_workunits_internal()]
assert len(wus) == 0
assert source.report.failures.total_elements == 1
def test_known_exception_while_listing_tables() -> None:
@ -658,21 +663,28 @@ def test_known_exception_while_listing_tables() -> None:
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 5 * 4 # ingested 5 tables (4 MCPs each), despite exception
# ingested 5 tables (6 MCPs each) and 5 namespaces (4 MCPs each), despite exception
expected_wu_urns = [
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)",
] * MCPS_PER_TABLE + [
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
"urn:li:container:9cb5e87ec392b231720f23bf00d6f6aa",
"urn:li:container:74727446a56420d80ff3b1abf2a18087",
"urn:li:container:3f9a24213cca64ab22e409d1b9a94789",
"urn:li:container:38a0583b0305ec5066cb708199f6848c",
] * MCPS_PER_NAMESPACE
assert len(wu) == len(expected_wu_urns)
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
urns.append(unit.metadata.entityUrn)
TestCase().assertCountEqual(
urns,
[
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)",
]
* 4,
expected_wu_urns,
)
assert source.report.warnings.total_elements == 1
assert source.report.failures.total_elements == 0
@ -759,21 +771,28 @@ def test_unknown_exception_while_listing_tables() -> None:
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 5 * 4 # ingested 5 tables (4 MCPs each), despite exception
# ingested 5 tables (6 MCPs each) and 5 namespaces (4 MCPs each), despite exception
expected_wu_urns = [
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)",
] * MCPS_PER_TABLE + [
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
"urn:li:container:be99158f9640329f4394315e1d8dacf3",
"urn:li:container:74727446a56420d80ff3b1abf2a18087",
"urn:li:container:3f9a24213cca64ab22e409d1b9a94789",
"urn:li:container:38a0583b0305ec5066cb708199f6848c",
] * MCPS_PER_NAMESPACE
assert len(wu) == len(expected_wu_urns)
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
urns.append(unit.metadata.entityUrn)
TestCase().assertCountEqual(
urns,
[
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceB.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceC.table4,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceD.table5,PROD)",
]
* 4,
expected_wu_urns,
)
assert source.report.warnings.total_elements == 0
assert source.report.failures.total_elements == 1
@ -806,17 +825,21 @@ def test_proper_run_with_multiple_namespaces() -> None:
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 1 * 4 # only one table processed (4 MCPs)
# ingested 1 table (6 MCPs) and 2 namespaces (4 MCPs each), despite exception
expected_wu_urns = [
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
] * MCPS_PER_TABLE + [
"urn:li:container:74727446a56420d80ff3b1abf2a18087",
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
] * MCPS_PER_NAMESPACE
assert len(wu) == len(expected_wu_urns)
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
urns.append(unit.metadata.entityUrn)
TestCase().assertCountEqual(
urns,
[
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
]
* 4,
expected_wu_urns,
)
@ -937,18 +960,21 @@ def test_filtering() -> None:
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 2 * 4
# ingested 2 tables (6 MCPs each) and 1 namespace (4 MCPs)
expected_wu_urns = [
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.table_xyz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.JKLtable,PROD)",
] * MCPS_PER_TABLE + [
"urn:li:container:075fc8fdac17b0eb3482e73052e875f1",
] * MCPS_PER_NAMESPACE
assert len(wu) == len(expected_wu_urns)
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
urns.append(unit.metadata.entityUrn)
TestCase().assertCountEqual(
urns,
[
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.table_xyz,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.JKLtable,PROD)",
]
* 4,
expected_wu_urns,
)
assert source.report.tables_scanned == 2
@ -1039,20 +1065,23 @@ def test_handle_expected_exceptions() -> None:
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 4 * 4
# ingested 4 tables (6 MCPs each) and 1 namespace (4 MCPs)
expected_wu_urns = [
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
] * MCPS_PER_TABLE + [
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
] * MCPS_PER_NAMESPACE
assert len(wu) == len(expected_wu_urns)
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
urns.append(unit.metadata.entityUrn)
TestCase().assertCountEqual(
urns,
[
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
]
* 4,
expected_wu_urns,
)
assert source.report.warnings.total_elements == 6
assert source.report.failures.total_elements == 0
@ -1129,20 +1158,23 @@ def test_handle_unexpected_exceptions() -> None:
) as get_catalog:
get_catalog.return_value = mock_catalog
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 4 * 4
# ingested 4 tables (6 MCPs each) and 1 namespace (4 MCPs)
expected_wu_urns = [
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
] * MCPS_PER_TABLE + [
"urn:li:container:390e031441265aae5b7b7ae8d51b0c1f",
] * MCPS_PER_NAMESPACE
# assert len(wu) == len(expected_wu_urns)
urns = []
for unit in wu:
assert isinstance(unit.metadata, MetadataChangeProposalWrapper)
urns.append(unit.metadata.entityUrn)
TestCase().assertCountEqual(
urns,
[
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table1,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table2,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table3,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
]
* 4,
expected_wu_urns,
)
assert source.report.warnings.total_elements == 0
assert source.report.failures.total_elements == 1
@ -1152,7 +1184,7 @@ def test_handle_unexpected_exceptions() -> None:
TestCase().assertCountEqual(
failures[0].context,
[
"Failed to create workunit for dataset ('namespaceA', 'table6'): Other value exception",
"Failed to create workunit for dataset ('namespaceA', 'table5'): ",
"('namespaceA', 'table6') <class 'ValueError'>: Other value exception",
"('namespaceA', 'table5') <class 'Exception'>: ",
],
)