2025-04-03 10:39:47 +05:30

91 lines
3.3 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Datalake ingestion integration tests"""
import pytest
from metadata.generated.schema.entity.data.table import DataType, Table
from metadata.ingestion.ometa.models import EntityList
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from .conftest import BUCKET_NAME
class TestDatalake:
"""datalake profiler E2E test"""
metadata: OpenMetadata = None
s3_client = None
@pytest.fixture(autouse=True)
def set_metadata(self, metadata):
self.metadata = metadata
@pytest.mark.order(10000)
def test_ingestion(self, run_ingestion):
"""test ingestion of datalake data"""
# Ingest our S3 data
resp: EntityList[Table] = self.metadata.list_entities(
entity=Table, params={"database": "datalake_for_integration_tests.default"}
) # type: ignore
entities = resp.entities
assert len(entities) == 5
names = [entity.name.root for entity in entities]
assert {
"names.json",
"names.jsonl",
"new_users.parquet",
"users/users.csv",
"profiler_test_.csv",
} == set(names)
for entity in entities:
columns = entity.columns
for column in columns:
if column.dataType == DataType.JSON:
assert column.children
def test_auto_classification(self, run_auto_classification):
"""Also excluding the test for parquet files until the above is fixed"""
csv_ = self.metadata.get_by_name(
entity=Table,
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv"',
fields=["tableProfilerConfig"],
)
# parquet_ = self.metadata.get_by_name(
# entity=Table,
# fqn='datalake_for_integration_tests.default.MyBucket."new_users.parquet"',
# fields=["tableProfilerConfig"],
# )
json_ = self.metadata.get_by_name(
entity=Table,
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.json"',
fields=["tableProfilerConfig"],
)
jsonl_ = self.metadata.get_by_name(
entity=Table,
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.jsonl"',
fields=["tableProfilerConfig"],
)
csv_sample_data = self.metadata.get_sample_data(csv_)
# parquet_sample_data = self.metadata.get_sample_data(parquet_)
json_sample_data = self.metadata.get_sample_data(json_)
jsonl_sample_data = self.metadata.get_sample_data(jsonl_)
assert csv_sample_data.sampleData.rows
# assert parquet_sample_data.sampleData.rows
assert json_sample_data.sampleData.rows
assert jsonl_sample_data.sampleData.rows