Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

195 lines
7.7 KiB
Python
Raw Normal View History

# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Datalake ingestion integration tests"""
import os
from copy import deepcopy
from unittest import TestCase
import boto3
import botocore
import pytest
from moto import mock_s3
from metadata.generated.schema.entity.data.table import DataType, Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.models import EntityList
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
BUCKET_NAME = "MyBucket"
INGESTION_CONFIG = {
"source": {
"type": "datalake",
"serviceName": "datalake_for_integration_tests",
"serviceConnection": {
"config": {
"type": "Datalake",
"configSource": {
"securityConfig": {
"awsAccessKeyId": "fake_access_key",
"awsSecretAccessKey": "fake_secret_key",
"awsRegion": "us-weat-1",
}
},
"bucketName": f"{BUCKET_NAME}",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
@mock_s3
class DatalakeTestE2E(TestCase):
"""datalake profiler E2E test"""
@classmethod
def setUpClass(cls) -> None:
server_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
),
) # type: ignore
cls.metadata = OpenMetadata(server_config)
def setUp(self) -> None:
# Mock our S3 bucket and ingest a file
boto3.DEFAULT_SESSION = None
self.client = boto3.client(
"s3",
region_name="us-weat-1",
)
# check that we are not running our test against a real bucket
try:
s3 = boto3.resource(
"s3",
region_name="us-west-1",
aws_access_key_id="fake_access_key",
aws_secret_access_key="fake_secret_key",
)
s3.meta.client.head_bucket(Bucket=BUCKET_NAME)
except botocore.exceptions.ClientError:
pass
else:
err = f"{BUCKET_NAME} should not exist."
raise EnvironmentError(err)
self.client.create_bucket(
Bucket=BUCKET_NAME,
CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
)
current_dir = os.path.dirname(__file__)
resources_dir = os.path.join(current_dir, "resources")
resources_paths = [
os.path.join(path, filename)
for path, _, files in os.walk(resources_dir)
for filename in files
]
self.s3_keys = []
for path in resources_paths:
key = os.path.relpath(path, resources_dir)
self.s3_keys.append(key)
self.client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key)
@pytest.mark.order(10000)
def test_ingestion(self):
"""test ingestion of datalake data"""
# Ingest our S3 data
ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
resp: EntityList[Table] = self.metadata.list_entities(
entity=Table, params={"database": "datalake_for_integration_tests.default"}
) # type: ignore
entities = resp.entities
self.assertEqual(len(entities), 3)
names = [entity.name.__root__ for entity in entities]
self.assertListEqual(
sorted(["names.json", "new_users.parquet", "users.csv"]), sorted(names)
)
for entity in entities:
columns = entity.columns
for column in columns:
if column.dataType == DataType.JSON:
assert column.children
@pytest.mark.order(10001)
def test_profiler(self):
"""Test profiler ingestion"""
workflow_config = deepcopy(INGESTION_CONFIG)
workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
workflow_config["processor"] = {
"type": "orm-profiler",
"config": {},
}
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
csv_ = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."users.csv"',
fields=["tableProfilerConfig"],
)
parquet_ = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."new_users.parquet"',
fields=["tableProfilerConfig"],
)
json_ = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."names.json"',
fields=["tableProfilerConfig"],
)
csv_sample_data = self.metadata.get_sample_data(csv_)
parquet_sample_data = self.metadata.get_sample_data(parquet_)
json_sample_data = self.metadata.get_sample_data(json_)
assert csv_sample_data.sampleData.rows
assert parquet_sample_data.sampleData.rows
assert json_sample_data.sampleData.rows