Fix #14830: Ignore non current columns for iceberg tables for glue & athena (#22564)

This commit is contained in:
Mayur Singal 2025-07-29 16:19:09 +05:30 committed by GitHub
parent 26f99a3ac2
commit 199e3b981c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 418 additions and 10 deletions

View File

@ -115,6 +115,7 @@ class AthenaSource(ExternalTableLineageMixin, CommonDbSourceService):
) )
self.external_location_map = {} self.external_location_map = {}
self.schema_description_map = {} self.schema_description_map = {}
self.glue_client = None
def prepare(self): def prepare(self):
""" """
@ -122,8 +123,10 @@ class AthenaSource(ExternalTableLineageMixin, CommonDbSourceService):
""" """
try: try:
super().prepare() super().prepare()
glue_client = AWSClient(self.service_connection.awsConfig).get_glue_client() self.glue_client = AWSClient(
paginator = glue_client.get_paginator("get_databases") self.service_connection.awsConfig
).get_glue_client()
paginator = self.glue_client.get_paginator("get_databases")
for page in paginator.paginate(): for page in paginator.paginate():
database_page = DatabasePage(**page) database_page = DatabasePage(**page)
for database in database_page.DatabaseList or []: for database in database_page.DatabaseList or []:
@ -304,3 +307,23 @@ class AthenaSource(ExternalTableLineageMixin, CommonDbSourceService):
else: else:
description = table_info.get("text") description = table_info.get("text")
return description return description
def _get_columns_internal(
self,
schema_name: str,
table_name: str,
db_name: str,
inspector: Inspector,
table_type: TableType = None,
):
"""
Override to pass Glue client to get_columns for Iceberg table filtering
"""
# Pass the Glue client as a keyword argument to get_columns
return inspector.get_columns(
table_name,
schema_name,
table_type=table_type,
db_name=db_name,
glue_client=self.glue_client,
)

View File

@ -20,9 +20,12 @@ from sqlalchemy.engine import reflection
from metadata.ingestion.source import sqa_types from metadata.ingestion.source import sqa_types
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils.logger import utils_logger
from metadata.utils.sqlalchemy_utils import is_complex_type from metadata.utils.sqlalchemy_utils import is_complex_type
logger = utils_logger()
# pylint: disable=protected-access
@reflection.cache @reflection.cache
def _get_column_type(self, type_): def _get_column_type(self, type_):
""" """
@ -30,7 +33,7 @@ def _get_column_type(self, type_):
to add custom SQA typing. to add custom SQA typing.
""" """
type_ = type_.replace(" ", "").lower() type_ = type_.replace(" ", "").lower()
match = self._pattern_column_type.match(type_) # pylint: disable=protected-access match = self._pattern_column_type.match(type_)
if match: if match:
name = match.group(1).lower() name = match.group(1).lower()
length = match.group(2) length = match.group(2)
@ -113,18 +116,17 @@ def _get_projection_details(
return columns return columns
# pylint: disable=too-many-locals
@reflection.cache @reflection.cache
def get_columns(self, connection, table_name, schema=None, **kw): def get_columns(self, connection, table_name, schema=None, **kw):
""" """
Method to handle table columns Method to handle table columns
""" """
metadata = self._get_table( # pylint: disable=protected-access metadata = self._get_table(connection, table_name, schema=schema, **kw)
connection, table_name, schema=schema, **kw
)
columns = [ columns = [
{ {
"name": c.name, "name": c.name,
"type": self._get_column_type(c.type), # pylint: disable=protected-access "type": self._get_column_type(c.type),
"nullable": True, "nullable": True,
"default": None, "default": None,
"autoincrement": False, "autoincrement": False,
@ -147,10 +149,65 @@ def get_columns(self, connection, table_name, schema=None, **kw):
columns = _get_projection_details(columns, projection_parameters) columns = _get_projection_details(columns, projection_parameters)
return columns return columns
# Check if this is an Iceberg table
if metadata.parameters.get("table_type") == "ICEBERG":
# For Iceberg tables, get the full table metadata from Glue to access column parameters
try:
# Get the raw connection to access schema information
raw_connection = self._raw_connection(connection)
schema = schema if schema else raw_connection.schema_name
# Use the provided Glue client or create one with default credentials
glue_client = kw.get("glue_client")
# Get full table metadata from Glue
response = glue_client.get_table(DatabaseName=schema, Name=table_name)
table_info = response["Table"]
# Filter out non-current Iceberg columns
current_columns = []
storage_descriptor = table_info.get("StorageDescriptor", {})
glue_columns = storage_descriptor.get("Columns", [])
for glue_col in glue_columns:
col_name = glue_col["Name"]
col_type = glue_col["Type"]
col_comment = glue_col.get("Comment", "")
col_parameters = glue_col.get("Parameters", {})
# Check if this is a non-current Iceberg column
iceberg_current = col_parameters.get("iceberg.field.current", "true")
is_current = iceberg_current != "false"
if is_current:
current_columns.append(
{
"name": col_name,
"type": self._get_column_type(col_type),
"nullable": True,
"default": None,
"autoincrement": False,
"comment": col_comment,
"system_data_type": col_type,
"is_complex": is_complex_type(col_type),
"dialect_options": {"awsathena_partition": None},
}
)
columns += current_columns
return columns
except Exception as e:
# If we can't get Glue metadata, fall back to the original method
# This ensures backward compatibility
logger.warning(f"Error getting Glue metadata for table {table_name}: {e}")
# For non-Iceberg tables or if Glue access fails, use the original method
columns += [ columns += [
{ {
"name": c.name, "name": c.name,
"type": self._get_column_type(c.type), # pylint: disable=protected-access "type": self._get_column_type(c.type),
"nullable": True, "nullable": True,
"default": None, "default": None,
"autoincrement": False, "autoincrement": False,

View File

@ -377,12 +377,87 @@ class GlueSource(ExternalTableLineageMixin, DatabaseServiceSource):
parsed_string["description"] = column.Comment parsed_string["description"] = column.Comment
return Column(**parsed_string) return Column(**parsed_string)
# pylint: disable=too-many-locals
def get_columns(self, column_data: StorageDetails) -> Optional[Iterable[Column]]: def get_columns(self, column_data: StorageDetails) -> Optional[Iterable[Column]]:
"""
Get columns from Glue.
"""
# Check if this is an Iceberg table
table = self.context.get().table_data
is_iceberg = table.Parameters and table.Parameters.table_type == "ICEBERG"
if is_iceberg:
# For Iceberg tables, get the full table metadata from Glue to access column parameters
try:
schema_name = self.context.get().database_schema
table_name = table.Name
# Get full table metadata from Glue API
response = self.glue.get_table(
DatabaseName=schema_name, Name=table_name
)
table_info = response["Table"]
# Filter out non-current Iceberg columns
storage_descriptor = table_info.get("StorageDescriptor", {})
glue_columns = storage_descriptor.get("Columns", [])
for glue_col in glue_columns:
col_name = glue_col["Name"]
col_type = glue_col["Type"]
col_comment = glue_col.get("Comment", "")
col_parameters = glue_col.get("Parameters", {})
# Check if this is a non-current Iceberg column
iceberg_current = col_parameters.get(
"iceberg.field.current", "true"
)
is_current = iceberg_current != "false"
if is_current:
# Create a GlueColumn object for processing
column_obj = GlueColumn(
Name=col_name, Type=col_type, Comment=col_comment
)
yield self._get_column_object(column_obj)
# Process partition columns
partition_keys = table_info.get("PartitionKeys", [])
for glue_col in partition_keys:
col_name = glue_col["Name"]
col_type = glue_col["Type"]
col_comment = glue_col.get("Comment", "")
col_parameters = glue_col.get("Parameters", {})
# Check if this is a non-current Iceberg column
iceberg_current = col_parameters.get(
"iceberg.field.current", "true"
)
is_current = iceberg_current != "false"
if is_current:
# Create a GlueColumn object for processing
column_obj = GlueColumn(
Name=col_name, Type=col_type, Comment=col_comment
)
yield self._get_column_object(column_obj)
return
except Exception as e:
# If we can't get Glue metadata, fall back to the original method
# This ensures backward compatibility
logger.warning(
f"Failed to get Glue metadata for Iceberg table {table.Name}: {e}"
)
# For non-Iceberg tables or if Glue access fails, use the original method
# process table regular columns info # process table regular columns info
for column in column_data.Columns: for column in column_data.Columns:
yield self._get_column_object(column) yield self._get_column_object(column)
# process table regular columns info # process table partition columns info
for column in self.context.get().table_data.PartitionKeys: for column in self.context.get().table_data.PartitionKeys:
yield self._get_column_object(column) yield self._get_column_object(column)

View File

@ -0,0 +1,169 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test Athena Utils"""
import unittest
class TestAthenaUtils(unittest.TestCase):
"""Test Athena Utils"""
def test_iceberg_column_filtering_logic(self):
"""Test the Iceberg column filtering logic directly"""
# Create mock Glue column data (as returned by boto3)
current_column = {
"Name": "current_col",
"Type": "int",
"Comment": "Current column",
"Parameters": {"iceberg.field.current": "true"},
}
non_current_column = {
"Name": "non_current_col",
"Type": "string",
"Comment": "Non-current column",
"Parameters": {"iceberg.field.current": "false"},
}
column_without_params = {
"Name": "normal_col",
"Type": "boolean",
"Comment": "Normal column",
"Parameters": {},
}
# Test the filtering logic directly (same logic as in get_columns function)
current_columns = []
for col in [current_column, non_current_column, column_without_params]:
col_name = col["Name"]
col_type = col["Type"]
col_comment = col.get("Comment", "")
col_parameters = col.get("Parameters", {})
# Check if this is a non-current Iceberg column
iceberg_current = col_parameters.get("iceberg.field.current", "true")
is_current = iceberg_current != "false"
if is_current:
current_columns.append(col_name)
# Verify that only current columns are returned
current_column_names = current_columns
# Should include current_col and normal_col, but not non_current_col
self.assertIn("current_col", current_column_names)
self.assertIn("normal_col", current_column_names)
self.assertNotIn("non_current_col", current_column_names)
# Verify that exactly 2 columns are returned (current_col and normal_col)
self.assertEqual(len(current_columns), 2)
def test_get_columns_handles_attribute_error(self):
"""Test that get_columns handles AttributeError gracefully"""
# Create a column object that raises AttributeError when accessing parameters
class MockColumn:
def __init__(self, name, type_, comment):
self.name = name
self.type = type_
self.comment = comment
@property
def parameters(self):
raise AttributeError("parameters attribute not available")
column = MockColumn("test_col", "int", "Test column")
# Test the filtering logic with AttributeError
current_columns = []
for c in [column]:
is_current = True
try:
if hasattr(c, "parameters") and c.parameters:
iceberg_current = c.parameters.get("iceberg.field.current")
if iceberg_current == "false":
is_current = False
except (AttributeError, KeyError):
pass
if is_current:
current_columns.append(c)
# Should include the column since AttributeError is caught
self.assertEqual(len(current_columns), 1)
self.assertEqual(current_columns[0].name, "test_col")
def test_get_columns_handles_missing_parameters_attribute(self):
"""Test that get_columns handles missing parameters attribute gracefully"""
# Create a column object without parameters attribute
class MockColumn:
def __init__(self, name, type_, comment):
self.name = name
self.type = type_
self.comment = comment
column = MockColumn("test_col", "int", "Test column")
# Test the filtering logic with missing parameters attribute
current_columns = []
for c in [column]:
is_current = True
try:
if hasattr(c, "parameters") and c.parameters:
iceberg_current = c.parameters.get("iceberg.field.current")
if iceberg_current == "false":
is_current = False
except (AttributeError, KeyError):
pass
if is_current:
current_columns.append(c)
# Should include the column since parameters attribute is missing
self.assertEqual(len(current_columns), 1)
self.assertEqual(current_columns[0].name, "test_col")
def test_get_columns_handles_none_parameters(self):
"""Test that get_columns handles None parameters gracefully"""
# Create a column object with None parameters
class MockColumn:
def __init__(self, name, type_, comment, parameters=None):
self.name = name
self.type = type_
self.comment = comment
self.parameters = parameters
column = MockColumn("test_col", "int", "Test column", None)
# Test the filtering logic with None parameters
current_columns = []
for c in [column]:
is_current = True
try:
if hasattr(c, "parameters") and c.parameters:
iceberg_current = c.parameters.get("iceberg.field.current")
if iceberg_current == "false":
is_current = False
except (AttributeError, KeyError):
pass
if is_current:
current_columns.append(c)
# Should include the column since parameters is None
self.assertEqual(len(current_columns), 1)
self.assertEqual(current_columns[0].name, "test_col")
if __name__ == "__main__":
unittest.main()

View File

@ -17,7 +17,7 @@ import json
from copy import deepcopy from copy import deepcopy
from pathlib import Path from pathlib import Path
from unittest import TestCase from unittest import TestCase
from unittest.mock import patch from unittest.mock import Mock, patch
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
@ -211,3 +211,87 @@ class GlueUnitTest(TestCase):
list(map(lambda x: x.locationPath, self.get_table_requests())) list(map(lambda x: x.locationPath, self.get_table_requests()))
== EXPECTED_LOCATION_PATHS == EXPECTED_LOCATION_PATHS
) )
def test_iceberg_column_filtering_logic(self):
"""Test the Iceberg column filtering logic directly"""
# Create mock Glue column data (as returned by boto3)
current_column = {
"Name": "current_col",
"Type": "int",
"Comment": "Current column",
"Parameters": {"iceberg.field.current": "true"},
}
non_current_column = {
"Name": "non_current_col",
"Type": "string",
"Comment": "Non-current column",
"Parameters": {"iceberg.field.current": "false"},
}
column_without_params = {
"Name": "normal_col",
"Type": "boolean",
"Comment": "Normal column",
"Parameters": {},
}
# Test the filtering logic directly (same logic as in get_columns function)
current_columns = []
for col in [current_column, non_current_column, column_without_params]:
col_name = col["Name"]
col_type = col["Type"]
col_comment = col.get("Comment", "")
col_parameters = col.get("Parameters", {})
# Check if this is a non-current Iceberg column
iceberg_current = col_parameters.get("iceberg.field.current", "true")
is_current = iceberg_current != "false"
if is_current:
current_columns.append(col_name)
# Verify that only current columns are returned
current_column_names = current_columns
# Should include current_col and normal_col, but not non_current_col
self.assertIn("current_col", current_column_names)
self.assertIn("normal_col", current_column_names)
self.assertNotIn("non_current_col", current_column_names)
# Verify that exactly 2 columns are returned (current_col and normal_col)
self.assertEqual(len(current_columns), 2)
def test_iceberg_table_detection(self):
"""Test that Iceberg tables are correctly detected"""
# Test with Iceberg table
mock_iceberg_table = Mock()
mock_iceberg_table.Parameters = Mock()
mock_iceberg_table.Parameters.table_type = "ICEBERG"
# Test with non-Iceberg table
mock_regular_table = Mock()
mock_regular_table.Parameters = Mock()
mock_regular_table.Parameters.table_type = "EXTERNAL_TABLE"
# Test with table without parameters
mock_no_params_table = Mock()
mock_no_params_table.Parameters = None
# Test the detection logic
is_iceberg_1 = (
mock_iceberg_table.Parameters
and mock_iceberg_table.Parameters.table_type == "ICEBERG"
)
is_iceberg_2 = (
mock_regular_table.Parameters
and mock_regular_table.Parameters.table_type == "ICEBERG"
)
is_iceberg_3 = (
mock_no_params_table.Parameters
and mock_no_params_table.Parameters.table_type == "ICEBERG"
)
self.assertTrue(is_iceberg_1)
self.assertFalse(is_iceberg_2)
self.assertFalse(is_iceberg_3)