MINOR: classification + test workflow for BQ multiproject (#20779)

* fix: classification + test workflow for BQ multiproject

* fix: deleted e2e test as handled from the UI

* fix: failing test case
This commit is contained in:
Teddy 2025-04-15 10:37:29 +02:00 committed by GitHub
parent 2197c00984
commit 1edeb0baf8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 60 additions and 1057 deletions

View File

@ -16,6 +16,8 @@ import traceback
from copy import deepcopy from copy import deepcopy
from typing import List, Optional from typing import List, Optional
from pydantic import RootModel
from metadata.data_quality.api.models import ( from metadata.data_quality.api.models import (
TableAndTests, TableAndTests,
TestCaseDefinition, TestCaseDefinition,
@ -27,7 +29,6 @@ from metadata.data_quality.runner.base_test_suite_source import BaseTestSuiteRun
from metadata.data_quality.runner.core import DataTestsRunner from metadata.data_quality.runner.core import DataTestsRunner
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.entity.services.ingestionPipelines.status import ( from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError, StackTraceError,
) )
@ -94,9 +95,8 @@ class TestCaseRunner(Processor):
record.table, openmetadata_test_cases record.table, openmetadata_test_cases
) )
test_suite_runner = self.get_test_suite_runner( self.config.source.serviceConnection = RootModel(record.service_connection)
record.table, record.service_connection test_suite_runner = self.get_test_suite_runner(record.table)
)
logger.debug( logger.debug(
f"Found {len(openmetadata_test_cases)} test cases for table {record.table.fullyQualifiedName.root}" f"Found {len(openmetadata_test_cases)} test cases for table {record.table.fullyQualifiedName.root}"
@ -354,9 +354,7 @@ class TestCaseRunner(Processor):
result.append(tc) result.append(tc)
return result return result
def get_test_suite_runner( def get_test_suite_runner(self, table: Table):
self, table: Table, service_connection: DatabaseConnection
):
return BaseTestSuiteRunner( return BaseTestSuiteRunner(
self.config, self.metadata, table, service_connection self.config, self.metadata, table
).get_data_quality_runner() ).get_data_quality_runner()

View File

@ -19,6 +19,9 @@ from metadata.data_quality.builders.validator_builder import ValidatorBuilder
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.data_quality.runner.core import DataTestsRunner from metadata.data_quality.runner.core import DataTestsRunner
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
@ -31,6 +34,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.sampler.models import SampleConfig from metadata.sampler.models import SampleConfig
from metadata.sampler.sampler_interface import SamplerInterface from metadata.sampler.sampler_interface import SamplerInterface
from metadata.utils.bigquery_utils import copy_service_config
from metadata.utils.profiler_utils import get_context_entities from metadata.utils.profiler_utils import get_context_entities
from metadata.utils.service_spec.service_spec import ( from metadata.utils.service_spec.service_spec import (
import_sampler_class, import_sampler_class,
@ -46,12 +50,11 @@ class BaseTestSuiteRunner:
config: OpenMetadataWorkflowConfig, config: OpenMetadataWorkflowConfig,
ometa_client: OpenMetadata, ometa_client: OpenMetadata,
entity: Table, entity: Table,
service_connection: DatabaseConnection,
): ):
self.validator_builder_class = ValidatorBuilder self.validator_builder_class = ValidatorBuilder
self._interface = None self._interface = None
self.entity = entity self.entity = entity
self.service_conn_config = self._copy_service_config(service_connection, self.entity.database) # type: ignore self.service_conn_config = self._copy_service_config(config, self.entity.database) # type: ignore
self._interface_type: str = self.service_conn_config.type.value.lower() self._interface_type: str = self.service_conn_config.type.value.lower()
self.source_config = TestSuitePipeline.model_validate( self.source_config = TestSuitePipeline.model_validate(
@ -68,7 +71,7 @@ class BaseTestSuiteRunner:
self._interface = interface self._interface = interface
def _copy_service_config( def _copy_service_config(
self, service_connection: DatabaseConnection, database: EntityReference self, config: OpenMetadataWorkflowConfig, database: EntityReference
) -> DatabaseConnection: ) -> DatabaseConnection:
"""Make a copy of the service config and update the database name """Make a copy of the service config and update the database name
@ -78,7 +81,10 @@ class BaseTestSuiteRunner:
Returns: Returns:
DatabaseService.__config__ DatabaseService.__config__
""" """
config_copy = deepcopy(service_connection.config) # type: ignore if isinstance(config.source.serviceConnection.root.config, BigQueryConnection):
return copy_service_config(config, database.name)
config_copy = deepcopy(config.source.serviceConnection.root.config) # type: ignore
if hasattr( if hasattr(
config_copy, # type: ignore config_copy, # type: ignore
"supportsDatabase", "supportsDatabase",

View File

@ -13,8 +13,6 @@
Bigquery Profiler source Bigquery Profiler source
""" """
from copy import deepcopy
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection, BigQueryConnection,
@ -22,12 +20,8 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig, OpenMetadataWorkflowConfig,
) )
from metadata.generated.schema.security.credentials.gcpValues import (
GcpCredentialsValues,
MultipleProjectId,
SingleProjectId,
)
from metadata.profiler.source.database.base.profiler_source import ProfilerSource from metadata.profiler.source.database.base.profiler_source import ProfilerSource
from metadata.utils.bigquery_utils import copy_service_config
class BigQueryProfilerSource(ProfilerSource): class BigQueryProfilerSource(ProfilerSource):
@ -46,16 +40,4 @@ class BigQueryProfilerSource(ProfilerSource):
Returns: Returns:
DatabaseConnection DatabaseConnection
""" """
config_copy: BigQueryConnection = deepcopy( return copy_service_config(config, database.name.root)
config.source.serviceConnection.root.config # type: ignore
)
if isinstance(config_copy.credentials.gcpConfig, GcpCredentialsValues):
if isinstance(
config_copy.credentials.gcpConfig.projectId, MultipleProjectId
):
config_copy.credentials.gcpConfig.projectId = SingleProjectId(
database.name.root
)
return config_copy

View File

@ -17,6 +17,9 @@ from typing import Optional, cast
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.entity.services.ingestionPipelines.status import ( from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError, StackTraceError,
@ -38,6 +41,7 @@ from metadata.profiler.source.metadata import ProfilerSourceAndEntity
from metadata.sampler.config import get_config_for_table from metadata.sampler.config import get_config_for_table
from metadata.sampler.models import SampleConfig, SampleData, SamplerResponse from metadata.sampler.models import SampleConfig, SampleData, SamplerResponse
from metadata.sampler.sampler_interface import SamplerInterface from metadata.sampler.sampler_interface import SamplerInterface
from metadata.utils.bigquery_utils import copy_service_config
from metadata.utils.profiler_utils import get_context_entities from metadata.utils.profiler_utils import get_context_entities
from metadata.utils.service_spec.service_spec import import_sampler_class from metadata.utils.service_spec.service_spec import import_sampler_class
@ -133,6 +137,9 @@ class SamplerProcessor(Processor):
Returns: Returns:
DatabaseService.__config__ DatabaseService.__config__
""" """
if isinstance(config.source.serviceConnection.root.config, BigQueryConnection):
return copy_service_config(config, database.name.root)
config_copy = deepcopy( config_copy = deepcopy(
config.source.serviceConnection.root.config # type: ignore config.source.serviceConnection.root.config # type: ignore
) )

View File

@ -13,10 +13,22 @@
Utils module of BigQuery Utils module of BigQuery
""" """
from copy import deepcopy
from typing import List, Optional from typing import List, Optional
from google.cloud import bigquery from google.cloud import bigquery
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.security.credentials.gcpValues import (
GcpCredentialsValues,
MultipleProjectId,
SingleProjectId,
)
from metadata.utils.credentials import ( from metadata.utils.credentials import (
get_gcp_default_credentials, get_gcp_default_credentials,
get_gcp_impersonate_credentials, get_gcp_impersonate_credentials,
@ -56,3 +68,26 @@ def get_bigquery_client(
return bigquery.Client( return bigquery.Client(
credentials=credentials, project=project_id, location=location credentials=credentials, project=project_id, location=location
) )
def copy_service_config(
config: OpenMetadataWorkflowConfig, database_name: str
) -> BigQueryConnection:
"""Handles multiple project id in the service config and replace it with the database name
Args:
config (OpenMetadataWorkflowConfig): openmetadata workflow config
database_name (str): database name
Returns:
BigQueryConnection: bigquery connection
"""
config_copy: BigQueryConnection = deepcopy(
config.source.serviceConnection.root.config # type: ignore
)
if isinstance(config_copy.credentials.gcpConfig, GcpCredentialsValues):
if isinstance(config_copy.credentials.gcpConfig.projectId, MultipleProjectId):
config_copy.credentials.gcpConfig.projectId = SingleProjectId(database_name)
return config_copy

View File

@ -1,77 +0,0 @@
# Playwright end-to-end tests
https://playwright.dev/python/docs/intro
## Structure
In the `e2e` folder you will find 2 folders and 1 file:
- `conftest.py`: defines some module scope fixture (module here is the `e2e` folder). All tests will use `init_with_redshift` by default -- ingestin metadata from a redshift service. The ingestion will only happens on the first test execution. The `create_data_consumer_user` allows tests to login as a Data Consumer and perform some actions
- `configs`: holds all the shared configuration. So far we have 2 main classes families (User and Connector) and common functions
- `entity`: holds entity related tests. It contains a subfolder per asset category. In the asset category folder you will find the `common_assertions.py`. This file contains all the common assertions to be ran for that specific asset.
## Install Dependencies and Run Tests
run `make install_e2e_tests`. Run `make run_e2e_tests`, you can also pass arguments such as `make run_e2e_tests ARGS="--browser webkit"` to run tests against webkit browser or `make run_e2e_tests ARGS="--headed --slowmo 100"` to run the tests in slowmo mode and head full.
## Adding a new test
The first step is to define the connector config for your source. this happens in `configs/connectors/<asset category>` folder. For a database connector, you will must ensure your class inherits from `DataBaseConnectorInterface`. You will then need to implement the `get_service()` and `set_connection()`. `get_service` specifies which service to choose from the `<assetCategory>/add-service` page of the webside and `set_connection` the different elements to configure on the connector connection config page. If you are unsure how an element can be accessed on the page you can run `playwright codegen http://localhost:8585/` -- more info [here](https://playwright.dev/python/docs/codegen). By default `DataBaseConnectorInterface` sets `self.supports_profiler_ingestion=True` which will result in the profiler ingestion to run when the test class is executed. You can `self.supports_profiler_ingestion=False` in your specific connector to override this behavior.
e.g.
```python
class DruidConnector(DataBaseConnectorInterface):
"""druid connector"""
def __init__(self, config):
super().__init__(config)
self.supports_profiler_ingestion=False
def set_connection():
...
def get_service():
...
```
Once your connector config has been created you will need to add a new test. Simply create a new file in the asset category of your choice (e.g. `entity/database/test_druid.py`). In this file create a new test class and mark this class with `@pytest.mark.usefixtures("setUpClass")` and `@pytest.mark.parametrize("setUpClass", ...)`. The first mark will make sure `setUpClass` fixture is ran before running your tests (this manage the ingestion of metadata and profiler as of Oct-25 2023) and `@pytest.mark.parametrize` will pass the right connector class to the `setUpClass` fixture. The second argument of `@pytest.mark.parametrize` should be as below
```python
[
{
"connector_obj": <connectorClassConfig>(
ConnectorTestConfig(...)
)
}
]
```
`ConnectorTestConfig` defines the configuration to use for the test. It has 2 arguments:
- `ingestion`: This allows you to define the different filtering when performing the ingestion. it expects a `ConnectorIngestionTestConfig` which will take 2 arguments:
- `metadata`: this allows you to define metadata ingestion filters. It take a `IngestionTestConfig` which takes 3 arguments:
- `database`: it expects an `IngestionFilterConfig` class which takes 2 argumenst:
- `includes`: a list of str
- `excludes`: a list of str
- `schema_`: see `database`
- `table`: see `database`
- `profiler`: see `metadata`
- `validation`: this config can be used when we need to validate expectations against specific entities. As of Oct-25 2023 it is only used in the `assert_profile_data`, `assert_sample_data_ingestion` and `assert_pii_column_auto_tagging` test functions of the profiler.
Once you have set up your class you can create your test. There are currently (as of Oct-25 2023) 5 assertions that can be performed:
- assert pipeline status are `success`. You can refer to the implementation in the existing test
- `assert_change_database_owner`: assert the owner of a data can be changed
- `assert_profile_data`: assert table profile data summary are visible
- `assert_sample_data_ingestion`: assert sample data are ingested and visible
- `assert_pii_column_auto_tagging`: assert auto PII tagging from the profiler has been performed
Note that in every test method you define the following class attributes are accessible:
- `connector_obj`: `<connectorClassConfig>`` the connector class pass to `setUpClass` in the `@pytest.mark.parametrize`
- `service_name`: `str`` the name of the service that was created for the test
- `metadata_ingestion_status`: `PipelineState` the ingestion status of the metadata pipeline
- `profiler_ingestion_status`: `PipelineState` the ingestion status of the profiler pipeline.
## Test Coverage
| **tests** | redshift | druid | hive |
|-----------------------------|:--------:|:-----:|:----:|
| metadata ingestion | ✅ | ✅ | ✅ |
| profiler ingestion | ✅ | ✅ | ✅ |
| change DB owner | ✅ | ✅ | ✅ |
| Table Profiler Summary Data | ✅ | ✅ | ✅ |
| Sample data visible | ✅ | ✅ | ✅ |
| Profiler PII auto Tag | ✅ | ✅ | ❌ |

View File

@ -1,60 +0,0 @@
"""common navigation functions"""
import random
import string
from playwright.sync_api import Page, expect
from .users.user import User
BASE_URL = "http://localhost:8585"
def go_to_service(service_type: str, page: Page, service_name: str):
"""navigate to the given service page
Args:
service_type (str): service type
page (Page): playwright page
service_name (str): service name
"""
page.get_by_test_id("app-bar-item-settings").click()
page.get_by_text(service_type).click()
page.get_by_test_id(f"service-name-{service_name}").click()
def create_user(page: Page, email: str, display_name: str, role: str) -> User:
"""create a user
Args:
page (Page): playwright page
email (str): user email
display_name (str): user display name
role (str): user role
Returns:
_type_: User
"""
page.get_by_test_id("app-bar-item-settings").click()
page.get_by_test_id("global-setting-left-panel").get_by_text("Users").click()
page.get_by_test_id("add-user").click()
page.get_by_test_id("email").click()
page.get_by_test_id("email").fill(email)
expect(page.get_by_test_id("email")).to_have_value(email)
page.get_by_test_id("displayName").fill(display_name)
expect(page.get_by_test_id("displayName")).to_have_value(display_name)
password = "".join(random.choice(string.ascii_uppercase) for _ in range(3))
password += "".join(random.choice(string.digits) for _ in range(3))
password += "".join(random.choice(string.ascii_lowercase) for _ in range(3))
password += "".join(random.choice("%^&*#@$!)(?") for _ in range(3))
page.get_by_label("Create Password").check()
page.get_by_placeholder("Enter Password").fill(password)
expect(page.get_by_placeholder("Enter Password")).to_have_value(password)
page.get_by_placeholder("Confirm Password").fill(password)
expect(page.get_by_placeholder("Confirm Password")).to_have_value(password)
page.get_by_test_id("roles-dropdown").locator("div").nth(1).click()
page.get_by_text(role).click()
page.get_by_test_id("save-user").click()
return User(email, password, display_name)

View File

@ -1,37 +0,0 @@
"""Redshift connector for e2e tests"""
import os
from playwright.sync_api import Page, expect
from .interface import DataBaseConnectorInterface
class Db2Connector(DataBaseConnectorInterface):
"""db2 connector"""
def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Db2").click()
def set_connection(self, page):
"""Set connection for redshift service"""
page.get_by_label("Username*").fill(os.environ["E2E_DB2_USERNAME"])
expect(page.get_by_label("Username*")).to_have_value(
os.environ["E2E_DB2_USERNAME"]
)
page.get_by_label("Password").fill(os.environ["E2E_DB2_PASSWORD"])
expect(page.get_by_label("Password")).to_have_value(
os.environ["E2E_DB2_PASSWORD"]
)
page.get_by_label("Host and Port*").fill(os.environ["E2E_DB2_HOST_PORT"])
expect(page.get_by_label("Host and Port*")).to_have_value(
os.environ["E2E_DB2_HOST_PORT"]
)
page.get_by_label("database*").fill(os.environ["E2E_DB2_DATABASE"])
expect(page.get_by_label("database*")).to_have_value(
os.environ["E2E_DB2_DATABASE"]
)

View File

@ -1,22 +0,0 @@
"""Redshift connector for e2e tests"""
import os
from playwright.sync_api import Page, expect
from .interface import DataBaseConnectorInterface
class DruidConnector(DataBaseConnectorInterface):
"""druid connector"""
def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Druid").click()
def set_connection(self, page):
"""Set connection for redshift service"""
page.get_by_label("Host and Port*").fill(os.environ["E2E_DRUID_HOST_PORT"])
expect(page.get_by_label("Host and Port*")).to_have_value(
os.environ["E2E_DRUID_HOST_PORT"]
)

View File

@ -1,24 +0,0 @@
"""MySQL connector for e2e tests"""
import os
from playwright.sync_api import Page, expect
from .interface import DataBaseConnectorInterface
class HiveConnector(DataBaseConnectorInterface):
def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Hive").click()
def set_connection(self, page):
"""Set connection for redshift service"""
page.locator('[id="root\\/hostPort"]').fill(os.environ["E2E_HIVE_HOST_PORT"])
expect(page.locator('[id="root\\/hostPort"]')).to_have_value(
os.environ["E2E_HIVE_HOST_PORT"]
)
page.locator('[id="root\\/metastoreConnection__oneof_select"]').select_option(
"2"
)

View File

@ -1,217 +0,0 @@
"""connectors interface"""
import random
import string
import time
from abc import ABC, abstractmethod
from time import sleep
from playwright.sync_api import Page, TimeoutError, expect
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
from ...connectors.model import ConnectorTestConfig, IngestionFilterConfig
BASE_URL = "http://localhost:8585"
class DataBaseConnectorInterface(ABC):
"""Interface for connectors class for e2e tests"""
def __init__(self, config: ConnectorTestConfig):
"""Initialize the connector"""
self.supports_profiler_ingestion = True
self.profiler_summary_card_count = 4
self.ingestion_config = config.ingestion
self.validation_config = config.validation
self.service_type = "Databases"
self.service_name = None
self.metadata_ingestion_pipeline_fqn = None
self.profiler_ingestion_pipeline_fqn = None
self.ometa = OpenMetadata(
OpenMetadataConnection(
hostPort=f"{BASE_URL}/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
),
)
)
def _check_and_handle_workflow(self, page: Page, ingestion_pipeline_fqn: str):
pipeline_status = None
try_ = 0
sleep(1)
# we'll iterate until we get a pipeline status
while not pipeline_status:
pipeline_status = self.ometa.get_pipeline_status_between_ts(
f"{self.service_name}.{ingestion_pipeline_fqn}",
get_beginning_of_day_timestamp_mill(),
get_end_of_day_timestamp_mill(),
)
if not pipeline_status and try_ > 10:
# if we don't get a pipeline status after trying 10 times
# we need to deploy the workflow
try:
page.get_by_role(
"row", name=f"{ingestion_pipeline_fqn}"
).get_by_test_id("re-deploy").click()
except TimeoutError:
page.get_by_role(
"row", name=f"{ingestion_pipeline_fqn}"
).get_by_test_id("deploy").click()
if try_ > 20:
# if we've tried 20 times, we'll raise an exception
raise TimeoutError("Pipeline status not found")
try_ += 1
@abstractmethod
def get_service(self, page: Page):
"""get service from the service page"""
raise NotImplementedError
@abstractmethod
def set_connection(self, page: Page):
"""Set connection for redshift service"""
raise NotImplementedError
@staticmethod
def generate_service_name():
"""Generate a random service name"""
chinese_char = "".join([chr(random.randint(0x4E00, 0x9FBF)) for _ in range(3)])
cyrillic_char = "".join([chr(random.randint(1072, 1104)) for _ in range(3)])
return (
"".join(random.choices(string.ascii_lowercase, k=10))
+ chinese_char
+ cyrillic_char
+ "_-1"
)
def _set_ingestion_filter(self, type_: str, page: Page):
"""Set schema filter for redshift service"""
filter_config: IngestionFilterConfig = getattr(self.ingestion_config, type_)
if not filter_config:
return
for container_type, value in filter_config:
if not value:
continue
if container_type == "schema_":
container_type = "schema"
for filter_type, filter_elements in value:
if not filter_elements:
continue
for element in filter_elements:
page.locator(
f'xpath=//*[@id="root/{container_type}FilterPattern/{filter_type}"]'
).fill(element)
def get_sorted_ingestion_pipeline_statues(
self, ingestion_pipeline_fqn: str, desc=True
):
statuses = self.ometa.get_pipeline_status_between_ts(
ingestion_pipeline_fqn,
get_beginning_of_day_timestamp_mill(),
get_end_of_day_timestamp_mill(),
)
return sorted(
statuses,
key=lambda x: x.startDate.root,
reverse=True if desc else False,
)
def get_pipeline_status(self, ingestion_pipeline_fqn: str):
# Not best practice. Should use `expect`, though playwright does not have a `wait_until` function
# we'll make a call to the API to get the pipeline status and check if it's success
status = None
timeout = time.time() + 60 * 5 # 5 minutes from now
while not status or status == PipelineState.running:
if time.time() > timeout:
raise TimeoutError(
"Pipeline with status {status} has been running for more than 5 minutes"
)
statuses = self.get_sorted_ingestion_pipeline_statues(
ingestion_pipeline_fqn,
)
# we'll get the state of the most recent pipeline run
status = statuses[0].pipelineState
if status != PipelineState.running:
break
return status
def create_service_ingest_metadata(self, page: Page):
"""Ingest redshift service data
Args:
page (Page): playwright page. Should be logged in and pointing to the home page
e.g. page.goto(f"{BASE_URL}/")
"""
page.get_by_test_id("app-bar-item-settings").click()
page.get_by_text(self.service_type).click()
page.get_by_test_id("add-service-button").click()
self.get_service(page)
page.get_by_test_id("next-button").click()
self.service_name = self.generate_service_name()
page.get_by_test_id("service-name").fill(self.service_name)
expect(page.get_by_test_id("service-name")).to_have_value(self.service_name)
page.get_by_test_id("next-button").click()
self.set_connection(page)
page.get_by_test_id("submit-btn").click()
page.get_by_test_id("add-ingestion-button").click()
self._set_ingestion_filter("metadata", page)
self.metadata_ingestion_pipeline_fqn = page.get_by_label("name*").input_value()
page.get_by_test_id("submit-btn").click()
page.get_by_test_id("deploy-button").click()
page.get_by_test_id("view-service-button").click()
page.get_by_test_id("ingestions").click()
self._check_and_handle_workflow(page, self.metadata_ingestion_pipeline_fqn)
return self.service_name
def create_profiler_workflow(self, page: Page):
"""create profiler workflow"""
page.get_by_test_id("app-bar-item-settings").click()
page.get_by_text("Databases").click()
page.get_by_test_id(f"service-name-{self.service_name}").click()
page.get_by_text("Ingestions").click()
page.get_by_test_id("add-new-ingestion-button").click()
page.get_by_text("Add Profiler Ingestion").click()
page.locator(
"div:nth-child(5) > div > div:nth-child(2) > .form-group > .ant-row > div:nth-child(2) > .ant-select > .ant-select-selector > .ant-select-selection-overflow"
).click()
self._set_ingestion_filter("profiler", page)
page.locator('[id="root\\/processPiiSensitive"]').click()
self.profiler_ingestion_pipeline_fqn = page.get_by_label("name*").input_value()
page.get_by_test_id("submit-btn").click()
page.get_by_test_id("deploy-button").click()
page.get_by_test_id("view-service-button").click()
page.get_by_test_id("ingestions").click()
self._check_and_handle_workflow(page, self.profiler_ingestion_pipeline_fqn)
def delete_service(self, page: Page):
"""Delete service"""
page.goto(f"{BASE_URL}/")
page.get_by_test_id("app-bar-item-settings").click()
page.get_by_text("Databases").click()
page.get_by_test_id(f"service-name-{self.service_name}").click()
page.get_by_test_id("manage-button").click()
page.get_by_test_id("delete-button-title").click()
page.get_by_test_id("confirmation-text-input").fill("DELETE")
expect(page.get_by_test_id("confirmation-text-input")).to_have_value("DELETE")
page.get_by_test_id("confirm-button").click()

View File

@ -1,32 +0,0 @@
"""Redshift connector for e2e tests"""
import os
from playwright.sync_api import Page, expect
from .interface import DataBaseConnectorInterface
class RedshiftConnector(DataBaseConnectorInterface):
def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Redshift").click()
def set_connection(self, page):
"""Set connection for redshift service"""
page.get_by_label("Username").fill(os.environ["E2E_REDSHIFT_USERNAME"])
expect(page.get_by_label("Username")).to_have_value(
os.environ["E2E_REDSHIFT_USERNAME"]
)
page.get_by_label("Password").fill(os.environ["E2E_REDSHIFT_PASSWORD"])
expect(page.get_by_label("Password")).to_have_value(
os.environ["E2E_REDSHIFT_PASSWORD"]
)
page.get_by_label("Host and Port").fill(os.environ["E2E_REDSHIFT_HOST_PORT"])
expect(page.get_by_label("Host and Port")).to_have_value(
os.environ["E2E_REDSHIFT_HOST_PORT"]
)
page.get_by_label("Database*").fill(os.environ["E2E_REDSHIFT_DB"])
expect(page.get_by_label("Database*")).to_have_value(
os.environ["E2E_REDSHIFT_DB"]
)

View File

@ -1,38 +0,0 @@
"""Connector model config for testing."""
from typing import Optional
from pydantic import BaseModel
class IngestionFilterConfig(BaseModel):
includes: Optional[list[str]] = []
excludes: Optional[list[str]] = []
class IngestionTestConfig(BaseModel):
database: Optional[IngestionFilterConfig]
schema_: Optional[IngestionFilterConfig]
table: Optional[IngestionFilterConfig]
class ConnectorIngestionTestConfig(BaseModel):
metadata: Optional[IngestionTestConfig]
profiler: Optional[IngestionTestConfig]
class ValidationTestConfig(BaseModel):
service: Optional[str]
database: Optional[str]
schema_: Optional[str]
table: Optional[str]
class ConnectorValidationTestConfig(BaseModel):
metadata: Optional[ValidationTestConfig]
profiler: Optional[ValidationTestConfig]
class ConnectorTestConfig(BaseModel):
ingestion: Optional[ConnectorIngestionTestConfig]
validation: Optional[ConnectorValidationTestConfig]

View File

@ -1,9 +0,0 @@
"""Admin user configuration for e2e tests."""
from ...configs.users.user import User
class Admin(User):
def __init__(self, username="admin", password="admin"):
"""Initialize the admin user."""
super().__init__(username, password)

View File

@ -1,36 +0,0 @@
"""Admin user configuration for e2e tests."""
import time
from typing import Optional
from playwright.sync_api import Page, expect
class User:
def __init__(
self, username: str, password: str, display_name: Optional[str] = None
):
"""Initialize the admin user."""
self.username = username
self.password = password
self.display_name = display_name
def login(self, page: Page):
"""Login as the user."""
page.get_by_label("Username or Email").fill(self.username)
page.get_by_label("Password").fill(self.password)
page.get_by_role("button", name="Login").click()
time.sleep(0.5)
page.reload()
expect(page.get_by_test_id("app-bar-item-explore")).to_be_visible()
def delete(self, page: Page):
"""Delete the user."""
page.get_by_test_id("app-bar-item-settings").click()
page.get_by_test_id("global-setting-left-panel").get_by_text("Users").click()
page.get_by_test_id("searchbar").fill(self.display_name) # type: ignore
page.get_by_test_id("searchbar").press("Enter")
page.get_by_role("row", name=self.display_name).get_by_role("button").click()
page.get_by_test_id("hard-delete").check()
page.get_by_test_id("confirmation-text-input").fill("DELETE")
page.get_by_test_id("confirm-button").click()

View File

@ -1,76 +0,0 @@
"""Module fixture for data quality e2e tests"""
import pytest
from playwright.sync_api import Browser, Page, expect
from .configs.common import go_to_service
from .configs.users.admin import Admin
TIMEOUT = 60000
BASE_URL = "http://localhost:8585"
expect.set_options(timeout=TIMEOUT)
def context(context):
"""Set default timeout for playwright context"""
context.set_default_timeout(TIMEOUT)
yield context
context.close()
@pytest.fixture(scope="session")
def browser_context_args(browser_context_args):
"""override default browser context args"""
return {
**browser_context_args,
"base_url": BASE_URL,
"java_script_enabled": True,
}
@pytest.fixture(scope="function")
def admin_page_context(page: Page):
page.goto("/")
Admin().login(page)
yield page
page.close()
@pytest.fixture(scope="class")
def setUpClass(browser: Browser, request): # pylint: disable=invalid-name
"""set up class for ingestion pipelines"""
context_ = browser.new_context(base_url=BASE_URL)
page = context_.new_page()
page.goto(f"{BASE_URL}/")
Admin().login(page)
connector_obj = request.param["connector_obj"]
request.cls.connector_obj = connector_obj
# create service and ingest metadata
connector_obj.create_service_ingest_metadata(page)
request.cls.service_name = connector_obj.service_name
page.get_by_text("Ingestions").click()
# Not best practice. Should use `expect`, though playwright does not have a `wait_until` function
# we'll make a call to the API to get the pipeline status and check if it's success
request.cls.metadata_ingestion_status = connector_obj.get_pipeline_status(
f"{connector_obj.service_name}.{connector_obj.metadata_ingestion_pipeline_fqn}"
)
if connector_obj.supports_profiler_ingestion:
connector_obj.create_profiler_workflow(page)
go_to_service("Databases", page, connector_obj.service_name)
page.get_by_text("Ingestions").click()
# Not best practice. Should use `expect`, though playwright does not have a `wait_until` function
# we'll make a call to the API to get the pipeline status and check if it's success
request.cls.profiler_ingestion_status = connector_obj.get_pipeline_status(
f"{connector_obj.service_name}.{connector_obj.profiler_ingestion_pipeline_fqn}"
)
else:
request.cls.profiler_ingestion_status = None
yield
connector_obj.delete_service(page)
context_.close()

View File

@ -1,80 +0,0 @@
"""common database assertions"""
import time
from playwright.sync_api import Page, expect
from ...configs.common import go_to_service
def assert_change_database_owner(page_context: Page, service_name: str):
"""assert database owner can be changed as expected"""
go_to_service("Databases", page_context, service_name)
page_context.get_by_test_id("edit-owner").click()
page_context.get_by_test_id("owner-select-users-search-bar").click()
page_context.get_by_test_id("owner-select-users-search-bar").fill("Aaron Johnson")
page_context.get_by_text("Aaron Johnson").click()
expect(
page_context.get_by_test_id("owner-label").get_by_test_id("owner-link")
).to_have_text("Aaron Johnson")
def assert_profile_data(
page_context: Page,
service_name: str,
database: str,
schema: str,
table: str,
connector_obj,
):
"""Assert profile data have been computed correctly"""
go_to_service("Databases", page_context, service_name)
page_context.get_by_role("link", name=database).click()
page_context.get_by_role("link", name=schema).click()
page_context.get_by_role("link", name=table, exact=True).click()
page_context.get_by_text("Profiler & Data Quality").click()
time.sleep(0.05)
for card in range(connector_obj.profiler_summary_card_count):
summary_card = page_context.get_by_test_id("summary-card-container").nth(card)
description = summary_card.get_by_test_id(
"summary-card-description"
).inner_text()
assert description not in {"0"}
def assert_sample_data_ingestion(
page_context: Page,
service_name: str,
database: str,
schema: str,
table: str,
):
"""assert sample data are ingested as expected"""
go_to_service("Databases", page_context, service_name)
page_context.get_by_role("link", name=database).click()
page_context.get_by_role("link", name=schema).click()
page_context.get_by_role("link", name=table, exact=True).click()
page_context.get_by_text("Sample Data").click()
expect(page_context.get_by_test_id("sample-data")).to_be_visible()
def assert_pii_column_auto_tagging(
page_context: Page,
service_name: str,
database: str,
schema: str,
table: str,
column: str,
):
"""assert pii column auto tagging tagged as expected"""
go_to_service("Databases", page_context, service_name)
page_context.get_by_role("link", name=database).click()
page_context.get_by_role("link", name=schema).click()
page_context.get_by_role("link", name=table, exact=True).click()
time.sleep(0.05)
table_row = page_context.locator(f'tr:has-text("{column}")')
tag = table_row.locator("td:nth-child(4)")
expect(tag).to_be_visible()
assert tag.text_content() in {"Sensitive", "NonSensitive"}

View File

@ -1,40 +0,0 @@
"""Test Db2 database ingestion."""
import pytest
from ...configs.connectors.database.db2 import Db2Connector
from ...configs.connectors.model import (
ConnectorIngestionTestConfig,
ConnectorTestConfig,
ConnectorValidationTestConfig,
IngestionFilterConfig,
IngestionTestConfig,
ValidationTestConfig,
)
@pytest.mark.parametrize(
"setUpClass",
[
{
"connector_obj": Db2Connector(
ConnectorTestConfig(
ingestion=ConnectorIngestionTestConfig(
metadata=IngestionTestConfig(
database=IngestionFilterConfig(includes=["testdb"]),
), # type: ignore
),
validation=ConnectorValidationTestConfig(
profiler=ValidationTestConfig(
database="testdb", schema_="sampledata", table="customer"
) # type: ignore
),
)
)
}
],
indirect=True,
)
@pytest.mark.usefixtures("setUpClass")
class TestDb2Connector:
"""We need to validate dependency can be installed in the test env."""

View File

@ -1,97 +0,0 @@
"""Test default database ingestion (Druid)."""
import pytest
from playwright.sync_api import Page
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from ...configs.connectors.database.druid import DruidConnector
from ...configs.connectors.model import (
ConnectorIngestionTestConfig,
ConnectorTestConfig,
ConnectorValidationTestConfig,
IngestionFilterConfig,
IngestionTestConfig,
ValidationTestConfig,
)
from ...entity.database.common_assertions import (
assert_change_database_owner,
assert_pii_column_auto_tagging,
assert_profile_data,
assert_sample_data_ingestion,
)
@pytest.mark.parametrize(
"setUpClass",
[
{
"connector_obj": DruidConnector(
ConnectorTestConfig(
ingestion=ConnectorIngestionTestConfig(
metadata=IngestionTestConfig(
schema_=IngestionFilterConfig(includes=["druid"]),
), # type: ignore
profiler=IngestionTestConfig(
schema_=IngestionFilterConfig(includes=["druid"]),
), # type: ignore
),
validation=ConnectorValidationTestConfig(
profiler=ValidationTestConfig(
database="default", schema_="druid", table="inline_data"
) # type: ignore
),
)
)
}
],
indirect=True,
)
@pytest.mark.usefixtures("setUpClass")
class TestDruidConnector:
"""Druid connector test case"""
def test_pipelines_statuses(self):
"""check ingestion pipelines ran successfully"""
assert self.metadata_ingestion_status == PipelineState.success
# if the connector does not support profiler ingestion return None as status
assert self.profiler_ingestion_status in {PipelineState.success, None}
def test_change_database_owner(self, admin_page_context: Page):
"""test change database owner"""
assert_change_database_owner(admin_page_context, self.service_name)
def test_check_profile_data(self, admin_page_context: Page):
"""check profile data are visible"""
assert_profile_data(
admin_page_context,
self.service_name,
self.connector_obj.validation_config.profiler.database,
self.connector_obj.validation_config.profiler.schema_,
self.connector_obj.validation_config.profiler.table,
self.connector_obj,
)
def test_sample_data_ingestion(self, admin_page_context: Page):
"""test sample dta is ingested as expected for the table"""
assert_sample_data_ingestion(
admin_page_context,
self.service_name,
self.connector_obj.validation_config.profiler.database,
self.connector_obj.validation_config.profiler.schema_,
self.connector_obj.validation_config.profiler.table,
)
def test_pii_colum_auto_tagging(self, admin_page_context: Page):
"""check pii column auto tagging tagged as expected"""
assert_pii_column_auto_tagging(
admin_page_context,
self.service_name,
self.connector_obj.validation_config.profiler.database,
self.connector_obj.validation_config.profiler.schema_,
self.connector_obj.validation_config.profiler.table,
"cityName",
)

View File

@ -1,81 +0,0 @@
"""Test Hive database ingestion."""
import pytest
from playwright.sync_api import Page
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from ...configs.connectors.database.hive import HiveConnector
from ...configs.connectors.model import (
ConnectorIngestionTestConfig,
ConnectorTestConfig,
ConnectorValidationTestConfig,
IngestionFilterConfig,
IngestionTestConfig,
ValidationTestConfig,
)
from ...entity.database.common_assertions import (
assert_change_database_owner,
assert_profile_data,
assert_sample_data_ingestion,
)
@pytest.mark.parametrize(
"setUpClass",
[
{
"connector_obj": HiveConnector(
ConnectorTestConfig(
ingestion=ConnectorIngestionTestConfig(
metadata=IngestionTestConfig(
database=IngestionFilterConfig(includes=["default"]),
), # type: ignore
),
validation=ConnectorValidationTestConfig(
profiler=ValidationTestConfig(
database="default", schema_="default", table="t1"
) # type: ignore
),
)
)
}
],
indirect=True,
)
@pytest.mark.usefixtures("setUpClass")
class TestHiveConnector:
"""Hive connector test case"""
def test_pipelines_statuses(self):
"""check ingestion pipelines ran successfully"""
assert self.metadata_ingestion_status == PipelineState.success
# if the connector does not support profiler ingestion return None as status
assert self.profiler_ingestion_status in {PipelineState.success, None}
def test_change_database_owner(self, admin_page_context: Page):
"""test change database owner"""
assert_change_database_owner(admin_page_context, self.service_name)
def test_check_profile_data(self, admin_page_context: Page):
"""check profile data are visible"""
assert_profile_data(
admin_page_context,
self.service_name,
self.connector_obj.validation_config.profiler.database,
self.connector_obj.validation_config.profiler.schema_,
self.connector_obj.validation_config.profiler.table,
self.connector_obj,
)
def test_sample_data_ingestion(self, admin_page_context: Page):
"""test sample dta is ingested as expected for the table"""
assert_sample_data_ingestion(
admin_page_context,
self.service_name,
self.connector_obj.validation_config.profiler.database,
self.connector_obj.validation_config.profiler.schema_,
self.connector_obj.validation_config.profiler.table,
)

View File

@ -1,99 +0,0 @@
"""Test default database ingestion (Redshift)."""
import pytest
from playwright.sync_api import Page
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
)
from ...configs.connectors.database.redshift import RedshiftConnector
from ...configs.connectors.model import (
ConnectorIngestionTestConfig,
ConnectorTestConfig,
ConnectorValidationTestConfig,
IngestionFilterConfig,
IngestionTestConfig,
ValidationTestConfig,
)
from ...entity.database.common_assertions import (
assert_change_database_owner,
assert_pii_column_auto_tagging,
assert_profile_data,
assert_sample_data_ingestion,
)
@pytest.mark.parametrize(
"setUpClass",
[
{
"connector_obj": RedshiftConnector(
ConnectorTestConfig(
ingestion=ConnectorIngestionTestConfig(
metadata=IngestionTestConfig(
schema_=IngestionFilterConfig(includes=["dbt_jaffle"]),
), # type: ignore
profiler=IngestionTestConfig(
table=IngestionFilterConfig(includes=["customer"]),
), # type: ignore
),
validation=ConnectorValidationTestConfig(
profiler=ValidationTestConfig(
database="e2e_cli_tests",
schema_="dbt_jaffle",
table="customer",
) # type: ignore
),
)
)
}
],
indirect=True,
)
@pytest.mark.usefixtures("setUpClass")
class TestRedshiftConnector:
"""Redshift connector test case"""
def test_pipelines_statuses(self):
"""check ingestion pipelines ran successfully"""
assert self.metadata_ingestion_status == PipelineState.success
# if the connector does not support profiler ingestion return None as status
assert self.profiler_ingestion_status in {PipelineState.success, None}
def test_change_database_owner(self, admin_page_context: Page):
"""test change database owner"""
assert_change_database_owner(admin_page_context, self.service_name)
def test_check_profile_data(self, admin_page_context: Page):
"""check profile data are visible"""
assert_profile_data(
admin_page_context,
self.service_name,
self.connector_obj.validation_config.profiler.database,
self.connector_obj.validation_config.profiler.schema_,
self.connector_obj.validation_config.profiler.table,
self.connector_obj,
)
def test_sample_data_ingestion(self, admin_page_context: Page):
"""test sample dta is ingested as expected for the table"""
assert_sample_data_ingestion(
admin_page_context,
self.service_name,
self.connector_obj.validation_config.profiler.database,
self.connector_obj.validation_config.profiler.schema_,
self.connector_obj.validation_config.profiler.table,
)
def test_pii_colum_auto_tagging(self, admin_page_context: Page):
"""check pii column auto tagging tagged as expected"""
assert_pii_column_auto_tagging(
admin_page_context,
self.service_name,
self.connector_obj.validation_config.profiler.database,
self.connector_obj.validation_config.profiler.schema_,
self.connector_obj.validation_config.profiler.table,
"c_name",
)