mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-03 14:13:06 +00:00
Add Datalake E2E (#12323)
This commit is contained in:
parent
79d3e5d05c
commit
b3ebe3f2ea
9
.github/workflows/py-cli-e2e-tests.yml
vendored
9
.github/workflows/py-cli-e2e-tests.yml
vendored
@ -25,7 +25,7 @@ jobs:
|
|||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
e2e-test: ['bigquery', 'dbt_redshift', 'metabase', 'mssql', 'mysql', 'redash', 'snowflake', 'tableau', 'powerbi', 'vertica', 'python', 'redshift', 'hive', 'quicksight']
|
e2e-test: ['bigquery', 'dbt_redshift', 'metabase', 'mssql', 'mysql', 'redash', 'snowflake', 'tableau', 'powerbi', 'vertica', 'python', 'redshift', 'hive', 'quicksight', 'datalake_s3']
|
||||||
environment: test
|
environment: test
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
@ -44,10 +44,10 @@ jobs:
|
|||||||
python-version: 3.9
|
python-version: 3.9
|
||||||
|
|
||||||
- name: configure aws credentials
|
- name: configure aws credentials
|
||||||
if: matrix.e2e-test == 'quicksight'
|
if: contains(['quicksight','datalake_s3'], matrix.e2e-test)
|
||||||
uses: aws-actions/configure-aws-credentials@v1
|
uses: aws-actions/configure-aws-credentials@v1
|
||||||
with:
|
with:
|
||||||
role-to-assume: ${{ secrets.E2E_QUICKSIGHT_IAM_ROLE_ARN }}
|
role-to-assume: ${{ secrets.E2E_AWS_IAM_ROLE_ARN }}
|
||||||
role-session-name: github-ci-aws-e2e-tests
|
role-session-name: github-ci-aws-e2e-tests
|
||||||
aws-region: ${{ secrets.E2E_AWS_REGION }}
|
aws-region: ${{ secrets.E2E_AWS_REGION }}
|
||||||
|
|
||||||
@ -134,6 +134,9 @@ jobs:
|
|||||||
E2E_HIVE_AUTH: ${{ secrets.E2E_HIVE_AUTH }}
|
E2E_HIVE_AUTH: ${{ secrets.E2E_HIVE_AUTH }}
|
||||||
E2E_QUICKSIGHT_AWS_ACCOUNTID: ${{ secrets.E2E_QUICKSIGHT_AWS_ACCOUNTID }}
|
E2E_QUICKSIGHT_AWS_ACCOUNTID: ${{ secrets.E2E_QUICKSIGHT_AWS_ACCOUNTID }}
|
||||||
E2E_QUICKSIGHT_REGION: ${{ secrets.E2E_QUICKSIGHT_REGION }}
|
E2E_QUICKSIGHT_REGION: ${{ secrets.E2E_QUICKSIGHT_REGION }}
|
||||||
|
E2E_DATALAKE_S3_BUCKET_NAME: ${{ secrets.E2E_DATALAKE_S3_BUCKET_NAME }}
|
||||||
|
E2E_DATALAKE_S3_PREFIX: ${{ secrets.E2E_DATALAKE_S3_PREFIX }}
|
||||||
|
E2E_DATALAKE_S3_REGION: ${{ E2E_DATALAKE_S3_REGION }}
|
||||||
run: |
|
run: |
|
||||||
source env/bin/activate
|
source env/bin/activate
|
||||||
export SITE_CUSTOMIZE_PATH=$(python -c "import site; import os; from pathlib import Path; print(os.path.relpath(site.getsitepackages()[0], str(Path.cwd())))")/sitecustomize.py
|
export SITE_CUSTOMIZE_PATH=$(python -c "import site; import os; from pathlib import Path; print(os.path.relpath(site.getsitepackages()[0], str(Path.cwd())))")/sitecustomize.py
|
||||||
|
@ -0,0 +1,25 @@
|
|||||||
|
source:
|
||||||
|
serviceConnection:
|
||||||
|
config:
|
||||||
|
bucketName: $E2E_DATALAKE_S3_BUCKET_NAME
|
||||||
|
prefix: $E2E_DATALAKE_S3_PREFIX
|
||||||
|
configSource:
|
||||||
|
securityConfig:
|
||||||
|
awsRegion: $E2E_DATALAKE_S3_REGION
|
||||||
|
type: Datalake
|
||||||
|
serviceName: aws_datalake
|
||||||
|
sourceConfig:
|
||||||
|
config:
|
||||||
|
type: DatabaseMetadata
|
||||||
|
type: datalake
|
||||||
|
sink:
|
||||||
|
config: {}
|
||||||
|
type: metadata-rest
|
||||||
|
workflowConfig:
|
||||||
|
loggerLevel: DEBUG
|
||||||
|
openMetadataServerConfig:
|
||||||
|
authProvider: openmetadata
|
||||||
|
hostPort: http://localhost:8585/api
|
||||||
|
securityConfig:
|
||||||
|
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||||
|
|
@ -12,74 +12,79 @@
|
|||||||
"""
|
"""
|
||||||
Test Datalake connector with CLI
|
Test Datalake connector with CLI
|
||||||
"""
|
"""
|
||||||
|
import urllib.parse
|
||||||
|
from pathlib import Path
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from metadata.ingestion.api.sink import SinkStatus
|
||||||
|
from metadata.ingestion.api.source import SourceStatus
|
||||||
|
from metadata.ingestion.api.workflow import Workflow
|
||||||
|
|
||||||
|
from .base.e2e_types import E2EType
|
||||||
|
from .base.test_cli import PATH_TO_RESOURCES
|
||||||
from .common.test_cli_db import CliCommonDB
|
from .common.test_cli_db import CliCommonDB
|
||||||
from .common_e2e_sqa_mixins import SQACommonMethods
|
from .common_e2e_sqa_mixins import SQACommonMethods
|
||||||
|
|
||||||
|
|
||||||
class MysqlCliTest(CliCommonDB.TestSuite, SQACommonMethods):
|
class DatalakeCliTest(CliCommonDB.TestSuite):
|
||||||
create_table_query: str = """
|
@classmethod
|
||||||
CREATE TABLE persons (
|
def setUpClass(cls) -> None:
|
||||||
person_id int,
|
connector = cls.get_connector_name()
|
||||||
full_name varchar(255)
|
workflow: Workflow = cls.get_workflow(
|
||||||
|
test_type=cls.get_test_type(), connector=connector
|
||||||
|
)
|
||||||
|
cls.openmetadata = workflow.source.metadata
|
||||||
|
cls.config_file_path = str(
|
||||||
|
Path(PATH_TO_RESOURCES + f"/database/{connector}/{connector}.yaml")
|
||||||
|
)
|
||||||
|
cls.test_file_path = str(
|
||||||
|
Path(PATH_TO_RESOURCES + f"/database/{connector}/test.yaml")
|
||||||
)
|
)
|
||||||
"""
|
|
||||||
|
|
||||||
create_view_query: str = """
|
def tearDown(self) -> None:
|
||||||
CREATE VIEW view_persons AS
|
pass
|
||||||
SELECT *
|
|
||||||
FROM openmetadata_db.persons;
|
|
||||||
"""
|
|
||||||
|
|
||||||
insert_data_queries: List[str] = [
|
def create_table_and_view(self):
|
||||||
"INSERT INTO persons (person_id, full_name) VALUES (1,'Peter Parker');",
|
pass
|
||||||
"INSERT INTO persons (person_id, full_name) VALUES (1, 'Clark Kent');",
|
|
||||||
]
|
|
||||||
|
|
||||||
drop_table_query: str = """
|
def delete_table_and_view(self):
|
||||||
DROP TABLE IF EXISTS openmetadata_db.persons;
|
pass
|
||||||
"""
|
|
||||||
|
|
||||||
drop_view_query: str = """
|
|
||||||
DROP VIEW IF EXISTS openmetadata_db.view_persons;
|
|
||||||
"""
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_connector_name() -> str:
|
def get_connector_name() -> str:
|
||||||
return "mysql"
|
return "datalake_s3"
|
||||||
|
|
||||||
def create_table_and_view(self) -> None:
|
|
||||||
SQACommonMethods.create_table_and_view(self)
|
|
||||||
|
|
||||||
def delete_table_and_view(self) -> None:
|
|
||||||
SQACommonMethods.delete_table_and_view(self)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def expected_tables() -> int:
|
def expected_tables() -> int:
|
||||||
return 49
|
return 7
|
||||||
|
|
||||||
def inserted_rows_count(self) -> int:
|
def inserted_rows_count(self) -> int:
|
||||||
return len(self.insert_data_queries)
|
return 100
|
||||||
|
|
||||||
def view_column_lineage_count(self) -> int:
|
def view_column_lineage_count(self) -> int:
|
||||||
return 2
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def fqn_created_table() -> str:
|
def fqn_created_table() -> str:
|
||||||
return "local_mysql.default.openmetadata_db.persons"
|
return 'aws_datalake.default.aws-datalake-e2e."sales/sales.csv"'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def fqn_deleted_table() -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_includes_schemas() -> List[str]:
|
def get_includes_schemas() -> List[str]:
|
||||||
return ["openmetadata_db.*"]
|
return ["aws-datalake-e2e"]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_includes_tables() -> List[str]:
|
def get_includes_tables() -> List[str]:
|
||||||
return ["entity_*"]
|
return [".*example.*"]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_excludes_tables() -> List[str]:
|
def get_excludes_tables() -> List[str]:
|
||||||
return [".*bot.*"]
|
return [".*test.*"]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def expected_filtered_schema_includes() -> int:
|
def expected_filtered_schema_includes() -> int:
|
||||||
@ -87,16 +92,42 @@ class MysqlCliTest(CliCommonDB.TestSuite, SQACommonMethods):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def expected_filtered_schema_excludes() -> int:
|
def expected_filtered_schema_excludes() -> int:
|
||||||
return 1
|
return 0
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def expected_filtered_table_includes() -> int:
|
def expected_filtered_table_includes() -> int:
|
||||||
return 48
|
return 7
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def expected_filtered_table_excludes() -> int:
|
def expected_filtered_table_excludes() -> int:
|
||||||
return 4
|
return 1
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def expected_filtered_mix() -> int:
|
def expected_filtered_mix() -> int:
|
||||||
return 48
|
return 7
|
||||||
|
|
||||||
|
def retrieve_lineage(self, entity_fqn: str) -> dict:
|
||||||
|
return self.openmetadata.client.get(
|
||||||
|
f"/lineage/table/name/{urllib.parse.quote_plus(entity_fqn)}?upstreamDepth=3&downstreamDepth=3"
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.order(2)
|
||||||
|
def test_create_table_with_profiler(self) -> None:
|
||||||
|
# delete table in case it exists
|
||||||
|
self.delete_table_and_view()
|
||||||
|
# create a table and a view
|
||||||
|
self.create_table_and_view()
|
||||||
|
# build config file for ingest
|
||||||
|
self.build_config_file()
|
||||||
|
# run ingest with new tables
|
||||||
|
self.run_command()
|
||||||
|
# build config file for profiler
|
||||||
|
self.build_config_file(
|
||||||
|
E2EType.PROFILER,
|
||||||
|
# Otherwise the sampling here takes too long
|
||||||
|
extra_args={"profileSample": 5, "includes": self.get_includes_schemas()},
|
||||||
|
)
|
||||||
|
# run profiler with new tables
|
||||||
|
result = self.run_command("profile")
|
||||||
|
sink_status, source_status = self.retrieve_statuses(result)
|
||||||
|
self.assert_for_table_with_profiler(source_status, sink_status)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user