diff --git a/ingestion/tests/integration/postgres/test_lineage.py b/ingestion/tests/integration/postgres/test_lineage.py index 636d2bf7aa4..b6e6609ca34 100644 --- a/ingestion/tests/integration/postgres/test_lineage.py +++ b/ingestion/tests/integration/postgres/test_lineage.py @@ -146,3 +146,66 @@ def reindex_search(metadata: OpenMetadata, timeout=60): if time.time() - start > timeout: raise TimeoutError("Timed out waiting for reindexing to complete") time.sleep(1) + + +@pytest.fixture() +def long_cell_query_log(tmp_path_factory): + log_file = tmp_path_factory.mktemp("data") / "large_query_log.csv" + with open(log_file, "w") as f: + f.write("query_text,database_name,schema_name\n") + f.write( + "insert into dvdrental.public.rental select {} from dvdrental.public.payment\n".format( + "first_name || '" + "a" * 100_000 + "'" + ) + ) + return log_file + + +@pytest.fixture() +def long_cell_query_file( + db_service, metadata, workflow_config, sink_config, long_cell_query_log +): + return { + "source": { + "type": "query-log-lineage", + "serviceName": db_service.fullyQualifiedName.root, + "sourceConfig": { + "config": { + "type": "DatabaseLineage", + "queryLogFilePath": str(long_cell_query_log), + } + }, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + + +def test_log_file_with_long_cell( + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + long_cell_query_file, + metadata, + db_service, +): + # since query cache is stored in ES, we need to reindex to avoid having a stale cache + # TODO fix the server so that we dont need to run this + reindex_search(metadata) + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(MetadataWorkflow, long_cell_query_file) + rental_table: Table = metadata.get_by_name( + Table, + f"{db_service.fullyQualifiedName.root}.dvdrental.public.rental", + nullable=False, + ) + payment_table: Table = metadata.get_by_name( + Table, + f"{db_service.fullyQualifiedName.root}.dvdrental.public.payment", + nullable=False, + ) + edge = metadata.get_lineage_edge( + str(payment_table.id.root), str(rental_table.id.root) + ) + assert edge is not None