OpenMetadata/ingestion/tests/unit/test_ometa_to_dataframe.py
2025-04-03 10:39:47 +05:30

112 lines
4.4 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test Ometa Dataframe utility tests"""
import os
import unittest
from unittest.mock import patch
import pyarrow.parquet as pq
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin
from metadata.readers.dataframe.reader_factory import SupportedTypes
from .topology.database.test_datalake import mock_datalake_config
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
resp_parquet_file = (
pq.ParquetFile(os.path.join(ROOT_DIR, "test_ometa_to_dataframe.parquet"))
.read()
.to_pandas()
)
method_resp_file = [resp_parquet_file]
class TestStringMethods(unittest.TestCase):
def test_dl_column_parser(self):
with patch(
"metadata.utils.datalake.datalake_utils.fetch_dataframe",
return_value=method_resp_file,
) as exec_mock_method:
resp = exec_mock_method("key", "string")
assert type(resp) == list
@patch(
"metadata.ingestion.source.database.database_service.DatabaseServiceSource.test_connection"
)
def test_get_dataframes(self, test_connection):
with patch(
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
return_value=[resp_parquet_file],
):
config = OpenMetadataWorkflowConfig.model_validate(mock_datalake_config)
datalake_source = DatalakeSource.create(
mock_datalake_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
resp = PandasInterfaceMixin().get_dataframes(
service_connection_config=datalake_source.service_connection,
table=Table(
id="cec14ccf-123f-4271-8c90-0ae54cc4227e",
columns=[],
name="test",
databaseSchema=EntityReference(
name="Test",
id="cec14ccf-123f-4271-8c90-0ae54cc4227e",
type="databaseSchema",
),
fileFormat=SupportedTypes.PARQUET.value,
),
client=None,
)
assert resp == method_resp_file
assert type(resp) == list
@patch(
"metadata.ingestion.source.database.database_service.DatabaseServiceSource.test_connection"
)
def test_get_dataframes_fail(self, test_connection):
with patch(
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
return_value=None,
):
with self.assertRaises(TypeError) as context:
config = OpenMetadataWorkflowConfig.model_validate(mock_datalake_config)
datalake_source = DatalakeSource.create(
mock_datalake_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
PandasInterfaceMixin().get_dataframes(
service_connection_config=datalake_source.service_connection,
table=Table(
id="cec14ccf-123f-4271-8c90-0ae54cc4227e",
columns=[],
name="test",
databaseSchema=EntityReference(
name="Test",
id="cec14ccf-123f-4271-8c90-0ae54cc4227e",
type="databaseSchema",
),
fileFormat=None,
),
client=None,
)
self.assertEqual(context.exception.args[0], "Couldn't fetch test")