mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-04 15:45:42 +00:00
112 lines
4.4 KiB
Python
112 lines
4.4 KiB
Python
# Copyright 2025 Collate
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Test Ometa Dataframe utility tests"""
|
|
import os
|
|
import unittest
|
|
from unittest.mock import patch
|
|
|
|
import pyarrow.parquet as pq
|
|
|
|
from metadata.generated.schema.entity.data.table import Table
|
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
|
OpenMetadataWorkflowConfig,
|
|
)
|
|
from metadata.generated.schema.type.entityReference import EntityReference
|
|
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
|
|
from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin
|
|
from metadata.readers.dataframe.reader_factory import SupportedTypes
|
|
|
|
from .topology.database.test_datalake import mock_datalake_config
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
resp_parquet_file = (
|
|
pq.ParquetFile(os.path.join(ROOT_DIR, "test_ometa_to_dataframe.parquet"))
|
|
.read()
|
|
.to_pandas()
|
|
)
|
|
method_resp_file = [resp_parquet_file]
|
|
|
|
|
|
class TestStringMethods(unittest.TestCase):
|
|
def test_dl_column_parser(self):
|
|
with patch(
|
|
"metadata.utils.datalake.datalake_utils.fetch_dataframe",
|
|
return_value=method_resp_file,
|
|
) as exec_mock_method:
|
|
resp = exec_mock_method("key", "string")
|
|
assert type(resp) == list
|
|
|
|
@patch(
|
|
"metadata.ingestion.source.database.database_service.DatabaseServiceSource.test_connection"
|
|
)
|
|
def test_get_dataframes(self, test_connection):
|
|
with patch(
|
|
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
|
return_value=[resp_parquet_file],
|
|
):
|
|
config = OpenMetadataWorkflowConfig.model_validate(mock_datalake_config)
|
|
datalake_source = DatalakeSource.create(
|
|
mock_datalake_config["source"],
|
|
config.workflowConfig.openMetadataServerConfig,
|
|
)
|
|
resp = PandasInterfaceMixin().get_dataframes(
|
|
service_connection_config=datalake_source.service_connection,
|
|
table=Table(
|
|
id="cec14ccf-123f-4271-8c90-0ae54cc4227e",
|
|
columns=[],
|
|
name="test",
|
|
databaseSchema=EntityReference(
|
|
name="Test",
|
|
id="cec14ccf-123f-4271-8c90-0ae54cc4227e",
|
|
type="databaseSchema",
|
|
),
|
|
fileFormat=SupportedTypes.PARQUET.value,
|
|
),
|
|
client=None,
|
|
)
|
|
|
|
assert resp == method_resp_file
|
|
assert type(resp) == list
|
|
|
|
@patch(
|
|
"metadata.ingestion.source.database.database_service.DatabaseServiceSource.test_connection"
|
|
)
|
|
def test_get_dataframes_fail(self, test_connection):
|
|
with patch(
|
|
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
|
return_value=None,
|
|
):
|
|
with self.assertRaises(TypeError) as context:
|
|
config = OpenMetadataWorkflowConfig.model_validate(mock_datalake_config)
|
|
datalake_source = DatalakeSource.create(
|
|
mock_datalake_config["source"],
|
|
config.workflowConfig.openMetadataServerConfig,
|
|
)
|
|
PandasInterfaceMixin().get_dataframes(
|
|
service_connection_config=datalake_source.service_connection,
|
|
table=Table(
|
|
id="cec14ccf-123f-4271-8c90-0ae54cc4227e",
|
|
columns=[],
|
|
name="test",
|
|
databaseSchema=EntityReference(
|
|
name="Test",
|
|
id="cec14ccf-123f-4271-8c90-0ae54cc4227e",
|
|
type="databaseSchema",
|
|
),
|
|
fileFormat=None,
|
|
),
|
|
client=None,
|
|
)
|
|
|
|
self.assertEqual(context.exception.args[0], "Couldn't fetch test")
|