diff --git a/.github/workflows/py-cli-e2e-tests.yml b/.github/workflows/py-cli-e2e-tests.yml index b8ca7685088..7ca0b31b8d7 100644 --- a/.github/workflows/py-cli-e2e-tests.yml +++ b/.github/workflows/py-cli-e2e-tests.yml @@ -25,7 +25,7 @@ jobs: strategy: fail-fast: false matrix: - e2e-test: ['bigquery', 'dbt_redshift', 'metabase', 'mssql', 'mysql', 'redash', 'snowflake', 'tableau', 'powerbi', 'vertica', 'python', 'redshift', 'hive', 'quicksight'] + e2e-test: ['bigquery', 'dbt_redshift', 'metabase', 'mssql', 'mysql', 'redash', 'snowflake', 'tableau', 'powerbi', 'vertica', 'python', 'redshift', 'hive', 'quicksight', 'datalake_s3'] environment: test steps: @@ -44,10 +44,10 @@ jobs: python-version: 3.9 - name: configure aws credentials - if: matrix.e2e-test == 'quicksight' + if: contains(['quicksight','datalake_s3'], matrix.e2e-test) uses: aws-actions/configure-aws-credentials@v1 with: - role-to-assume: ${{ secrets.E2E_QUICKSIGHT_IAM_ROLE_ARN }} + role-to-assume: ${{ secrets.E2E_AWS_IAM_ROLE_ARN }} role-session-name: github-ci-aws-e2e-tests aws-region: ${{ secrets.E2E_AWS_REGION }} @@ -134,6 +134,9 @@ jobs: E2E_HIVE_AUTH: ${{ secrets.E2E_HIVE_AUTH }} E2E_QUICKSIGHT_AWS_ACCOUNTID: ${{ secrets.E2E_QUICKSIGHT_AWS_ACCOUNTID }} E2E_QUICKSIGHT_REGION: ${{ secrets.E2E_QUICKSIGHT_REGION }} + E2E_DATALAKE_S3_BUCKET_NAME: ${{ secrets.E2E_DATALAKE_S3_BUCKET_NAME }} + E2E_DATALAKE_S3_PREFIX: ${{ secrets.E2E_DATALAKE_S3_PREFIX }} + E2E_DATALAKE_S3_REGION: ${{ E2E_DATALAKE_S3_REGION }} run: | source env/bin/activate export SITE_CUSTOMIZE_PATH=$(python -c "import site; import os; from pathlib import Path; print(os.path.relpath(site.getsitepackages()[0], str(Path.cwd())))")/sitecustomize.py diff --git a/ingestion/tests/cli_e2e/database/datalake_s3/datalake_s3.yaml b/ingestion/tests/cli_e2e/database/datalake_s3/datalake_s3.yaml new file mode 100644 index 00000000000..7b45d84fbc9 --- /dev/null +++ b/ingestion/tests/cli_e2e/database/datalake_s3/datalake_s3.yaml @@ -0,0 +1,25 @@ +source: + serviceConnection: + config: + bucketName: $E2E_DATALAKE_S3_BUCKET_NAME + prefix: $E2E_DATALAKE_S3_PREFIX + configSource: + securityConfig: + awsRegion: $E2E_DATALAKE_S3_REGION + type: Datalake + serviceName: aws_datalake + sourceConfig: + config: + type: DatabaseMetadata + type: datalake +sink: + config: {} + type: metadata-rest +workflowConfig: + loggerLevel: DEBUG + openMetadataServerConfig: + authProvider: openmetadata + hostPort: http://localhost:8585/api + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + diff --git a/ingestion/tests/cli_e2e/test_cli_datalake.py b/ingestion/tests/cli_e2e/test_cli_datalake.py index acc645e97d2..1a957d8f2b2 100644 --- a/ingestion/tests/cli_e2e/test_cli_datalake.py +++ b/ingestion/tests/cli_e2e/test_cli_datalake.py @@ -12,74 +12,79 @@ """ Test Datalake connector with CLI """ +import urllib.parse +from pathlib import Path from typing import List +import pytest + +from metadata.ingestion.api.sink import SinkStatus +from metadata.ingestion.api.source import SourceStatus +from metadata.ingestion.api.workflow import Workflow + +from .base.e2e_types import E2EType +from .base.test_cli import PATH_TO_RESOURCES from .common.test_cli_db import CliCommonDB from .common_e2e_sqa_mixins import SQACommonMethods -class MysqlCliTest(CliCommonDB.TestSuite, SQACommonMethods): - create_table_query: str = """ - CREATE TABLE persons ( - person_id int, - full_name varchar(255) +class DatalakeCliTest(CliCommonDB.TestSuite): + @classmethod + def setUpClass(cls) -> None: + connector = cls.get_connector_name() + workflow: Workflow = cls.get_workflow( + test_type=cls.get_test_type(), connector=connector + ) + cls.openmetadata = workflow.source.metadata + cls.config_file_path = str( + Path(PATH_TO_RESOURCES + f"/database/{connector}/{connector}.yaml") + ) + cls.test_file_path = str( + Path(PATH_TO_RESOURCES + f"/database/{connector}/test.yaml") ) - """ - create_view_query: str = """ - CREATE VIEW view_persons AS - SELECT * - FROM openmetadata_db.persons; - """ + def tearDown(self) -> None: + pass - insert_data_queries: List[str] = [ - "INSERT INTO persons (person_id, full_name) VALUES (1,'Peter Parker');", - "INSERT INTO persons (person_id, full_name) VALUES (1, 'Clark Kent');", - ] + def create_table_and_view(self): + pass - drop_table_query: str = """ - DROP TABLE IF EXISTS openmetadata_db.persons; - """ - - drop_view_query: str = """ - DROP VIEW IF EXISTS openmetadata_db.view_persons; - """ + def delete_table_and_view(self): + pass @staticmethod def get_connector_name() -> str: - return "mysql" - - def create_table_and_view(self) -> None: - SQACommonMethods.create_table_and_view(self) - - def delete_table_and_view(self) -> None: - SQACommonMethods.delete_table_and_view(self) + return "datalake_s3" @staticmethod def expected_tables() -> int: - return 49 + return 7 def inserted_rows_count(self) -> int: - return len(self.insert_data_queries) + return 100 def view_column_lineage_count(self) -> int: - return 2 + pass @staticmethod def fqn_created_table() -> str: - return "local_mysql.default.openmetadata_db.persons" + return 'aws_datalake.default.aws-datalake-e2e."sales/sales.csv"' + + @staticmethod + def fqn_deleted_table() -> None: + return None @staticmethod def get_includes_schemas() -> List[str]: - return ["openmetadata_db.*"] + return ["aws-datalake-e2e"] @staticmethod def get_includes_tables() -> List[str]: - return ["entity_*"] + return [".*example.*"] @staticmethod def get_excludes_tables() -> List[str]: - return [".*bot.*"] + return [".*test.*"] @staticmethod def expected_filtered_schema_includes() -> int: @@ -87,16 +92,42 @@ class MysqlCliTest(CliCommonDB.TestSuite, SQACommonMethods): @staticmethod def expected_filtered_schema_excludes() -> int: - return 1 + return 0 @staticmethod def expected_filtered_table_includes() -> int: - return 48 + return 7 @staticmethod def expected_filtered_table_excludes() -> int: - return 4 + return 1 @staticmethod def expected_filtered_mix() -> int: - return 48 + return 7 + + def retrieve_lineage(self, entity_fqn: str) -> dict: + return self.openmetadata.client.get( + f"/lineage/table/name/{urllib.parse.quote_plus(entity_fqn)}?upstreamDepth=3&downstreamDepth=3" + ) + + @pytest.mark.order(2) + def test_create_table_with_profiler(self) -> None: + # delete table in case it exists + self.delete_table_and_view() + # create a table and a view + self.create_table_and_view() + # build config file for ingest + self.build_config_file() + # run ingest with new tables + self.run_command() + # build config file for profiler + self.build_config_file( + E2EType.PROFILER, + # Otherwise the sampling here takes too long + extra_args={"profileSample": 5, "includes": self.get_includes_schemas()}, + ) + # run profiler with new tables + result = self.run_command("profile") + sink_status, source_status = self.retrieve_statuses(result) + self.assert_for_table_with_profiler(source_status, sink_status)