From 0fee79b200618dd4311ade81e8de2940a600e571 Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Wed, 17 Jul 2024 08:11:34 +0200 Subject: [PATCH] MINOR: fix sample data issue with Pydantic v2 and refactor python integration tests (#16943) * tests: refactor refactor tests and consolidate common functionality in integrations.conftest this enables writing tests more concisely. demonstrated with postgres and mssql. will migrate more * format * removed helpers * changed scope of fictures * changed scope of fixtures * added profiler test for mssql * fixed import in data_quality test * json safe serialization * format * set MARS_Connection * use SerializableTableData instead of TableData * deleted file test_postgres.py * fixed tests * added more test cases * format * changed name test_models.py * removed the logic for serializing table data * wip * changed mapping in common type map * changed mapping in common type map * reverted TableData imports * reverted TableData imports * reverted TableData imports --- ingestion/pyproject.toml | 7 +- ingestion/setup.py | 2 + .../helpers/markers.py | 2 +- .../postgres/conftest.py | 2 +- .../source/database/column_type_parser.py | 11 +- .../src/metadata/profiler/adaptors/mongodb.py | 14 +- .../metadata/profiler/orm/converter/common.py | 7 +- .../src/metadata/profiler/orm/registry.py | 4 + .../orm/types/custom_hex_byte_string.py | 4 +- .../profiler/orm/types/undetermined_type.py | 38 ++ .../processor/sampler/nosql/sampler.py | 11 +- ingestion/tests/helpers/README.md | 2 - ingestion/tests/helpers/docker_utils.py | 22 - ingestion/tests/integration/conftest.py | 144 ++++++- .../integration/data_quality/conftest.py | 5 - .../tests/integration/postgres/conftest.py | 58 +-- .../integration/postgres/test_data_quality.py | 57 +-- .../integration/postgres/test_lineage.py | 121 ++++++ .../integration/postgres/test_metadata.py | 14 + .../integration/postgres/test_postgres.py | 392 ------------------ .../integration/postgres/test_profiler.py | 18 + .../tests/integration/postgres/test_usage.py | 64 +++ .../profiler/test_nosql_profiler.py | 2 +- .../tests/integration/sql_server/conftest.py | 158 +++---- .../integration/sql_server/test_lineage.py | 60 ++- ...metadata_ingestion.py => test_metadata.py} | 11 +- .../integration/sql_server/test_profiler.py | 18 + ingestion/tests/integration/trino/conftest.py | 91 +++- .../tests/integration/trino/test_metadata.py | 19 + .../tests/integration/trino/test_profiler.py | 15 + .../tests/integration/trino/test_trino.py | 76 ---- .../unit/metadata/profiler/api/test_models.py | 57 +++ .../json/schema/entity/data/table.json | 3 +- 33 files changed, 774 insertions(+), 735 deletions(-) rename ingestion/{tests => src/_openmetadata_testutils}/helpers/markers.py (84%) create mode 100644 ingestion/src/metadata/profiler/orm/types/undetermined_type.py delete mode 100644 ingestion/tests/helpers/README.md delete mode 100644 ingestion/tests/helpers/docker_utils.py create mode 100644 ingestion/tests/integration/postgres/test_lineage.py create mode 100644 ingestion/tests/integration/postgres/test_metadata.py delete mode 100644 ingestion/tests/integration/postgres/test_postgres.py create mode 100644 ingestion/tests/integration/postgres/test_profiler.py create mode 100644 ingestion/tests/integration/postgres/test_usage.py rename ingestion/tests/integration/sql_server/{test_metadata_ingestion.py => test_metadata.py} (64%) create mode 100644 ingestion/tests/integration/sql_server/test_profiler.py create mode 100644 ingestion/tests/integration/trino/test_metadata.py create mode 100644 ingestion/tests/integration/trino/test_profiler.py delete mode 100644 ingestion/tests/integration/trino/test_trino.py create mode 100644 ingestion/tests/unit/metadata/profiler/api/test_models.py diff --git a/ingestion/pyproject.toml b/ingestion/pyproject.toml index 70a81746178..046e5ba3f8d 100644 --- a/ingestion/pyproject.toml +++ b/ingestion/pyproject.toml @@ -9,9 +9,9 @@ name = "openmetadata-ingestion" version = "1.5.0.0.dev0" dynamic = ["readme", "dependencies", "optional-dependencies"] authors = [ - {name = "OpenMetadata Committers"} + { name = "OpenMetadata Committers" } ] -license = {file = "LICENSE"} +license = { file = "LICENSE" } description = "Ingestion Framework for OpenMetadata" requires-python = ">=3.8" @@ -21,7 +21,7 @@ Documentation = "https://docs.open-metadata.org/" Source = "https://github.com/open-metadata/OpenMetadata" [tool.setuptools.dynamic] -readme = {file = ["README.md"]} +readme = { file = ["README.md"] } [tool.setuptools.packages.find] where = ["./src"] @@ -72,7 +72,6 @@ disallow_incomplete_defs = false markers = [ "slow: marks tests as slow (deselect with '-m \"not slow\"')" ] -norecursedirs = "tests/helpers" [tool.pylint.BASIC] # W1203: logging-fstring-interpolation - f-string brings better readability and unifies style diff --git a/ingestion/setup.py b/ingestion/setup.py index 88ee7e3e94b..37601b4d9f4 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -346,6 +346,7 @@ test = { "pytest-order", # install dbt dependency "dbt-artifacts-parser", + "freezegun", VERSIONS["sqlalchemy-databricks"], VERSIONS["databricks-sdk"], VERSIONS["scikit-learn"], @@ -382,6 +383,7 @@ test = { *plugins["mssql"], *plugins["dagster"], *plugins["oracle"], + *plugins["mssql"], } e2e_test = { diff --git a/ingestion/tests/helpers/markers.py b/ingestion/src/_openmetadata_testutils/helpers/markers.py similarity index 84% rename from ingestion/tests/helpers/markers.py rename to ingestion/src/_openmetadata_testutils/helpers/markers.py index ed279ee7228..5083051c4f2 100644 --- a/ingestion/tests/helpers/markers.py +++ b/ingestion/src/_openmetadata_testutils/helpers/markers.py @@ -2,4 +2,4 @@ import pytest def xfail_param(param, reason): - return pytest.param(param, marks=pytest.mark.xfail(reason=reason)) + return pytest.param(param, marks=pytest.mark.xfail(reason=reason, strict=True)) diff --git a/ingestion/src/_openmetadata_testutils/postgres/conftest.py b/ingestion/src/_openmetadata_testutils/postgres/conftest.py index 93d24b9075f..ba3d2cbd12d 100644 --- a/ingestion/src/_openmetadata_testutils/postgres/conftest.py +++ b/ingestion/src/_openmetadata_testutils/postgres/conftest.py @@ -30,7 +30,7 @@ def try_bind(container, container_port, host_port): yield container -@pytest.fixture(scope="session") +@pytest.fixture(scope="module") def postgres_container(tmp_path_factory): """Start a PostgreSQL container with the dvdrental database.""" data_dir = tmp_path_factory.mktemp("data") diff --git a/ingestion/src/metadata/ingestion/source/database/column_type_parser.py b/ingestion/src/metadata/ingestion/source/database/column_type_parser.py index c59b431d2e6..0ef94951bf6 100644 --- a/ingestion/src/metadata/ingestion/source/database/column_type_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/column_type_parser.py @@ -88,7 +88,7 @@ class ColumnTypeParser: "BIGNUMERIC": "NUMERIC", "BIGSERIAL": "BIGINT", "BINARY": "BINARY", - "BIT": "INT", + "BIT": "BIT", "BLOB": "BLOB", "BOOL": "BOOLEAN", "BOOLEAN": "BOOLEAN", @@ -298,15 +298,6 @@ class ColumnTypeParser: _FIXED_DECIMAL = re.compile(r"(decimal|numeric)(\(\s*(\d+)\s*,\s*(\d+)\s*\))?") - try: - # pylint: disable=import-outside-toplevel - from sqlalchemy.dialects.mssql import BIT - - _COLUMN_TYPE_MAPPING[BIT] = "BINARY" - _SOURCE_TYPE_TO_OM_TYPE["BIT"] = "BINARY" - except ImportError: - pass - try: # pylint: disable=import-outside-toplevel from teradatasqlalchemy import BYTE, VARBYTE diff --git a/ingestion/src/metadata/profiler/adaptors/mongodb.py b/ingestion/src/metadata/profiler/adaptors/mongodb.py index 3d2d721731b..b09cb257192 100644 --- a/ingestion/src/metadata/profiler/adaptors/mongodb.py +++ b/ingestion/src/metadata/profiler/adaptors/mongodb.py @@ -163,16 +163,4 @@ class MongoDB(NoSQLAdaptor): return AggregationFunction.MIN def execute(self, query: Executable) -> List[Dict[str, any]]: - records = list(query.to_executable(self.client)) - result = [] - for r in records: - result.append({c: self._json_safe(r.get(c)) for c in r}) - return result - - @staticmethod - def _json_safe(data: any): - try: - json.dumps(data) - return data - except Exception: # noqa - return str(data) + return list(query.to_executable(self.client)) diff --git a/ingestion/src/metadata/profiler/orm/converter/common.py b/ingestion/src/metadata/profiler/orm/converter/common.py index 64094fcce97..2d1e41e13dc 100644 --- a/ingestion/src/metadata/profiler/orm/converter/common.py +++ b/ingestion/src/metadata/profiler/orm/converter/common.py @@ -12,7 +12,6 @@ """ Common Class For Profiler Converter. """ - from typing import Dict, Set import sqlalchemy @@ -51,8 +50,8 @@ class CommonMapTypes: DataType.CHAR: sqlalchemy.CHAR, DataType.VARCHAR: sqlalchemy.VARCHAR, DataType.BOOLEAN: sqlalchemy.BOOLEAN, - DataType.BINARY: sqlalchemy.LargeBinary, - DataType.VARBINARY: sqlalchemy.VARBINARY, + DataType.BINARY: CustomTypes.BYTES.value, + DataType.VARBINARY: CustomTypes.BYTES.value, DataType.ARRAY: CustomTypes.ARRAY.value, DataType.BLOB: CustomTypes.BYTES.value, DataType.LONGBLOB: sqlalchemy.LargeBinary, @@ -81,7 +80,7 @@ class CommonMapTypes: return self.return_custom_type(col, table_service_type) def return_custom_type(self, col: Column, _): - return self._TYPE_MAP.get(col.dataType) + return self._TYPE_MAP.get(col.dataType, CustomTypes.UNDETERMINED.value) @staticmethod def map_sqa_to_om_types() -> Dict[TypeEngine, Set[DataType]]: diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index 8a6d2efdaa9..27d6bdf6551 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -26,6 +26,7 @@ from metadata.profiler.orm.types.custom_hex_byte_string import HexByteString from metadata.profiler.orm.types.custom_image import CustomImage from metadata.profiler.orm.types.custom_ip import CustomIP from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp +from metadata.profiler.orm.types.undetermined_type import UndeterminedType from metadata.profiler.orm.types.uuid import UUIDString from metadata.profiler.registry import TypeRegistry @@ -39,6 +40,7 @@ class CustomTypes(TypeRegistry): IMAGE = CustomImage IP = CustomIP SQADATETIMERANGE = CustomDateTimeRange + UNDETERMINED = UndeterminedType class Dialects(Enum): @@ -97,6 +99,8 @@ NOT_COMPUTE = { DataType.JSON.value, CustomTypes.ARRAY.value.__name__, CustomTypes.SQADATETIMERANGE.value.__name__, + DataType.XML.value, + CustomTypes.UNDETERMINED.value.__name__, } FLOAT_SET = {sqlalchemy.types.DECIMAL, sqlalchemy.types.FLOAT} diff --git a/ingestion/src/metadata/profiler/orm/types/custom_hex_byte_string.py b/ingestion/src/metadata/profiler/orm/types/custom_hex_byte_string.py index 11868546a5c..1e27adb3ce3 100644 --- a/ingestion/src/metadata/profiler/orm/types/custom_hex_byte_string.py +++ b/ingestion/src/metadata/profiler/orm/types/custom_hex_byte_string.py @@ -42,7 +42,9 @@ class HexByteString(TypeDecorator): Make sure the data is of correct type """ if not isinstance(value, (bytes, bytearray)): - raise TypeError("HexByteString columns support only bytes values.") + raise TypeError( + f"HexByteString columns support only bytes values. Received {type(value).__name__}." + ) def process_result_value(self, value: str, dialect) -> Optional[str]: """This is executed during result retrieval diff --git a/ingestion/src/metadata/profiler/orm/types/undetermined_type.py b/ingestion/src/metadata/profiler/orm/types/undetermined_type.py new file mode 100644 index 00000000000..f7121347d2f --- /dev/null +++ b/ingestion/src/metadata/profiler/orm/types/undetermined_type.py @@ -0,0 +1,38 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=abstract-method + +""" +Undetermined types for cases where we dont have typ mappings +""" +from sqlalchemy.sql.sqltypes import String, TypeDecorator + + +class UndeterminedType(TypeDecorator): + """A fallback type for undetermined types returned from the database""" + + impl = String + cache_ok = True + + @property + def python_type(self): + return str + + def process_result_value(self, value, _): + """ + We have no idea what is this type. So we just casr + """ + return ( + f"OPENMETADATA_UNDETERMIND[{str(value)}]" + if value + else "OPENMETADATA_UNDETERMIND[]" + ) diff --git a/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py b/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py index 0d4a4d4b621..38157045008 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py +++ b/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py @@ -30,7 +30,9 @@ class NoSQLSampler(SamplerInterface): for column in self.table.columns ] rows, cols = self.transpose_records(records, columns) - return TableData(rows=rows, columns=[c.name for c in cols]) + return TableData( + rows=[list(map(str, row)) for row in rows], columns=[c.name for c in cols] + ) def random_sample(self): pass @@ -40,14 +42,17 @@ class NoSQLSampler(SamplerInterface): return self._fetch_sample_data_from_user_query() return self._fetch_sample_data(columns) - def _fetch_sample_data(self, columns: List[SQALikeColumn]): + def _fetch_sample_data(self, columns: List[SQALikeColumn]) -> TableData: """ returns sampled ometa dataframes """ limit = self._get_limit() records = self.client.scan(self.table, self.table.columns, limit) rows, cols = self.transpose_records(records, columns) - return TableData(rows=rows, columns=[col.name for col in cols]) + return TableData( + rows=[list(map(str, row)) for row in rows], + columns=[col.name for col in cols], + ) def _get_limit(self) -> Optional[int]: num_rows = self.client.item_count(self.table) diff --git a/ingestion/tests/helpers/README.md b/ingestion/tests/helpers/README.md deleted file mode 100644 index f280b61f5cc..00000000000 --- a/ingestion/tests/helpers/README.md +++ /dev/null @@ -1,2 +0,0 @@ -When working on pycharm, add this directory to the sources root so that -it resolves the modules properly. \ No newline at end of file diff --git a/ingestion/tests/helpers/docker_utils.py b/ingestion/tests/helpers/docker_utils.py deleted file mode 100644 index 190c63f7d87..00000000000 --- a/ingestion/tests/helpers/docker_utils.py +++ /dev/null @@ -1,22 +0,0 @@ -import contextlib -import logging - -import docker - - -@contextlib.contextmanager -def try_bind(container, container_port, *host_ports): - """Try to bind a container locally on a specfic port and yield the container. If the port is already in use, - try another port. This is useful when running tests locally and we want to avoid having to reconfigure SQL clients - to connect to a different port each time we run the tests. Multiple ports can be passed so that the next available - port is tried if the previous one is already in use. - """ - for host_port in host_ports: - try: - with container.with_bind_ports(container_port, host_port) as container: - yield container - return - except docker.errors.APIError: - logging.warning("Port %s is already in use, trying another port", host_port) - with container.with_bind_ports(container_port, None) as container: - yield container diff --git a/ingestion/tests/integration/conftest.py b/ingestion/tests/integration/conftest.py index a95ea321b68..308989383a2 100644 --- a/ingestion/tests/integration/conftest.py +++ b/ingestion/tests/integration/conftest.py @@ -1,27 +1,25 @@ import logging -import os import sys import pytest from _openmetadata_testutils.ometa import int_admin_ometa +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.workflow import LogLevels +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.ingestion import IngestionWorkflow if not sys.version_info >= (3, 9): collect_ignore = ["trino"] -def pytest_configure(): - helpers_path = os.path.abspath(os.path.dirname(__file__) + "/../helpers") - sys.path.insert(0, helpers_path) - - @pytest.fixture(scope="session", autouse=True) def configure_logging(): logging.getLogger("sqlfluff").setLevel(logging.CRITICAL) logging.getLogger("pytds").setLevel(logging.CRITICAL) -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def metadata(): return int_admin_ometa() @@ -41,3 +39,135 @@ def config_testcontatiners(): from testcontainers.core.config import testcontainers_config testcontainers_config.max_tries = 10 + + +@pytest.fixture(scope="session") +def sink_config(metadata): + return { + "type": "metadata-rest", + "config": {}, + } + + +@pytest.fixture(scope="session") +def workflow_config(metadata): + return { + "loggerLevel": LogLevels.DEBUG.value, + "openMetadataServerConfig": metadata.config.model_dump(), + } + + +@pytest.fixture() +def profiler_config(db_service, workflow_config, sink_config): + return { + "source": { + "type": db_service.connection.config.type.value.lower(), + "serviceName": db_service.fullyQualifiedName.root, + "sourceConfig": { + "config": { + "type": "Profiler", + "generateSampleData": True, + "timeoutSeconds": 30, + } + }, + }, + "processor": { + "type": "orm-profiler", + "config": {}, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + + +@pytest.fixture() +def run_workflow(): + def _run(workflow_type: type(IngestionWorkflow), config, raise_from_status=True): + workflow: IngestionWorkflow = workflow_type.create(config) + workflow.execute() + if raise_from_status: + workflow.raise_from_status() + return workflow + + return _run + + +@pytest.fixture(scope="module") +def db_service(metadata, create_service_request, unmask_password): + service_entity = metadata.create_or_update(data=create_service_request) + fqn = service_entity.fullyQualifiedName.root + yield unmask_password(service_entity) + service_entity = metadata.get_by_name(DatabaseService, fqn) + if service_entity: + metadata.delete( + DatabaseService, service_entity.id, recursive=True, hard_delete=True + ) + + +@pytest.fixture(scope="module") +def unmask_password(create_service_request): + """Unmask the db passwrod returned by the metadata service. + You can override this at the test_module level to implement custom password handling. + + Example: + @pytest.fixture(scope="module") + def unmask_password(my_container1, my_container2): + def patch_password(service: DatabaseService): + if service.connection.config.authType.password == "my_password": + ... # do something else + return service + return patch_password + """ + + def patch_password(service: DatabaseService): + service.connection.config.authType.password = ( + create_service_request.connection.config.authType.password + ) + return service + + return patch_password + + +@pytest.fixture(scope="module") +def create_service_request(): + """ + Implement in the test module to create a service request + Example: + def create_service_request(scope="module"): + return CreateDatabaseServiceRequest( + name="my_service", + serviceType=DatabaseServiceType.MyService, + connection=DatabaseConnection( + config=MyServiceConnection( + username="my_user", + password="my_password", + host="localhost", + port="5432", + ) + ), + ) + """ + raise NotImplementedError("Implement in the test module") + + +@pytest.fixture() +def patch_passwords_for_db_services(db_service, unmask_password, monkeypatch): + def override_password(getter): + def inner(*args, **kwargs): + result = getter(*args, **kwargs) + if isinstance(result, DatabaseService): + if result.fullyQualifiedName.root == db_service.fullyQualifiedName.root: + return unmask_password(result) + return result + + return inner + + monkeypatch.setattr( + "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name", + override_password(OpenMetadata.get_by_name), + ) + + monkeypatch.setattr( + "metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id", + override_password(OpenMetadata.get_by_id), + ) diff --git a/ingestion/tests/integration/data_quality/conftest.py b/ingestion/tests/integration/data_quality/conftest.py index 52db5796ff4..7f04d86289e 100644 --- a/ingestion/tests/integration/data_quality/conftest.py +++ b/ingestion/tests/integration/data_quality/conftest.py @@ -8,12 +8,7 @@ from metadata.ingestion.models.custom_pydantic import CustomSecretStr from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.metadata import MetadataWorkflow -from ..postgres.conftest import db_service as postgres_service -from ..postgres.conftest import ingest_metadata as ingest_postgres - __all__ = [ - "ingest_postgres", - "postgres_service", "postgres_container", ] diff --git a/ingestion/tests/integration/postgres/conftest.py b/ingestion/tests/integration/postgres/conftest.py index 1caf6d45fad..4ad8dde4d80 100644 --- a/ingestion/tests/integration/postgres/conftest.py +++ b/ingestion/tests/integration/postgres/conftest.py @@ -12,26 +12,14 @@ from metadata.generated.schema.entity.services.connections.database.postgresConn ) from metadata.generated.schema.entity.services.databaseService import ( DatabaseConnection, - DatabaseService, DatabaseServiceType, ) -from metadata.generated.schema.metadataIngestion.workflow import ( - OpenMetadataWorkflowConfig, - Sink, - Source, - SourceConfig, - WorkflowConfig, -) -from metadata.ingestion.lineage.sql_lineage import search_cache -from metadata.ingestion.models.custom_pydantic import CustomSecretStr -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.workflow.metadata import MetadataWorkflow @pytest.fixture(scope="module") -def db_service(metadata, postgres_container): - service = CreateDatabaseServiceRequest( - name="docker_test_db", +def create_service_request(postgres_container, tmp_path_factory): + return CreateDatabaseServiceRequest( + name="docker_test_" + tmp_path_factory.mktemp("postgres").name, serviceType=DatabaseServiceType.Postgres, connection=DatabaseConnection( config=PostgresConnection( @@ -43,33 +31,19 @@ def db_service(metadata, postgres_container): ) ), ) - service_entity = metadata.create_or_update(data=service) - # Since we're using admin JWT (not ingestion-bot), the secret is not sent by the API - service_entity.connection.config.authType.password = CustomSecretStr( - postgres_container.password - ) - yield service_entity - metadata.delete( - DatabaseService, service_entity.id, recursive=True, hard_delete=True - ) @pytest.fixture(scope="module") -def ingest_metadata(db_service, metadata: OpenMetadata): - workflow_config = OpenMetadataWorkflowConfig( - source=Source( - type=db_service.connection.config.type.value.lower(), - serviceName=db_service.fullyQualifiedName.root, - serviceConnection=db_service.connection, - sourceConfig=SourceConfig(config={}), - ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config), - ) - metadata_ingestion = MetadataWorkflow.create(workflow_config) - search_cache.clear() - metadata_ingestion.execute() - metadata_ingestion.raise_from_status() +def ingestion_config( + db_service, metadata, workflow_config, sink_config, postgres_container +): + return { + "source": { + "type": db_service.connection.config.type.value.lower(), + "serviceName": db_service.fullyQualifiedName.root, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + "serviceConnection": db_service.connection.dict(), + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } diff --git a/ingestion/tests/integration/postgres/test_data_quality.py b/ingestion/tests/integration/postgres/test_data_quality.py index d51b42c2663..0506fc06987 100644 --- a/ingestion/tests/integration/postgres/test_data_quality.py +++ b/ingestion/tests/integration/postgres/test_data_quality.py @@ -9,7 +9,6 @@ from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuitePipeline, ) from metadata.generated.schema.metadataIngestion.workflow import ( - LogLevels, OpenMetadataWorkflowConfig, Processor, Sink, @@ -24,16 +23,23 @@ from metadata.generated.schema.type.basic import ComponentConfig from metadata.ingestion.api.status import TruncatedStackTraceError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.workflow.data_quality import TestSuiteWorkflow +from metadata.workflow.metadata import MetadataWorkflow if not sys.version_info >= (3, 9): pytest.skip("requires python 3.9+", allow_module_level=True) -@pytest.fixture(scope="module") +@pytest.fixture() def run_data_quality_workflow( - ingest_metadata, db_service: DatabaseService, metadata: OpenMetadata + run_workflow, + ingestion_config, + db_service: DatabaseService, + metadata: OpenMetadata, + sink_config, + workflow_config, ): - workflow_config = OpenMetadataWorkflowConfig( + run_workflow(MetadataWorkflow, ingestion_config) + test_suite_config = OpenMetadataWorkflowConfig( source=Source( type=TestSuiteConfigType.TestSuite.value, serviceName="MyTestSuite", @@ -80,15 +86,10 @@ def run_data_quality_workflow( } ), ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig( - loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config - ), + sink=Sink.model_validate(sink_config), + workflowConfig=WorkflowConfig.model_validate(workflow_config), ) - test_suite_processor = TestSuiteWorkflow.create(workflow_config) + test_suite_processor = TestSuiteWorkflow.create(test_suite_config) test_suite_processor.execute() test_suite_processor.raise_from_status() yield @@ -120,8 +121,9 @@ def test_data_quality( assert test_case.testCaseResult.testCaseStatus == expected_status -def test_incompatible_column_type(ingest_metadata, metadata: OpenMetadata, db_service): - workflow_config = { +@pytest.fixture() +def incpompatible_column_type_config(db_service, workflow_config, sink_config): + return { "source": { "type": "TestSuite", "serviceName": "MyTestSuite", @@ -131,7 +133,6 @@ def test_incompatible_column_type(ingest_metadata, metadata: OpenMetadata, db_se "entityFullyQualifiedName": f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer", } }, - "serviceConnection": db_service.connection.model_dump(), }, "processor": { "type": "orm-test-runner", @@ -158,17 +159,23 @@ def test_incompatible_column_type(ingest_metadata, metadata: OpenMetadata, db_se ] }, }, - "sink": { - "type": "metadata-rest", - "config": {}, - }, - "workflowConfig": { - "loggerLevel": "DEBUG", - "openMetadataServerConfig": metadata.config.model_dump(), - }, + "sink": sink_config, + "workflowConfig": workflow_config, } - test_suite_processor = TestSuiteWorkflow.create(workflow_config) - test_suite_processor.execute() + + +def test_incompatible_column_type( + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + incpompatible_column_type_config, + metadata: OpenMetadata, + db_service, +): + run_workflow(MetadataWorkflow, ingestion_config) + test_suite_processor = run_workflow( + TestSuiteWorkflow, incpompatible_column_type_config, raise_from_status=False + ) assert test_suite_processor.steps[0].get_status().failures == [ TruncatedStackTraceError( name="Incompatible Column for Test Case", diff --git a/ingestion/tests/integration/postgres/test_lineage.py b/ingestion/tests/integration/postgres/test_lineage.py new file mode 100644 index 00000000000..9a5b9f34138 --- /dev/null +++ b/ingestion/tests/integration/postgres/test_lineage.py @@ -0,0 +1,121 @@ +import sys +import time +from os import path + +import pytest + +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.lineage.sql_lineage import search_cache +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.metadata import MetadataWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) + + +@pytest.fixture() +def native_lineage_config(db_service, workflow_config, sink_config): + return { + "source": { + "type": "postgres-lineage", + "serviceName": db_service.fullyQualifiedName.root, + "sourceConfig": {"config": {}}, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + + +def test_native_lineage( + run_workflow, ingestion_config, native_lineage_config, metadata, db_service +): + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(MetadataWorkflow, native_lineage_config) + + +@pytest.fixture() +def log_lineage_config(db_service, metadata, workflow_config, sink_config): + return { + "source": { + "type": "query-log-lineage", + "serviceName": db_service.fullyQualifiedName.root, + "sourceConfig": { + "config": { + "type": "DatabaseLineage", + "queryLogFilePath": path.dirname(__file__) + "/bad_query_log.csv", + } + }, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + + +def test_log_lineage( + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + log_lineage_config, + metadata, + db_service, +): + # since query cache is stored in ES, we need to reindex to avoid having a stale cache + # TODO fix the server so that we dont need to run this + reindex_search(metadata) + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + workflow = run_workflow( + MetadataWorkflow, log_lineage_config, raise_from_status=False + ) + assert len(workflow.source.status.failures) == 2 + for failure in workflow.source.status.failures: + assert "Table entity not found" in failure.error + customer_table: Table = metadata.get_by_name( + Table, + f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer", + nullable=False, + ) + actor_table: Table = metadata.get_by_name( + Table, + f"{db_service.fullyQualifiedName.root}.dvdrental.public.actor", + nullable=False, + ) + staff_table: Table = metadata.get_by_name( + Table, + f"{db_service.fullyQualifiedName.root}.dvdrental.public.staff", + nullable=False, + ) + edge = metadata.get_lineage_edge( + str(customer_table.id.root), str(actor_table.id.root) + ) + assert edge is not None + edge = metadata.get_lineage_edge( + str(customer_table.id.root), str(staff_table.id.root) + ) + assert edge is not None + + +def reindex_search(metadata: OpenMetadata, timeout=60): + start = time.time() + status = None + while status is None or status == "running": + response = metadata.client.get( + "/apps/name/SearchIndexingApplication/status?offset=0&limit=1" + ) + status = response["data"][0]["status"] + if time.time() - start > timeout: + raise TimeoutError("Timed out waiting for reindexing to start") + time.sleep(1) + time.sleep( + 0.5 + ) # app interactivity is not immediate (probably bc async operations), so we wait a bit + metadata.client.post("/apps/trigger/SearchIndexingApplication") + time.sleep(0.5) # here too + while status != "success": + response = metadata.client.get( + "/apps/name/SearchIndexingApplication/status?offset=0&limit=1" + ) + status = response["data"][0]["status"] + if time.time() - start > timeout: + raise TimeoutError("Timed out waiting for reindexing to complete") + time.sleep(1) diff --git a/ingestion/tests/integration/postgres/test_metadata.py b/ingestion/tests/integration/postgres/test_metadata.py new file mode 100644 index 00000000000..dd491df33a5 --- /dev/null +++ b/ingestion/tests/integration/postgres/test_metadata.py @@ -0,0 +1,14 @@ +import sys + +import pytest + +from metadata.workflow.metadata import MetadataWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) + + +def test_ingest_metadata( + patch_passwords_for_db_services, run_workflow, ingestion_config +): + run_workflow(MetadataWorkflow, ingestion_config) diff --git a/ingestion/tests/integration/postgres/test_postgres.py b/ingestion/tests/integration/postgres/test_postgres.py deleted file mode 100644 index d35e4b6fa07..00000000000 --- a/ingestion/tests/integration/postgres/test_postgres.py +++ /dev/null @@ -1,392 +0,0 @@ -import sys -import time -from os import path - -import pytest - -from _openmetadata_testutils.postgres.conftest import postgres_container -from metadata.generated.schema.api.services.createDatabaseService import ( - CreateDatabaseServiceRequest, -) -from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( - BasicAuth, -) -from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( - PostgresConnection, -) -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseConnection, - DatabaseService, - DatabaseServiceType, -) -from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( - DatabaseServiceProfilerPipeline, -) -from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import ( - DatabaseServiceQueryLineagePipeline, -) -from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import ( - DatabaseUsageConfigType, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - LogLevels, - OpenMetadataWorkflowConfig, - Processor, - Sink, - Source, - SourceConfig, - WorkflowConfig, -) -from metadata.ingestion.lineage.sql_lineage import search_cache -from metadata.ingestion.models.custom_pydantic import CustomSecretStr -from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.workflow.metadata import MetadataWorkflow -from metadata.workflow.profiler import ProfilerWorkflow -from metadata.workflow.usage import UsageWorkflow - -if not sys.version_info >= (3, 9): - pytest.skip("requires python 3.9+", allow_module_level=True) - - -@pytest.fixture(scope="module") -def db_service(metadata, postgres_container): - service = CreateDatabaseServiceRequest( - name="docker_test_db", - serviceType=DatabaseServiceType.Postgres, - connection=DatabaseConnection( - config=PostgresConnection( - username=postgres_container.username, - authType=BasicAuth(password=postgres_container.password), - hostPort="localhost:" - + postgres_container.get_exposed_port(postgres_container.port), - database="dvdrental", - ) - ), - ) - service_entity = metadata.create_or_update(data=service) - service_entity.connection.config.authType.password = CustomSecretStr( - postgres_container.password - ) - yield service_entity - try: - metadata.delete( - DatabaseService, service_entity.id, recursive=True, hard_delete=True - ) - except APIError as error: - if error.status_code == 404: - pass - else: - raise - - -@pytest.fixture(scope="module") -def ingest_metadata(db_service, metadata: OpenMetadata): - search_cache.clear() - workflow_config = OpenMetadataWorkflowConfig( - source=Source( - type=db_service.connection.config.type.value.lower(), - serviceName=db_service.fullyQualifiedName.root, - serviceConnection=db_service.connection, - sourceConfig=SourceConfig(config={}), - ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config), - ) - metadata_ingestion = MetadataWorkflow.create(workflow_config) - metadata_ingestion.execute() - return - - -@pytest.fixture(scope="module") -def ingest_postgres_lineage(db_service, ingest_metadata, metadata: OpenMetadata): - search_cache.clear() - workflow_config = OpenMetadataWorkflowConfig( - source=Source( - type="postgres-lineage", - serviceName=db_service.fullyQualifiedName.root, - serviceConnection=db_service.connection, - sourceConfig=SourceConfig(config=DatabaseServiceQueryLineagePipeline()), - ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig( - loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config - ), - ) - metadata_ingestion = MetadataWorkflow.create(workflow_config) - metadata_ingestion.execute() - metadata_ingestion.raise_from_status() - return - - -def test_ingest_query_log(db_service, ingest_metadata, metadata: OpenMetadata): - search_cache.clear() - reindex_search( - metadata - ) # since query cache is stored in ES, we need to reindex to avoid having a stale cache - workflow_config = { - "source": { - "type": "query-log-lineage", - "serviceName": db_service.fullyQualifiedName.root, - "sourceConfig": { - "config": { - "type": "DatabaseLineage", - "queryLogFilePath": path.dirname(__file__) + "/bad_query_log.csv", - } - }, - }, - "sink": {"type": "metadata-rest", "config": {}}, - "workflowConfig": { - "loggerLevel": "DEBUG", - "openMetadataServerConfig": metadata.config.model_dump(), - }, - } - metadata_ingestion = MetadataWorkflow.create(workflow_config) - metadata_ingestion.execute() - assert len(metadata_ingestion.source.status.failures) == 2 - for failure in metadata_ingestion.source.status.failures: - assert "Table entity not found" in failure.error - customer_table: Table = metadata.get_by_name( - Table, - f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer", - nullable=False, - ) - actor_table: Table = metadata.get_by_name( - Table, - f"{db_service.fullyQualifiedName.root}.dvdrental.public.actor", - nullable=False, - ) - staff_table: Table = metadata.get_by_name( - Table, - f"{db_service.fullyQualifiedName.root}.dvdrental.public.staff", - nullable=False, - ) - edge = metadata.get_lineage_edge( - str(customer_table.id.root), str(actor_table.id.root) - ) - assert edge is not None - edge = metadata.get_lineage_edge( - str(customer_table.id.root), str(staff_table.id.root) - ) - assert edge is not None - - -@pytest.fixture(scope="module") -def run_profiler_workflow(ingest_metadata, db_service, metadata): - workflow_config = OpenMetadataWorkflowConfig( - source=Source( - type=db_service.connection.config.type.value.lower(), - serviceName=db_service.fullyQualifiedName.root, - serviceConnection=db_service.connection, - sourceConfig=SourceConfig(config=DatabaseServiceProfilerPipeline()), - ), - processor=Processor( - type="orm-profiler", - config={}, - ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig( - loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config - ), - ) - metadata_ingestion = ProfilerWorkflow.create(workflow_config.model_dump()) - search_cache.clear() - metadata_ingestion.execute() - return - - -@pytest.fixture(scope="module") -def ingest_query_usage(ingest_metadata, db_service, metadata): - search_cache.clear() - workflow_config = { - "source": { - "type": "postgres-usage", - "serviceName": db_service.fullyQualifiedName.root, - "serviceConnection": db_service.connection.model_dump(), - "sourceConfig": { - "config": {"type": DatabaseUsageConfigType.DatabaseUsage.value} - }, - }, - "processor": {"type": "query-parser", "config": {}}, - "stage": { - "type": "table-usage", - "config": { - "filename": "/tmp/postgres_usage", - }, - }, - "bulkSink": { - "type": "metadata-usage", - "config": { - "filename": "/tmp/postgres_usage", - }, - }, - "sink": {"type": "metadata-rest", "config": {}}, - "workflowConfig": { - "loggerLevel": "DEBUG", - "openMetadataServerConfig": metadata.config.model_dump(), - }, - } - workflow = UsageWorkflow.create(workflow_config) - search_cache.clear() - workflow.execute() - workflow.raise_from_status() - return - - -@pytest.fixture(scope="module") -def db_fqn(db_service: DatabaseService): - return ".".join( - [ - db_service.fullyQualifiedName.root, - db_service.connection.config.database, - ] - ) - - -def test_query_usage( - ingest_query_usage, - db_service, - metadata, - db_fqn, -): - table = metadata.get_by_name(Table, ".".join([db_fqn, "public", "actor"])) - queries = metadata.get_entity_queries(table.id) - # TODO this should be retruning 2 queries but in CI sometimes it returns 1 *shrug* - assert 1 <= len(queries) <= 2 - - -def test_profiler(run_profiler_workflow): - pass - - -def test_db_lineage(ingest_postgres_lineage): - pass - - -def run_usage_workflow(db_service, metadata): - workflow_config = { - "source": { - "type": "postgres-usage", - "serviceName": db_service.fullyQualifiedName.root, - "serviceConnection": db_service.connection.model_dump(), - "sourceConfig": { - "config": {"type": DatabaseUsageConfigType.DatabaseUsage.value} - }, - }, - "processor": {"type": "query-parser", "config": {}}, - "stage": { - "type": "table-usage", - "config": { - "filename": "/tmp/postgres_usage", - }, - }, - "bulkSink": { - "type": "metadata-usage", - "config": { - "filename": "/tmp/postgres_usage", - }, - }, - "sink": {"type": "metadata-rest", "config": {}}, - "workflowConfig": { - "loggerLevel": "DEBUG", - "openMetadataServerConfig": metadata.config.model_dump(), - }, - } - workflow = UsageWorkflow.create(workflow_config) - search_cache.clear() - workflow.execute() - workflow.raise_from_status() - - -@pytest.mark.xfail( - reason="'metadata.ingestion.lineage.sql_lineage.search_cache' gets corrupted with invalid data." - " See issue https://github.com/open-metadata/OpenMetadata/issues/16408" -) -def test_usage_delete_usage(db_service, ingest_postgres_lineage, metadata): - workflow_config = { - "source": { - "type": "postgres-usage", - "serviceName": db_service.fullyQualifiedName.root, - "serviceConnection": db_service.connection.model_dump(), - "sourceConfig": { - "config": {"type": DatabaseUsageConfigType.DatabaseUsage.value} - }, - }, - "processor": {"type": "query-parser", "config": {}}, - "stage": { - "type": "table-usage", - "config": { - "filename": "/tmp/postgres_usage", - }, - }, - "bulkSink": { - "type": "metadata-usage", - "config": { - "filename": "/tmp/postgres_usage", - }, - }, - "sink": {"type": "metadata-rest", "config": {}}, - "workflowConfig": { - "loggerLevel": "DEBUG", - "openMetadataServerConfig": metadata.config.model_dump(), - }, - } - workflow = UsageWorkflow.create(workflow_config) - search_cache.clear() - workflow.execute() - workflow.raise_from_status() - run_usage_workflow(db_service, metadata) - metadata.delete(DatabaseService, db_service.id, hard_delete=True, recursive=True) - workflow_config = OpenMetadataWorkflowConfig( - source=Source( - type=db_service.connection.config.type.value.lower(), - serviceName=db_service.fullyQualifiedName.root, - serviceConnection=db_service.connection, - sourceConfig=SourceConfig(config={}), - ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config), - ) - metadata_ingestion = MetadataWorkflow.create(workflow_config) - metadata_ingestion.execute() - metadata_ingestion.raise_from_status() - run_usage_workflow(db_service, metadata) - - -def reindex_search(metadata: OpenMetadata, timeout=60): - start = time.time() - status = None - while status is None or status == "running": - response = metadata.client.get( - "/apps/name/SearchIndexingApplication/status?offset=0&limit=1" - ) - status = response["data"][0]["status"] - if time.time() - start > timeout: - raise TimeoutError("Timed out waiting for reindexing to start") - time.sleep(1) - time.sleep( - 0.5 - ) # app interactivity is not immediate (probably bc async operations), so we wait a bit - metadata.client.post("/apps/trigger/SearchIndexingApplication") - time.sleep(0.5) # here too - while status != "success": - response = metadata.client.get( - "/apps/name/SearchIndexingApplication/status?offset=0&limit=1" - ) - status = response["data"][0]["status"] - if time.time() - start > timeout: - raise TimeoutError("Timed out waiting for reindexing to complete") - time.sleep(1) diff --git a/ingestion/tests/integration/postgres/test_profiler.py b/ingestion/tests/integration/postgres/test_profiler.py new file mode 100644 index 00000000000..5aa854b0601 --- /dev/null +++ b/ingestion/tests/integration/postgres/test_profiler.py @@ -0,0 +1,18 @@ +import sys + +import pytest + +from metadata.ingestion.lineage.sql_lineage import search_cache +from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) + + +def test_profiler( + patch_passwords_for_db_services, run_workflow, ingestion_config, profiler_config +): + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(ProfilerWorkflow, profiler_config) diff --git a/ingestion/tests/integration/postgres/test_usage.py b/ingestion/tests/integration/postgres/test_usage.py new file mode 100644 index 00000000000..dfe6d3e7d00 --- /dev/null +++ b/ingestion/tests/integration/postgres/test_usage.py @@ -0,0 +1,64 @@ +import sys + +import pytest + +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import ( + DatabaseUsageConfigType, +) +from metadata.ingestion.lineage.sql_lineage import search_cache +from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.usage import UsageWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) + + +@pytest.fixture() +def usage_config(sink_config, workflow_config, db_service): + return { + "source": { + "type": "postgres-usage", + "serviceName": db_service.fullyQualifiedName.root, + "sourceConfig": { + "config": {"type": DatabaseUsageConfigType.DatabaseUsage.value} + }, + }, + "processor": {"type": "query-parser", "config": {}}, + "stage": { + "type": "table-usage", + "config": { + "filename": "/tmp/postgres_usage", + }, + }, + "bulkSink": { + "type": "metadata-usage", + "config": { + "filename": "/tmp/postgres_usage", + }, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + + +def test_usage(run_workflow, ingestion_config, usage_config, metadata, db_service): + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(UsageWorkflow, usage_config) + + +@pytest.mark.xfail( + reason="'metadata.ingestion.lineage.sql_lineage.search_cache' gets corrupted with invalid data." + " See issue https://github.com/open-metadata/OpenMetadata/issues/16408", + strict=True, +) +def test_usage_delete_usage( + run_workflow, ingestion_config, usage_config, metadata, db_service +): + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(UsageWorkflow, usage_config) + metadata.delete(DatabaseService, db_service.id, hard_delete=True, recursive=True) + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(UsageWorkflow, usage_config) diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py index f4567e8e225..7a428cbd08c 100644 --- a/ingestion/tests/integration/profiler/test_nosql_profiler.py +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -328,5 +328,5 @@ class NoSQLProfiler(TestCase): "age" ) assert all( - [r[age_column_index] == query_age for r in sample_data.sampleData.rows] + [r[age_column_index] == str(query_age) for r in sample_data.sampleData.rows] ) diff --git a/ingestion/tests/integration/sql_server/conftest.py b/ingestion/tests/integration/sql_server/conftest.py index a2831137b65..465eb6ef56a 100644 --- a/ingestion/tests/integration/sql_server/conftest.py +++ b/ingestion/tests/integration/sql_server/conftest.py @@ -1,4 +1,3 @@ -import logging import os import shutil @@ -6,24 +5,22 @@ import pytest from sqlalchemy import create_engine, text from testcontainers.mssql import SqlServerContainer +from _openmetadata_testutils.postgres.conftest import try_bind +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( + MssqlConnection, MssqlScheme, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.ingestion.lineage.sql_lineage import search_cache -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.workflow.metadata import MetadataWorkflow - -from ...helpers.docker_utils import try_bind -from ...helpers.markers import xfail_param +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) -@pytest.fixture(scope="session", autouse=True) -def config_logging(): - logging.getLogger("sqlfluff").setLevel(logging.CRITICAL) - - -@pytest.fixture(scope="session") +@pytest.fixture(scope="module") def mssql_container(tmp_path_factory): container = SqlServerContainer( "mcr.microsoft.com/mssql/server:2017-latest", dbname="AdventureWorks" @@ -80,27 +77,6 @@ GO yield container -@pytest.fixture( - scope="module", - params=[ - "english", - xfail_param( - "german", - "failes due to date format handling (https://github.com/open-metadata/OpenMetadata/issues/16434)", - ), - ], -) -def mssql_server_config(mssql_container, request): - language = request.param - engine = create_engine( - "mssql+pytds://" + mssql_container.get_connection_url().split("://")[1], - connect_args={"autocommit": True}, - ) - engine.execute( - f"ALTER LOGIN {mssql_container.username} WITH DEFAULT_LANGUAGE={language};" - ) - - @pytest.fixture( scope="module", params=[ @@ -108,24 +84,40 @@ def mssql_server_config(mssql_container, request): MssqlScheme.mssql_pyodbc, ], ) -def ingest_metadata(mssql_container, metadata: OpenMetadata, request): - workflow_config = { +def scheme(request): + return request.param + + +@pytest.fixture(scope="module") +def create_service_request(mssql_container, scheme, tmp_path_factory): + return CreateDatabaseServiceRequest( + name="docker_test_" + tmp_path_factory.mktemp("mssql").name + "_" + scheme.name, + serviceType=DatabaseServiceType.Mssql, + connection=DatabaseConnection( + config=MssqlConnection( + username=mssql_container.username, + password=mssql_container.password, + hostPort="localhost:" + + mssql_container.get_exposed_port(mssql_container.port), + database="AdventureWorks", + scheme=scheme, + ingestAllDatabases=True, + connectionOptions={ + "TrustServerCertificate": "yes", + "MARS_Connection": "yes", + }, + ) + ), + ) + + +@pytest.fixture(scope="module") +def ingestion_config(db_service, tmp_path_factory, workflow_config, sink_config): + return { "source": { "type": "mssql", - "serviceName": "integration_test_mssql_" + request.param.name, - "serviceConnection": { - "config": { - "type": "Mssql", - "scheme": request.param, - "username": mssql_container.username, - "password": mssql_container.password, - "hostPort": "localhost:" - + mssql_container.get_exposed_port(mssql_container.port), - "database": "AdventureWorks", - "ingestAllDatabases": True, - "connectionOptions": {"TrustServerCertificate": "yes"}, - } - }, + "serviceName": db_service.fullyQualifiedName.root, + "serviceConnection": db_service.connection.dict(), "sourceConfig": { "config": { "type": "DatabaseMetadata", @@ -133,61 +125,17 @@ def ingest_metadata(mssql_container, metadata: OpenMetadata, request): }, }, }, - "sink": {"type": "metadata-rest", "config": {}}, - "workflowConfig": { - "loggerLevel": "DEBUG", - "openMetadataServerConfig": metadata.config.model_dump(), - }, + "sink": sink_config, + "workflowConfig": workflow_config, } - metadata_ingestion = MetadataWorkflow.create(workflow_config) - metadata_ingestion.execute() - metadata_ingestion.raise_from_status() - metadata_ingestion.stop() - db_service = metadata.get_by_name( - DatabaseService, workflow_config["source"]["serviceName"] - ) - yield db_service - metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True) @pytest.fixture(scope="module") -def run_lineage_workflow( - mssql_server_config, - ingest_metadata: DatabaseService, - mssql_container, - metadata: OpenMetadata, -): - workflow_config = { - "source": { - "type": "mssql-lineage", - "serviceName": ingest_metadata.fullyQualifiedName.root, - "serviceConnection": { - "config": { - "type": "Mssql", - "scheme": ingest_metadata.connection.config.scheme, - "username": mssql_container.username, - "password": mssql_container.password, - "hostPort": "localhost:" - + mssql_container.get_exposed_port(mssql_container.port), - "database": "AdventureWorks", - "ingestAllDatabases": True, - } - }, - "sourceConfig": { - "config": { - "type": "DatabaseLineage", - "databaseFilterPattern": {"includes": ["TestDB", "AdventureWorks"]}, - }, - }, - }, - "sink": {"type": "metadata-rest", "config": {}}, - "workflowConfig": { - "loggerLevel": "INFO", - "openMetadataServerConfig": metadata.config.model_dump(), - }, - } - metadata_ingestion = MetadataWorkflow.create(workflow_config) - search_cache.clear() - metadata_ingestion.execute() - metadata_ingestion.raise_from_status() - metadata_ingestion.stop() +def unmask_password(create_service_request): + def inner(service: DatabaseService): + service.connection.config.password = ( + create_service_request.connection.config.password + ) + return service + + return inner diff --git a/ingestion/tests/integration/sql_server/test_lineage.py b/ingestion/tests/integration/sql_server/test_lineage.py index a1e0f6c0e31..04f98895598 100644 --- a/ingestion/tests/integration/sql_server/test_lineage.py +++ b/ingestion/tests/integration/sql_server/test_lineage.py @@ -1,22 +1,72 @@ import sys import pytest +from freezegun import freeze_time +from sqlalchemy import create_engine +from _openmetadata_testutils.helpers.markers import xfail_param from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.lineage.sql_lineage import search_cache +from metadata.workflow.metadata import MetadataWorkflow if not sys.version_info >= (3, 9): pytest.skip("requires python 3.9+", allow_module_level=True) -@pytest.mark.skip("fails for english even thoudh it should succeed") +@pytest.fixture( + params=[ + "english", + xfail_param( + "german", + "failes due to date format handling (https://github.com/open-metadata/OpenMetadata/issues/16434)", + ), + ], +) +def language_config(mssql_container, request): + language = request.param + engine = create_engine( + "mssql+pytds://" + mssql_container.get_connection_url().split("://")[1], + connect_args={"autocommit": True}, + ) + engine.execute( + f"ALTER LOGIN {mssql_container.username} WITH DEFAULT_LANGUAGE={language};" + ) + + +@pytest.fixture() +def lineage_config(language_config, db_service, workflow_config, sink_config): + return { + "source": { + "type": "mssql-lineage", + "serviceName": db_service.fullyQualifiedName.root, + "sourceConfig": { + "config": { + "type": "DatabaseLineage", + "databaseFilterPattern": {"includes": ["TestDB", "AdventureWorks"]}, + }, + }, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + + +@freeze_time("2024-01-30") # to demonstrate the issue with german language def test_lineage( - ingest_metadata, - run_lineage_workflow, + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + lineage_config, + db_service, metadata, ): - service_fqn = ingest_metadata.fullyQualifiedName.root + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(MetadataWorkflow, lineage_config) department_table = metadata.get_by_name( - Table, f"{service_fqn}.AdventureWorks.HumanResources.Department", nullable=False + Table, + f"{db_service.fullyQualifiedName.root}.AdventureWorks.HumanResources.Department", + nullable=False, ) lineage = metadata.get_lineage_by_id(Table, department_table.id.root) assert lineage is not None diff --git a/ingestion/tests/integration/sql_server/test_metadata_ingestion.py b/ingestion/tests/integration/sql_server/test_metadata.py similarity index 64% rename from ingestion/tests/integration/sql_server/test_metadata_ingestion.py rename to ingestion/tests/integration/sql_server/test_metadata.py index ec952ca08d0..945873f4f0b 100644 --- a/ingestion/tests/integration/sql_server/test_metadata_ingestion.py +++ b/ingestion/tests/integration/sql_server/test_metadata.py @@ -3,18 +3,23 @@ import sys import pytest from metadata.generated.schema.entity.data.table import Constraint, Table +from metadata.workflow.metadata import MetadataWorkflow if not sys.version_info >= (3, 9): pytest.skip("requires python 3.9+", allow_module_level=True) -def test_pass( - ingest_metadata, +def test_ingest_metadata( + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + db_service, metadata, ): + run_workflow(MetadataWorkflow, ingestion_config) table: Table = metadata.get_by_name( Table, - f"{ingest_metadata.fullyQualifiedName.root}.AdventureWorks.HumanResources.Department", + f"{db_service.fullyQualifiedName.root}.AdventureWorks.HumanResources.Department", ) assert table is not None assert table.columns[0].name.root == "DepartmentID" diff --git a/ingestion/tests/integration/sql_server/test_profiler.py b/ingestion/tests/integration/sql_server/test_profiler.py new file mode 100644 index 00000000000..5aa854b0601 --- /dev/null +++ b/ingestion/tests/integration/sql_server/test_profiler.py @@ -0,0 +1,18 @@ +import sys + +import pytest + +from metadata.ingestion.lineage.sql_lineage import search_cache +from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) + + +def test_profiler( + patch_passwords_for_db_services, run_workflow, ingestion_config, profiler_config +): + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(ProfilerWorkflow, profiler_config) diff --git a/ingestion/tests/integration/trino/conftest.py b/ingestion/tests/integration/trino/conftest.py index 450216bf0d0..2d0a1f93518 100644 --- a/ingestion/tests/integration/trino/conftest.py +++ b/ingestion/tests/integration/trino/conftest.py @@ -11,6 +11,19 @@ from testcontainers.core.generic import DbContainer from testcontainers.minio import MinioContainer from testcontainers.mysql import MySqlContainer +from _openmetadata_testutils.postgres.conftest import try_bind +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( + TrinoConnection, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) + class TrinoContainer(DbContainer): def __init__( @@ -98,13 +111,13 @@ class HiveMetaStoreContainer(DockerContainer): ) -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def docker_network(): with testcontainers.core.network.Network() as network: yield network -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def trino_container(hive_metastore_container, minio_container, docker_network): with TrinoContainer(image="trinodb/trino:418").with_network( docker_network @@ -118,15 +131,20 @@ def trino_container(hive_metastore_container, minio_container, docker_network): yield trino -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def mysql_container(docker_network): - with MySqlContainer( - "mariadb:10.6.16", username="admin", password="admin", dbname="metastore_db" - ).with_network(docker_network).with_network_aliases("mariadb") as mysql: + container = ( + MySqlContainer( + "mariadb:10.6.16", username="admin", password="admin", dbname="metastore_db" + ) + .with_network(docker_network) + .with_network_aliases("mariadb") + ) + with try_bind(container, container.port, container.port + 1) as mysql: yield mysql -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def hive_metastore_container(mysql_container, minio_container, docker_network): with HiveMetaStoreContainer("bitsondatadev/hive-metastore:latest").with_network( docker_network @@ -144,17 +162,18 @@ def hive_metastore_container(mysql_container, minio_container, docker_network): yield hive -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def minio_container(docker_network): - with MinioContainer().with_network(docker_network).with_network_aliases( - "minio" - ) as minio: + container = ( + MinioContainer().with_network(docker_network).with_network_aliases("minio") + ) + with try_bind(container, container.port, container.port) as minio: client = minio.get_client() client.make_bucket("hive-warehouse") yield minio -@pytest.fixture(scope="module") +@pytest.fixture(scope="session") def create_test_data(trino_container): engine = create_engine(trino_container.get_connection_url()) engine.execute( @@ -163,3 +182,51 @@ def create_test_data(trino_container): engine.execute("create table minio.my_schema.test_table (id int)") engine.execute("insert into minio.my_schema.test_table values (1), (2), (3)") return + + +@pytest.fixture(scope="module") +def create_service_request(trino_container, tmp_path_factory): + return CreateDatabaseServiceRequest( + name="docker_test_" + tmp_path_factory.mktemp("trino").name, + serviceType=DatabaseServiceType.Trino, + connection=DatabaseConnection( + config=TrinoConnection( + username=trino_container.user, + hostPort="localhost:" + + trino_container.get_exposed_port(trino_container.port), + catalog="minio", + connectionArguments={"http_scheme": "http"}, + ) + ), + ) + + +@pytest.fixture +def ingestion_config(db_service, sink_config, workflow_config): + return { + "source": { + "type": db_service.connection.config.type.value.lower(), + "serviceName": db_service.fullyQualifiedName.root, + "serviceConnection": db_service.connection.dict(), + "sourceConfig": { + "config": { + "type": "DatabaseMetadata", + "schemaFilterPattern": { + "excludes": [ + "^information_schema$", + ], + }, + }, + }, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + + +@pytest.fixture(scope="module") +def unmask_password(): + def patch_password(service: DatabaseService): + return service + + return patch_password diff --git a/ingestion/tests/integration/trino/test_metadata.py b/ingestion/tests/integration/trino/test_metadata.py new file mode 100644 index 00000000000..aa489bd74f9 --- /dev/null +++ b/ingestion/tests/integration/trino/test_metadata.py @@ -0,0 +1,19 @@ +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.metadata import MetadataWorkflow + + +def test_metadata( + db_service, metadata: OpenMetadata, run_workflow, ingestion_config, create_test_data +): + run_workflow(MetadataWorkflow, ingestion_config) + tables = metadata.list_entities( + Table, + params={ + "databaseSchema": f"{db_service.fullyQualifiedName.root}.minio.my_schema" + }, + ) + assert ( + next((t for t in tables.entities if t.name.root == "test_table"), None) + is not None + ) diff --git a/ingestion/tests/integration/trino/test_profiler.py b/ingestion/tests/integration/trino/test_profiler.py new file mode 100644 index 00000000000..af8bab0512e --- /dev/null +++ b/ingestion/tests/integration/trino/test_profiler.py @@ -0,0 +1,15 @@ +from metadata.ingestion.lineage.sql_lineage import search_cache +from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow + + +def test_profiler( + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + profiler_config, + create_test_data, +): + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(ProfilerWorkflow, profiler_config) diff --git a/ingestion/tests/integration/trino/test_trino.py b/ingestion/tests/integration/trino/test_trino.py deleted file mode 100644 index 2d5746874bd..00000000000 --- a/ingestion/tests/integration/trino/test_trino.py +++ /dev/null @@ -1,76 +0,0 @@ -import pytest - -from metadata.generated.schema.api.services.createDatabaseService import ( - CreateDatabaseServiceRequest, -) -from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( - TrinoConnection, -) -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseConnection, - DatabaseService, - DatabaseServiceType, -) -from metadata.generated.schema.metadataIngestion.workflow import ( - OpenMetadataWorkflowConfig, - Sink, - Source, - SourceConfig, - WorkflowConfig, -) -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.workflow.metadata import MetadataWorkflow - - -@pytest.fixture(scope="module") -def db_service(metadata, trino_container): - service = CreateDatabaseServiceRequest( - name="docker_test_trino", - serviceType=DatabaseServiceType.Trino, - connection=DatabaseConnection( - config=TrinoConnection( - username=trino_container.user, - hostPort="localhost:" - + trino_container.get_exposed_port(trino_container.port), - catalog="minio", - connectionArguments={"http_scheme": "http"}, - ) - ), - ) - service_entity = metadata.create_or_update(data=service) - yield service_entity - metadata.delete( - DatabaseService, service_entity.id, recursive=True, hard_delete=True - ) - - -@pytest.fixture(scope="module") -def ingest_metadata(db_service, metadata: OpenMetadata, create_test_data): - workflow_config = OpenMetadataWorkflowConfig( - source=Source( - type=db_service.connection.config.type.value.lower(), - serviceName=db_service.fullyQualifiedName.root, - serviceConnection=db_service.connection, - sourceConfig=SourceConfig(config={}), - ), - sink=Sink( - type="metadata-rest", - config={}, - ), - workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config), - ) - metadata_ingestion = MetadataWorkflow.create(workflow_config) - metadata_ingestion.execute() - metadata_ingestion.raise_from_status() - return - - -def test_ingest_metadata(ingest_metadata, db_service, metadata: OpenMetadata): - tables = metadata.list_entities( - Table, params={"databaseSchema": "docker_test_trino.minio.my_schema"} - ) - assert ( - next((t for t in tables.entities if t.name.root == "test_table"), None) - is not None - ) diff --git a/ingestion/tests/unit/metadata/profiler/api/test_models.py b/ingestion/tests/unit/metadata/profiler/api/test_models.py new file mode 100644 index 00000000000..bd057d50397 --- /dev/null +++ b/ingestion/tests/unit/metadata/profiler/api/test_models.py @@ -0,0 +1,57 @@ +import pytest + +from _openmetadata_testutils.helpers.markers import xfail_param +from metadata.generated.schema.entity.data.table import TableData + + +@pytest.mark.parametrize( + "parameter", + [ + TableData( + columns=[], + rows=[], + ), + TableData( + columns=[], + rows=[[1]], + ), + TableData( + columns=[], + rows=[["a"]], + ), + TableData( + columns=[], + rows=[ + [b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"] + ], + ), + ], +) +def test_table_data_serialization(parameter): + for row in parameter.rows: + for i, cell in enumerate(row): + if isinstance(cell, bytes): + # bytes are written as strings and deserialize as strings + row[i] = cell.decode("utf-8") + assert TableData.model_validate_json(parameter.model_dump_json()) == parameter + + +@pytest.mark.parametrize( + "parameter", + [ + xfail_param( + TableData( + columns=[], + rows=[ + [ + b"\xe6\x10\x00\x00\x01\x0c\xae\x8b\xfc(\xbc\xe4G@g\xa8\x91\x89\x89\x8a^\xc0" + ] + ], + ), + reason="TODO: change TableData.rows to List[List[str]]", + ), + ], +) +def test_unserializble(parameter): + parameter = TableData.model_validate(parameter.model_dump()) + assert TableData.model_validate_json(parameter.model_dump_json()) != parameter diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index cf0a469c8fc..a0b03225ec1 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -154,7 +154,8 @@ "QUANTILE_STATE", "AGG_STATE", "BITMAP", - "UINT" + "UINT", + "BIT" ] }, "constraint": {