mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-06 08:37:22 +00:00
217 lines
6.7 KiB
Python
217 lines
6.7 KiB
Python
import sys
|
|
import time
|
|
from os import path
|
|
|
|
import pytest
|
|
|
|
from metadata.generated.schema.entity.data.table import Table
|
|
from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
|
|
DatabaseLineageConfigType,
|
|
)
|
|
from metadata.ingestion.lineage.sql_lineage import search_cache
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
from metadata.workflow.metadata import MetadataWorkflow
|
|
|
|
if not sys.version_info >= (3, 9):
|
|
pytest.skip("requires python 3.9+", allow_module_level=True)
|
|
|
|
|
|
@pytest.fixture()
|
|
def native_lineage_config(db_service, workflow_config, sink_config):
|
|
return {
|
|
"source": {
|
|
"type": "postgres-lineage",
|
|
"serviceName": db_service.fullyQualifiedName.root,
|
|
"sourceConfig": {
|
|
"config": {"type": DatabaseLineageConfigType.DatabaseLineage.value}
|
|
},
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"source_config,expected_nodes",
|
|
[
|
|
({"includeDDL": False}, 3),
|
|
({"includeDDL": True}, 3),
|
|
],
|
|
ids=lambda config: (
|
|
"".join([f"{k}={str(v)}" for k, v in config.items()])
|
|
if isinstance(config, dict)
|
|
else ""
|
|
),
|
|
)
|
|
def test_native_lineage(
|
|
patch_passwords_for_db_services,
|
|
source_config,
|
|
expected_nodes,
|
|
run_workflow,
|
|
ingestion_config,
|
|
native_lineage_config,
|
|
metadata,
|
|
db_service,
|
|
):
|
|
ingestion_config["source"]["sourceConfig"]["config"].update(source_config)
|
|
run_workflow(MetadataWorkflow, ingestion_config)
|
|
run_workflow(MetadataWorkflow, native_lineage_config)
|
|
film_actor_edges = metadata.get_lineage_by_name(
|
|
Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor"
|
|
)
|
|
assert len(film_actor_edges["nodes"]) == expected_nodes
|
|
run_workflow(MetadataWorkflow, native_lineage_config)
|
|
|
|
|
|
@pytest.fixture()
|
|
def log_lineage_config(db_service, metadata, workflow_config, sink_config):
|
|
return {
|
|
"source": {
|
|
"type": "query-log-lineage",
|
|
"serviceName": db_service.fullyQualifiedName.root,
|
|
"sourceConfig": {
|
|
"config": {
|
|
"type": "DatabaseLineage",
|
|
"queryLogFilePath": path.dirname(__file__) + "/bad_query_log.csv",
|
|
}
|
|
},
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
|
|
|
|
def test_log_lineage(
|
|
patch_passwords_for_db_services,
|
|
run_workflow,
|
|
ingestion_config,
|
|
log_lineage_config,
|
|
metadata,
|
|
db_service,
|
|
):
|
|
# since query cache is stored in ES, we need to reindex to avoid having a stale cache
|
|
# TODO fix the server so that we dont need to run this
|
|
reindex_search(metadata)
|
|
search_cache.clear()
|
|
run_workflow(MetadataWorkflow, ingestion_config)
|
|
workflow = run_workflow(
|
|
MetadataWorkflow, log_lineage_config, raise_from_status=False
|
|
)
|
|
assert len(workflow.source.status.failures) == 0
|
|
customer_table: Table = metadata.get_by_name(
|
|
Table,
|
|
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
|
|
nullable=False,
|
|
)
|
|
actor_table: Table = metadata.get_by_name(
|
|
Table,
|
|
f"{db_service.fullyQualifiedName.root}.dvdrental.public.actor",
|
|
nullable=False,
|
|
)
|
|
staff_table: Table = metadata.get_by_name(
|
|
Table,
|
|
f"{db_service.fullyQualifiedName.root}.dvdrental.public.staff",
|
|
nullable=False,
|
|
)
|
|
edge = metadata.get_lineage_edge(
|
|
str(customer_table.id.root), str(actor_table.id.root)
|
|
)
|
|
assert edge is not None
|
|
edge = metadata.get_lineage_edge(
|
|
str(customer_table.id.root), str(staff_table.id.root)
|
|
)
|
|
assert edge is not None
|
|
|
|
|
|
def reindex_search(metadata: OpenMetadata, timeout=60):
|
|
start = time.time()
|
|
# wait for previous reindexing to finish (if any)
|
|
while True:
|
|
response = metadata.client.get(
|
|
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
|
|
)
|
|
if len(response["data"]) == 0:
|
|
break
|
|
if response["data"][0]["status"] != "running":
|
|
break
|
|
if time.time() - start > timeout:
|
|
raise TimeoutError("Timed out waiting for reindexing to start")
|
|
time.sleep(1)
|
|
time.sleep(
|
|
0.5
|
|
) # app interactivity is not immediate (probably bc async operations), so we wait a bit
|
|
metadata.client.post("/apps/trigger/SearchIndexingApplication")
|
|
time.sleep(0.5) # here too
|
|
status = None
|
|
while status != "success":
|
|
response = metadata.client.get(
|
|
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
|
|
)
|
|
status = response["data"][0]["status"]
|
|
if time.time() - start > timeout:
|
|
raise TimeoutError("Timed out waiting for reindexing to complete")
|
|
time.sleep(1)
|
|
|
|
|
|
@pytest.fixture()
|
|
def long_cell_query_log(tmp_path_factory):
|
|
log_file = tmp_path_factory.mktemp("data") / "large_query_log.csv"
|
|
with open(log_file, "w") as f:
|
|
f.write("query_text,database_name,schema_name\n")
|
|
f.write(
|
|
"insert into dvdrental.public.rental select {} from dvdrental.public.payment\n".format(
|
|
"first_name || '" + "a" * 100_000 + "'"
|
|
)
|
|
)
|
|
return log_file
|
|
|
|
|
|
@pytest.fixture()
|
|
def long_cell_query_file(
|
|
db_service, metadata, workflow_config, sink_config, long_cell_query_log
|
|
):
|
|
return {
|
|
"source": {
|
|
"type": "query-log-lineage",
|
|
"serviceName": db_service.fullyQualifiedName.root,
|
|
"sourceConfig": {
|
|
"config": {
|
|
"type": "DatabaseLineage",
|
|
"queryLogFilePath": str(long_cell_query_log),
|
|
}
|
|
},
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
|
|
|
|
def test_log_file_with_long_cell(
|
|
patch_passwords_for_db_services,
|
|
run_workflow,
|
|
ingestion_config,
|
|
long_cell_query_file,
|
|
metadata,
|
|
db_service,
|
|
):
|
|
# since query cache is stored in ES, we need to reindex to avoid having a stale cache
|
|
# TODO fix the server so that we dont need to run this
|
|
reindex_search(metadata)
|
|
search_cache.clear()
|
|
run_workflow(MetadataWorkflow, ingestion_config)
|
|
run_workflow(MetadataWorkflow, long_cell_query_file)
|
|
rental_table: Table = metadata.get_by_name(
|
|
Table,
|
|
f"{db_service.fullyQualifiedName.root}.dvdrental.public.rental",
|
|
nullable=False,
|
|
)
|
|
payment_table: Table = metadata.get_by_name(
|
|
Table,
|
|
f"{db_service.fullyQualifiedName.root}.dvdrental.public.payment",
|
|
nullable=False,
|
|
)
|
|
edge = metadata.get_lineage_edge(
|
|
str(payment_table.id.root), str(rental_table.id.root)
|
|
)
|
|
assert edge is not None
|