mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-08 01:29:37 +00:00
865 lines
27 KiB
Python
865 lines
27 KiB
Python
# Copyright 2025 Collate
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
Test iceberg source
|
|
"""
|
|
import uuid
|
|
from copy import deepcopy
|
|
from unittest import TestCase
|
|
from unittest.mock import patch
|
|
|
|
import pyiceberg.exceptions
|
|
from pyiceberg.catalog.hive import HiveCatalog
|
|
from pyiceberg.partitioning import PartitionField
|
|
from pyiceberg.schema import Schema
|
|
from pyiceberg.table import Table as PyIcebergTable
|
|
from pyiceberg.table.metadata import TableMetadataV2
|
|
from pyiceberg.transforms import IdentityTransform
|
|
from pyiceberg.types import (
|
|
BinaryType,
|
|
BooleanType,
|
|
DateType,
|
|
DecimalType,
|
|
DoubleType,
|
|
FixedType,
|
|
FloatType,
|
|
IntegerType,
|
|
ListType,
|
|
LongType,
|
|
MapType,
|
|
NestedField,
|
|
StringType,
|
|
StructType,
|
|
TimestampType,
|
|
TimestamptzType,
|
|
TimeType,
|
|
UUIDType,
|
|
)
|
|
|
|
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
|
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
|
CreateDatabaseSchemaRequest,
|
|
)
|
|
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
|
from metadata.generated.schema.entity.data.table import (
|
|
Column,
|
|
Constraint,
|
|
DataType,
|
|
PartitionColumnDetails,
|
|
PartitionIntervalTypes,
|
|
TablePartition,
|
|
TableType,
|
|
)
|
|
from metadata.generated.schema.type.basic import (
|
|
EntityName,
|
|
FullyQualifiedEntityName,
|
|
Markdown,
|
|
)
|
|
from metadata.generated.schema.type.entityReference import EntityReference
|
|
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
|
|
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
|
|
from metadata.ingestion.api.steps import InvalidSourceException
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
from metadata.ingestion.source.database.iceberg.metadata import IcebergSource
|
|
from metadata.utils import fqn
|
|
|
|
MOCK_COLUMN_MAP = {
|
|
"binary": {
|
|
"iceberg": NestedField(
|
|
field_id=1,
|
|
name="binary",
|
|
field_type=BinaryType(),
|
|
required=False,
|
|
doc="Binary",
|
|
),
|
|
"ometa": Column(
|
|
name="binary",
|
|
description="Binary",
|
|
dataType=DataType.BINARY,
|
|
dataTypeDisplay=str(BinaryType()),
|
|
dataLength=0,
|
|
),
|
|
},
|
|
"boolean": {
|
|
"iceberg": NestedField(
|
|
field_id=2,
|
|
name="boolean",
|
|
field_type=BooleanType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="boolean",
|
|
dataType=DataType.BOOLEAN,
|
|
dataTypeDisplay=str(BooleanType()),
|
|
),
|
|
},
|
|
"date": {
|
|
"iceberg": NestedField(
|
|
field_id=3,
|
|
name="date",
|
|
field_type=DateType(),
|
|
required=True,
|
|
),
|
|
"ometa": Column(
|
|
name="date",
|
|
dataType=DataType.DATE,
|
|
dataTypeDisplay=str(DateType()),
|
|
constraint=Constraint.NOT_NULL,
|
|
),
|
|
},
|
|
"decimal": {
|
|
"iceberg": NestedField(
|
|
field_id=4,
|
|
name="decimal",
|
|
field_type=DecimalType(9, 3),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="decimal",
|
|
dataType=DataType.DECIMAL,
|
|
dataTypeDisplay=str(DecimalType(9, 3)),
|
|
precision=9,
|
|
scale=3,
|
|
),
|
|
},
|
|
"double": {
|
|
"iceberg": NestedField(
|
|
field_id=5,
|
|
name="double",
|
|
field_type=DoubleType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="double",
|
|
dataType=DataType.DOUBLE,
|
|
dataTypeDisplay=str(DoubleType()),
|
|
),
|
|
},
|
|
"fixed": {
|
|
"iceberg": NestedField(
|
|
field_id=6,
|
|
name="fixed",
|
|
field_type=FixedType(10),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="fixed",
|
|
dataType=DataType.FIXED,
|
|
dataTypeDisplay=str(FixedType(10)),
|
|
),
|
|
},
|
|
"float": {
|
|
"iceberg": NestedField(
|
|
field_id=7,
|
|
name="float",
|
|
field_type=FloatType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="float",
|
|
dataType=DataType.FLOAT,
|
|
dataTypeDisplay=str(FloatType()),
|
|
),
|
|
},
|
|
"integer": {
|
|
"iceberg": NestedField(
|
|
field_id=8,
|
|
name="integer",
|
|
field_type=IntegerType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="integer",
|
|
dataType=DataType.INT,
|
|
dataTypeDisplay=str(IntegerType()),
|
|
),
|
|
},
|
|
"list": {
|
|
"iceberg": NestedField(
|
|
field_id=9,
|
|
name="list",
|
|
field_type=ListType(
|
|
element_id=1,
|
|
element_type=IntegerType(),
|
|
),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="list",
|
|
dataType=DataType.ARRAY,
|
|
dataTypeDisplay=str(
|
|
ListType(
|
|
element_id=1,
|
|
element_type=IntegerType(),
|
|
)
|
|
),
|
|
arrayDataType=DataType.INT,
|
|
),
|
|
},
|
|
"long": {
|
|
"iceberg": NestedField(
|
|
field_id=10,
|
|
name="long",
|
|
field_type=LongType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="long",
|
|
dataType=DataType.LONG,
|
|
dataTypeDisplay=str(LongType()),
|
|
),
|
|
},
|
|
"map": {
|
|
"iceberg": NestedField(
|
|
field_id=11,
|
|
name="map",
|
|
field_type=MapType(
|
|
key_id=1,
|
|
key_type=StringType(),
|
|
value_id=2,
|
|
value_type=IntegerType(),
|
|
),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="map",
|
|
dataType=DataType.MAP,
|
|
dataTypeDisplay=str(
|
|
MapType(
|
|
key_id=1,
|
|
key_type=StringType(),
|
|
value_id=2,
|
|
value_type=IntegerType(),
|
|
)
|
|
),
|
|
),
|
|
},
|
|
"string": {
|
|
"iceberg": NestedField(
|
|
field_id=12,
|
|
name="string",
|
|
field_type=StringType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="string",
|
|
dataType=DataType.STRING,
|
|
dataTypeDisplay=str(StringType()),
|
|
),
|
|
},
|
|
"struct": {
|
|
"iceberg": NestedField(
|
|
field_id=13,
|
|
name="struct",
|
|
field_type=StructType(
|
|
fields=(
|
|
NestedField(
|
|
field_id=100,
|
|
name="nested1",
|
|
field_type=IntegerType(),
|
|
required=False,
|
|
doc="nested #1",
|
|
),
|
|
NestedField(
|
|
field_id=101,
|
|
name="nested2",
|
|
field_type=StringType(),
|
|
required=True,
|
|
),
|
|
)
|
|
),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="struct",
|
|
dataType=DataType.STRUCT,
|
|
dataTypeDisplay=str(
|
|
StructType(
|
|
fields=(
|
|
NestedField(
|
|
field_id=100,
|
|
name="nested1",
|
|
field_type=IntegerType(),
|
|
required=False,
|
|
doc="nested #1",
|
|
),
|
|
NestedField(
|
|
field_id=101,
|
|
name="nested2",
|
|
field_type=StringType(),
|
|
required=True,
|
|
),
|
|
)
|
|
)
|
|
),
|
|
children=[
|
|
Column(
|
|
name="nested1",
|
|
description="nested #1",
|
|
dataType=DataType.INT,
|
|
dataTypeDisplay=str(IntegerType()),
|
|
),
|
|
Column(
|
|
name="nested2",
|
|
dataType=DataType.STRING,
|
|
dataTypeDisplay=str(StringType()),
|
|
constraint=Constraint.NOT_NULL,
|
|
),
|
|
],
|
|
),
|
|
},
|
|
"time": {
|
|
"iceberg": NestedField(
|
|
field_id=14,
|
|
name="time",
|
|
field_type=TimeType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="time",
|
|
dataType=DataType.TIME,
|
|
dataTypeDisplay=str(TimeType()),
|
|
),
|
|
},
|
|
"timestamp": {
|
|
"iceberg": NestedField(
|
|
field_id=15,
|
|
name="timestamp",
|
|
field_type=TimestampType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="timestamp",
|
|
dataType=DataType.TIMESTAMP,
|
|
dataTypeDisplay=str(TimestampType()),
|
|
),
|
|
},
|
|
"timestamptz": {
|
|
"iceberg": NestedField(
|
|
field_id=16,
|
|
name="timestamptz",
|
|
field_type=TimestamptzType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="timestamptz",
|
|
dataType=DataType.TIMESTAMPZ,
|
|
dataTypeDisplay=str(TimestamptzType()),
|
|
),
|
|
},
|
|
"uuid": {
|
|
"iceberg": NestedField(
|
|
field_id=17,
|
|
name="uuid",
|
|
field_type=UUIDType(),
|
|
required=False,
|
|
),
|
|
"ometa": Column(
|
|
name="uuid",
|
|
dataType=DataType.UUID,
|
|
dataTypeDisplay=str(UUIDType()),
|
|
),
|
|
},
|
|
}
|
|
|
|
MOCK_HIVE_CONFIG = {
|
|
"source": {
|
|
"type": "iceberg",
|
|
"serviceName": "test_iceberg",
|
|
"serviceConnection": {
|
|
"config": {
|
|
"type": "Iceberg",
|
|
"catalog": {
|
|
"name": "Batata",
|
|
"connection": {"uri": "thrift://localhost:9083"},
|
|
},
|
|
}
|
|
},
|
|
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
|
},
|
|
"sink": {"type": "metadata-rest", "config": {}},
|
|
"workflowConfig": {
|
|
"openMetadataServerConfig": {
|
|
"hostPort": "http://localhost:8585/api",
|
|
"authProvider": "openmetadata",
|
|
"securityConfig": {"jwtToken": "iceberg"},
|
|
}
|
|
},
|
|
}
|
|
|
|
MOCK_REST_CONFIG = {
|
|
"source": {
|
|
"type": "iceberg",
|
|
"serviceName": "test_iceberg",
|
|
"serviceConnection": {
|
|
"config": {
|
|
"type": "Iceberg",
|
|
"catalog": {
|
|
"name": "Batata",
|
|
"connection": {"uri": "http://localhost:8181"},
|
|
},
|
|
}
|
|
},
|
|
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
|
},
|
|
"sink": {"type": "metadata-rest", "config": {}},
|
|
"workflowConfig": {
|
|
"openMetadataServerConfig": {
|
|
"hostPort": "http://localhost:8585/api",
|
|
"authProvider": "openmetadata",
|
|
"securityConfig": {"jwtToken": "iceberg"},
|
|
}
|
|
},
|
|
}
|
|
|
|
MOCK_GLUE_CONFIG = {
|
|
"source": {
|
|
"type": "iceberg",
|
|
"serviceName": "test_iceberg",
|
|
"serviceConnection": {
|
|
"config": {
|
|
"type": "Iceberg",
|
|
"catalog": {
|
|
"name": "Batata",
|
|
"connection": {
|
|
"awsConfig": {
|
|
"awsAccessKeyId": "access_key",
|
|
"awsSecretAccessKey": "secret",
|
|
"awsRegion": "us-east-2",
|
|
"awsSessionToken": "token",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
},
|
|
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
|
},
|
|
"sink": {"type": "metadata-rest", "config": {}},
|
|
"workflowConfig": {
|
|
"openMetadataServerConfig": {
|
|
"hostPort": "http://localhost:8585/api",
|
|
"authProvider": "openmetadata",
|
|
"securityConfig": {"jwtToken": "iceberg"},
|
|
}
|
|
},
|
|
}
|
|
|
|
MOCK_DYNAMO_CONFIG = {
|
|
"source": {
|
|
"type": "iceberg",
|
|
"serviceName": "test_iceberg",
|
|
"serviceConnection": {
|
|
"config": {
|
|
"type": "Iceberg",
|
|
"catalog": {
|
|
"name": "Batata",
|
|
"connection": {
|
|
"tableName": "table",
|
|
"awsConfig": {
|
|
"endPointURL": "http://example.com",
|
|
"awsAccessKeyId": "access_key",
|
|
"awsSecretAccessKey": "secret",
|
|
"awsRegion": "us-east-2",
|
|
"awsSessionToken": "token",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
},
|
|
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
|
},
|
|
"sink": {"type": "metadata-rest", "config": {}},
|
|
"workflowConfig": {
|
|
"openMetadataServerConfig": {
|
|
"hostPort": "http://localhost:8585/api",
|
|
"authProvider": "openmetadata",
|
|
"securityConfig": {"jwtToken": "iceberg"},
|
|
}
|
|
},
|
|
}
|
|
|
|
MOCK_NAMESPACES = [
|
|
("namespace1",),
|
|
("namespace2", "foo"),
|
|
]
|
|
|
|
EXPECTED_NAMESPACES_NAMES = [
|
|
"namespace1",
|
|
"namespace2.foo",
|
|
]
|
|
|
|
MOCK_TABLE_LIST = [
|
|
("namespace1", "table1"),
|
|
("namespace1", "table2", "foo"),
|
|
("namespace2", "table3"),
|
|
]
|
|
|
|
EXPECTED_TABLE_LIST = [
|
|
("table1", TableType.Regular),
|
|
("table2.foo", TableType.Regular),
|
|
("table3", TableType.Regular),
|
|
]
|
|
|
|
|
|
class IcebergUnitTest(TestCase):
|
|
"""
|
|
Validate how we work with the Iceberg connector
|
|
"""
|
|
|
|
@patch(
|
|
"metadata.ingestion.source.database.database_service.DatabaseServiceSource.test_connection"
|
|
)
|
|
def __init__(self, methodName, test_connection) -> None:
|
|
super().__init__(methodName)
|
|
test_connection.return_value = False
|
|
|
|
for config in [
|
|
MOCK_DYNAMO_CONFIG,
|
|
MOCK_GLUE_CONFIG,
|
|
MOCK_REST_CONFIG,
|
|
MOCK_HIVE_CONFIG,
|
|
]:
|
|
self.config = parse_workflow_config_gracefully(config)
|
|
self.iceberg = IcebergSource.create(
|
|
config["source"],
|
|
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
|
|
)
|
|
|
|
self.iceberg.context.get().database_service = "test_iceberg"
|
|
self.iceberg.context.get().database = "default"
|
|
self.iceberg.context.get().database_schema = "schema"
|
|
|
|
def test_create(self):
|
|
"""
|
|
An invalid config raises an error
|
|
"""
|
|
not_looker_source = {
|
|
"type": "mysql",
|
|
"serviceName": "mysql_local",
|
|
"serviceConnection": {
|
|
"config": {
|
|
"type": "Mysql",
|
|
"username": "openmetadata_user",
|
|
"authType": {"password": "openmetadata_password"},
|
|
"hostPort": "localhost:3306",
|
|
"databaseSchema": "openmetadata_db",
|
|
}
|
|
},
|
|
"sourceConfig": {
|
|
"config": {
|
|
"type": "DatabaseMetadata",
|
|
}
|
|
},
|
|
}
|
|
|
|
self.assertRaises(
|
|
InvalidSourceException,
|
|
IcebergSource.create,
|
|
not_looker_source,
|
|
self.config.workflowConfig.openMetadataServerConfig,
|
|
)
|
|
|
|
def test_get_database_name(self):
|
|
"""
|
|
Asserts 'get_database_name' returns:
|
|
- databaseName when it is set
|
|
- 'default' when it is not set
|
|
"""
|
|
|
|
# Iceberg connector without databaseName set returns 'default'
|
|
self.assertEqual(next(self.iceberg.get_database_names()), "default")
|
|
|
|
# Iceberg connector with databaseName set returns the databaseName
|
|
iceberg_source_with_database_name = deepcopy(self.iceberg)
|
|
iceberg_source_with_database_name.service_connection.catalog.databaseName = (
|
|
"myDatabase"
|
|
)
|
|
|
|
self.assertEqual(
|
|
next(iceberg_source_with_database_name.get_database_names()), "myDatabase"
|
|
)
|
|
|
|
def test_yield_database(self):
|
|
"""
|
|
Assert 'yield_database' returns the proper 'CreateDatabaseRequest'
|
|
"""
|
|
|
|
database_name = "database"
|
|
|
|
expected = CreateDatabaseRequest(
|
|
name="database", service=self.iceberg.context.get().database_service
|
|
)
|
|
self.assertEqual(
|
|
next(self.iceberg.yield_database(database_name)).right, expected
|
|
)
|
|
|
|
def test_get_database_schema_names(self):
|
|
"""
|
|
Mock the Catalog and check the namespaces are transformed as expected,
|
|
from tuples to strings.
|
|
"""
|
|
with patch.object(HiveCatalog, "list_namespaces", return_value=MOCK_NAMESPACES):
|
|
for i, namespace in enumerate(self.iceberg.get_database_schema_names()):
|
|
self.assertEqual(namespace, EXPECTED_NAMESPACES_NAMES[i])
|
|
|
|
def raise_something_bad():
|
|
raise RuntimeError("Something bad")
|
|
|
|
with patch.object(
|
|
HiveCatalog, "list_namespaces", side_effect=raise_something_bad
|
|
):
|
|
self.assertRaises(Exception, IcebergSource.get_database_schema_names)
|
|
|
|
def test_yield_database_schema(self):
|
|
"""
|
|
Assert 'yield_database_schema' returns the propert 'CreateDatabaseSchemaRequest'
|
|
"""
|
|
schema_name = "MySchema"
|
|
fully_qualified_database_name = "FullyQualifiedDatabaseName"
|
|
|
|
expected = CreateDatabaseSchemaRequest(
|
|
name=schema_name,
|
|
database=fully_qualified_database_name,
|
|
)
|
|
|
|
with patch.object(fqn, "build", return_value=fully_qualified_database_name):
|
|
self.assertEqual(
|
|
next(self.iceberg.yield_database_schema(schema_name)).right, expected
|
|
)
|
|
|
|
def test_get_tables_name_and_type(self):
|
|
"""
|
|
Assert 'get_tables_name_and_type' returns the proper table name and type.
|
|
"""
|
|
|
|
# Happy Path scenario
|
|
class LoadTableMock:
|
|
data = iter(MOCK_TABLE_LIST)
|
|
|
|
def name(self):
|
|
return next(self.data)
|
|
|
|
with patch.object(
|
|
HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST
|
|
), patch.object(HiveCatalog, "load_table", return_value=LoadTableMock()):
|
|
for i, table in enumerate(self.iceberg.get_tables_name_and_type()):
|
|
self.assertEqual(table, EXPECTED_TABLE_LIST[i])
|
|
|
|
# When pyiceberg.exceptions.NoSuchIcebergTableError is raised
|
|
# Then nothing is yield.
|
|
def raise_no_such_iceberg_table():
|
|
raise pyiceberg.exceptions.NoSuchIcebergTableError()
|
|
|
|
with patch.object(
|
|
HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST
|
|
), patch.object(
|
|
HiveCatalog, "load_table", side_effect=raise_no_such_iceberg_table
|
|
):
|
|
self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)
|
|
|
|
# When pyiceberg.exceptions.NoSuchTableError is raised
|
|
# Then nothing is yield.
|
|
def raise_no_such_table():
|
|
raise pyiceberg.exceptions.NoSuchTableError()
|
|
|
|
with patch.object(
|
|
HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST
|
|
), patch.object(HiveCatalog, "load_table", side_effect=raise_no_such_table):
|
|
self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)
|
|
|
|
def test_get_owner_ref(self):
|
|
"""
|
|
Asserts 'get_owner_ref' returns:
|
|
- None if there is no Owner
|
|
- EntityReference if there is an Owner
|
|
"""
|
|
table_name = "table"
|
|
|
|
# When the Owner is present on the PyIceberg Table
|
|
# Then EntityReference needs to be searched for
|
|
ref = EntityReferenceList(root=[EntityReference(id=uuid.uuid4(), type="user")])
|
|
|
|
iceberg_table_with_owner = {
|
|
"identifier": (
|
|
self.iceberg.context.get().database,
|
|
self.iceberg.context.get().database_schema,
|
|
table_name,
|
|
),
|
|
"metadata": TableMetadataV2.model_validate(
|
|
{
|
|
"location": "foo",
|
|
"last_column_id": 1,
|
|
"format_version": 2,
|
|
"schemas": [
|
|
Schema(
|
|
fields=tuple(
|
|
MOCK_COLUMN_MAP[field]["iceberg"]
|
|
for field in MOCK_COLUMN_MAP.keys()
|
|
)
|
|
)
|
|
],
|
|
"partition_spec": [],
|
|
"partition_specs": [
|
|
{
|
|
"fields": (
|
|
PartitionField(
|
|
source_id=1,
|
|
field_id=1000,
|
|
transform=IdentityTransform(),
|
|
name="boolean",
|
|
),
|
|
)
|
|
}
|
|
],
|
|
"properties": {"owner": "myself"},
|
|
}
|
|
),
|
|
"metadata_location": "bar",
|
|
"io": "pyiceberg.io.pyarrow.PyArrowFileIO",
|
|
"catalog": self.iceberg.connection_obj,
|
|
}
|
|
|
|
self.iceberg.context.get().iceberg_table = PyIcebergTable(
|
|
**iceberg_table_with_owner
|
|
)
|
|
|
|
with patch.object(OpenMetadata, "get_reference_by_email", return_value=ref):
|
|
self.assertEqual(
|
|
self.iceberg.get_owner_ref(table_name),
|
|
ref,
|
|
)
|
|
|
|
# When the Owner is not present on the PyIceberg Table
|
|
# Then None is returned
|
|
iceberg_table_without_owner = {
|
|
"identifier": (
|
|
self.iceberg.context.get().database,
|
|
self.iceberg.context.get().database_schema,
|
|
table_name,
|
|
),
|
|
"metadata": TableMetadataV2.model_validate(
|
|
{
|
|
"location": "foo",
|
|
"last_column_id": 1,
|
|
"format_version": 2,
|
|
"schemas": [
|
|
Schema(
|
|
fields=tuple(
|
|
MOCK_COLUMN_MAP[field]["iceberg"]
|
|
for field in MOCK_COLUMN_MAP.keys()
|
|
)
|
|
)
|
|
],
|
|
"partition_spec": [],
|
|
"partition_specs": [
|
|
{
|
|
"fields": (
|
|
PartitionField(
|
|
source_id=1,
|
|
field_id=1000,
|
|
transform=IdentityTransform(),
|
|
name="boolean",
|
|
),
|
|
)
|
|
}
|
|
],
|
|
"properties": {},
|
|
}
|
|
),
|
|
"metadata_location": "bar",
|
|
"io": "pyiceberg.io.pyarrow.PyArrowFileIO",
|
|
"catalog": self.iceberg.connection_obj,
|
|
}
|
|
self.iceberg.context.get().iceberg_table = PyIcebergTable(
|
|
**iceberg_table_without_owner
|
|
)
|
|
|
|
self.assertIsNone(
|
|
self.iceberg.get_owner_ref(table_name),
|
|
)
|
|
|
|
def test_yield_table(self):
|
|
table_name = "table_name"
|
|
table_type = TableType.Regular
|
|
|
|
iceberg_table = {
|
|
"identifier": (
|
|
self.iceberg.context.get().database,
|
|
self.iceberg.context.get().database_schema,
|
|
table_name,
|
|
),
|
|
"metadata": TableMetadataV2.model_validate(
|
|
{
|
|
"location": "foo",
|
|
"current-schema-id": 0,
|
|
"last_column_id": 1,
|
|
"format_version": 2,
|
|
"schemas": [
|
|
Schema(
|
|
fields=tuple(
|
|
MOCK_COLUMN_MAP[field]["iceberg"]
|
|
for field in MOCK_COLUMN_MAP.keys()
|
|
)
|
|
)
|
|
],
|
|
"partition_spec": [],
|
|
"partition_specs": [
|
|
{
|
|
"fields": (
|
|
PartitionField(
|
|
source_id=1,
|
|
field_id=1000,
|
|
transform=IdentityTransform(),
|
|
name="boolean",
|
|
),
|
|
)
|
|
}
|
|
],
|
|
"properties": {"owner": "myself", "comment": "Table Description"},
|
|
}
|
|
),
|
|
"metadata_location": "bar",
|
|
"io": "pyiceberg.io.pyarrow.PyArrowFileIO",
|
|
"catalog": self.iceberg.connection_obj,
|
|
}
|
|
|
|
fq_database_schema = "FullyQualifiedDatabaseSchema"
|
|
|
|
ref = EntityReferenceList(root=[EntityReference(id=uuid.uuid4(), type="user")])
|
|
self.iceberg.context.get().iceberg_table = PyIcebergTable(**iceberg_table)
|
|
|
|
expected = CreateTableRequest(
|
|
name=EntityName(table_name),
|
|
tableType=table_type,
|
|
description=Markdown("Table Description"),
|
|
owners=ref,
|
|
columns=[
|
|
MOCK_COLUMN_MAP[field]["ometa"] for field in MOCK_COLUMN_MAP.keys()
|
|
],
|
|
tablePartition=TablePartition(
|
|
columns=[
|
|
PartitionColumnDetails(
|
|
columnName="binary",
|
|
intervalType=PartitionIntervalTypes.COLUMN_VALUE,
|
|
interval=None,
|
|
)
|
|
]
|
|
),
|
|
databaseSchema=FullyQualifiedEntityName(fq_database_schema),
|
|
)
|
|
|
|
with patch.object(
|
|
OpenMetadata, "get_reference_by_email", return_value=ref
|
|
), patch.object(fqn, "build", return_value=fq_database_schema):
|
|
result = next(self.iceberg.yield_table((table_name, table_type))).right
|
|
|
|
self.assertEqual(result, expected)
|