mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-24 08:58:06 +00:00
Snowflake E2E Tests (#8781)
This commit is contained in:
parent
8c250f5f80
commit
07aed10a7e
7
.github/workflows/py-cli-e2e-tests.yml
vendored
7
.github/workflows/py-cli-e2e-tests.yml
vendored
@ -21,7 +21,7 @@ jobs:
|
|||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
py-version: ['3.9']
|
py-version: ['3.9']
|
||||||
e2e-test: ['mysql', 'bigquery']
|
e2e-test: ['mysql', 'bigquery', 'snowflake']
|
||||||
environment: e2e
|
environment: e2e
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
@ -73,6 +73,11 @@ jobs:
|
|||||||
E2E_BQ_PRIVATE_KEY_ID: ${{ secrets.E2E_BQ_PRIVATE_KEY_ID }}
|
E2E_BQ_PRIVATE_KEY_ID: ${{ secrets.E2E_BQ_PRIVATE_KEY_ID }}
|
||||||
E2E_BQ_CLIENT_EMAIL: ${{ secrets.E2E_BQ_CLIENT_EMAIL }}
|
E2E_BQ_CLIENT_EMAIL: ${{ secrets.E2E_BQ_CLIENT_EMAIL }}
|
||||||
E2E_BQ_CLIENT_ID: ${{ secrets.E2E_BQ_CLIENT_ID }}
|
E2E_BQ_CLIENT_ID: ${{ secrets.E2E_BQ_CLIENT_ID }}
|
||||||
|
E2E_SNOWFLAKE_PASSWORD: ${{ secrets.E2E_SNOWFLAKE_PASSWORD }}
|
||||||
|
E2E_SNOWFLAKE_USERNAME: ${{ secrets.E2E_SNOWFLAKE_USERNAME }}
|
||||||
|
E2E_SNOWFLAKE_ACCOUNT: ${{ secrets.E2E_SNOWFLAKE_ACCOUNT }}
|
||||||
|
E2E_SNOWFLAKE_DATABASE: ${{ secrets.E2E_SNOWFLAKE_DATABASE }}
|
||||||
|
E2E_SNOWFLAKE_WAREHOUSE: ${{ secrets.E2E_SNOWFLAKE_WAREHOUSE }}
|
||||||
run: |
|
run: |
|
||||||
source env/bin/activate
|
source env/bin/activate
|
||||||
python -m pytest -c ingestion/setup.cfg ingestion/tests/cli_e2e/test_cli_$E2E_TEST.py
|
python -m pytest -c ingestion/setup.cfg ingestion/tests/cli_e2e/test_cli_$E2E_TEST.py
|
||||||
|
11
ingestion/tests/cli_e2e/database/snowflake/queries.sql
Normal file
11
ingestion/tests/cli_e2e/database/snowflake/queries.sql
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
DROP DATABASE IF EXISTS E2E_DB;
|
||||||
|
CREATE OR REPLACE DATABASE E2E_DB;
|
||||||
|
USE E2E_DB;
|
||||||
|
CREATE OR REPLACE SCHEMA e2e_test;
|
||||||
|
CREATE OR REPLACE TABLE e2e_test.regions(region_id INT PRIMARY KEY,region_name VARCHAR(25));
|
||||||
|
CREATE OR REPLACE TABLE e2e_test.countries(country_id CHAR(2) PRIMARY KEY,country_name VARCHAR (40),region_id INT NOT NULL);
|
||||||
|
CREATE OR REPLACE TABLE e2e_test.locations(e2e_testlocation_id INT PRIMARY KEY,e2e_teststreet_address VARCHAR (40),e2e_testpostal_code VARCHAR (12),e2e_testcity VARCHAR (30) NOT NULL,e2e_teststate_province VARCHAR (25),e2e_testcountry_id CHAR (2) NOT NULL);
|
||||||
|
CREATE OR REPLACE TABLE e2e_test.jobs(e2e_testjob_id INT PRIMARY KEY,e2e_testjob_title VARCHAR (35) NOT NULL,e2e_testmin_salary DECIMAL (8, 2),e2e_testmax_salary DECIMAL (8, 2));
|
||||||
|
CREATE OR REPLACE TABLE e2e_test.test_departments(e2e_testdepartment_id INT PRIMARY KEY,e2e_testdepartment_name VARCHAR (30) NOT NULL,e2e_testlocation_id INT);
|
||||||
|
CREATE OR REPLACE TABLE e2e_test.test_employees(e2e_testemployee_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (20),e2e_testlast_name VARCHAR (25) NOT NULL,e2e_testemail VARCHAR (100) NOT NULL,e2e_testphone_number VARCHAR (20),e2e_testhire_date DATE NOT NULL,e2e_testjob_id INT NOT NULL,e2e_testsalary DECIMAL (8, 2) NOT NULL,e2e_testmanager_id INT,e2e_testdepartment_id INT);
|
||||||
|
CREATE OR REPLACE TABLE e2e_test.test_dependents(e2e_testdependent_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (50) NOT NULL,e2e_testlast_name VARCHAR (50) NOT NULL,e2e_testrelationship VARCHAR (25) NOT NULL,e2e_testemployee_id INT NOT NULL);
|
34
ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml
Normal file
34
ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
source:
|
||||||
|
type: snowflake
|
||||||
|
serviceName: local_snowflake
|
||||||
|
serviceConnection:
|
||||||
|
config:
|
||||||
|
username: $E2E_SNOWFLAKE_USERNAME
|
||||||
|
password: $E2E_SNOWFLAKE_PASSWORD
|
||||||
|
account: $E2E_SNOWFLAKE_ACCOUNT
|
||||||
|
warehouse: $E2E_SNOWFLAKE_WAREHOUSE
|
||||||
|
database: $E2E_SNOWFLAKE_DATABASE
|
||||||
|
type: Snowflake
|
||||||
|
connectionOptions: {}
|
||||||
|
connectionArguments: {}
|
||||||
|
sourceConfig:
|
||||||
|
config:
|
||||||
|
markDeletedTables: true
|
||||||
|
includeTables: true
|
||||||
|
includeViews: true
|
||||||
|
type: DatabaseMetadata
|
||||||
|
schemaFilterPattern:
|
||||||
|
excludes:
|
||||||
|
- information_schema.*
|
||||||
|
|
||||||
|
sink:
|
||||||
|
type: metadata-rest
|
||||||
|
config: {}
|
||||||
|
workflowConfig:
|
||||||
|
openMetadataServerConfig:
|
||||||
|
hostPort: http://localhost:8585/api
|
||||||
|
authProvider: openmetadata
|
||||||
|
securityConfig:
|
||||||
|
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||||
|
|
||||||
|
|
@ -32,11 +32,16 @@ from metadata.ingestion.api.sink import SinkStatus
|
|||||||
from metadata.ingestion.api.source import SourceStatus
|
from metadata.ingestion.api.source import SourceStatus
|
||||||
from metadata.ingestion.api.workflow import Workflow
|
from metadata.ingestion.api.workflow import Workflow
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
|
from metadata.utils.constants import UTF_8
|
||||||
|
|
||||||
PATH_TO_RESOURCES = os.path.dirname(os.path.realpath(__file__))
|
PATH_TO_RESOURCES = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
|
||||||
class E2EType(Enum):
|
class E2EType(Enum):
|
||||||
|
"""
|
||||||
|
E2E Type Enum Class
|
||||||
|
"""
|
||||||
|
|
||||||
INGEST = "ingest"
|
INGEST = "ingest"
|
||||||
PROFILER = "profiler"
|
PROFILER = "profiler"
|
||||||
INGEST_FILTER_SCHEMA = "ingest-filter-schema"
|
INGEST_FILTER_SCHEMA = "ingest-filter-schema"
|
||||||
@ -45,7 +50,15 @@ class E2EType(Enum):
|
|||||||
|
|
||||||
|
|
||||||
class CliDBBase(TestCase):
|
class CliDBBase(TestCase):
|
||||||
class TestSuite(TestCase):
|
"""
|
||||||
|
CLI DB Base class
|
||||||
|
"""
|
||||||
|
|
||||||
|
class TestSuite(TestCase): # pylint: disable=too-many-public-methods
|
||||||
|
"""
|
||||||
|
TestSuite class to define test structure
|
||||||
|
"""
|
||||||
|
|
||||||
catcher = StringIO()
|
catcher = StringIO()
|
||||||
openmetadata: OpenMetadata
|
openmetadata: OpenMetadata
|
||||||
test_file_path: str
|
test_file_path: str
|
||||||
@ -206,11 +219,11 @@ class CliDBBase(TestCase):
|
|||||||
def build_config_file(
|
def build_config_file(
|
||||||
self, test_type: E2EType = E2EType.INGEST, extra_args: dict = None
|
self, test_type: E2EType = E2EType.INGEST, extra_args: dict = None
|
||||||
) -> None:
|
) -> None:
|
||||||
with open(self.config_file_path) as f:
|
with open(self.config_file_path, encoding=UTF_8) as config_file:
|
||||||
config_yaml = yaml.safe_load(f)
|
config_yaml = yaml.safe_load(config_file)
|
||||||
config_yaml = self.build_yaml(config_yaml, test_type, extra_args)
|
config_yaml = self.build_yaml(config_yaml, test_type, extra_args)
|
||||||
with open(self.test_file_path, "w") as w:
|
with open(self.test_file_path, "w", encoding=UTF_8) as test_file:
|
||||||
yaml.dump(config_yaml, w)
|
yaml.dump(config_yaml, test_file)
|
||||||
|
|
||||||
def retrieve_statuses(self, result):
|
def retrieve_statuses(self, result):
|
||||||
source_status: SourceStatus = self.extract_source_status(result)
|
source_status: SourceStatus = self.extract_source_status(result)
|
||||||
@ -253,7 +266,9 @@ class CliDBBase(TestCase):
|
|||||||
output_clean = re.findall(
|
output_clean = re.findall(
|
||||||
"Source Status: (.*?) Sink Status: .*", output_clean.strip()
|
"Source Status: (.*?) Sink Status: .*", output_clean.strip()
|
||||||
)
|
)
|
||||||
return SourceStatus.parse_obj(eval(output_clean[0].strip()))
|
return SourceStatus.parse_obj(
|
||||||
|
eval(output_clean[0].strip()) # pylint: disable=eval-used
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def extract_sink_status(output) -> SinkStatus:
|
def extract_sink_status(output) -> SinkStatus:
|
||||||
@ -264,7 +279,7 @@ class CliDBBase(TestCase):
|
|||||||
output_clean = re.findall(
|
output_clean = re.findall(
|
||||||
".* Sink Status: (.*?) Workflow finished.*", output_clean.strip()
|
".* Sink Status: (.*?) Workflow finished.*", output_clean.strip()
|
||||||
)[0].strip()
|
)[0].strip()
|
||||||
return SinkStatus.parse_obj(eval(output_clean))
|
return SinkStatus.parse_obj(eval(output_clean)) # pylint: disable=eval-used
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
@ -344,10 +359,17 @@ class CliDBBase(TestCase):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def build_yaml(config_yaml: dict, test_type: E2EType, extra_args: dict):
|
def build_yaml(config_yaml: dict, test_type: E2EType, extra_args: dict):
|
||||||
|
"""
|
||||||
|
Build yaml as per E2EType
|
||||||
|
"""
|
||||||
if test_type == E2EType.PROFILER:
|
if test_type == E2EType.PROFILER:
|
||||||
del config_yaml["source"]["sourceConfig"]["config"]
|
del config_yaml["source"]["sourceConfig"]["config"]
|
||||||
config_yaml["source"]["sourceConfig"] = {
|
config_yaml["source"]["sourceConfig"] = {
|
||||||
"config": {"type": "Profiler", "generateSampleData": True}
|
"config": {
|
||||||
|
"type": "Profiler",
|
||||||
|
"generateSampleData": True,
|
||||||
|
"profileSample": 1,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
config_yaml["processor"] = {"type": "orm-profiler", "config": {}}
|
config_yaml["processor"] = {"type": "orm-profiler", "config": {}}
|
||||||
if test_type == E2EType.INGEST_FILTER_SCHEMA:
|
if test_type == E2EType.INGEST_FILTER_SCHEMA:
|
||||||
|
124
ingestion/tests/cli_e2e/test_cli_snowflake.py
Normal file
124
ingestion/tests/cli_e2e/test_cli_snowflake.py
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
# Copyright 2022 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Test Snowflake connector with CLI
|
||||||
|
"""
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from metadata.ingestion.api.sink import SinkStatus
|
||||||
|
from metadata.ingestion.api.source import SourceStatus
|
||||||
|
|
||||||
|
from .test_cli_db_base_common import CliCommonDB
|
||||||
|
|
||||||
|
|
||||||
|
class SnowflakeCliTest(CliCommonDB.TestSuite):
|
||||||
|
"""
|
||||||
|
Snowflake CLI Tests
|
||||||
|
"""
|
||||||
|
|
||||||
|
create_table_query: str = """
|
||||||
|
CREATE TABLE E2E_DB.e2e_test.persons (
|
||||||
|
person_id int,
|
||||||
|
full_name varchar(255)
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
|
||||||
|
create_view_query: str = """
|
||||||
|
CREATE VIEW E2E_DB.e2e_test.view_persons AS
|
||||||
|
SELECT person_id, full_name
|
||||||
|
FROM e2e_test.persons;
|
||||||
|
"""
|
||||||
|
|
||||||
|
insert_data_queries: List[str] = [
|
||||||
|
"INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1,'Peter Parker');",
|
||||||
|
"INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1, 'Clark Kent');",
|
||||||
|
]
|
||||||
|
|
||||||
|
drop_table_query: str = """
|
||||||
|
DROP TABLE IF EXISTS E2E_DB.e2e_test.persons;
|
||||||
|
"""
|
||||||
|
|
||||||
|
drop_view_query: str = """
|
||||||
|
DROP VIEW IF EXISTS E2E_DB.e2e_test.view_persons;
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_connector_name() -> str:
|
||||||
|
return "snowflake"
|
||||||
|
|
||||||
|
def assert_for_vanilla_ingestion(
|
||||||
|
self, source_status: SourceStatus, sink_status: SinkStatus
|
||||||
|
) -> None:
|
||||||
|
self.assertTrue(len(source_status.failures) == 0)
|
||||||
|
self.assertTrue(len(source_status.warnings) == 0)
|
||||||
|
self.assertTrue(len(source_status.filtered) == 1)
|
||||||
|
self.assertTrue(len(source_status.success) >= self.expected_tables())
|
||||||
|
self.assertTrue(len(sink_status.failures) == 0)
|
||||||
|
self.assertTrue(len(sink_status.warnings) == 0)
|
||||||
|
self.assertTrue(len(sink_status.records) > self.expected_tables())
|
||||||
|
|
||||||
|
def create_table_and_view(self) -> None:
|
||||||
|
with self.engine.connect() as connection:
|
||||||
|
connection.execute(self.create_table_query)
|
||||||
|
for insert_query in self.insert_data_queries:
|
||||||
|
connection.execute(insert_query)
|
||||||
|
connection.execute(self.create_view_query)
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
def delete_table_and_view(self) -> None:
|
||||||
|
with self.engine.connect() as connection:
|
||||||
|
connection.execute(self.drop_view_query)
|
||||||
|
connection.execute(self.drop_table_query)
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def expected_tables() -> int:
|
||||||
|
return 7
|
||||||
|
|
||||||
|
def inserted_rows_count(self) -> int:
|
||||||
|
return len(self.insert_data_queries)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def fqn_created_table() -> str:
|
||||||
|
return "local_snowflake.E2E_DB.E2E_TEST.PERSONS"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_includes_schemas() -> List[str]:
|
||||||
|
return ["e2e_test.*"]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_includes_tables() -> List[str]:
|
||||||
|
return ["^test.*"]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_excludes_tables() -> List[str]:
|
||||||
|
return [".*ons"]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def expected_filtered_schema_includes() -> int:
|
||||||
|
return 2
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def expected_filtered_schema_excludes() -> int:
|
||||||
|
return 1
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def expected_filtered_table_includes() -> int:
|
||||||
|
return 5
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def expected_filtered_table_excludes() -> int:
|
||||||
|
return 4
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def expected_filtered_mix() -> int:
|
||||||
|
return 6
|
Loading…
x
Reference in New Issue
Block a user