Fixes 16595: Parse Iceberg REST table FQN from identifier (#16596)

This commit is contained in:
Matt Chamberlin 2024-06-11 05:37:57 -04:00 committed by GitHub
parent 070bd2570e
commit 5236950b6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 8 additions and 11 deletions

View File

@ -37,7 +37,7 @@ class IcebergRestCatalog(IcebergCatalogBase):
"'connection' is not an instance of 'RestCatalogConnection'" "'connection' is not an instance of 'RestCatalogConnection'"
) )
if catalog.connection.credential: if catalog.connection.credential and catalog.connection.credential.clientSecret:
client_id = catalog.connection.credential.clientId.get_secret_value() client_id = catalog.connection.credential.clientId.get_secret_value()
client_secret = ( client_secret = (
catalog.connection.credential.clientSecret.get_secret_value() catalog.connection.credential.clientSecret.get_secret_value()

View File

@ -37,14 +37,14 @@ def namespace_to_str(namespace: tuple[str]) -> str:
return ".".join(namespace) return ".".join(namespace)
def get_table_name_as_str(table: pyiceberg.table.Table) -> str: def get_table_name_as_str(table: Tuple[str]) -> str:
"""Returns the Table Name as Tring from a PyIceberg Table. """Returns the Table Name as String from a PyIceberg table identifier tuple.
The PyIceberg table name is returned as tuple and we turn them into a String The PyIceberg table name is returned as tuple and we turn them into a String
concatenating the items with a '.' in between. concatenating the items with a '.' in between.
""" """
# We are skipping the first item because it is the schema name. # We are skipping the first item because it is the schema name.
return ".".join(table.name()[1:]) return ".".join(table[1:])
def get_column_from_partition( def get_column_from_partition(
@ -94,7 +94,7 @@ def get_column_partition_type(
def get_owner_from_table( def get_owner_from_table(
table: pyiceberg.table.Table, property_key: str table: pyiceberg.table.Table, property_key: str
) -> Optional[str]: ) -> Optional[str]:
"""Retrives the owner information from given Table Property.""" """Retrieves the owner information from given Table Property."""
return table.properties.get(property_key) return table.properties.get(property_key)

View File

@ -181,7 +181,8 @@ class IcebergSource(DatabaseServiceSource):
for table_identifier in self.iceberg.list_tables(namespace): for table_identifier in self.iceberg.list_tables(namespace):
try: try:
table = self.iceberg.load_table(table_identifier) table = self.iceberg.load_table(table_identifier)
table_name = get_table_name_as_str(table) # extract table name from table identifier, which does not include catalog name
table_name = get_table_name_as_str(table_identifier)
table_fqn = fqn.build( table_fqn = fqn.build(
self.metadata, self.metadata,
entity_type=Table, entity_type=Table,

View File

@ -30,11 +30,7 @@
"format": "password" "format": "password"
} }
}, },
"additionalProperties": false, "additionalProperties": false
"required": [
"clientId",
"clientSecret"
]
}, },
"token": { "token": {
"title": "Token", "title": "Token",