diff --git a/.github/workflows/py-cli-e2e-tests.yml b/.github/workflows/py-cli-e2e-tests.yml new file mode 100644 index 00000000000..0d6e4d8fcc4 --- /dev/null +++ b/.github/workflows/py-cli-e2e-tests.yml @@ -0,0 +1,68 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: py-cli-e2e-tests +on: + schedule: + - cron: '0 0 * * *' + +jobs: + py-cli-e2e-tests: + runs-on: ubuntu-latest + strategy: + matrix: + py-version: ['3.9'] + e2e-test: ['mysql'] + + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Set up JDK 11 + uses: actions/setup-java@v2 + with: + java-version: '11' + distribution: 'adopt' + + - name: Set up Python ${{ matrix.py-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.py-version }} + + - name: Install Ubuntu dependencies + run: | + sudo apt-get install -y unixodbc-dev python3-venv librdkafka-dev gcc libsasl2-dev build-essential libssl-dev libffi-dev \ + unixodbc-dev libevent-dev python3-dev + + - name: Generate models + run: | + python3 -m venv env + source env/bin/activate + sudo make install_antlr_cli + make install_dev generate + + - name: Install open-metadata dependencies + run: | + source env/bin/activate + make install_all install_test + + - name: Start Server and Ingest Sample Data + env: + INGESTION_DEPENDENCY: "mysql,elasticsearch" + run: ./docker/run_local_docker.sh -m no-ui + timeout-minutes: 30 + + - name: Run Python Tests + env: + E2E_TEST: ${{ matrix.e2e-test }} + run: | + source env/bin/activate + python -m pytest -c ingestion/setup.cfg ingestion/tests/cli_e2e/test_cli_$E2E_TEST.py diff --git a/.github/workflows/py-tests.yml b/.github/workflows/py-tests.yml index d2daadd2b66..0b677ac7efd 100644 --- a/.github/workflows/py-tests.yml +++ b/.github/workflows/py-tests.yml @@ -76,3 +76,29 @@ jobs: run: | source env/bin/activate make run_python_tests + + # we have to pass these args values since we are working with the 'pull_request_target' trigger + - name: Push Results in PR to Sonar + uses: sonarsource/sonarcloud-github-action@master + if: ${{ github.event_name == 'pull_request_target' && matrix.py-version == '3.9' }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SONAR_TOKEN: ${{ secrets.INGESTION_SONAR_SECRET }} + with: + projectBaseDir: ingestion/ + args: > + -Dproject.settings=ingestion/sonar-project.properties + -Dsonar.pullrequest.key=${{ github.event.pull_request.number }} + -Dsonar.pullrequest.branch=${{ github.head_ref }} + -Dsonar.pullrequest.github.repository=OpenMetadata + -Dsonar.scm.revision=${{ github.event.pull_request.head.sha }} + -Dsonar.pullrequest.provider=github + + - name: Push Results to Sonar + uses: sonarsource/sonarcloud-github-action@master + if: ${{ github.event_name == 'push' && matrix.py-version == '3.9' }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SONAR_TOKEN: ${{ secrets.INGESTION_SONAR_SECRET }} + with: + projectBaseDir: ingestion/ diff --git a/ingestion/setup.py b/ingestion/setup.py index c798ef8b332..43c9b46cba2 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -146,6 +146,7 @@ test = { "pylint", "pytest==7.0.0", "pytest-cov", + "pytest-order", "faker", "coverage", # sklearn integration diff --git a/ingestion/src/metadata/ingestion/api/sink.py b/ingestion/src/metadata/ingestion/api/sink.py index 78f9eaf79b3..ac7c5564cf9 100644 --- a/ingestion/src/metadata/ingestion/api/sink.py +++ b/ingestion/src/metadata/ingestion/api/sink.py @@ -12,9 +12,11 @@ Abstract Sink definition to build a Workflow """ from abc import ABCMeta, abstractmethod -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import Any, Generic, List +from pydantic import BaseModel + from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -23,12 +25,10 @@ from metadata.ingestion.api.common import Entity from metadata.ingestion.api.status import Status -# pylint: disable=duplicate-code -@dataclass -class SinkStatus(Status): - records: List[str] = field(default_factory=list) - warnings: List[Any] = field(default_factory=list) - failures: List[Any] = field(default_factory=list) +class SinkStatus(BaseModel, Status): + records: List[str] = [] + warnings: List[Any] = [] + failures: List[Any] = [] def records_written(self, record: str) -> None: self.records.append(record) diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index cb4ffe37bc1..f3896b3b087 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -11,7 +11,6 @@ """ Generic source to build SQL connectors. """ - import traceback from abc import ABC from copy import deepcopy @@ -450,6 +449,7 @@ class CommonDbSourceService( def close(self): if self.connection is not None: self.connection.close() + self.engine.dispose() def fetch_table_tags( self, table_name: str, schema_name: str, inspector: Inspector diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index f0267f635c4..22cffd15fcf 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -200,18 +200,13 @@ class SQLSourceStatus(SourceStatus): Reports the source status after ingestion """ - success: List[str] = list() - failures: List[str] = list() - warnings: List[str] = list() - filtered: List[str] = list() - def scanned(self, record: str) -> None: self.success.append(record) logger.info(f"Scanned [{record}]") def filter(self, record: str, err: str) -> None: - self.filtered.append(record) - logger.warning(f"Filtered [{record}] due to {err}") + logger.info(f"Filtered [{record}] due to {err}") + self.filtered.append({record: err}) class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC): diff --git a/ingestion/tests/cli_e2e/README.md b/ingestion/tests/cli_e2e/README.md new file mode 100644 index 00000000000..05dc447376a --- /dev/null +++ b/ingestion/tests/cli_e2e/README.md @@ -0,0 +1,96 @@ +# E2E CLI tests + +Currently, it runs CLI tests for any database connector. + +- `test_cli_db_base` has 8 test definitions for database connectors. It is an abstract class. +- `test_cli_db_base_common` is another abstract class which for those connectors whose sources implement the `CommonDbSourceService` class. +- It partially implements some methods from `test_cli_db_base`. +- `test_cli_{connector}` is the specific connector test. More tests apart the ones implemented by the `test_cli_db_base` can be run inside this class. + +## How to add a database connector + +1. Use `test_cli_mysql.py` as example. Your connector E2E CLI test must follow the name convention: `test_cli_{connector}.py` and the test +class must extend from `CliCommonDB.TestSuite` if the connector's source implement the `CommonDbSourceService` class, otherwise, from `CliDBBase.TestSuite`. + +2. Add an ingestion YAML file with the service and the credentials of it. Use when possible a Dockerized environment, otherwise, remember to use environment +variables for sensitive information in case of external resources. On each test, the YAML file will be modified by the `build_yaml` method which will create +a copy of the file and prepare it for the tests. This way, we avoid adding (and maintaining) an extra YAML for each test. + +3. The `{connector}` name must be added in the list of connectors in the GH Action: `.github/workflows/py-cli-e2e-tests.yml` + +```yaml + +jobs: + py-cli-e2e-tests: + runs-on: ubuntu-latest + strategy: + matrix: + py-version: ['3.9'] + e2e-test: ['mysql', '{connector}'] +``` + +4. If it is a database connector whose source implement the `CommonDbSourceService` class, these methods must be overwritten: + +```python + # the connector name + def get_connector_name() -> str: + return "{connector}" + + # create using the SQLAlchemy engine a table, a view associated to it and add some rows to the table + def create_table_and_view(self) -> None: + pass + + # delete the view and table created using the SQLAlchemy engine + def delete_table_and_view(self) -> None: + pass + + # expected tables to be ingested + def expected_tables() -> int: + pass + + # numbers of rows added to the created table + def inserted_rows_count(self) -> int: + pass + + # created table FQN + def fqn_created_table() -> str: + pass + + # list of schemas patterns to be included in the schema filters + def get_includes_schemas() -> List[str]: + pass + + # list of table patterns to be included in the table filters + def get_includes_tables() -> List[str]: + pass + + # list of table patterns to be excluded in the table filters + def get_excludes_tables() -> List[str]: + pass + + # expected number of schemas to be filtered with the use of includes (get_includes_schemas) + def expected_filtered_schema_includes() -> int: + pass + + # expected number of schemas to be filtered with the use of excludes (get_includes_schemas) + def expected_filtered_schema_excludes() -> int: + pass + + # expected number of tables to be filtered with the use of includes (get_includes_tables) + def expected_filtered_table_includes() -> int: + pass + + # expected number of tables to be filtered with the use of excludes (get_includes_tables) + def expected_filtered_table_excludes() -> int: + pass + + # expected number of filter entities with the use of a mix of filters (get_includes_schemas, get_includes_tables, get_excludes_tables) + def expected_filtered_mix() -> int: + pass +``` + + + + + + diff --git a/ingestion/tests/cli_e2e/__init__.py b/ingestion/tests/cli_e2e/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/cli_e2e/database/mysql/mysql.yaml b/ingestion/tests/cli_e2e/database/mysql/mysql.yaml new file mode 100644 index 00000000000..ab216156fb5 --- /dev/null +++ b/ingestion/tests/cli_e2e/database/mysql/mysql.yaml @@ -0,0 +1,29 @@ +source: + type: mysql + serviceName: local_mysql + serviceConnection: + config: + type: Mysql + username: openmetadata_user + password: openmetadata_password + hostPort: localhost:3306 + databaseSchema: openmetadata_db + connectionOptions: {} + connectionArguments: {} + sourceConfig: + config: + markDeletedTables: true + includeTables: true + includeViews: true + type: DatabaseMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + + diff --git a/ingestion/tests/cli_e2e/test_cli_db_base.py b/ingestion/tests/cli_e2e/test_cli_db_base.py new file mode 100644 index 00000000000..e5842551a5f --- /dev/null +++ b/ingestion/tests/cli_e2e/test_cli_db_base.py @@ -0,0 +1,340 @@ +# Copyright 2022 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test database connectors with CLI +""" +import os +import re +from abc import abstractmethod +from enum import Enum +from pathlib import Path +from typing import List +from unittest import TestCase + +import pytest +import yaml +from click.testing import CliRunner, Result + +from metadata.cmd import metadata +from metadata.config.common import load_config_file +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.api.sink import SinkStatus +from metadata.ingestion.api.source import SourceStatus +from metadata.ingestion.api.workflow import Workflow +from metadata.ingestion.ometa.ometa_api import OpenMetadata + +PATH_TO_RESOURCES = os.path.dirname(os.path.realpath(__file__)) + + +class E2EType(Enum): + INGEST = "ingest" + PROFILER = "profiler" + INGEST_FILTER_SCHEMA = "ingest-filter-schema" + INGEST_FILTER_TABLE = "ingest-filter-table" + INGEST_FILTER_MIX = "ingest-filter-mix" + + +class CliDBBase(TestCase): + class TestSuite(TestCase): + + runner = CliRunner() + openmetadata: OpenMetadata + test_file_path: str + config_file_path: str + + # 1. deploy vanilla ingestion + @pytest.mark.order(1) + def test_vanilla_ingestion(self) -> None: + # build config file for ingest + self.build_config_file(E2EType.INGEST) + # run ingest with new tables + result = self.run_command() + sink_status, source_status = self.retrieve_statuses(result) + self.assert_for_vanilla_ingestion(source_status, sink_status) + + # 2. create a new table + deploy ingestion with views, sample data, and profiler + @pytest.mark.order(2) + def test_create_table_with_profiler(self) -> None: + # delete table in case it exists + self.delete_table_and_view() + # create a table and a view + self.create_table_and_view() + # build config file for ingest + self.build_config_file() + # run ingest with new tables + self.run_command() + # build config file for profiler + self.build_config_file(E2EType.PROFILER) + # run profiler with new tables + result = self.run_command("profile") + sink_status, source_status = self.retrieve_statuses(result) + self.assert_for_table_with_profiler(source_status, sink_status) + + # 3. delete the new table + deploy marking tables as deleted + @pytest.mark.order(3) + def test_delete_table_is_marked_as_deleted(self) -> None: + # delete table created in previous test + self.delete_table_and_view() + # build config file for ingest + self.build_config_file() + # run ingest + result = self.run_command() + sink_status, source_status = self.retrieve_statuses(result) + self.assert_for_delete_table_is_marked_as_deleted( + source_status, sink_status + ) + + # 4. vanilla ingestion + include schema filter pattern + @pytest.mark.order(4) + def test_schema_filter_includes(self) -> None: + # build config file for ingest with filters + self.build_config_file( + E2EType.INGEST_FILTER_SCHEMA, {"includes": self.get_includes_schemas()} + ) + # run ingest + result = self.run_command() + sink_status, source_status = self.retrieve_statuses(result) + self.assert_filtered_schemas_includes(source_status, sink_status) + + # 5. vanilla ingestion + exclude schema filter pattern + @pytest.mark.order(5) + def test_schema_filter_excludes(self) -> None: + # build config file for ingest with filters + self.build_config_file( + E2EType.INGEST_FILTER_SCHEMA, {"excludes": self.get_includes_schemas()} + ) + # run ingest + result = self.run_command() + sink_status, source_status = self.retrieve_statuses(result) + self.assert_filtered_schemas_excludes(source_status, sink_status) + + # 6. Vanilla ingestion + include table filter pattern + @pytest.mark.order(6) + def test_table_filter_includes(self) -> None: + # build config file for ingest with filters + self.build_config_file( + E2EType.INGEST_FILTER_TABLE, {"includes": self.get_includes_tables()} + ) + # run ingest + result = self.run_command() + sink_status, source_status = self.retrieve_statuses(result) + self.assert_filtered_tables_includes(source_status, sink_status) + + # 7. Vanilla ingestion + include table filter pattern + @pytest.mark.order(7) + def test_table_filter_excludes(self) -> None: + # build config file for ingest with filters + self.build_config_file( + E2EType.INGEST_FILTER_TABLE, {"excludes": self.get_includes_tables()} + ) + # run ingest + result = self.run_command() + sink_status, source_status = self.retrieve_statuses(result) + self.assert_filtered_tables_excludes(source_status, sink_status) + + # 8. Vanilla ingestion mixing filters + @pytest.mark.order(8) + def test_table_filter_mix(self) -> None: + # build config file for ingest with filters + self.build_config_file( + E2EType.INGEST_FILTER_MIX, + { + "schema": {"includes": self.get_includes_schemas()}, + "table": { + "includes": self.get_includes_tables(), + "excludes": self.get_excludes_tables(), + }, + }, + ) + # run ingest + result = self.run_command() + sink_status, source_status = self.retrieve_statuses(result) + self.assert_filtered_mix(source_status, sink_status) + + # 9. Run usage + @pytest.mark.order(9) + def test_usage(self) -> None: + # to be implemented + pass + + # 10. Run queries in the source (creates, inserts, views) and ingest metadata & Lineage + @pytest.mark.order(10) + def test_lineage(self) -> None: + # to be implemented + pass + + def run_command(self, command: str = "ingest") -> Result: + args = f"{command} -c {self.test_file_path}" + result = self.runner.invoke( + metadata, args=args.split(" "), catch_exceptions=False + ) + self.assertEqual(0, result.exit_code) + return result + + def build_config_file( + self, test_type: E2EType = E2EType.INGEST, extra_args: dict = None + ) -> None: + with open(self.config_file_path) as f: + config_yaml = yaml.safe_load(f) + config_yaml = self.build_yaml(config_yaml, test_type, extra_args) + with open(self.test_file_path, "w") as w: + yaml.dump(config_yaml, w) + + def retrieve_statuses(self, result): + source_status: SourceStatus = self.extract_source_status(result.output) + sink_status: SinkStatus = self.extract_sink_status(result.output) + return sink_status, source_status + + def retrieve_table(self, table_name_fqn: str) -> Table: + return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn) + + def retrieve_sample_data(self, table_name_fqn: str) -> Table: + return self.openmetadata.get_by_name( + entity=Table, fqn=table_name_fqn, fields=["sampleData"] + ) + + def retrieve_lineage(self, table_name_fqn: str) -> dict: + return self.openmetadata.client.get( + f"/lineage/table/name/{table_name_fqn}?upstreamDepth=3&downstreamDepth=3" + ) + + @staticmethod + def get_workflow(connector: str) -> Workflow: + config_file = Path( + PATH_TO_RESOURCES + f"/database/{connector}/{connector}.yaml" + ) + config_dict = load_config_file(config_file) + return Workflow.create(config_dict) + + @staticmethod + def extract_source_status(output) -> SourceStatus: + output_clean = output.replace("\n", " ") + output_clean = re.sub(" +", " ", output_clean) + if re.match(".* Processor Status: .*", output_clean): + output_clean = re.findall( + "Source Status: (.*?) Processor Status: .*", output_clean.strip() + )[0].strip() + else: + output_clean = re.findall( + "Source Status: (.*?) Sink Status: .*", output_clean.strip() + )[0].strip() + return SourceStatus.parse_obj(eval(output_clean)) + + @staticmethod + def extract_sink_status(output) -> SinkStatus: + output_clean = output.replace("\n", " ") + output_clean = re.sub(" +", " ", output_clean) + output_clean = re.findall( + ".* Sink Status: (.*?) Workflow finished.*", output_clean.strip() + )[0].strip() + return SinkStatus.parse_obj(eval(output_clean)) + + @staticmethod + @abstractmethod + def get_connector_name() -> str: + raise NotImplementedError() + + @abstractmethod + def create_table_and_view(self) -> None: + raise NotImplementedError() + + @abstractmethod + def delete_table_and_view(self) -> None: + raise NotImplementedError() + + @abstractmethod + def assert_for_vanilla_ingestion( + self, source_status: SourceStatus, sink_status: SinkStatus + ) -> None: + raise NotImplementedError() + + @abstractmethod + def assert_for_table_with_profiler( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + raise NotImplementedError() + + @abstractmethod + def assert_for_delete_table_is_marked_as_deleted( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + raise NotImplementedError() + + @abstractmethod + def assert_filtered_schemas_includes( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + raise NotImplementedError() + + @abstractmethod + def assert_filtered_schemas_excludes( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + raise NotImplementedError() + + @abstractmethod + def assert_filtered_tables_includes( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + raise NotImplementedError() + + @abstractmethod + def assert_filtered_tables_excludes( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + raise NotImplementedError() + + @abstractmethod + def assert_filtered_mix( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + raise NotImplementedError() + + @staticmethod + @abstractmethod + def get_includes_schemas() -> List[str]: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def get_includes_tables() -> List[str]: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def get_excludes_tables() -> List[str]: + raise NotImplementedError() + + @staticmethod + def build_yaml(config_yaml: dict, test_type: E2EType, extra_args: dict): + if test_type == E2EType.PROFILER: + del config_yaml["source"]["sourceConfig"]["config"] + config_yaml["source"]["sourceConfig"] = { + "config": {"type": "Profiler", "generateSampleData": True} + } + config_yaml["processor"] = {"type": "orm-profiler", "config": {}} + if test_type == E2EType.INGEST_FILTER_SCHEMA: + config_yaml["source"]["sourceConfig"]["config"][ + "schemaFilterPattern" + ] = extra_args + if test_type == E2EType.INGEST_FILTER_TABLE: + config_yaml["source"]["sourceConfig"]["config"][ + "tableFilterPattern" + ] = extra_args + if test_type == E2EType.INGEST_FILTER_MIX: + config_yaml["source"]["sourceConfig"]["config"][ + "schemaFilterPattern" + ] = extra_args["schema"] + config_yaml["source"]["sourceConfig"]["config"][ + "tableFilterPattern" + ] = extra_args["table"] + return config_yaml diff --git a/ingestion/tests/cli_e2e/test_cli_db_base_common.py b/ingestion/tests/cli_e2e/test_cli_db_base_common.py new file mode 100644 index 00000000000..19f42dfa883 --- /dev/null +++ b/ingestion/tests/cli_e2e/test_cli_db_base_common.py @@ -0,0 +1,162 @@ +# Copyright 2022 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test database connectors which extend from `CommonDbSourceService` with CLI +""" +from abc import ABC, abstractmethod +from pathlib import Path + +from sqlalchemy.engine import Engine + +from metadata.ingestion.api.sink import SinkStatus +from metadata.ingestion.api.source import SourceStatus +from metadata.ingestion.api.workflow import Workflow + +from .test_cli_db_base import PATH_TO_RESOURCES, CliDBBase + + +class CliCommonDB: + class TestSuite(CliDBBase.TestSuite, ABC): + + engine: Engine + + @classmethod + def setUpClass(cls) -> None: + connector = cls.get_connector_name() + workflow: Workflow = cls.get_workflow(connector) + cls.engine = workflow.source.engine + cls.openmetadata = workflow.source.metadata + cls.config_file_path = str( + Path(PATH_TO_RESOURCES + f"/database/{connector}/{connector}.yaml") + ) + cls.test_file_path = str( + Path(PATH_TO_RESOURCES + f"/database/{connector}/test.yaml") + ) + + def tearDown(self) -> None: + self.engine.dispose() + + def assert_for_vanilla_ingestion( + self, source_status: SourceStatus, sink_status: SinkStatus + ) -> None: + self.assertTrue(len(source_status.failures) == 0) + self.assertTrue(len(source_status.warnings) == 0) + self.assertTrue(len(source_status.filtered) == 0) + self.assertTrue(len(source_status.success) >= self.expected_tables()) + self.assertTrue(len(sink_status.failures) == 0) + self.assertTrue(len(sink_status.warnings) == 0) + self.assertTrue(len(sink_status.records) > self.expected_tables()) + + def assert_for_table_with_profiler( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + self.assertTrue(len(source_status.failures) == 0) + self.assertTrue(len(source_status.success) > self.expected_tables()) + self.assertTrue(len(sink_status.failures) == 0) + self.assertTrue(len(sink_status.records) > self.expected_tables()) + sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData + lineage = self.retrieve_lineage(self.fqn_created_table()) + self.assertTrue(len(sample_data.columns) == self.inserted_rows_count()) + self.assertTrue( + len(lineage["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]) + == self.inserted_rows_count() + ) + + def assert_for_delete_table_is_marked_as_deleted( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + self.assertEqual(self.retrieve_table(self.fqn_created_table()), None) + + def assert_filtered_schemas_includes( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + self.assertTrue((len(source_status.failures) == 0)) + self.assertTrue( + ( + len(source_status.filtered) + == self.expected_filtered_schema_includes() + ) + ) + + def assert_filtered_schemas_excludes( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + self.assertTrue((len(source_status.failures) == 0)) + self.assertTrue( + ( + len(source_status.filtered) + == self.expected_filtered_schema_excludes() + ) + ) + + def assert_filtered_tables_includes( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + self.assertTrue((len(source_status.failures) == 0)) + self.assertTrue( + (len(source_status.filtered) == self.expected_filtered_table_includes()) + ) + + def assert_filtered_tables_excludes( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + self.assertTrue((len(source_status.failures) == 0)) + self.assertTrue( + (len(source_status.filtered) == self.expected_filtered_table_excludes()) + ) + + def assert_filtered_mix( + self, source_status: SourceStatus, sink_status: SinkStatus + ): + self.assertTrue((len(source_status.failures) == 0)) + self.assertTrue( + (len(source_status.filtered) == self.expected_filtered_mix()) + ) + + @staticmethod + @abstractmethod + def expected_tables() -> int: + raise NotImplementedError() + + @abstractmethod + def inserted_rows_count(self) -> int: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def fqn_created_table() -> str: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def expected_filtered_schema_includes() -> int: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def expected_filtered_schema_excludes() -> int: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def expected_filtered_table_includes() -> int: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def expected_filtered_table_excludes() -> int: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def expected_filtered_mix() -> int: + raise NotImplementedError() diff --git a/ingestion/tests/cli_e2e/test_cli_mysql.py b/ingestion/tests/cli_e2e/test_cli_mysql.py new file mode 100644 index 00000000000..afb3af6b15b --- /dev/null +++ b/ingestion/tests/cli_e2e/test_cli_mysql.py @@ -0,0 +1,107 @@ +# Copyright 2022 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test MySql connector with CLI +""" +from typing import List + +from .test_cli_db_base_common import CliCommonDB + + +class MysqlCliTest(CliCommonDB.TestSuite): + + create_table_query: str = """ + CREATE TABLE persons ( + person_id int, + full_name varchar(255) + ) + """ + + create_view_query: str = """ + CREATE VIEW view_persons AS + SELECT * + FROM openmetadata_db.persons; + """ + + insert_data_queries: List[str] = [ + "INSERT INTO persons (person_id, full_name) VALUES (1,'Peter Parker');", + "INSERT INTO persons (person_id, full_name) VALUES (1, 'Clark Kent');", + ] + + drop_table_query: str = """ + DROP TABLE IF EXISTS openmetadata_db.persons; + """ + + drop_view_query: str = """ + DROP VIEW IF EXISTS openmetadata_db.view_persons; + """ + + @staticmethod + def get_connector_name() -> str: + return "mysql" + + def create_table_and_view(self) -> None: + with self.engine.connect() as connection: + connection.execute(self.create_table_query) + for insert_query in self.insert_data_queries: + connection.execute(insert_query) + connection.execute(self.create_view_query) + connection.close() + + def delete_table_and_view(self) -> None: + with self.engine.connect() as connection: + connection.execute(self.drop_view_query) + connection.execute(self.drop_table_query) + connection.close() + + @staticmethod + def expected_tables() -> int: + return 44 + + def inserted_rows_count(self) -> int: + return len(self.insert_data_queries) + + @staticmethod + def fqn_created_table() -> str: + return "local_mysql.default.openmetadata_db.persons" + + @staticmethod + def get_includes_schemas() -> List[str]: + return ["openmetadata_db.*"] + + @staticmethod + def get_includes_tables() -> List[str]: + return ["entity_*"] + + @staticmethod + def get_excludes_tables() -> List[str]: + return [".*bot.*"] + + @staticmethod + def expected_filtered_schema_includes() -> int: + return 0 + + @staticmethod + def expected_filtered_schema_excludes() -> int: + return 1 + + @staticmethod + def expected_filtered_table_includes() -> int: + return 40 + + @staticmethod + def expected_filtered_table_excludes() -> int: + return 4 + + @staticmethod + def expected_filtered_mix() -> int: + return 40 diff --git a/ingestion/tests/unit/metadata/utils/secrets/test_aws_ssm_secrets_manager.py b/ingestion/tests/unit/metadata/utils/secrets/test_aws_ssm_secrets_manager.py index 0c49047062d..6ce036d2916 100644 --- a/ingestion/tests/unit/metadata/utils/secrets/test_aws_ssm_secrets_manager.py +++ b/ingestion/tests/unit/metadata/utils/secrets/test_aws_ssm_secrets_manager.py @@ -13,7 +13,6 @@ Test AWS SSM Secrets Manager """ import json -from abc import ABC from typing import Any, Dict, List from unittest.mock import Mock @@ -24,7 +23,7 @@ from metadata.utils.secrets.aws_ssm_secrets_manager import AWSSSMSecretsManager from .test_aws_based_secrets_manager import AWSBasedSecretsManager -class TestAWSSecretsManager(AWSBasedSecretsManager.TestCase, ABC): +class TestAWSSecretsManager(AWSBasedSecretsManager.TestCase): def build_secret_manager( self, mocked_get_client: Mock,