From 043f1e06016f66b00102f60b53ca6f9c6876c1b0 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Wed, 18 Jan 2023 18:59:16 +0530 Subject: [PATCH] Dbt e2e test (#9767) * Added base files * dbt e2e tests * Added tag condition based on UI feedback * Added owner and lineage tests Co-authored-by: Pere Miquel Brull --- .../ingestion/source/database/dbt/metadata.py | 11 +- ingestion/tests/cli_e2e/dbt/redshift/dbt.yaml | 19 ++ .../tests/cli_e2e/dbt/redshift/redshift.yaml | 24 +++ ingestion/tests/cli_e2e/test_cli_dbt_base.py | 189 ++++++++++++++++++ .../tests/cli_e2e/test_cli_dbt_redshift.py | 84 ++++++++ 5 files changed, 324 insertions(+), 3 deletions(-) create mode 100644 ingestion/tests/cli_e2e/dbt/redshift/dbt.yaml create mode 100644 ingestion/tests/cli_e2e/dbt/redshift/redshift.yaml create mode 100644 ingestion/tests/cli_e2e/test_cli_dbt_base.py create mode 100644 ingestion/tests/cli_e2e/test_cli_dbt_redshift.py diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 3cfed3f236e..141f804c51f 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -89,6 +89,11 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) self.report = SQLSourceStatus() + self.tag_classification_name = ( + self.source_config.dbtClassificationName + if self.source_config.dbtClassificationName + else "dbtTags" + ) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -148,7 +153,7 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods tagFQN=fqn.build( self.metadata, entity_type=Tag, - classification_name=self.source_config.dbtClassificationName, + classification_name=self.tag_classification_name, tag_name=tag.replace(".", ""), ), labelType=LabelType.Automated, @@ -266,11 +271,11 @@ class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods for tag_label in dbt_tag_labels or []: yield OMetaTagAndClassification( classification_request=CreateClassificationRequest( - name=self.source_config.dbtClassificationName, + name=self.tag_classification_name, description="dbt classification", ), tag_request=CreateTagRequest( - classification=self.source_config.dbtClassificationName, + classification=self.tag_classification_name, name=tag_label.tagFQN.__root__.split(".")[1], description="dbt Tags", ), diff --git a/ingestion/tests/cli_e2e/dbt/redshift/dbt.yaml b/ingestion/tests/cli_e2e/dbt/redshift/dbt.yaml new file mode 100644 index 00000000000..d3a054f3a50 --- /dev/null +++ b/ingestion/tests/cli_e2e/dbt/redshift/dbt.yaml @@ -0,0 +1,19 @@ +source: + type: dbt + serviceName: local_redshift + sourceConfig: + config: + type: DBT + dbtConfigSource: + dbtCatalogHttpPath: $E2E_REDSHIFT_DBT_CATALOG_HTTP_FILE_PATH + dbtManifestHttpPath: $E2E_REDSHIFT_DBT_MANIFEST_HTTP_FILE_PATH + dbtRunResultsHttpPath: $E2E_REDSHIFT_DBT_RUN_RESULTS_HTTP_FILE_PATH +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" diff --git a/ingestion/tests/cli_e2e/dbt/redshift/redshift.yaml b/ingestion/tests/cli_e2e/dbt/redshift/redshift.yaml new file mode 100644 index 00000000000..f9b9860a36c --- /dev/null +++ b/ingestion/tests/cli_e2e/dbt/redshift/redshift.yaml @@ -0,0 +1,24 @@ +source: + type: redshift + serviceName: local_redshift + serviceConnection: + config: + hostPort: $E2E_REDSHIFT_HOST_PORT + username: $E2E_REDSHIFT_USERNAME + password: $E2E_REDSHIFT_PASSWORD + database: $E2E_REDSHIFT_DATABASE + type: Redshift + sourceConfig: + config: + schemaFilterPattern: + includes: + - dbt_jaffle +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" diff --git a/ingestion/tests/cli_e2e/test_cli_dbt_base.py b/ingestion/tests/cli_e2e/test_cli_dbt_base.py new file mode 100644 index 00000000000..2a8caba2454 --- /dev/null +++ b/ingestion/tests/cli_e2e/test_cli_dbt_base.py @@ -0,0 +1,189 @@ +# Copyright 2022 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test DBT with CLI +""" +import os +import re +from abc import abstractmethod +from contextlib import redirect_stdout +from io import StringIO +from pathlib import Path +from typing import List +from unittest import TestCase + +import pytest + +from metadata.cmd import metadata +from metadata.config.common import load_config_file +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.tests.testCase import TestCase as OMTestCase +from metadata.generated.schema.tests.testSuite import TestSuite +from metadata.ingestion.api.sink import SinkStatus +from metadata.ingestion.api.source import SourceStatus +from metadata.ingestion.api.workflow import Workflow +from metadata.ingestion.ometa.ometa_api import OpenMetadata + +PATH_TO_RESOURCES = os.path.dirname(os.path.realpath(__file__)) + + +class CliDBTBase(TestCase): + class TestSuite(TestCase): + catcher = StringIO() + openmetadata: OpenMetadata + dbt_file_path: str + config_file_path: str + + # 1. deploy vanilla ingestion + @pytest.mark.order(1) + def test_connector_ingestion(self) -> None: + # run ingest with dbt tables + self.run_command(file_path=self.config_file_path) + result = self.catcher.getvalue() + self.catcher.truncate(0) + sink_status, source_status = self.retrieve_statuses(result) + self.assert_for_vanilla_ingestion(source_status, sink_status) + + # 2. deploy dbt ingestion + @pytest.mark.order(2) + def test_dbt_ingestion(self) -> None: + # run the dbt ingestion + self.run_command(file_path=self.dbt_file_path) + result = self.catcher.getvalue() + self.catcher.truncate(0) + sink_status, source_status = self.retrieve_statuses(result) + self.assert_for_dbt_ingestion(source_status, sink_status) + + # 3. run tests on dbt ingestion + @pytest.mark.order(3) + def test_entities(self) -> None: + for table_fqn in self.fqn_dbt_tables(): + table: Table = self.openmetadata.get_by_name( + entity=Table, fqn=table_fqn, fields="*" + ) + data_model = table.dataModel + self.assertTrue(len(data_model.columns) > 0) + self.assertIsNotNone(data_model.rawSql) + self.assertIsNotNone(data_model.sql) + self.assertIsNotNone(data_model.upstream) + self.assertIsNotNone(data_model.description) + self.assertIsNotNone(table.description) + self.assertIsNotNone(data_model.owner) + self.assertIsNotNone(table.owner) + self.assertTrue(len(data_model.tags) > 0) + self.assertTrue(len(table.tags) > 0) + + # 4. run tests on dbt test cases and test results + @pytest.mark.order(4) + def test_dbt_test_cases(self) -> None: + test_suite: TestSuite = self.openmetadata.get_by_name( + entity=TestSuite, fqn="DBT TEST SUITE" + ) + + test_case_entity_list = self.openmetadata.list_entities( + entity=OMTestCase, + fields=["testSuite", "entityLink", "testDefinition"], + params={"testSuiteId": test_suite.id.__root__}, + ) + self.assertTrue(len(test_case_entity_list.entities) == 23) + + # 5. test dbt lineage + @pytest.mark.order(5) + def test_lineage(self) -> None: + for table_fqn in self.fqn_dbt_tables(): + lineage = self.retrieve_lineage(table_fqn) + self.assertTrue(len(lineage["upstreamEdges"]) >= 4) + + def run_command(self, file_path: str, command: str = "ingest"): + args = [ + command, + "-c", + file_path, + ] + with redirect_stdout(self.catcher): + with self.assertRaises(SystemExit): + metadata(args) + + def retrieve_statuses(self, result): + source_status: SourceStatus = self.extract_source_status(result) + sink_status: SinkStatus = self.extract_sink_status(result) + return sink_status, source_status + + def retrieve_lineage(self, table_name_fqn: str) -> dict: + return self.openmetadata.client.get( + f"/lineage/table/name/{table_name_fqn}?upstreamDepth=3&downstreamDepth=3" + ) + + @staticmethod + def get_workflow(connector: str) -> Workflow: + config_file = Path(PATH_TO_RESOURCES + f"/dbt/{connector}/{connector}.yaml") + config_dict = load_config_file(config_file) + return Workflow.create(config_dict) + + @staticmethod + def extract_source_status(output) -> SourceStatus: + output_clean = output.replace("\n", " ") + output_clean = re.sub(" +", " ", output_clean) + output_clean_ansi = re.compile(r"\x1b[^m]*m") + output_clean = output_clean_ansi.sub(" ", output_clean) + if re.match(".* Processor Status: .*", output_clean): + output_clean = re.findall( + "Source Status: (.*?) Processor Status: .*", output_clean.strip() + ) + else: + output_clean = re.findall( + "Source Status: (.*?) Sink Status: .*", output_clean.strip() + ) + return SourceStatus.parse_obj(eval(output_clean[0].strip())) + + @staticmethod + def extract_sink_status(output) -> SinkStatus: + output_clean = output.replace("\n", " ") + output_clean = re.sub(" +", " ", output_clean) + output_clean_ansi = re.compile(r"\x1b[^m]*m") + output_clean = output_clean_ansi.sub("", output_clean) + output_clean = re.findall( + ".* Sink Status: (.*?) Workflow finished.*", output_clean.strip() + )[0].strip() + return SinkStatus.parse_obj(eval(output_clean)) + + @staticmethod + @abstractmethod + def get_connector_name() -> str: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def expected_tables() -> int: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def expected_records() -> int: + raise NotImplementedError() + + @staticmethod + @abstractmethod + def fqn_dbt_tables() -> List[str]: + raise NotImplementedError() + + @abstractmethod + def assert_for_vanilla_ingestion( + self, source_status: SourceStatus, sink_status: SinkStatus + ) -> None: + raise NotImplementedError() + + @abstractmethod + def assert_for_dbt_ingestion( + self, source_status: SourceStatus, sink_status: SinkStatus + ) -> None: + raise NotImplementedError() diff --git a/ingestion/tests/cli_e2e/test_cli_dbt_redshift.py b/ingestion/tests/cli_e2e/test_cli_dbt_redshift.py new file mode 100644 index 00000000000..ec56dc2694c --- /dev/null +++ b/ingestion/tests/cli_e2e/test_cli_dbt_redshift.py @@ -0,0 +1,84 @@ +# Copyright 2022 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test Redshift connector with CLI +""" +from pathlib import Path +from typing import List + +from sqlalchemy.engine import Engine + +from metadata.ingestion.api.sink import SinkStatus +from metadata.ingestion.api.source import SourceStatus +from metadata.ingestion.api.workflow import Workflow + +from .test_cli_dbt_base import PATH_TO_RESOURCES, CliDBTBase + + +class DbtCliTest(CliDBTBase.TestSuite): + + engine: Engine + + @classmethod + def setUpClass(cls) -> None: + connector = cls.get_connector_name() + workflow: Workflow = cls.get_workflow(connector) + cls.engine = workflow.source.engine + cls.openmetadata = workflow.source.metadata + cls.config_file_path = str( + Path(PATH_TO_RESOURCES + f"/dbt/{connector}/{connector}.yaml") + ) + cls.dbt_file_path = str(Path(PATH_TO_RESOURCES + f"/dbt/{connector}/dbt.yaml")) + + def tearDown(self) -> None: + self.engine.dispose() + + @staticmethod + def get_connector_name() -> str: + return "redshift" + + @staticmethod + def expected_tables() -> int: + return 9 + + @staticmethod + def expected_records() -> int: + return 72 + + @staticmethod + def fqn_dbt_tables() -> List[str]: + return [ + "local_redshift.dev.dbt_jaffle.customers", + "local_redshift.dev.dbt_jaffle.orders", + ] + + def assert_for_vanilla_ingestion( + self, source_status: SourceStatus, sink_status: SinkStatus + ) -> None: + self.assertTrue(len(source_status.failures) == 0) + self.assertTrue(len(source_status.warnings) == 0) + self.assertTrue(len(source_status.filtered) == 5) + self.assertTrue(len(source_status.success) >= self.expected_tables()) + self.assertTrue(len(sink_status.failures) == 0) + self.assertTrue(len(sink_status.warnings) == 0) + self.assertTrue(len(sink_status.records) > self.expected_tables()) + + def assert_for_dbt_ingestion( + self, source_status: SourceStatus, sink_status: SinkStatus + ) -> None: + self.assertTrue(len(source_status.failures) == 0) + self.assertTrue(len(source_status.warnings) == 0) + self.assertTrue(len(source_status.filtered) == 0) + self.assertTrue(len(source_status.success) >= 0) + self.assertTrue(len(sink_status.failures) == 0) + self.assertTrue(len(sink_status.warnings) == 0) + self.assertTrue(len(sink_status.records) >= self.expected_records())