mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-04 14:43:11 +00:00
MINOR: postgres integration test (#15929)
* implemented postgres-integration-tests * format * format * - disable ryuk - disabled verbose sqlfluff logging * query usage assertion
This commit is contained in:
parent
eeaab62e63
commit
d8781bbef2
2
.github/workflows/py-tests.yml
vendored
2
.github/workflows/py-tests.yml
vendored
@ -84,6 +84,8 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
source env/bin/activate
|
source env/bin/activate
|
||||||
make run_python_tests
|
make run_python_tests
|
||||||
|
environment:
|
||||||
|
TESTCONTAINERS_RYUK_DISABLED: true
|
||||||
|
|
||||||
- name: Run Python Tests & record coverage
|
- name: Run Python Tests & record coverage
|
||||||
if: ${{ matrix.py-version == '3.9' }}
|
if: ${{ matrix.py-version == '3.9' }}
|
||||||
|
0
ingestion/tests/integration/postgres/__init__.py
Normal file
0
ingestion/tests/integration/postgres/__init__.py
Normal file
53
ingestion/tests/integration/postgres/conftest.py
Normal file
53
ingestion/tests/integration/postgres/conftest.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import os
|
||||||
|
import zipfile
|
||||||
|
from subprocess import CalledProcessError
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from testcontainers.postgres import PostgresContainer
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def postgres_container(tmp_path_factory):
|
||||||
|
data_dir = tmp_path_factory.mktemp("data")
|
||||||
|
dvd_rental_zip = os.path.join(os.path.dirname(__file__), "data", "dvdrental.zip")
|
||||||
|
zipfile.ZipFile(dvd_rental_zip, "r").extractall(str(data_dir))
|
||||||
|
|
||||||
|
container = PostgresContainer("postgres:15", dbname="dvdrental")
|
||||||
|
container.volumes = {str(data_dir): {"bind": "/data"}}
|
||||||
|
container._command = [
|
||||||
|
"-c",
|
||||||
|
"shared_preload_libraries=pg_stat_statements",
|
||||||
|
"-c",
|
||||||
|
"pg_stat_statements.max=10000",
|
||||||
|
"-c",
|
||||||
|
"pg_stat_statements.track=all",
|
||||||
|
]
|
||||||
|
|
||||||
|
with container as container:
|
||||||
|
docker_container = container.get_wrapped_container()
|
||||||
|
for query in (
|
||||||
|
"CREATE USER postgres SUPERUSER;",
|
||||||
|
"CREATE EXTENSION pg_stat_statements;",
|
||||||
|
):
|
||||||
|
res = docker_container.exec_run(
|
||||||
|
["psql", "-U", container.username, "-d", container.dbname, "-c", query]
|
||||||
|
)
|
||||||
|
if res[0] != 0:
|
||||||
|
raise CalledProcessError(
|
||||||
|
returncode=res[0], cmd=res, output=res[1].decode("utf-8")
|
||||||
|
)
|
||||||
|
res = docker_container.exec_run(
|
||||||
|
[
|
||||||
|
"pg_restore",
|
||||||
|
"-U",
|
||||||
|
container.username,
|
||||||
|
"-d",
|
||||||
|
container.dbname,
|
||||||
|
"/data/dvdrental.tar",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
if res[0] != 0:
|
||||||
|
raise CalledProcessError(
|
||||||
|
returncode=res[0], cmd=res, output=res[1].decode("utf-8")
|
||||||
|
)
|
||||||
|
yield container
|
BIN
ingestion/tests/integration/postgres/data/dvdrental.zip
Normal file
BIN
ingestion/tests/integration/postgres/data/dvdrental.zip
Normal file
Binary file not shown.
213
ingestion/tests/integration/postgres/test_postgres.py
Normal file
213
ingestion/tests/integration/postgres/test_postgres.py
Normal file
@ -0,0 +1,213 @@
|
|||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from metadata.generated.schema.api.services.createDatabaseService import (
|
||||||
|
CreateDatabaseServiceRequest,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.entity.data.table import Table
|
||||||
|
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
|
||||||
|
BasicAuth,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
|
||||||
|
PostgresConnection,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.entity.services.databaseService import (
|
||||||
|
DatabaseConnection,
|
||||||
|
DatabaseService,
|
||||||
|
DatabaseServiceType,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
|
||||||
|
DatabaseServiceProfilerPipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
|
||||||
|
DatabaseServiceQueryLineagePipeline,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.databaseServiceQueryUsagePipeline import (
|
||||||
|
DatabaseUsageConfigType,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
|
LogLevels,
|
||||||
|
OpenMetadataWorkflowConfig,
|
||||||
|
Processor,
|
||||||
|
Sink,
|
||||||
|
Source,
|
||||||
|
SourceConfig,
|
||||||
|
WorkflowConfig,
|
||||||
|
)
|
||||||
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
|
from metadata.profiler.api.models import ProfilerProcessorConfig
|
||||||
|
from metadata.workflow.metadata import MetadataWorkflow
|
||||||
|
from metadata.workflow.profiler import ProfilerWorkflow
|
||||||
|
from metadata.workflow.usage import UsageWorkflow
|
||||||
|
|
||||||
|
from ..integration_base import int_admin_ometa
|
||||||
|
|
||||||
|
if not sys.version_info >= (3, 9):
|
||||||
|
pytest.skip("requires python 3.9+", allow_module_level=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def config_logging():
|
||||||
|
logging.getLogger("sqlfluff").setLevel(logging.CRITICAL)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def metadata():
|
||||||
|
return int_admin_ometa()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def db_service(metadata, postgres_container):
|
||||||
|
service = CreateDatabaseServiceRequest(
|
||||||
|
name="docker_test_db",
|
||||||
|
serviceType=DatabaseServiceType.Postgres,
|
||||||
|
connection=DatabaseConnection(
|
||||||
|
config=PostgresConnection(
|
||||||
|
username=postgres_container.username,
|
||||||
|
authType=BasicAuth(password=postgres_container.password),
|
||||||
|
hostPort="localhost:"
|
||||||
|
+ postgres_container.get_exposed_port(postgres_container.port),
|
||||||
|
database="dvdrental",
|
||||||
|
)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
service_entity = metadata.create_or_update(data=service)
|
||||||
|
service_entity.connection.config.authType.password = postgres_container.password
|
||||||
|
yield service_entity
|
||||||
|
metadata.delete(
|
||||||
|
DatabaseService, service_entity.id, recursive=True, hard_delete=True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def ingest_metadata(db_service, metadata: OpenMetadata):
|
||||||
|
workflow_config = OpenMetadataWorkflowConfig(
|
||||||
|
source=Source(
|
||||||
|
type=db_service.connection.config.type.value.lower(),
|
||||||
|
serviceName=db_service.fullyQualifiedName.__root__,
|
||||||
|
serviceConnection=db_service.connection,
|
||||||
|
sourceConfig=SourceConfig(config={}),
|
||||||
|
),
|
||||||
|
sink=Sink(
|
||||||
|
type="metadata-rest",
|
||||||
|
config={},
|
||||||
|
),
|
||||||
|
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
|
||||||
|
)
|
||||||
|
metadata_ingestion = MetadataWorkflow.create(workflow_config)
|
||||||
|
metadata_ingestion.execute()
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def ingest_lineage(db_service, ingest_metadata, metadata: OpenMetadata):
|
||||||
|
workflow_config = OpenMetadataWorkflowConfig(
|
||||||
|
source=Source(
|
||||||
|
type="postgres-lineage",
|
||||||
|
serviceName=db_service.fullyQualifiedName.__root__,
|
||||||
|
serviceConnection=db_service.connection,
|
||||||
|
sourceConfig=SourceConfig(config=DatabaseServiceQueryLineagePipeline()),
|
||||||
|
),
|
||||||
|
sink=Sink(
|
||||||
|
type="metadata-rest",
|
||||||
|
config={},
|
||||||
|
),
|
||||||
|
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
|
||||||
|
)
|
||||||
|
metadata_ingestion = MetadataWorkflow.create(workflow_config)
|
||||||
|
metadata_ingestion.execute()
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def run_profiler_workflow(ingest_metadata, db_service, metadata):
|
||||||
|
workflow_config = OpenMetadataWorkflowConfig(
|
||||||
|
source=Source(
|
||||||
|
type=db_service.connection.config.type.value.lower(),
|
||||||
|
serviceName=db_service.fullyQualifiedName.__root__,
|
||||||
|
serviceConnection=db_service.connection,
|
||||||
|
sourceConfig=SourceConfig(config=DatabaseServiceProfilerPipeline()),
|
||||||
|
),
|
||||||
|
processor=Processor(
|
||||||
|
type="orm-profiler",
|
||||||
|
config=ProfilerProcessorConfig(),
|
||||||
|
),
|
||||||
|
sink=Sink(
|
||||||
|
type="metadata-rest",
|
||||||
|
config={},
|
||||||
|
),
|
||||||
|
workflowConfig=WorkflowConfig(
|
||||||
|
loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config
|
||||||
|
),
|
||||||
|
)
|
||||||
|
metadata_ingestion = ProfilerWorkflow.create(workflow_config.dict())
|
||||||
|
metadata_ingestion.execute()
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def ingest_query_usage(ingest_metadata, db_service, metadata):
|
||||||
|
workflow_config = {
|
||||||
|
"source": {
|
||||||
|
"type": "postgres-usage",
|
||||||
|
"serviceName": db_service.fullyQualifiedName.__root__,
|
||||||
|
"serviceConnection": db_service.connection.dict(),
|
||||||
|
"sourceConfig": {
|
||||||
|
"config": {"type": DatabaseUsageConfigType.DatabaseUsage.value}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"processor": {"type": "query-parser", "config": {}},
|
||||||
|
"stage": {
|
||||||
|
"type": "table-usage",
|
||||||
|
"config": {
|
||||||
|
"filename": "/tmp/postgres_usage",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"bulkSink": {
|
||||||
|
"type": "metadata-usage",
|
||||||
|
"config": {
|
||||||
|
"filename": "/tmp/postgres_usage",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"sink": {"type": "metadata-rest", "config": {}},
|
||||||
|
"workflowConfig": {
|
||||||
|
"loggerLevel": "DEBUG",
|
||||||
|
"openMetadataServerConfig": metadata.config.dict(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
workflow = UsageWorkflow.create(workflow_config)
|
||||||
|
workflow.execute()
|
||||||
|
workflow.raise_from_status()
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def db_fqn(db_service: DatabaseService):
|
||||||
|
return ".".join(
|
||||||
|
[
|
||||||
|
db_service.fullyQualifiedName.__root__,
|
||||||
|
db_service.connection.config.database,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_query_usage(
|
||||||
|
ingest_query_usage,
|
||||||
|
db_service,
|
||||||
|
metadata,
|
||||||
|
db_fqn,
|
||||||
|
):
|
||||||
|
table = metadata.get_by_name(Table, ".".join([db_fqn, "public", "actor"]))
|
||||||
|
queries = metadata.get_entity_queries(table.id)
|
||||||
|
# TODO this should be retruning 2 queries but in CI sometimes it returns 1 *shrug*
|
||||||
|
assert 1 <= len(queries) <= 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_profiler(run_profiler_workflow):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_lineage(ingest_lineage):
|
||||||
|
pass
|
Loading…
x
Reference in New Issue
Block a user