Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

212 lines
6.5 KiB
Python
Raw Normal View History

import sys
import time
from os import path
import pytest
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
@pytest.fixture()
def native_lineage_config(db_service, workflow_config, sink_config):
return {
"source": {
"type": "postgres-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {"config": {}},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
@pytest.mark.parametrize(
"source_config,expected_nodes",
[
({"includeDDL": False}, 0),
({"includeDDL": True}, 3),
],
ids=lambda config: (
"".join([f"{k}={str(v)}" for k, v in config.items()])
if isinstance(config, dict)
else ""
),
)
def test_native_lineage(
source_config,
expected_nodes,
run_workflow,
ingestion_config,
native_lineage_config,
metadata,
db_service,
):
ingestion_config["source"]["sourceConfig"]["config"].update(source_config)
run_workflow(MetadataWorkflow, ingestion_config)
film_actor_edges = metadata.get_lineage_by_name(
Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor"
)
assert len(film_actor_edges["nodes"]) == expected_nodes
run_workflow(MetadataWorkflow, native_lineage_config)
@pytest.fixture()
def log_lineage_config(db_service, metadata, workflow_config, sink_config):
return {
"source": {
"type": "query-log-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"queryLogFilePath": path.dirname(__file__) + "/bad_query_log.csv",
}
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
def test_log_lineage(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
log_lineage_config,
metadata,
db_service,
):
# since query cache is stored in ES, we need to reindex to avoid having a stale cache
# TODO fix the server so that we dont need to run this
reindex_search(metadata)
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
workflow = run_workflow(
MetadataWorkflow, log_lineage_config, raise_from_status=False
)
assert len(workflow.source.status.failures) == 2
for failure in workflow.source.status.failures:
assert "Table entity not found" in failure.error
customer_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False,
)
actor_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.actor",
nullable=False,
)
staff_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.staff",
nullable=False,
)
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(actor_table.id.root)
)
assert edge is not None
edge = metadata.get_lineage_edge(
str(customer_table.id.root), str(staff_table.id.root)
)
assert edge is not None
def reindex_search(metadata: OpenMetadata, timeout=60):
start = time.time()
# wait for previous reindexing to finish (if any)
while True:
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
if len(response["data"]) == 0:
break
if response["data"][0]["status"] != "running":
break
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to start")
time.sleep(1)
time.sleep(
0.5
) # app interactivity is not immediate (probably bc async operations), so we wait a bit
metadata.client.post("/apps/trigger/SearchIndexingApplication")
time.sleep(0.5) # here too
status = None
while status != "success":
response = metadata.client.get(
"/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
)
status = response["data"][0]["status"]
if time.time() - start > timeout:
raise TimeoutError("Timed out waiting for reindexing to complete")
time.sleep(1)
@pytest.fixture()
def long_cell_query_log(tmp_path_factory):
log_file = tmp_path_factory.mktemp("data") / "large_query_log.csv"
with open(log_file, "w") as f:
f.write("query_text,database_name,schema_name\n")
f.write(
"insert into dvdrental.public.rental select {} from dvdrental.public.payment\n".format(
"first_name || '" + "a" * 100_000 + "'"
)
)
return log_file
@pytest.fixture()
def long_cell_query_file(
db_service, metadata, workflow_config, sink_config, long_cell_query_log
):
return {
"source": {
"type": "query-log-lineage",
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": "DatabaseLineage",
"queryLogFilePath": str(long_cell_query_log),
}
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
def test_log_file_with_long_cell(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
long_cell_query_file,
metadata,
db_service,
):
# since query cache is stored in ES, we need to reindex to avoid having a stale cache
# TODO fix the server so that we dont need to run this
reindex_search(metadata)
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(MetadataWorkflow, long_cell_query_file)
rental_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.rental",
nullable=False,
)
payment_table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.payment",
nullable=False,
)
edge = metadata.get_lineage_edge(
str(payment_table.id.root), str(rental_table.id.root)
)
assert edge is not None