MINOR: fix sample data issue with Pydantic v2 and refactor python integration tests (#16943)

* tests: refactor

refactor tests and consolidate common functionality in integrations.conftest

this enables writing tests more concisely.
demonstrated with postgres and mssql.
will migrate more

* format

* removed helpers

* changed scope of fictures

* changed scope of fixtures

* added profiler test for mssql

* fixed import in data_quality test

* json safe serialization

* format

* set MARS_Connection

* use SerializableTableData instead of TableData

* deleted file test_postgres.py

* fixed tests

* added more test cases

* format

* changed name test_models.py

* removed the logic for serializing table data

* wip

* changed mapping in common type map

* changed mapping in common type map

* reverted TableData imports

* reverted TableData imports

* reverted TableData imports
This commit is contained in:
Imri Paran 2024-07-17 08:11:34 +02:00 committed by GitHub
parent e67ba6b14c
commit 0fee79b200
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 774 additions and 735 deletions

View File

@ -9,9 +9,9 @@ name = "openmetadata-ingestion"
version = "1.5.0.0.dev0"
dynamic = ["readme", "dependencies", "optional-dependencies"]
authors = [
{name = "OpenMetadata Committers"}
{ name = "OpenMetadata Committers" }
]
license = {file = "LICENSE"}
license = { file = "LICENSE" }
description = "Ingestion Framework for OpenMetadata"
requires-python = ">=3.8"
@ -21,7 +21,7 @@ Documentation = "https://docs.open-metadata.org/"
Source = "https://github.com/open-metadata/OpenMetadata"
[tool.setuptools.dynamic]
readme = {file = ["README.md"]}
readme = { file = ["README.md"] }
[tool.setuptools.packages.find]
where = ["./src"]
@ -72,7 +72,6 @@ disallow_incomplete_defs = false
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')"
]
norecursedirs = "tests/helpers"
[tool.pylint.BASIC]
# W1203: logging-fstring-interpolation - f-string brings better readability and unifies style

View File

@ -346,6 +346,7 @@ test = {
"pytest-order",
# install dbt dependency
"dbt-artifacts-parser",
"freezegun",
VERSIONS["sqlalchemy-databricks"],
VERSIONS["databricks-sdk"],
VERSIONS["scikit-learn"],
@ -382,6 +383,7 @@ test = {
*plugins["mssql"],
*plugins["dagster"],
*plugins["oracle"],
*plugins["mssql"],
}
e2e_test = {

View File

@ -2,4 +2,4 @@ import pytest
def xfail_param(param, reason):
return pytest.param(param, marks=pytest.mark.xfail(reason=reason))
return pytest.param(param, marks=pytest.mark.xfail(reason=reason, strict=True))

View File

@ -30,7 +30,7 @@ def try_bind(container, container_port, host_port):
yield container
@pytest.fixture(scope="session")
@pytest.fixture(scope="module")
def postgres_container(tmp_path_factory):
"""Start a PostgreSQL container with the dvdrental database."""
data_dir = tmp_path_factory.mktemp("data")

View File

@ -88,7 +88,7 @@ class ColumnTypeParser:
"BIGNUMERIC": "NUMERIC",
"BIGSERIAL": "BIGINT",
"BINARY": "BINARY",
"BIT": "INT",
"BIT": "BIT",
"BLOB": "BLOB",
"BOOL": "BOOLEAN",
"BOOLEAN": "BOOLEAN",
@ -298,15 +298,6 @@ class ColumnTypeParser:
_FIXED_DECIMAL = re.compile(r"(decimal|numeric)(\(\s*(\d+)\s*,\s*(\d+)\s*\))?")
try:
# pylint: disable=import-outside-toplevel
from sqlalchemy.dialects.mssql import BIT
_COLUMN_TYPE_MAPPING[BIT] = "BINARY"
_SOURCE_TYPE_TO_OM_TYPE["BIT"] = "BINARY"
except ImportError:
pass
try:
# pylint: disable=import-outside-toplevel
from teradatasqlalchemy import BYTE, VARBYTE

View File

@ -163,16 +163,4 @@ class MongoDB(NoSQLAdaptor):
return AggregationFunction.MIN
def execute(self, query: Executable) -> List[Dict[str, any]]:
records = list(query.to_executable(self.client))
result = []
for r in records:
result.append({c: self._json_safe(r.get(c)) for c in r})
return result
@staticmethod
def _json_safe(data: any):
try:
json.dumps(data)
return data
except Exception: # noqa
return str(data)
return list(query.to_executable(self.client))

View File

@ -12,7 +12,6 @@
"""
Common Class For Profiler Converter.
"""
from typing import Dict, Set
import sqlalchemy
@ -51,8 +50,8 @@ class CommonMapTypes:
DataType.CHAR: sqlalchemy.CHAR,
DataType.VARCHAR: sqlalchemy.VARCHAR,
DataType.BOOLEAN: sqlalchemy.BOOLEAN,
DataType.BINARY: sqlalchemy.LargeBinary,
DataType.VARBINARY: sqlalchemy.VARBINARY,
DataType.BINARY: CustomTypes.BYTES.value,
DataType.VARBINARY: CustomTypes.BYTES.value,
DataType.ARRAY: CustomTypes.ARRAY.value,
DataType.BLOB: CustomTypes.BYTES.value,
DataType.LONGBLOB: sqlalchemy.LargeBinary,
@ -81,7 +80,7 @@ class CommonMapTypes:
return self.return_custom_type(col, table_service_type)
def return_custom_type(self, col: Column, _):
return self._TYPE_MAP.get(col.dataType)
return self._TYPE_MAP.get(col.dataType, CustomTypes.UNDETERMINED.value)
@staticmethod
def map_sqa_to_om_types() -> Dict[TypeEngine, Set[DataType]]:

View File

@ -26,6 +26,7 @@ from metadata.profiler.orm.types.custom_hex_byte_string import HexByteString
from metadata.profiler.orm.types.custom_image import CustomImage
from metadata.profiler.orm.types.custom_ip import CustomIP
from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp
from metadata.profiler.orm.types.undetermined_type import UndeterminedType
from metadata.profiler.orm.types.uuid import UUIDString
from metadata.profiler.registry import TypeRegistry
@ -39,6 +40,7 @@ class CustomTypes(TypeRegistry):
IMAGE = CustomImage
IP = CustomIP
SQADATETIMERANGE = CustomDateTimeRange
UNDETERMINED = UndeterminedType
class Dialects(Enum):
@ -97,6 +99,8 @@ NOT_COMPUTE = {
DataType.JSON.value,
CustomTypes.ARRAY.value.__name__,
CustomTypes.SQADATETIMERANGE.value.__name__,
DataType.XML.value,
CustomTypes.UNDETERMINED.value.__name__,
}
FLOAT_SET = {sqlalchemy.types.DECIMAL, sqlalchemy.types.FLOAT}

View File

@ -42,7 +42,9 @@ class HexByteString(TypeDecorator):
Make sure the data is of correct type
"""
if not isinstance(value, (bytes, bytearray)):
raise TypeError("HexByteString columns support only bytes values.")
raise TypeError(
f"HexByteString columns support only bytes values. Received {type(value).__name__}."
)
def process_result_value(self, value: str, dialect) -> Optional[str]:
"""This is executed during result retrieval

View File

@ -0,0 +1,38 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=abstract-method
"""
Undetermined types for cases where we dont have typ mappings
"""
from sqlalchemy.sql.sqltypes import String, TypeDecorator
class UndeterminedType(TypeDecorator):
"""A fallback type for undetermined types returned from the database"""
impl = String
cache_ok = True
@property
def python_type(self):
return str
def process_result_value(self, value, _):
"""
We have no idea what is this type. So we just casr
"""
return (
f"OPENMETADATA_UNDETERMIND[{str(value)}]"
if value
else "OPENMETADATA_UNDETERMIND[]"
)

View File

@ -30,7 +30,9 @@ class NoSQLSampler(SamplerInterface):
for column in self.table.columns
]
rows, cols = self.transpose_records(records, columns)
return TableData(rows=rows, columns=[c.name for c in cols])
return TableData(
rows=[list(map(str, row)) for row in rows], columns=[c.name for c in cols]
)
def random_sample(self):
pass
@ -40,14 +42,17 @@ class NoSQLSampler(SamplerInterface):
return self._fetch_sample_data_from_user_query()
return self._fetch_sample_data(columns)
def _fetch_sample_data(self, columns: List[SQALikeColumn]):
def _fetch_sample_data(self, columns: List[SQALikeColumn]) -> TableData:
"""
returns sampled ometa dataframes
"""
limit = self._get_limit()
records = self.client.scan(self.table, self.table.columns, limit)
rows, cols = self.transpose_records(records, columns)
return TableData(rows=rows, columns=[col.name for col in cols])
return TableData(
rows=[list(map(str, row)) for row in rows],
columns=[col.name for col in cols],
)
def _get_limit(self) -> Optional[int]:
num_rows = self.client.item_count(self.table)

View File

@ -1,2 +0,0 @@
When working on pycharm, add this directory to the sources root so that
it resolves the modules properly.

View File

@ -1,22 +0,0 @@
import contextlib
import logging
import docker
@contextlib.contextmanager
def try_bind(container, container_port, *host_ports):
"""Try to bind a container locally on a specfic port and yield the container. If the port is already in use,
try another port. This is useful when running tests locally and we want to avoid having to reconfigure SQL clients
to connect to a different port each time we run the tests. Multiple ports can be passed so that the next available
port is tried if the previous one is already in use.
"""
for host_port in host_ports:
try:
with container.with_bind_ports(container_port, host_port) as container:
yield container
return
except docker.errors.APIError:
logging.warning("Port %s is already in use, trying another port", host_port)
with container.with_bind_ports(container_port, None) as container:
yield container

View File

@ -1,27 +1,25 @@
import logging
import os
import sys
import pytest
from _openmetadata_testutils.ometa import int_admin_ometa
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.ingestion import IngestionWorkflow
if not sys.version_info >= (3, 9):
collect_ignore = ["trino"]
def pytest_configure():
helpers_path = os.path.abspath(os.path.dirname(__file__) + "/../helpers")
sys.path.insert(0, helpers_path)
@pytest.fixture(scope="session", autouse=True)
def configure_logging():
logging.getLogger("sqlfluff").setLevel(logging.CRITICAL)
logging.getLogger("pytds").setLevel(logging.CRITICAL)
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def metadata():
return int_admin_ometa()
@ -41,3 +39,135 @@ def config_testcontatiners():
from testcontainers.core.config import testcontainers_config
testcontainers_config.max_tries = 10
@pytest.fixture(scope="session")
def sink_config(metadata):
return {
"type": "metadata-rest",
"config": {},
}
@pytest.fixture(scope="session")
def workflow_config(metadata):
return {
"loggerLevel": LogLevels.DEBUG.value,
"openMetadataServerConfig": metadata.config.model_dump(),
}
@pytest.fixture()
def profiler_config(db_service, workflow_config, sink_config):
return {
"source": {
"type": db_service.connection.config.type.value.lower(),
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": "Profiler",
"generateSampleData": True,
"timeoutSeconds": 30,
}
},
},
"processor": {
"type": "orm-profiler",
"config": {},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
@pytest.fixture()
def run_workflow():
def _run(workflow_type: type(IngestionWorkflow), config, raise_from_status=True):
workflow: IngestionWorkflow = workflow_type.create(config)
workflow.execute()
if raise_from_status:
workflow.raise_from_status()
return workflow
return _run
@pytest.fixture(scope="module")
def db_service(metadata, create_service_request, unmask_password):
service_entity = metadata.create_or_update(data=create_service_request)
fqn = service_entity.fullyQualifiedName.root
yield unmask_password(service_entity)
service_entity = metadata.get_by_name(DatabaseService, fqn)
if service_entity:
metadata.delete(
DatabaseService, service_entity.id, recursive=True, hard_delete=True
)
@pytest.fixture(scope="module")
def unmask_password(create_service_request):
"""Unmask the db passwrod returned by the metadata service.
You can override this at the test_module level to implement custom password handling.
Example:
@pytest.fixture(scope="module")
def unmask_password(my_container1, my_container2):
def patch_password(service: DatabaseService):
if service.connection.config.authType.password == "my_password":
... # do something else
return service
return patch_password
"""
def patch_password(service: DatabaseService):
service.connection.config.authType.password = (
create_service_request.connection.config.authType.password
)
return service
return patch_password
@pytest.fixture(scope="module")
def create_service_request():
"""
Implement in the test module to create a service request
Example:
def create_service_request(scope="module"):
return CreateDatabaseServiceRequest(
name="my_service",
serviceType=DatabaseServiceType.MyService,
connection=DatabaseConnection(
config=MyServiceConnection(
username="my_user",
password="my_password",
host="localhost",
port="5432",
)
),
)
"""
raise NotImplementedError("Implement in the test module")
@pytest.fixture()
def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch):
def override_password(getter):
def inner(*args, **kwargs):
result = getter(*args, **kwargs)
if isinstance(result, DatabaseService):
if result.fullyQualifiedName.root == db_service.fullyQualifiedName.root:
return unmask_password(result)
return result
return inner
monkeypatch.setattr(
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name",
override_password(OpenMetadata.get_by_name),
)
monkeypatch.setattr(
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id",
override_password(OpenMetadata.get_by_id),
)

View File

@ -8,12 +8,7 @@ from metadata.ingestion.models.custom_pydantic import CustomSecretStr
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
from ..postgres.conftest import db_service as postgres_service
from ..postgres.conftest import ingest_metadata as ingest_postgres
__all__ = [
"ingest_postgres",
"postgres_service",
"postgres_container",
]

View File

@ -12,26 +12,14 @@ from metadata.generated.schema.entity.services.connections.database.postgresConn
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
Sink,
Source,
SourceConfig,
WorkflowConfig,
)
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
@pytest.fixture(scope="module")
def db_service(metadata, postgres_container):
service = CreateDatabaseServiceRequest(
name="docker_test_db",
def create_service_request(postgres_container, tmp_path_factory):
return CreateDatabaseServiceRequest(
name="docker_test_" + tmp_path_factory.mktemp("postgres").name,
serviceType=DatabaseServiceType.Postgres,
connection=DatabaseConnection(
config=PostgresConnection(
@ -43,33 +31,19 @@ def db_service(metadata, postgres_container):
)
),
)
service_entity = metadata.create_or_update(data=service)
# Since we're using admin JWT (not ingestion-bot), the secret is not sent by the API
service_entity.connection.config.authType.password = CustomSecretStr(
postgres_container.password
)
yield service_entity
metadata.delete(
DatabaseService, service_entity.id, recursive=True, hard_delete=True
)
@pytest.fixture(scope="module")
def ingest_metadata(db_service, metadata: OpenMetadata):
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type=db_service.connection.config.type.value.lower(),
serviceName=db_service.fullyQualifiedName.root,
serviceConnection=db_service.connection,
sourceConfig=SourceConfig(config={}),
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
)
metadata_ingestion = MetadataWorkflow.create(workflow_config)
search_cache.clear()
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
def ingestion_config(
db_service, metadata, workflow_config, sink_config, postgres_container
):
return {
"source": {
"type": db_service.connection.config.type.value.lower(),
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
"serviceConnection": db_service.connection.dict(),
},
"sink": sink_config,
"workflowConfig": workflow_config,
}

View File

@ -9,7 +9,6 @@ from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
OpenMetadataWorkflowConfig,
Processor,
Sink,
@ -24,16 +23,23 @@ from metadata.generated.schema.type.basic import ComponentConfig
from metadata.ingestion.api.status import TruncatedStackTraceError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
@pytest.fixture(scope="module")
@pytest.fixture()
def run_data_quality_workflow(
ingest_metadata, db_service: DatabaseService, metadata: OpenMetadata
run_workflow,
ingestion_config,
db_service: DatabaseService,
metadata: OpenMetadata,
sink_config,
workflow_config,
):
workflow_config = OpenMetadataWorkflowConfig(
run_workflow(MetadataWorkflow, ingestion_config)
test_suite_config = OpenMetadataWorkflowConfig(
source=Source(
type=TestSuiteConfigType.TestSuite.value,
serviceName="MyTestSuite",
@ -80,15 +86,10 @@ def run_data_quality_workflow(
}
),
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(
loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config
),
sink=Sink.model_validate(sink_config),
workflowConfig=WorkflowConfig.model_validate(workflow_config),
)
test_suite_processor = TestSuiteWorkflow.create(workflow_config)
test_suite_processor = TestSuiteWorkflow.create(test_suite_config)
test_suite_processor.execute()
test_suite_processor.raise_from_status()
yield
@ -120,8 +121,9 @@ def test_data_quality(
assert test_case.testCaseResult.testCaseStatus == expected_status
def test_incompatible_column_type(ingest_metadata, metadata: OpenMetadata, db_service):
workflow_config = {
@pytest.fixture()
def incpompatible_column_type_config(db_service, workflow_config, sink_config):
return {
"source": {
"type": "TestSuite",
"serviceName": "MyTestSuite",
@ -131,7 +133,6 @@ def test_incompatible_column_type(ingest_metadata, metadata: OpenMetadata, db_se
"entityFullyQualifiedName": f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
}
},
"serviceConnection": db_service.connection.model_dump(),
},
"processor": {
"type": "orm-test-runner",
@ -158,17 +159,23 @@ def test_incompatible_column_type(ingest_metadata, metadata: OpenMetadata, db_se
]
},
},
"sink": {
"type": "metadata-rest",
"config": {},
},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": metadata.config.model_dump(),
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
test_suite_processor = TestSuiteWorkflow.create(workflow_config)
test_suite_processor.execute()
def test_incompatible_column_type(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
incpompatible_column_type_config,
metadata: OpenMetadata,
db_service,
):
run_workflow(MetadataWorkflow, ingestion_config)
test_suite_processor = run_workflow(
TestSuiteWorkflow, incpompatible_column_type_config, raise_from_status=False
)
assert test_suite_processor.steps[0].get_status().failures == [
TruncatedStackTraceError(
name="Incompatible Column for Test Case",

View File

@ -0,0 +1,121 @@
import sys
import time
from os import path
import pytest
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
@pytest.fixture()
def native_lineage_config(db_service, workflow_config, sink_config):
return {
"source": {
"type": "postgres-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {"config": {}},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
def test_native_lineage(
run_workflow, ingestion_config, native_lineage_config, metadata, db_service
):
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(MetadataWorkflow, native_lineage_config)
@pytest.fixture()
def log_lineage_config(db_service, metadata, workflow_config, sink_config):
return {
"source": {
"type": "query-log-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"queryLogFilePath": path.dirname(__file__) + "/bad_query_log.csv",
}
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
def test_log_lineage(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
log_lineage_config,
metadata,
db_service,
):
# since query cache is stored in ES, we need to reindex to avoid having a stale cache
# TODO fix the server so that we dont need to run this
reindex_search(metadata)
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
workflow = run_workflow(
MetadataWorkflow, log_lineage_config, raise_from_status=False
)
assert len(workflow.source.status.failures) == 2
for failure in workflow.source.status.failures:
assert "Table entity not found" in failure.error
customer_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False,
)
actor_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.actor",
nullable=False,
)
staff_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.staff",
nullable=False,
)
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(actor_table.id.root)
)
assert edge is not None
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(staff_table.id.root)
)
assert edge is not None
def reindex_search(metadata: OpenMetadata, timeout=60):
start = time.time()
status = None
while status is None or status == "running":
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
status = response["data"][0]["status"]
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to start")
time.sleep(1)
time.sleep(
0.5
) # app interactivity is not immediate (probably bc async operations), so we wait a bit
metadata.client.post("/apps/trigger/SearchIndexingApplication")
time.sleep(0.5) # here too
while status != "success":
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
status = response["data"][0]["status"]
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to complete")
time.sleep(1)

View File

@ -0,0 +1,14 @@
import sys
import pytest
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
def test_ingest_metadata(
patch_passwords_for_db_services, run_workflow, ingestion_config
):
run_workflow(MetadataWorkflow, ingestion_config)

View File

@ -1,392 +0,0 @@
import sys
import time
from os import path
import pytest
from _openmetadata_testutils.postgres.conftest import postgres_container
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
DatabaseServiceQueryLineagePipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import (
DatabaseUsageConfigType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
OpenMetadataWorkflowConfig,
Processor,
Sink,
Source,
SourceConfig,
WorkflowConfig,
)
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.usage import UsageWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
@pytest.fixture(scope="module")
def db_service(metadata, postgres_container):
service = CreateDatabaseServiceRequest(
name="docker_test_db",
serviceType=DatabaseServiceType.Postgres,
connection=DatabaseConnection(
config=PostgresConnection(
username=postgres_container.username,
authType=BasicAuth(password=postgres_container.password),
hostPort="localhost:"
+ postgres_container.get_exposed_port(postgres_container.port),
database="dvdrental",
)
),
)
service_entity = metadata.create_or_update(data=service)
service_entity.connection.config.authType.password = CustomSecretStr(
postgres_container.password
)
yield service_entity
try:
metadata.delete(
DatabaseService, service_entity.id, recursive=True, hard_delete=True
)
except APIError as error:
if error.status_code == 404:
pass
else:
raise
@pytest.fixture(scope="module")
def ingest_metadata(db_service, metadata: OpenMetadata):
search_cache.clear()
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type=db_service.connection.config.type.value.lower(),
serviceName=db_service.fullyQualifiedName.root,
serviceConnection=db_service.connection,
sourceConfig=SourceConfig(config={}),
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
)
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
return
@pytest.fixture(scope="module")
def ingest_postgres_lineage(db_service, ingest_metadata, metadata: OpenMetadata):
search_cache.clear()
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type="postgres-lineage",
serviceName=db_service.fullyQualifiedName.root,
serviceConnection=db_service.connection,
sourceConfig=SourceConfig(config=DatabaseServiceQueryLineagePipeline()),
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(
loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config
),
)
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
return
def test_ingest_query_log(db_service, ingest_metadata, metadata: OpenMetadata):
search_cache.clear()
reindex_search(
metadata
) # since query cache is stored in ES, we need to reindex to avoid having a stale cache
workflow_config = {
"source": {
"type": "query-log-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"queryLogFilePath": path.dirname(__file__) + "/bad_query_log.csv",
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": metadata.config.model_dump(),
},
}
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
assert len(metadata_ingestion.source.status.failures) == 2
for failure in metadata_ingestion.source.status.failures:
assert "Table entity not found" in failure.error
customer_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False,
)
actor_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.actor",
nullable=False,
)
staff_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.staff",
nullable=False,
)
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(actor_table.id.root)
)
assert edge is not None
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(staff_table.id.root)
)
assert edge is not None
@pytest.fixture(scope="module")
def run_profiler_workflow(ingest_metadata, db_service, metadata):
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type=db_service.connection.config.type.value.lower(),
serviceName=db_service.fullyQualifiedName.root,
serviceConnection=db_service.connection,
sourceConfig=SourceConfig(config=DatabaseServiceProfilerPipeline()),
),
processor=Processor(
type="orm-profiler",
config={},
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(
loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config
),
)
metadata_ingestion = ProfilerWorkflow.create(workflow_config.model_dump())
search_cache.clear()
metadata_ingestion.execute()
return
@pytest.fixture(scope="module")
def ingest_query_usage(ingest_metadata, db_service, metadata):
search_cache.clear()
workflow_config = {
"source": {
"type": "postgres-usage",
"serviceName": db_service.fullyQualifiedName.root,
"serviceConnection": db_service.connection.model_dump(),
"sourceConfig": {
"config": {"type": DatabaseUsageConfigType.DatabaseUsage.value}
},
},
"processor": {"type": "query-parser", "config": {}},
"stage": {
"type": "table-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"bulkSink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": metadata.config.model_dump(),
},
}
workflow = UsageWorkflow.create(workflow_config)
search_cache.clear()
workflow.execute()
workflow.raise_from_status()
return
@pytest.fixture(scope="module")
def db_fqn(db_service: DatabaseService):
return ".".join(
[
db_service.fullyQualifiedName.root,
db_service.connection.config.database,
]
)
def test_query_usage(
ingest_query_usage,
db_service,
metadata,
db_fqn,
):
table = metadata.get_by_name(Table, ".".join([db_fqn, "public", "actor"]))
queries = metadata.get_entity_queries(table.id)
# TODO this should be retruning 2 queries but in CI sometimes it returns 1 *shrug*
assert 1 <= len(queries) <= 2
def test_profiler(run_profiler_workflow):
pass
def test_db_lineage(ingest_postgres_lineage):
pass
def run_usage_workflow(db_service, metadata):
workflow_config = {
"source": {
"type": "postgres-usage",
"serviceName": db_service.fullyQualifiedName.root,
"serviceConnection": db_service.connection.model_dump(),
"sourceConfig": {
"config": {"type": DatabaseUsageConfigType.DatabaseUsage.value}
},
},
"processor": {"type": "query-parser", "config": {}},
"stage": {
"type": "table-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"bulkSink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": metadata.config.model_dump(),
},
}
workflow = UsageWorkflow.create(workflow_config)
search_cache.clear()
workflow.execute()
workflow.raise_from_status()
@pytest.mark.xfail(
reason="'metadata.ingestion.lineage.sql_lineage.search_cache' gets corrupted with invalid data."
" See issue https://github.com/open-metadata/OpenMetadata/issues/16408"
)
def test_usage_delete_usage(db_service, ingest_postgres_lineage, metadata):
workflow_config = {
"source": {
"type": "postgres-usage",
"serviceName": db_service.fullyQualifiedName.root,
"serviceConnection": db_service.connection.model_dump(),
"sourceConfig": {
"config": {"type": DatabaseUsageConfigType.DatabaseUsage.value}
},
},
"processor": {"type": "query-parser", "config": {}},
"stage": {
"type": "table-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"bulkSink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": metadata.config.model_dump(),
},
}
workflow = UsageWorkflow.create(workflow_config)
search_cache.clear()
workflow.execute()
workflow.raise_from_status()
run_usage_workflow(db_service, metadata)
metadata.delete(DatabaseService, db_service.id, hard_delete=True, recursive=True)
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type=db_service.connection.config.type.value.lower(),
serviceName=db_service.fullyQualifiedName.root,
serviceConnection=db_service.connection,
sourceConfig=SourceConfig(config={}),
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
)
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
run_usage_workflow(db_service, metadata)
def reindex_search(metadata: OpenMetadata, timeout=60):
start = time.time()
status = None
while status is None or status == "running":
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
status = response["data"][0]["status"]
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to start")
time.sleep(1)
time.sleep(
0.5
) # app interactivity is not immediate (probably bc async operations), so we wait a bit
metadata.client.post("/apps/trigger/SearchIndexingApplication")
time.sleep(0.5) # here too
while status != "success":
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
status = response["data"][0]["status"]
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to complete")
time.sleep(1)

View File

@ -0,0 +1,18 @@
import sys
import pytest
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
def test_profiler(
patch_passwords_for_db_services, run_workflow, ingestion_config, profiler_config
):
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(ProfilerWorkflow, profiler_config)

View File

@ -0,0 +1,64 @@
import sys
import pytest
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import (
DatabaseUsageConfigType,
)
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.usage import UsageWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
@pytest.fixture()
def usage_config(sink_config, workflow_config, db_service):
return {
"source": {
"type": "postgres-usage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {"type": DatabaseUsageConfigType.DatabaseUsage.value}
},
},
"processor": {"type": "query-parser", "config": {}},
"stage": {
"type": "table-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"bulkSink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/postgres_usage",
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
def test_usage(run_workflow, ingestion_config, usage_config, metadata, db_service):
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(UsageWorkflow, usage_config)
@pytest.mark.xfail(
reason="'metadata.ingestion.lineage.sql_lineage.search_cache' gets corrupted with invalid data."
" See issue https://github.com/open-metadata/OpenMetadata/issues/16408",
strict=True,
)
def test_usage_delete_usage(
run_workflow, ingestion_config, usage_config, metadata, db_service
):
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(UsageWorkflow, usage_config)
metadata.delete(DatabaseService, db_service.id, hard_delete=True, recursive=True)
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(UsageWorkflow, usage_config)

View File

@ -328,5 +328,5 @@ class NoSQLProfiler(TestCase):
"age"
)
assert all(
[r[age_column_index] == query_age for r in sample_data.sampleData.rows]
[r[age_column_index] == str(query_age) for r in sample_data.sampleData.rows]
)

View File

@ -1,4 +1,3 @@
import logging
import os
import shutil
@ -6,24 +5,22 @@ import pytest
from sqlalchemy import create_engine, text
from testcontainers.mssql import SqlServerContainer
from _openmetadata_testutils.postgres.conftest import try_bind
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection,
MssqlScheme,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
from ...helpers.docker_utils import try_bind
from ...helpers.markers import xfail_param
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
@pytest.fixture(scope="session", autouse=True)
def config_logging():
logging.getLogger("sqlfluff").setLevel(logging.CRITICAL)
@pytest.fixture(scope="session")
@pytest.fixture(scope="module")
def mssql_container(tmp_path_factory):
container = SqlServerContainer(
"mcr.microsoft.com/mssql/server:2017-latest", dbname="AdventureWorks"
@ -80,27 +77,6 @@ GO
yield container
@pytest.fixture(
scope="module",
params=[
"english",
xfail_param(
"german",
"failes due to date format handling (https://github.com/open-metadata/OpenMetadata/issues/16434)",
),
],
)
def mssql_server_config(mssql_container, request):
language = request.param
engine = create_engine(
"mssql+pytds://" + mssql_container.get_connection_url().split("://")[1],
connect_args={"autocommit": True},
)
engine.execute(
f"ALTER LOGIN {mssql_container.username} WITH DEFAULT_LANGUAGE={language};"
)
@pytest.fixture(
scope="module",
params=[
@ -108,24 +84,40 @@ def mssql_server_config(mssql_container, request):
MssqlScheme.mssql_pyodbc,
],
)
def ingest_metadata(mssql_container, metadata: OpenMetadata, request):
workflow_config = {
def scheme(request):
return request.param
@pytest.fixture(scope="module")
def create_service_request(mssql_container, scheme, tmp_path_factory):
return CreateDatabaseServiceRequest(
name="docker_test_" + tmp_path_factory.mktemp("mssql").name + "_" + scheme.name,
serviceType=DatabaseServiceType.Mssql,
connection=DatabaseConnection(
config=MssqlConnection(
username=mssql_container.username,
password=mssql_container.password,
hostPort="localhost:"
+ mssql_container.get_exposed_port(mssql_container.port),
database="AdventureWorks",
scheme=scheme,
ingestAllDatabases=True,
connectionOptions={
"TrustServerCertificate": "yes",
"MARS_Connection": "yes",
},
)
),
)
@pytest.fixture(scope="module")
def ingestion_config(db_service, tmp_path_factory, workflow_config, sink_config):
return {
"source": {
"type": "mssql",
"serviceName": "integration_test_mssql_" + request.param.name,
"serviceConnection": {
"config": {
"type": "Mssql",
"scheme": request.param,
"username": mssql_container.username,
"password": mssql_container.password,
"hostPort": "localhost:"
+ mssql_container.get_exposed_port(mssql_container.port),
"database": "AdventureWorks",
"ingestAllDatabases": True,
"connectionOptions": {"TrustServerCertificate": "yes"},
}
},
"serviceName": db_service.fullyQualifiedName.root,
"serviceConnection": db_service.connection.dict(),
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
@ -133,61 +125,17 @@ def ingest_metadata(mssql_container, metadata: OpenMetadata, request):
},
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": metadata.config.model_dump(),
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
metadata_ingestion.stop()
db_service = metadata.get_by_name(
DatabaseService, workflow_config["source"]["serviceName"]
)
yield db_service
metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True)
@pytest.fixture(scope="module")
def run_lineage_workflow(
mssql_server_config,
ingest_metadata: DatabaseService,
mssql_container,
metadata: OpenMetadata,
):
workflow_config = {
"source": {
"type": "mssql-lineage",
"serviceName": ingest_metadata.fullyQualifiedName.root,
"serviceConnection": {
"config": {
"type": "Mssql",
"scheme": ingest_metadata.connection.config.scheme,
"username": mssql_container.username,
"password": mssql_container.password,
"hostPort": "localhost:"
+ mssql_container.get_exposed_port(mssql_container.port),
"database": "AdventureWorks",
"ingestAllDatabases": True,
}
},
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"databaseFilterPattern": {"includes": ["TestDB", "AdventureWorks"]},
},
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "INFO",
"openMetadataServerConfig": metadata.config.model_dump(),
},
}
metadata_ingestion = MetadataWorkflow.create(workflow_config)
search_cache.clear()
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
metadata_ingestion.stop()
def unmask_password(create_service_request):
def inner(service: DatabaseService):
service.connection.config.password = (
create_service_request.connection.config.password
)
return service
return inner

View File

@ -1,22 +1,72 @@
import sys
import pytest
from freezegun import freeze_time
from sqlalchemy import create_engine
from _openmetadata_testutils.helpers.markers import xfail_param
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
@pytest.mark.skip("fails for english even thoudh it should succeed")
@pytest.fixture(
params=[
"english",
xfail_param(
"german",
"failes due to date format handling (https://github.com/open-metadata/OpenMetadata/issues/16434)",
),
],
)
def language_config(mssql_container, request):
language = request.param
engine = create_engine(
"mssql+pytds://" + mssql_container.get_connection_url().split("://")[1],
connect_args={"autocommit": True},
)
engine.execute(
f"ALTER LOGIN {mssql_container.username} WITH DEFAULT_LANGUAGE={language};"
)
@pytest.fixture()
def lineage_config(language_config, db_service, workflow_config, sink_config):
return {
"source": {
"type": "mssql-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"databaseFilterPattern": {"includes": ["TestDB", "AdventureWorks"]},
},
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
@freeze_time("2024-01-30") # to demonstrate the issue with german language
def test_lineage(
ingest_metadata,
run_lineage_workflow,
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
lineage_config,
db_service,
metadata,
):
service_fqn = ingest_metadata.fullyQualifiedName.root
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(MetadataWorkflow, lineage_config)
department_table = metadata.get_by_name(
Table, f"{service_fqn}.AdventureWorks.HumanResources.Department", nullable=False
Table,
f"{db_service.fullyQualifiedName.root}.AdventureWorks.HumanResources.Department",
nullable=False,
)
lineage = metadata.get_lineage_by_id(Table, department_table.id.root)
assert lineage is not None

View File

@ -3,18 +3,23 @@ import sys
import pytest
from metadata.generated.schema.entity.data.table import Constraint, Table
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
def test_pass(
ingest_metadata,
def test_ingest_metadata(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
db_service,
metadata,
):
run_workflow(MetadataWorkflow, ingestion_config)
table: Table = metadata.get_by_name(
Table,
f"{ingest_metadata.fullyQualifiedName.root}.AdventureWorks.HumanResources.Department",
f"{db_service.fullyQualifiedName.root}.AdventureWorks.HumanResources.Department",
)
assert table is not None
assert table.columns[0].name.root == "DepartmentID"

View File

@ -0,0 +1,18 @@
import sys
import pytest
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
def test_profiler(
patch_passwords_for_db_services, run_workflow, ingestion_config, profiler_config
):
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(ProfilerWorkflow, profiler_config)

View File

@ -11,6 +11,19 @@ from testcontainers.core.generic import DbContainer
from testcontainers.minio import MinioContainer
from testcontainers.mysql import MySqlContainer
from _openmetadata_testutils.postgres.conftest import try_bind
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
class TrinoContainer(DbContainer):
def __init__(
@ -98,13 +111,13 @@ class HiveMetaStoreContainer(DockerContainer):
)
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def docker_network():
with testcontainers.core.network.Network() as network:
yield network
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def trino_container(hive_metastore_container, minio_container, docker_network):
with TrinoContainer(image="trinodb/trino:418").with_network(
docker_network
@ -118,15 +131,20 @@ def trino_container(hive_metastore_container, minio_container, docker_network):
yield trino
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def mysql_container(docker_network):
with MySqlContainer(
"mariadb:10.6.16", username="admin", password="admin", dbname="metastore_db"
).with_network(docker_network).with_network_aliases("mariadb") as mysql:
container = (
MySqlContainer(
"mariadb:10.6.16", username="admin", password="admin", dbname="metastore_db"
)
.with_network(docker_network)
.with_network_aliases("mariadb")
)
with try_bind(container, container.port, container.port + 1) as mysql:
yield mysql
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def hive_metastore_container(mysql_container, minio_container, docker_network):
with HiveMetaStoreContainer("bitsondatadev/hive-metastore:latest").with_network(
docker_network
@ -144,17 +162,18 @@ def hive_metastore_container(mysql_container, minio_container, docker_network):
yield hive
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def minio_container(docker_network):
with MinioContainer().with_network(docker_network).with_network_aliases(
"minio"
) as minio:
container = (
MinioContainer().with_network(docker_network).with_network_aliases("minio")
)
with try_bind(container, container.port, container.port) as minio:
client = minio.get_client()
client.make_bucket("hive-warehouse")
yield minio
@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def create_test_data(trino_container):
engine = create_engine(trino_container.get_connection_url())
engine.execute(
@ -163,3 +182,51 @@ def create_test_data(trino_container):
engine.execute("create table minio.my_schema.test_table (id int)")
engine.execute("insert into minio.my_schema.test_table values (1), (2), (3)")
return
@pytest.fixture(scope="module")
def create_service_request(trino_container, tmp_path_factory):
return CreateDatabaseServiceRequest(
name="docker_test_" + tmp_path_factory.mktemp("trino").name,
serviceType=DatabaseServiceType.Trino,
connection=DatabaseConnection(
config=TrinoConnection(
username=trino_container.user,
hostPort="localhost:"
+ trino_container.get_exposed_port(trino_container.port),
catalog="minio",
connectionArguments={"http_scheme": "http"},
)
),
)
@pytest.fixture
def ingestion_config(db_service, sink_config, workflow_config):
return {
"source": {
"type": db_service.connection.config.type.value.lower(),
"serviceName": db_service.fullyQualifiedName.root,
"serviceConnection": db_service.connection.dict(),
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
"schemaFilterPattern": {
"excludes": [
"^information_schema$",
],
},
},
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
@pytest.fixture(scope="module")
def unmask_password():
def patch_password(service: DatabaseService):
return service
return patch_password

View File

@ -0,0 +1,19 @@
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
def test_metadata(
db_service, metadata: OpenMetadata, run_workflow, ingestion_config, create_test_data
):
run_workflow(MetadataWorkflow, ingestion_config)
tables = metadata.list_entities(
Table,
params={
"databaseSchema": f"{db_service.fullyQualifiedName.root}.minio.my_schema"
},
)
assert (
next((t for t in tables.entities if t.name.root == "test_table"), None)
is not None
)

View File

@ -0,0 +1,15 @@
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
def test_profiler(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
profiler_config,
create_test_data,
):
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(ProfilerWorkflow, profiler_config)

View File

@ -1,76 +0,0 @@
import pytest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
Sink,
Source,
SourceConfig,
WorkflowConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
@pytest.fixture(scope="module")
def db_service(metadata, trino_container):
service = CreateDatabaseServiceRequest(
name="docker_test_trino",
serviceType=DatabaseServiceType.Trino,
connection=DatabaseConnection(
config=TrinoConnection(
username=trino_container.user,
hostPort="localhost:"
+ trino_container.get_exposed_port(trino_container.port),
catalog="minio",
connectionArguments={"http_scheme": "http"},
)
),
)
service_entity = metadata.create_or_update(data=service)
yield service_entity
metadata.delete(
DatabaseService, service_entity.id, recursive=True, hard_delete=True
)
@pytest.fixture(scope="module")
def ingest_metadata(db_service, metadata: OpenMetadata, create_test_data):
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type=db_service.connection.config.type.value.lower(),
serviceName=db_service.fullyQualifiedName.root,
serviceConnection=db_service.connection,
sourceConfig=SourceConfig(config={}),
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
)
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
return
def test_ingest_metadata(ingest_metadata, db_service, metadata: OpenMetadata):
tables = metadata.list_entities(
Table, params={"databaseSchema": "docker_test_trino.minio.my_schema"}
)
assert (
next((t for t in tables.entities if t.name.root == "test_table"), None)
is not None
)

View File

@ -0,0 +1,57 @@
import pytest
from _openmetadata_testutils.helpers.markers import xfail_param
from metadata.generated.schema.entity.data.table import TableData
@pytest.mark.parametrize(
"parameter",
[
TableData(
columns=[],
rows=[],
),
TableData(
columns=[],
rows=[[1]],
),
TableData(
columns=[],
rows=[["a"]],
),
TableData(
columns=[],
rows=[
[b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]
],
),
],
)
def test_table_data_serialization(parameter):
for row in parameter.rows:
for i, cell in enumerate(row):
if isinstance(cell, bytes):
# bytes are written as strings and deserialize as strings
row[i] = cell.decode("utf-8")
assert TableData.model_validate_json(parameter.model_dump_json()) == parameter
@pytest.mark.parametrize(
"parameter",
[
xfail_param(
TableData(
columns=[],
rows=[
[
b"\xe6\x10\x00\x00\x01\x0c\xae\x8b\xfc(\xbc\xe4G@g\xa8\x91\x89\x89\x8a^\xc0"
]
],
),
reason="TODO: change TableData.rows to List[List[str]]",
),
],
)
def test_unserializble(parameter):
parameter = TableData.model_validate(parameter.model_dump())
assert TableData.model_validate_json(parameter.model_dump_json()) != parameter

View File

@ -154,7 +154,8 @@
"QUANTILE_STATE",
"AGG_STATE",
"BITMAP",
"UINT"
"UINT",
"BIT"
]
},
"constraint": {