Add CLI E2E tests for MySQL (#8041)

* Add CLI E2E tests for MySQL

* Fix setup.py and pylint

* Add missing doc and update code after pylint refactor
This commit is contained in:
Nahuel 2022-10-10 11:36:20 +02:00 committed by GitHub
parent 13b76dfd88
commit 5c499d2a7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 840 additions and 17 deletions

68
.github/workflows/py-cli-e2e-tests.yml vendored Normal file
View File

@ -0,0 +1,68 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: py-cli-e2e-tests
on:
schedule:
- cron: '0 0 * * *'
jobs:
py-cli-e2e-tests:
runs-on: ubuntu-latest
strategy:
matrix:
py-version: ['3.9']
e2e-test: ['mysql']
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v2
with:
java-version: '11'
distribution: 'adopt'
- name: Set up Python ${{ matrix.py-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.py-version }}
- name: Install Ubuntu dependencies
run: |
sudo apt-get install -y unixodbc-dev python3-venv librdkafka-dev gcc libsasl2-dev build-essential libssl-dev libffi-dev \
unixodbc-dev libevent-dev python3-dev
- name: Generate models
run: |
python3 -m venv env
source env/bin/activate
sudo make install_antlr_cli
make install_dev generate
- name: Install open-metadata dependencies
run: |
source env/bin/activate
make install_all install_test
- name: Start Server and Ingest Sample Data
env:
INGESTION_DEPENDENCY: "mysql,elasticsearch"
run: ./docker/run_local_docker.sh -m no-ui
timeout-minutes: 30
- name: Run Python Tests
env:
E2E_TEST: ${{ matrix.e2e-test }}
run: |
source env/bin/activate
python -m pytest -c ingestion/setup.cfg ingestion/tests/cli_e2e/test_cli_$E2E_TEST.py

View File

@ -76,3 +76,29 @@ jobs:
run: |
source env/bin/activate
make run_python_tests
# we have to pass these args values since we are working with the 'pull_request_target' trigger
- name: Push Results in PR to Sonar
uses: sonarsource/sonarcloud-github-action@master
if: ${{ github.event_name == 'pull_request_target' && matrix.py-version == '3.9' }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.INGESTION_SONAR_SECRET }}
with:
projectBaseDir: ingestion/
args: >
-Dproject.settings=ingestion/sonar-project.properties
-Dsonar.pullrequest.key=${{ github.event.pull_request.number }}
-Dsonar.pullrequest.branch=${{ github.head_ref }}
-Dsonar.pullrequest.github.repository=OpenMetadata
-Dsonar.scm.revision=${{ github.event.pull_request.head.sha }}
-Dsonar.pullrequest.provider=github
- name: Push Results to Sonar
uses: sonarsource/sonarcloud-github-action@master
if: ${{ github.event_name == 'push' && matrix.py-version == '3.9' }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.INGESTION_SONAR_SECRET }}
with:
projectBaseDir: ingestion/

View File

@ -146,6 +146,7 @@ test = {
"pylint",
"pytest==7.0.0",
"pytest-cov",
"pytest-order",
"faker",
"coverage",
# sklearn integration

View File

@ -12,9 +12,11 @@
Abstract Sink definition to build a Workflow
"""
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Any, Generic, List
from pydantic import BaseModel
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -23,12 +25,10 @@ from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.status import Status
# pylint: disable=duplicate-code
@dataclass
class SinkStatus(Status):
records: List[str] = field(default_factory=list)
warnings: List[Any] = field(default_factory=list)
failures: List[Any] = field(default_factory=list)
class SinkStatus(BaseModel, Status):
records: List[str] = []
warnings: List[Any] = []
failures: List[Any] = []
def records_written(self, record: str) -> None:
self.records.append(record)

View File

@ -11,7 +11,6 @@
"""
Generic source to build SQL connectors.
"""
import traceback
from abc import ABC
from copy import deepcopy
@ -450,6 +449,7 @@ class CommonDbSourceService(
def close(self):
if self.connection is not None:
self.connection.close()
self.engine.dispose()
def fetch_table_tags(
self, table_name: str, schema_name: str, inspector: Inspector

View File

@ -200,18 +200,13 @@ class SQLSourceStatus(SourceStatus):
Reports the source status after ingestion
"""
success: List[str] = list()
failures: List[str] = list()
warnings: List[str] = list()
filtered: List[str] = list()
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Scanned [{record}]")
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Filtered [{record}] due to {err}")
logger.info(f"Filtered [{record}] due to {err}")
self.filtered.append({record: err})
class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):

View File

@ -0,0 +1,96 @@
# E2E CLI tests
Currently, it runs CLI tests for any database connector.
- `test_cli_db_base` has 8 test definitions for database connectors. It is an abstract class.
- `test_cli_db_base_common` is another abstract class which for those connectors whose sources implement the `CommonDbSourceService` class.
- It partially implements some methods from `test_cli_db_base`.
- `test_cli_{connector}` is the specific connector test. More tests apart the ones implemented by the `test_cli_db_base` can be run inside this class.
## How to add a database connector
1. Use `test_cli_mysql.py` as example. Your connector E2E CLI test must follow the name convention: `test_cli_{connector}.py` and the test
class must extend from `CliCommonDB.TestSuite` if the connector's source implement the `CommonDbSourceService` class, otherwise, from `CliDBBase.TestSuite`.
2. Add an ingestion YAML file with the service and the credentials of it. Use when possible a Dockerized environment, otherwise, remember to use environment
variables for sensitive information in case of external resources. On each test, the YAML file will be modified by the `build_yaml` method which will create
a copy of the file and prepare it for the tests. This way, we avoid adding (and maintaining) an extra YAML for each test.
3. The `{connector}` name must be added in the list of connectors in the GH Action: `.github/workflows/py-cli-e2e-tests.yml`
```yaml
jobs:
py-cli-e2e-tests:
runs-on: ubuntu-latest
strategy:
matrix:
py-version: ['3.9']
e2e-test: ['mysql', '{connector}']
```
4. If it is a database connector whose source implement the `CommonDbSourceService` class, these methods must be overwritten:
```python
# the connector name
def get_connector_name() -> str:
return "{connector}"
# create using the SQLAlchemy engine a table, a view associated to it and add some rows to the table
def create_table_and_view(self) -> None:
pass
# delete the view and table created using the SQLAlchemy engine
def delete_table_and_view(self) -> None:
pass
# expected tables to be ingested
def expected_tables() -> int:
pass
# numbers of rows added to the created table
def inserted_rows_count(self) -> int:
pass
# created table FQN
def fqn_created_table() -> str:
pass
# list of schemas patterns to be included in the schema filters
def get_includes_schemas() -> List[str]:
pass
# list of table patterns to be included in the table filters
def get_includes_tables() -> List[str]:
pass
# list of table patterns to be excluded in the table filters
def get_excludes_tables() -> List[str]:
pass
# expected number of schemas to be filtered with the use of includes (get_includes_schemas)
def expected_filtered_schema_includes() -> int:
pass
# expected number of schemas to be filtered with the use of excludes (get_includes_schemas)
def expected_filtered_schema_excludes() -> int:
pass
# expected number of tables to be filtered with the use of includes (get_includes_tables)
def expected_filtered_table_includes() -> int:
pass
# expected number of tables to be filtered with the use of excludes (get_includes_tables)
def expected_filtered_table_excludes() -> int:
pass
# expected number of filter entities with the use of a mix of filters (get_includes_schemas, get_includes_tables, get_excludes_tables)
def expected_filtered_mix() -> int:
pass
```

View File

View File

@ -0,0 +1,29 @@
source:
type: mysql
serviceName: local_mysql
serviceConnection:
config:
type: Mysql
username: openmetadata_user
password: openmetadata_password
hostPort: localhost:3306
databaseSchema: openmetadata_db
connectionOptions: {}
connectionArguments: {}
sourceConfig:
config:
markDeletedTables: true
includeTables: true
includeViews: true
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -0,0 +1,340 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test database connectors with CLI
"""
import os
import re
from abc import abstractmethod
from enum import Enum
from pathlib import Path
from typing import List
from unittest import TestCase
import pytest
import yaml
from click.testing import CliRunner, Result
from metadata.cmd import metadata
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.api.sink import SinkStatus
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.ometa.ometa_api import OpenMetadata
PATH_TO_RESOURCES = os.path.dirname(os.path.realpath(__file__))
class E2EType(Enum):
INGEST = "ingest"
PROFILER = "profiler"
INGEST_FILTER_SCHEMA = "ingest-filter-schema"
INGEST_FILTER_TABLE = "ingest-filter-table"
INGEST_FILTER_MIX = "ingest-filter-mix"
class CliDBBase(TestCase):
class TestSuite(TestCase):
runner = CliRunner()
openmetadata: OpenMetadata
test_file_path: str
config_file_path: str
# 1. deploy vanilla ingestion
@pytest.mark.order(1)
def test_vanilla_ingestion(self) -> None:
# build config file for ingest
self.build_config_file(E2EType.INGEST)
# run ingest with new tables
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_vanilla_ingestion(source_status, sink_status)
# 2. create a new table + deploy ingestion with views, sample data, and profiler
@pytest.mark.order(2)
def test_create_table_with_profiler(self) -> None:
# delete table in case it exists
self.delete_table_and_view()
# create a table and a view
self.create_table_and_view()
# build config file for ingest
self.build_config_file()
# run ingest with new tables
self.run_command()
# build config file for profiler
self.build_config_file(E2EType.PROFILER)
# run profiler with new tables
result = self.run_command("profile")
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_table_with_profiler(source_status, sink_status)
# 3. delete the new table + deploy marking tables as deleted
@pytest.mark.order(3)
def test_delete_table_is_marked_as_deleted(self) -> None:
# delete table created in previous test
self.delete_table_and_view()
# build config file for ingest
self.build_config_file()
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_delete_table_is_marked_as_deleted(
source_status, sink_status
)
# 4. vanilla ingestion + include schema filter pattern
@pytest.mark.order(4)
def test_schema_filter_includes(self) -> None:
# build config file for ingest with filters
self.build_config_file(
E2EType.INGEST_FILTER_SCHEMA, {"includes": self.get_includes_schemas()}
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_schemas_includes(source_status, sink_status)
# 5. vanilla ingestion + exclude schema filter pattern
@pytest.mark.order(5)
def test_schema_filter_excludes(self) -> None:
# build config file for ingest with filters
self.build_config_file(
E2EType.INGEST_FILTER_SCHEMA, {"excludes": self.get_includes_schemas()}
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_schemas_excludes(source_status, sink_status)
# 6. Vanilla ingestion + include table filter pattern
@pytest.mark.order(6)
def test_table_filter_includes(self) -> None:
# build config file for ingest with filters
self.build_config_file(
E2EType.INGEST_FILTER_TABLE, {"includes": self.get_includes_tables()}
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_tables_includes(source_status, sink_status)
# 7. Vanilla ingestion + include table filter pattern
@pytest.mark.order(7)
def test_table_filter_excludes(self) -> None:
# build config file for ingest with filters
self.build_config_file(
E2EType.INGEST_FILTER_TABLE, {"excludes": self.get_includes_tables()}
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_tables_excludes(source_status, sink_status)
# 8. Vanilla ingestion mixing filters
@pytest.mark.order(8)
def test_table_filter_mix(self) -> None:
# build config file for ingest with filters
self.build_config_file(
E2EType.INGEST_FILTER_MIX,
{
"schema": {"includes": self.get_includes_schemas()},
"table": {
"includes": self.get_includes_tables(),
"excludes": self.get_excludes_tables(),
},
},
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_mix(source_status, sink_status)
# 9. Run usage
@pytest.mark.order(9)
def test_usage(self) -> None:
# to be implemented
pass
# 10. Run queries in the source (creates, inserts, views) and ingest metadata & Lineage
@pytest.mark.order(10)
def test_lineage(self) -> None:
# to be implemented
pass
def run_command(self, command: str = "ingest") -> Result:
args = f"{command} -c {self.test_file_path}"
result = self.runner.invoke(
metadata, args=args.split(" "), catch_exceptions=False
)
self.assertEqual(0, result.exit_code)
return result
def build_config_file(
self, test_type: E2EType = E2EType.INGEST, extra_args: dict = None
) -> None:
with open(self.config_file_path) as f:
config_yaml = yaml.safe_load(f)
config_yaml = self.build_yaml(config_yaml, test_type, extra_args)
with open(self.test_file_path, "w") as w:
yaml.dump(config_yaml, w)
def retrieve_statuses(self, result):
source_status: SourceStatus = self.extract_source_status(result.output)
sink_status: SinkStatus = self.extract_sink_status(result.output)
return sink_status, source_status
def retrieve_table(self, table_name_fqn: str) -> Table:
return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn)
def retrieve_sample_data(self, table_name_fqn: str) -> Table:
return self.openmetadata.get_by_name(
entity=Table, fqn=table_name_fqn, fields=["sampleData"]
)
def retrieve_lineage(self, table_name_fqn: str) -> dict:
return self.openmetadata.client.get(
f"/lineage/table/name/{table_name_fqn}?upstreamDepth=3&downstreamDepth=3"
)
@staticmethod
def get_workflow(connector: str) -> Workflow:
config_file = Path(
PATH_TO_RESOURCES + f"/database/{connector}/{connector}.yaml"
)
config_dict = load_config_file(config_file)
return Workflow.create(config_dict)
@staticmethod
def extract_source_status(output) -> SourceStatus:
output_clean = output.replace("\n", " ")
output_clean = re.sub(" +", " ", output_clean)
if re.match(".* Processor Status: .*", output_clean):
output_clean = re.findall(
"Source Status: (.*?) Processor Status: .*", output_clean.strip()
)[0].strip()
else:
output_clean = re.findall(
"Source Status: (.*?) Sink Status: .*", output_clean.strip()
)[0].strip()
return SourceStatus.parse_obj(eval(output_clean))
@staticmethod
def extract_sink_status(output) -> SinkStatus:
output_clean = output.replace("\n", " ")
output_clean = re.sub(" +", " ", output_clean)
output_clean = re.findall(
".* Sink Status: (.*?) Workflow finished.*", output_clean.strip()
)[0].strip()
return SinkStatus.parse_obj(eval(output_clean))
@staticmethod
@abstractmethod
def get_connector_name() -> str:
raise NotImplementedError()
@abstractmethod
def create_table_and_view(self) -> None:
raise NotImplementedError()
@abstractmethod
def delete_table_and_view(self) -> None:
raise NotImplementedError()
@abstractmethod
def assert_for_vanilla_ingestion(
self, source_status: SourceStatus, sink_status: SinkStatus
) -> None:
raise NotImplementedError()
@abstractmethod
def assert_for_table_with_profiler(
self, source_status: SourceStatus, sink_status: SinkStatus
):
raise NotImplementedError()
@abstractmethod
def assert_for_delete_table_is_marked_as_deleted(
self, source_status: SourceStatus, sink_status: SinkStatus
):
raise NotImplementedError()
@abstractmethod
def assert_filtered_schemas_includes(
self, source_status: SourceStatus, sink_status: SinkStatus
):
raise NotImplementedError()
@abstractmethod
def assert_filtered_schemas_excludes(
self, source_status: SourceStatus, sink_status: SinkStatus
):
raise NotImplementedError()
@abstractmethod
def assert_filtered_tables_includes(
self, source_status: SourceStatus, sink_status: SinkStatus
):
raise NotImplementedError()
@abstractmethod
def assert_filtered_tables_excludes(
self, source_status: SourceStatus, sink_status: SinkStatus
):
raise NotImplementedError()
@abstractmethod
def assert_filtered_mix(
self, source_status: SourceStatus, sink_status: SinkStatus
):
raise NotImplementedError()
@staticmethod
@abstractmethod
def get_includes_schemas() -> List[str]:
raise NotImplementedError()
@staticmethod
@abstractmethod
def get_includes_tables() -> List[str]:
raise NotImplementedError()
@staticmethod
@abstractmethod
def get_excludes_tables() -> List[str]:
raise NotImplementedError()
@staticmethod
def build_yaml(config_yaml: dict, test_type: E2EType, extra_args: dict):
if test_type == E2EType.PROFILER:
del config_yaml["source"]["sourceConfig"]["config"]
config_yaml["source"]["sourceConfig"] = {
"config": {"type": "Profiler", "generateSampleData": True}
}
config_yaml["processor"] = {"type": "orm-profiler", "config": {}}
if test_type == E2EType.INGEST_FILTER_SCHEMA:
config_yaml["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = extra_args
if test_type == E2EType.INGEST_FILTER_TABLE:
config_yaml["source"]["sourceConfig"]["config"][
"tableFilterPattern"
] = extra_args
if test_type == E2EType.INGEST_FILTER_MIX:
config_yaml["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = extra_args["schema"]
config_yaml["source"]["sourceConfig"]["config"][
"tableFilterPattern"
] = extra_args["table"]
return config_yaml

View File

@ -0,0 +1,162 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test database connectors which extend from `CommonDbSourceService` with CLI
"""
from abc import ABC, abstractmethod
from pathlib import Path
from sqlalchemy.engine import Engine
from metadata.ingestion.api.sink import SinkStatus
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.api.workflow import Workflow
from .test_cli_db_base import PATH_TO_RESOURCES, CliDBBase
class CliCommonDB:
class TestSuite(CliDBBase.TestSuite, ABC):
engine: Engine
@classmethod
def setUpClass(cls) -> None:
connector = cls.get_connector_name()
workflow: Workflow = cls.get_workflow(connector)
cls.engine = workflow.source.engine
cls.openmetadata = workflow.source.metadata
cls.config_file_path = str(
Path(PATH_TO_RESOURCES + f"/database/{connector}/{connector}.yaml")
)
cls.test_file_path = str(
Path(PATH_TO_RESOURCES + f"/database/{connector}/test.yaml")
)
def tearDown(self) -> None:
self.engine.dispose()
def assert_for_vanilla_ingestion(
self, source_status: SourceStatus, sink_status: SinkStatus
) -> None:
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(source_status.warnings) == 0)
self.assertTrue(len(source_status.filtered) == 0)
self.assertTrue(len(source_status.success) >= self.expected_tables())
self.assertTrue(len(sink_status.failures) == 0)
self.assertTrue(len(sink_status.warnings) == 0)
self.assertTrue(len(sink_status.records) > self.expected_tables())
def assert_for_table_with_profiler(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(source_status.success) > self.expected_tables())
self.assertTrue(len(sink_status.failures) == 0)
self.assertTrue(len(sink_status.records) > self.expected_tables())
sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData
lineage = self.retrieve_lineage(self.fqn_created_table())
self.assertTrue(len(sample_data.columns) == self.inserted_rows_count())
self.assertTrue(
len(lineage["downstreamEdges"][0]["lineageDetails"]["columnsLineage"])
== self.inserted_rows_count()
)
def assert_for_delete_table_is_marked_as_deleted(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertEqual(self.retrieve_table(self.fqn_created_table()), None)
def assert_filtered_schemas_includes(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(
len(source_status.filtered)
== self.expected_filtered_schema_includes()
)
)
def assert_filtered_schemas_excludes(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(
len(source_status.filtered)
== self.expected_filtered_schema_excludes()
)
)
def assert_filtered_tables_includes(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(len(source_status.filtered) == self.expected_filtered_table_includes())
)
def assert_filtered_tables_excludes(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(len(source_status.filtered) == self.expected_filtered_table_excludes())
)
def assert_filtered_mix(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(len(source_status.filtered) == self.expected_filtered_mix())
)
@staticmethod
@abstractmethod
def expected_tables() -> int:
raise NotImplementedError()
@abstractmethod
def inserted_rows_count(self) -> int:
raise NotImplementedError()
@staticmethod
@abstractmethod
def fqn_created_table() -> str:
raise NotImplementedError()
@staticmethod
@abstractmethod
def expected_filtered_schema_includes() -> int:
raise NotImplementedError()
@staticmethod
@abstractmethod
def expected_filtered_schema_excludes() -> int:
raise NotImplementedError()
@staticmethod
@abstractmethod
def expected_filtered_table_includes() -> int:
raise NotImplementedError()
@staticmethod
@abstractmethod
def expected_filtered_table_excludes() -> int:
raise NotImplementedError()
@staticmethod
@abstractmethod
def expected_filtered_mix() -> int:
raise NotImplementedError()

View File

@ -0,0 +1,107 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test MySql connector with CLI
"""
from typing import List
from .test_cli_db_base_common import CliCommonDB
class MysqlCliTest(CliCommonDB.TestSuite):
create_table_query: str = """
CREATE TABLE persons (
person_id int,
full_name varchar(255)
)
"""
create_view_query: str = """
CREATE VIEW view_persons AS
SELECT *
FROM openmetadata_db.persons;
"""
insert_data_queries: List[str] = [
"INSERT INTO persons (person_id, full_name) VALUES (1,'Peter Parker');",
"INSERT INTO persons (person_id, full_name) VALUES (1, 'Clark Kent');",
]
drop_table_query: str = """
DROP TABLE IF EXISTS openmetadata_db.persons;
"""
drop_view_query: str = """
DROP VIEW IF EXISTS openmetadata_db.view_persons;
"""
@staticmethod
def get_connector_name() -> str:
return "mysql"
def create_table_and_view(self) -> None:
with self.engine.connect() as connection:
connection.execute(self.create_table_query)
for insert_query in self.insert_data_queries:
connection.execute(insert_query)
connection.execute(self.create_view_query)
connection.close()
def delete_table_and_view(self) -> None:
with self.engine.connect() as connection:
connection.execute(self.drop_view_query)
connection.execute(self.drop_table_query)
connection.close()
@staticmethod
def expected_tables() -> int:
return 44
def inserted_rows_count(self) -> int:
return len(self.insert_data_queries)
@staticmethod
def fqn_created_table() -> str:
return "local_mysql.default.openmetadata_db.persons"
@staticmethod
def get_includes_schemas() -> List[str]:
return ["openmetadata_db.*"]
@staticmethod
def get_includes_tables() -> List[str]:
return ["entity_*"]
@staticmethod
def get_excludes_tables() -> List[str]:
return [".*bot.*"]
@staticmethod
def expected_filtered_schema_includes() -> int:
return 0
@staticmethod
def expected_filtered_schema_excludes() -> int:
return 1
@staticmethod
def expected_filtered_table_includes() -> int:
return 40
@staticmethod
def expected_filtered_table_excludes() -> int:
return 4
@staticmethod
def expected_filtered_mix() -> int:
return 40

View File

@ -13,7 +13,6 @@
Test AWS SSM Secrets Manager
"""
import json
from abc import ABC
from typing import Any, Dict, List
from unittest.mock import Mock
@ -24,7 +23,7 @@ from metadata.utils.secrets.aws_ssm_secrets_manager import AWSSSMSecretsManager
from .test_aws_based_secrets_manager import AWSBasedSecretsManager
class TestAWSSecretsManager(AWSBasedSecretsManager.TestCase, ABC):
class TestAWSSecretsManager(AWSBasedSecretsManager.TestCase):
def build_secret_manager(
self,
mocked_get_client: Mock,