mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-16 05:02:59 +00:00
fix(ingestion/iceberg): Improvements to iceberg source (#11987)
This commit is contained in:
parent
ecba2244f0
commit
816fd3dba7
@ -317,5 +317,13 @@
|
|||||||
"displayName": "CassandraDB",
|
"displayName": "CassandraDB",
|
||||||
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cassandra",
|
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cassandra",
|
||||||
"recipe": "source:\n type: cassandra\n config:\n # Credentials for on prem cassandra\n contact_point: localhost\n port: 9042\n username: admin\n password: password\n\n # Or\n # Credentials Astra Cloud\n #cloud_config:\n # secure_connect_bundle: Path to Secure Connect Bundle (.zip)\n # token: Application Token\n\n # Optional Allow / Deny extraction of particular keyspaces.\n keyspace_pattern:\n allow: [.*]\n\n # Optional Allow / Deny extraction of particular tables.\n table_pattern:\n allow: [.*]"
|
"recipe": "source:\n type: cassandra\n config:\n # Credentials for on prem cassandra\n contact_point: localhost\n port: 9042\n username: admin\n password: password\n\n # Or\n # Credentials Astra Cloud\n #cloud_config:\n # secure_connect_bundle: Path to Secure Connect Bundle (.zip)\n # token: Application Token\n\n # Optional Allow / Deny extraction of particular keyspaces.\n keyspace_pattern:\n allow: [.*]\n\n # Optional Allow / Deny extraction of particular tables.\n table_pattern:\n allow: [.*]"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"urn": "urn:li:dataPlatform:iceberg",
|
||||||
|
"name": "iceberg",
|
||||||
|
"displayName": "Iceberg",
|
||||||
|
"description": "Ingest databases and tables from any Iceberg catalog implementation",
|
||||||
|
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/iceberg",
|
||||||
|
"recipe": "source:\n type: \"iceberg\"\n config:\n env: dev\n # each thread will open internet connections to fetch manifest files independently, \n # this value needs to be adjusted with ulimit\n processing_threads: 1 \n # a single catalog definition with a form of a dictionary\n catalog: \n demo: # name of the catalog\n type: \"rest\" # other types are available\n uri: \"uri\"\n s3.access-key-id: \"access-key\"\n s3.secret-access-key: \"secret-access-key\"\n s3.region: \"aws-region\"\n profiling:\n enabled: false\n"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
@ -18,6 +18,8 @@ This ingestion source maps the following Source System Concepts to DataHub Conce
|
|||||||
|
|
||||||
## Troubleshooting
|
## Troubleshooting
|
||||||
|
|
||||||
### [Common Issue]
|
### Exceptions while increasing `processing_threads`
|
||||||
|
|
||||||
[Provide description of common issues with this integration and steps to resolve]
|
Each processing thread will open several files/sockets to download manifest files from blob storage. If you experience
|
||||||
|
exceptions appearing when increasing `processing_threads` configuration parameter, try to increase limit of open
|
||||||
|
files (i.e. using `ulimit` in Linux).
|
||||||
|
@ -249,7 +249,8 @@ microsoft_common = {"msal>=1.24.0"}
|
|||||||
|
|
||||||
iceberg_common = {
|
iceberg_common = {
|
||||||
# Iceberg Python SDK
|
# Iceberg Python SDK
|
||||||
"pyiceberg>=0.4,<0.7",
|
# Kept at 0.4.0 due to higher versions requiring pydantic>2, as soon as we are fine with it, bump this dependency
|
||||||
|
"pyiceberg>=0.4.0",
|
||||||
}
|
}
|
||||||
|
|
||||||
mssql_common = {
|
mssql_common = {
|
||||||
|
@ -9,6 +9,7 @@ from pyiceberg.exceptions import (
|
|||||||
NoSuchIcebergTableError,
|
NoSuchIcebergTableError,
|
||||||
NoSuchNamespaceError,
|
NoSuchNamespaceError,
|
||||||
NoSuchPropertyException,
|
NoSuchPropertyException,
|
||||||
|
NoSuchTableError,
|
||||||
)
|
)
|
||||||
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
|
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
|
||||||
from pyiceberg.table import Table
|
from pyiceberg.table import Table
|
||||||
@ -104,7 +105,7 @@ logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(
|
|||||||
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default.")
|
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default.")
|
||||||
@capability(
|
@capability(
|
||||||
SourceCapability.OWNERSHIP,
|
SourceCapability.OWNERSHIP,
|
||||||
"Optionally enabled via configuration by specifying which Iceberg table property holds user or group ownership.",
|
"Automatically ingests ownership information from table properties based on `user_ownership_property` and `group_ownership_property`",
|
||||||
)
|
)
|
||||||
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
|
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
|
||||||
class IcebergSource(StatefulIngestionSourceBase):
|
class IcebergSource(StatefulIngestionSourceBase):
|
||||||
@ -192,9 +193,7 @@ class IcebergSource(StatefulIngestionSourceBase):
|
|||||||
table = thread_local.local_catalog.load_table(dataset_path)
|
table = thread_local.local_catalog.load_table(dataset_path)
|
||||||
time_taken = timer.elapsed_seconds()
|
time_taken = timer.elapsed_seconds()
|
||||||
self.report.report_table_load_time(time_taken)
|
self.report.report_table_load_time(time_taken)
|
||||||
LOGGER.debug(
|
LOGGER.debug(f"Loaded table: {table.name()}, time taken: {time_taken}")
|
||||||
f"Loaded table: {table.identifier}, time taken: {time_taken}"
|
|
||||||
)
|
|
||||||
yield from self._create_iceberg_workunit(dataset_name, table)
|
yield from self._create_iceberg_workunit(dataset_name, table)
|
||||||
except NoSuchPropertyException as e:
|
except NoSuchPropertyException as e:
|
||||||
self.report.report_warning(
|
self.report.report_warning(
|
||||||
@ -206,12 +205,20 @@ class IcebergSource(StatefulIngestionSourceBase):
|
|||||||
)
|
)
|
||||||
except NoSuchIcebergTableError as e:
|
except NoSuchIcebergTableError as e:
|
||||||
self.report.report_warning(
|
self.report.report_warning(
|
||||||
"no-iceberg-table",
|
"not-an-iceberg-table",
|
||||||
f"Failed to create workunit for {dataset_name}. {e}",
|
f"Failed to create workunit for {dataset_name}. {e}",
|
||||||
)
|
)
|
||||||
LOGGER.warning(
|
LOGGER.warning(
|
||||||
f"NoSuchIcebergTableError while processing table {dataset_path}, skipping it.",
|
f"NoSuchIcebergTableError while processing table {dataset_path}, skipping it.",
|
||||||
)
|
)
|
||||||
|
except NoSuchTableError as e:
|
||||||
|
self.report.report_warning(
|
||||||
|
"no-such-table",
|
||||||
|
f"Failed to create workunit for {dataset_name}. {e}",
|
||||||
|
)
|
||||||
|
LOGGER.warning(
|
||||||
|
f"NoSuchTableError while processing table {dataset_path}, skipping it.",
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.report.report_failure("general", f"Failed to create workunit: {e}")
|
self.report.report_failure("general", f"Failed to create workunit: {e}")
|
||||||
LOGGER.exception(
|
LOGGER.exception(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user