2025-04-03 10:39:47 +05:30

865 lines
27 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test iceberg source
"""
import uuid
from copy import deepcopy
from unittest import TestCase
from unittest.mock import patch
import pyiceberg.exceptions
from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.partitioning import PartitionField
from pyiceberg.schema import Schema
from pyiceberg.table import Table as PyIcebergTable
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.transforms import IdentityTransform
from pyiceberg.types import (
BinaryType,
BooleanType,
DateType,
DecimalType,
DoubleType,
FixedType,
FloatType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
StringType,
StructType,
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.table import (
Column,
Constraint,
DataType,
PartitionColumnDetails,
PartitionIntervalTypes,
TablePartition,
TableType,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
Markdown,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.iceberg.metadata import IcebergSource
from metadata.utils import fqn
MOCK_COLUMN_MAP = {
"binary": {
"iceberg": NestedField(
field_id=1,
name="binary",
field_type=BinaryType(),
required=False,
doc="Binary",
),
"ometa": Column(
name="binary",
description="Binary",
dataType=DataType.BINARY,
dataTypeDisplay=str(BinaryType()),
dataLength=0,
),
},
"boolean": {
"iceberg": NestedField(
field_id=2,
name="boolean",
field_type=BooleanType(),
required=False,
),
"ometa": Column(
name="boolean",
dataType=DataType.BOOLEAN,
dataTypeDisplay=str(BooleanType()),
),
},
"date": {
"iceberg": NestedField(
field_id=3,
name="date",
field_type=DateType(),
required=True,
),
"ometa": Column(
name="date",
dataType=DataType.DATE,
dataTypeDisplay=str(DateType()),
constraint=Constraint.NOT_NULL,
),
},
"decimal": {
"iceberg": NestedField(
field_id=4,
name="decimal",
field_type=DecimalType(9, 3),
required=False,
),
"ometa": Column(
name="decimal",
dataType=DataType.DECIMAL,
dataTypeDisplay=str(DecimalType(9, 3)),
precision=9,
scale=3,
),
},
"double": {
"iceberg": NestedField(
field_id=5,
name="double",
field_type=DoubleType(),
required=False,
),
"ometa": Column(
name="double",
dataType=DataType.DOUBLE,
dataTypeDisplay=str(DoubleType()),
),
},
"fixed": {
"iceberg": NestedField(
field_id=6,
name="fixed",
field_type=FixedType(10),
required=False,
),
"ometa": Column(
name="fixed",
dataType=DataType.FIXED,
dataTypeDisplay=str(FixedType(10)),
),
},
"float": {
"iceberg": NestedField(
field_id=7,
name="float",
field_type=FloatType(),
required=False,
),
"ometa": Column(
name="float",
dataType=DataType.FLOAT,
dataTypeDisplay=str(FloatType()),
),
},
"integer": {
"iceberg": NestedField(
field_id=8,
name="integer",
field_type=IntegerType(),
required=False,
),
"ometa": Column(
name="integer",
dataType=DataType.INT,
dataTypeDisplay=str(IntegerType()),
),
},
"list": {
"iceberg": NestedField(
field_id=9,
name="list",
field_type=ListType(
element_id=1,
element_type=IntegerType(),
),
required=False,
),
"ometa": Column(
name="list",
dataType=DataType.ARRAY,
dataTypeDisplay=str(
ListType(
element_id=1,
element_type=IntegerType(),
)
),
arrayDataType=DataType.INT,
),
},
"long": {
"iceberg": NestedField(
field_id=10,
name="long",
field_type=LongType(),
required=False,
),
"ometa": Column(
name="long",
dataType=DataType.LONG,
dataTypeDisplay=str(LongType()),
),
},
"map": {
"iceberg": NestedField(
field_id=11,
name="map",
field_type=MapType(
key_id=1,
key_type=StringType(),
value_id=2,
value_type=IntegerType(),
),
required=False,
),
"ometa": Column(
name="map",
dataType=DataType.MAP,
dataTypeDisplay=str(
MapType(
key_id=1,
key_type=StringType(),
value_id=2,
value_type=IntegerType(),
)
),
),
},
"string": {
"iceberg": NestedField(
field_id=12,
name="string",
field_type=StringType(),
required=False,
),
"ometa": Column(
name="string",
dataType=DataType.STRING,
dataTypeDisplay=str(StringType()),
),
},
"struct": {
"iceberg": NestedField(
field_id=13,
name="struct",
field_type=StructType(
fields=(
NestedField(
field_id=100,
name="nested1",
field_type=IntegerType(),
required=False,
doc="nested #1",
),
NestedField(
field_id=101,
name="nested2",
field_type=StringType(),
required=True,
),
)
),
required=False,
),
"ometa": Column(
name="struct",
dataType=DataType.STRUCT,
dataTypeDisplay=str(
StructType(
fields=(
NestedField(
field_id=100,
name="nested1",
field_type=IntegerType(),
required=False,
doc="nested #1",
),
NestedField(
field_id=101,
name="nested2",
field_type=StringType(),
required=True,
),
)
)
),
children=[
Column(
name="nested1",
description="nested #1",
dataType=DataType.INT,
dataTypeDisplay=str(IntegerType()),
),
Column(
name="nested2",
dataType=DataType.STRING,
dataTypeDisplay=str(StringType()),
constraint=Constraint.NOT_NULL,
),
],
),
},
"time": {
"iceberg": NestedField(
field_id=14,
name="time",
field_type=TimeType(),
required=False,
),
"ometa": Column(
name="time",
dataType=DataType.TIME,
dataTypeDisplay=str(TimeType()),
),
},
"timestamp": {
"iceberg": NestedField(
field_id=15,
name="timestamp",
field_type=TimestampType(),
required=False,
),
"ometa": Column(
name="timestamp",
dataType=DataType.TIMESTAMP,
dataTypeDisplay=str(TimestampType()),
),
},
"timestamptz": {
"iceberg": NestedField(
field_id=16,
name="timestamptz",
field_type=TimestamptzType(),
required=False,
),
"ometa": Column(
name="timestamptz",
dataType=DataType.TIMESTAMPZ,
dataTypeDisplay=str(TimestamptzType()),
),
},
"uuid": {
"iceberg": NestedField(
field_id=17,
name="uuid",
field_type=UUIDType(),
required=False,
),
"ometa": Column(
name="uuid",
dataType=DataType.UUID,
dataTypeDisplay=str(UUIDType()),
),
},
}
MOCK_HIVE_CONFIG = {
"source": {
"type": "iceberg",
"serviceName": "test_iceberg",
"serviceConnection": {
"config": {
"type": "Iceberg",
"catalog": {
"name": "Batata",
"connection": {"uri": "thrift://localhost:9083"},
},
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "iceberg"},
}
},
}
MOCK_REST_CONFIG = {
"source": {
"type": "iceberg",
"serviceName": "test_iceberg",
"serviceConnection": {
"config": {
"type": "Iceberg",
"catalog": {
"name": "Batata",
"connection": {"uri": "http://localhost:8181"},
},
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "iceberg"},
}
},
}
MOCK_GLUE_CONFIG = {
"source": {
"type": "iceberg",
"serviceName": "test_iceberg",
"serviceConnection": {
"config": {
"type": "Iceberg",
"catalog": {
"name": "Batata",
"connection": {
"awsConfig": {
"awsAccessKeyId": "access_key",
"awsSecretAccessKey": "secret",
"awsRegion": "us-east-2",
"awsSessionToken": "token",
},
},
},
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "iceberg"},
}
},
}
MOCK_DYNAMO_CONFIG = {
"source": {
"type": "iceberg",
"serviceName": "test_iceberg",
"serviceConnection": {
"config": {
"type": "Iceberg",
"catalog": {
"name": "Batata",
"connection": {
"tableName": "table",
"awsConfig": {
"endPointURL": "http://example.com",
"awsAccessKeyId": "access_key",
"awsSecretAccessKey": "secret",
"awsRegion": "us-east-2",
"awsSessionToken": "token",
},
},
},
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "iceberg"},
}
},
}
MOCK_NAMESPACES = [
("namespace1",),
("namespace2", "foo"),
]
EXPECTED_NAMESPACES_NAMES = [
"namespace1",
"namespace2.foo",
]
MOCK_TABLE_LIST = [
("namespace1", "table1"),
("namespace1", "table2", "foo"),
("namespace2", "table3"),
]
EXPECTED_TABLE_LIST = [
("table1", TableType.Regular),
("table2.foo", TableType.Regular),
("table3", TableType.Regular),
]
class IcebergUnitTest(TestCase):
"""
Validate how we work with the Iceberg connector
"""
@patch(
"metadata.ingestion.source.database.database_service.DatabaseServiceSource.test_connection"
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
for config in [
MOCK_DYNAMO_CONFIG,
MOCK_GLUE_CONFIG,
MOCK_REST_CONFIG,
MOCK_HIVE_CONFIG,
]:
self.config = parse_workflow_config_gracefully(config)
self.iceberg = IcebergSource.create(
config["source"],
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
)
self.iceberg.context.get().database_service = "test_iceberg"
self.iceberg.context.get().database = "default"
self.iceberg.context.get().database_schema = "schema"
def test_create(self):
"""
An invalid config raises an error
"""
not_looker_source = {
"type": "mysql",
"serviceName": "mysql_local",
"serviceConnection": {
"config": {
"type": "Mysql",
"username": "openmetadata_user",
"authType": {"password": "openmetadata_password"},
"hostPort": "localhost:3306",
"databaseSchema": "openmetadata_db",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
}
},
}
self.assertRaises(
InvalidSourceException,
IcebergSource.create,
not_looker_source,
self.config.workflowConfig.openMetadataServerConfig,
)
def test_get_database_name(self):
"""
Asserts 'get_database_name' returns:
- databaseName when it is set
- 'default' when it is not set
"""
# Iceberg connector without databaseName set returns 'default'
self.assertEqual(next(self.iceberg.get_database_names()), "default")
# Iceberg connector with databaseName set returns the databaseName
iceberg_source_with_database_name = deepcopy(self.iceberg)
iceberg_source_with_database_name.service_connection.catalog.databaseName = (
"myDatabase"
)
self.assertEqual(
next(iceberg_source_with_database_name.get_database_names()), "myDatabase"
)
def test_yield_database(self):
"""
Assert 'yield_database' returns the proper 'CreateDatabaseRequest'
"""
database_name = "database"
expected = CreateDatabaseRequest(
name="database", service=self.iceberg.context.get().database_service
)
self.assertEqual(
next(self.iceberg.yield_database(database_name)).right, expected
)
def test_get_database_schema_names(self):
"""
Mock the Catalog and check the namespaces are transformed as expected,
from tuples to strings.
"""
with patch.object(HiveCatalog, "list_namespaces", return_value=MOCK_NAMESPACES):
for i, namespace in enumerate(self.iceberg.get_database_schema_names()):
self.assertEqual(namespace, EXPECTED_NAMESPACES_NAMES[i])
def raise_something_bad():
raise RuntimeError("Something bad")
with patch.object(
HiveCatalog, "list_namespaces", side_effect=raise_something_bad
):
self.assertRaises(Exception, IcebergSource.get_database_schema_names)
def test_yield_database_schema(self):
"""
Assert 'yield_database_schema' returns the propert 'CreateDatabaseSchemaRequest'
"""
schema_name = "MySchema"
fully_qualified_database_name = "FullyQualifiedDatabaseName"
expected = CreateDatabaseSchemaRequest(
name=schema_name,
database=fully_qualified_database_name,
)
with patch.object(fqn, "build", return_value=fully_qualified_database_name):
self.assertEqual(
next(self.iceberg.yield_database_schema(schema_name)).right, expected
)
def test_get_tables_name_and_type(self):
"""
Assert 'get_tables_name_and_type' returns the proper table name and type.
"""
# Happy Path scenario
class LoadTableMock:
data = iter(MOCK_TABLE_LIST)
def name(self):
return next(self.data)
with patch.object(
HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST
), patch.object(HiveCatalog, "load_table", return_value=LoadTableMock()):
for i, table in enumerate(self.iceberg.get_tables_name_and_type()):
self.assertEqual(table, EXPECTED_TABLE_LIST[i])
# When pyiceberg.exceptions.NoSuchIcebergTableError is raised
# Then nothing is yield.
def raise_no_such_iceberg_table():
raise pyiceberg.exceptions.NoSuchIcebergTableError()
with patch.object(
HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST
), patch.object(
HiveCatalog, "load_table", side_effect=raise_no_such_iceberg_table
):
self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)
# When pyiceberg.exceptions.NoSuchTableError is raised
# Then nothing is yield.
def raise_no_such_table():
raise pyiceberg.exceptions.NoSuchTableError()
with patch.object(
HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST
), patch.object(HiveCatalog, "load_table", side_effect=raise_no_such_table):
self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)
def test_get_owner_ref(self):
"""
Asserts 'get_owner_ref' returns:
- None if there is no Owner
- EntityReference if there is an Owner
"""
table_name = "table"
# When the Owner is present on the PyIceberg Table
# Then EntityReference needs to be searched for
ref = EntityReferenceList(root=[EntityReference(id=uuid.uuid4(), type="user")])
iceberg_table_with_owner = {
"identifier": (
self.iceberg.context.get().database,
self.iceberg.context.get().database_schema,
table_name,
),
"metadata": TableMetadataV2.model_validate(
{
"location": "foo",
"last_column_id": 1,
"format_version": 2,
"schemas": [
Schema(
fields=tuple(
MOCK_COLUMN_MAP[field]["iceberg"]
for field in MOCK_COLUMN_MAP.keys()
)
)
],
"partition_spec": [],
"partition_specs": [
{
"fields": (
PartitionField(
source_id=1,
field_id=1000,
transform=IdentityTransform(),
name="boolean",
),
)
}
],
"properties": {"owner": "myself"},
}
),
"metadata_location": "bar",
"io": "pyiceberg.io.pyarrow.PyArrowFileIO",
"catalog": self.iceberg.connection_obj,
}
self.iceberg.context.get().iceberg_table = PyIcebergTable(
**iceberg_table_with_owner
)
with patch.object(OpenMetadata, "get_reference_by_email", return_value=ref):
self.assertEqual(
self.iceberg.get_owner_ref(table_name),
ref,
)
# When the Owner is not present on the PyIceberg Table
# Then None is returned
iceberg_table_without_owner = {
"identifier": (
self.iceberg.context.get().database,
self.iceberg.context.get().database_schema,
table_name,
),
"metadata": TableMetadataV2.model_validate(
{
"location": "foo",
"last_column_id": 1,
"format_version": 2,
"schemas": [
Schema(
fields=tuple(
MOCK_COLUMN_MAP[field]["iceberg"]
for field in MOCK_COLUMN_MAP.keys()
)
)
],
"partition_spec": [],
"partition_specs": [
{
"fields": (
PartitionField(
source_id=1,
field_id=1000,
transform=IdentityTransform(),
name="boolean",
),
)
}
],
"properties": {},
}
),
"metadata_location": "bar",
"io": "pyiceberg.io.pyarrow.PyArrowFileIO",
"catalog": self.iceberg.connection_obj,
}
self.iceberg.context.get().iceberg_table = PyIcebergTable(
**iceberg_table_without_owner
)
self.assertIsNone(
self.iceberg.get_owner_ref(table_name),
)
def test_yield_table(self):
table_name = "table_name"
table_type = TableType.Regular
iceberg_table = {
"identifier": (
self.iceberg.context.get().database,
self.iceberg.context.get().database_schema,
table_name,
),
"metadata": TableMetadataV2.model_validate(
{
"location": "foo",
"current-schema-id": 0,
"last_column_id": 1,
"format_version": 2,
"schemas": [
Schema(
fields=tuple(
MOCK_COLUMN_MAP[field]["iceberg"]
for field in MOCK_COLUMN_MAP.keys()
)
)
],
"partition_spec": [],
"partition_specs": [
{
"fields": (
PartitionField(
source_id=1,
field_id=1000,
transform=IdentityTransform(),
name="boolean",
),
)
}
],
"properties": {"owner": "myself", "comment": "Table Description"},
}
),
"metadata_location": "bar",
"io": "pyiceberg.io.pyarrow.PyArrowFileIO",
"catalog": self.iceberg.connection_obj,
}
fq_database_schema = "FullyQualifiedDatabaseSchema"
ref = EntityReferenceList(root=[EntityReference(id=uuid.uuid4(), type="user")])
self.iceberg.context.get().iceberg_table = PyIcebergTable(**iceberg_table)
expected = CreateTableRequest(
name=EntityName(table_name),
tableType=table_type,
description=Markdown("Table Description"),
owners=ref,
columns=[
MOCK_COLUMN_MAP[field]["ometa"] for field in MOCK_COLUMN_MAP.keys()
],
tablePartition=TablePartition(
columns=[
PartitionColumnDetails(
columnName="binary",
intervalType=PartitionIntervalTypes.COLUMN_VALUE,
interval=None,
)
]
),
databaseSchema=FullyQualifiedEntityName(fq_database_schema),
)
with patch.object(
OpenMetadata, "get_reference_by_email", return_value=ref
), patch.object(fqn, "build", return_value=fq_database_schema):
result = next(self.iceberg.yield_table((table_name, table_type))).right
self.assertEqual(result, expected)