2025-04-03 10:39:47 +05:30
|
|
|
# Copyright 2025 Collate
|
|
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
2023-10-12 14:51:38 +02:00
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
2025-04-03 10:39:47 +05:30
|
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
2023-10-12 14:51:38 +02:00
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
"""Datalake ingestion integration tests"""
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
from metadata.generated.schema.entity.data.table import DataType, Table
|
|
|
|
from metadata.ingestion.ometa.models import EntityList
|
|
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
|
|
|
2025-03-27 13:11:56 +05:30
|
|
|
from .conftest import BUCKET_NAME
|
|
|
|
|
2023-10-12 14:51:38 +02:00
|
|
|
|
2024-05-28 09:30:30 +02:00
|
|
|
class TestDatalake:
|
2023-10-12 14:51:38 +02:00
|
|
|
"""datalake profiler E2E test"""
|
|
|
|
|
2024-05-28 09:30:30 +02:00
|
|
|
metadata: OpenMetadata = None
|
|
|
|
s3_client = None
|
2023-10-12 14:51:38 +02:00
|
|
|
|
2024-05-28 09:30:30 +02:00
|
|
|
@pytest.fixture(autouse=True)
|
2024-06-05 21:18:37 +02:00
|
|
|
def set_metadata(self, metadata):
|
2024-05-28 09:30:30 +02:00
|
|
|
self.metadata = metadata
|
2023-10-12 14:51:38 +02:00
|
|
|
|
|
|
|
@pytest.mark.order(10000)
|
2024-05-28 09:30:30 +02:00
|
|
|
def test_ingestion(self, run_ingestion):
|
2023-10-12 14:51:38 +02:00
|
|
|
"""test ingestion of datalake data"""
|
|
|
|
# Ingest our S3 data
|
|
|
|
resp: EntityList[Table] = self.metadata.list_entities(
|
|
|
|
entity=Table, params={"database": "datalake_for_integration_tests.default"}
|
|
|
|
) # type: ignore
|
|
|
|
|
|
|
|
entities = resp.entities
|
2024-09-12 07:13:01 +02:00
|
|
|
assert len(entities) == 5
|
2024-06-05 21:18:37 +02:00
|
|
|
names = [entity.name.root for entity in entities]
|
2024-09-12 07:13:01 +02:00
|
|
|
assert {
|
|
|
|
"names.json",
|
|
|
|
"names.jsonl",
|
|
|
|
"new_users.parquet",
|
2024-12-02 17:17:21 +01:00
|
|
|
"users/users.csv",
|
2024-09-12 07:13:01 +02:00
|
|
|
"profiler_test_.csv",
|
|
|
|
} == set(names)
|
2023-10-12 14:51:38 +02:00
|
|
|
|
|
|
|
for entity in entities:
|
|
|
|
columns = entity.columns
|
|
|
|
for column in columns:
|
|
|
|
if column.dataType == DataType.JSON:
|
|
|
|
assert column.children
|
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
def test_auto_classification(self, run_auto_classification):
|
2024-06-05 21:18:37 +02:00
|
|
|
"""Also excluding the test for parquet files until the above is fixed"""
|
2023-10-12 14:51:38 +02:00
|
|
|
csv_ = self.metadata.get_by_name(
|
|
|
|
entity=Table,
|
2024-12-02 17:17:21 +01:00
|
|
|
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv"',
|
2023-10-12 14:51:38 +02:00
|
|
|
fields=["tableProfilerConfig"],
|
|
|
|
)
|
2024-06-05 21:18:37 +02:00
|
|
|
# parquet_ = self.metadata.get_by_name(
|
|
|
|
# entity=Table,
|
|
|
|
# fqn='datalake_for_integration_tests.default.MyBucket."new_users.parquet"',
|
|
|
|
# fields=["tableProfilerConfig"],
|
|
|
|
# )
|
2023-10-12 14:51:38 +02:00
|
|
|
json_ = self.metadata.get_by_name(
|
|
|
|
entity=Table,
|
2024-09-12 07:13:01 +02:00
|
|
|
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.json"',
|
2023-10-12 14:51:38 +02:00
|
|
|
fields=["tableProfilerConfig"],
|
|
|
|
)
|
2024-06-21 03:54:19 -04:00
|
|
|
|
|
|
|
jsonl_ = self.metadata.get_by_name(
|
|
|
|
entity=Table,
|
2024-09-12 07:13:01 +02:00
|
|
|
fqn=f'datalake_for_integration_tests.default.{BUCKET_NAME}."names.jsonl"',
|
2024-06-21 03:54:19 -04:00
|
|
|
fields=["tableProfilerConfig"],
|
|
|
|
)
|
|
|
|
|
2023-10-12 14:51:38 +02:00
|
|
|
csv_sample_data = self.metadata.get_sample_data(csv_)
|
2024-06-05 21:18:37 +02:00
|
|
|
# parquet_sample_data = self.metadata.get_sample_data(parquet_)
|
2023-10-12 14:51:38 +02:00
|
|
|
json_sample_data = self.metadata.get_sample_data(json_)
|
2024-06-21 03:54:19 -04:00
|
|
|
jsonl_sample_data = self.metadata.get_sample_data(jsonl_)
|
2023-10-12 14:51:38 +02:00
|
|
|
|
|
|
|
assert csv_sample_data.sampleData.rows
|
2024-06-05 21:18:37 +02:00
|
|
|
# assert parquet_sample_data.sampleData.rows
|
2023-10-12 14:51:38 +02:00
|
|
|
assert json_sample_data.sampleData.rows
|
2024-06-21 03:54:19 -04:00
|
|
|
assert jsonl_sample_data.sampleData.rows
|