mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-24 15:25:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			217 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			217 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import sys
 | |
| import time
 | |
| from os import path
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from metadata.generated.schema.entity.data.table import Table
 | |
| from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
 | |
|     DatabaseLineageConfigType,
 | |
| )
 | |
| from metadata.ingestion.lineage.sql_lineage import search_cache
 | |
| from metadata.ingestion.ometa.ometa_api import OpenMetadata
 | |
| from metadata.workflow.metadata import MetadataWorkflow
 | |
| 
 | |
| if not sys.version_info >= (3, 9):
 | |
|     pytest.skip("requires python 3.9+", allow_module_level=True)
 | |
| 
 | |
| 
 | |
| @pytest.fixture()
 | |
| def native_lineage_config(db_service, workflow_config, sink_config):
 | |
|     return {
 | |
|         "source": {
 | |
|             "type": "postgres-lineage",
 | |
|             "serviceName": db_service.fullyQualifiedName.root,
 | |
|             "sourceConfig": {
 | |
|                 "config": {"type": DatabaseLineageConfigType.DatabaseLineage.value}
 | |
|             },
 | |
|         },
 | |
|         "sink": sink_config,
 | |
|         "workflowConfig": workflow_config,
 | |
|     }
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "source_config,expected_nodes",
 | |
|     [
 | |
|         ({"includeDDL": False}, 3),
 | |
|         ({"includeDDL": True}, 3),
 | |
|     ],
 | |
|     ids=lambda config: (
 | |
|         "".join([f"{k}={str(v)}" for k, v in config.items()])
 | |
|         if isinstance(config, dict)
 | |
|         else ""
 | |
|     ),
 | |
| )
 | |
| def test_native_lineage(
 | |
|     patch_passwords_for_db_services,
 | |
|     source_config,
 | |
|     expected_nodes,
 | |
|     run_workflow,
 | |
|     ingestion_config,
 | |
|     native_lineage_config,
 | |
|     metadata,
 | |
|     db_service,
 | |
| ):
 | |
|     ingestion_config["source"]["sourceConfig"]["config"].update(source_config)
 | |
|     run_workflow(MetadataWorkflow, ingestion_config)
 | |
|     run_workflow(MetadataWorkflow, native_lineage_config)
 | |
|     film_actor_edges = metadata.get_lineage_by_name(
 | |
|         Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor"
 | |
|     )
 | |
|     assert len(film_actor_edges["nodes"]) == expected_nodes
 | |
|     run_workflow(MetadataWorkflow, native_lineage_config)
 | |
| 
 | |
| 
 | |
| @pytest.fixture()
 | |
| def log_lineage_config(db_service, metadata, workflow_config, sink_config):
 | |
|     return {
 | |
|         "source": {
 | |
|             "type": "query-log-lineage",
 | |
|             "serviceName": db_service.fullyQualifiedName.root,
 | |
|             "sourceConfig": {
 | |
|                 "config": {
 | |
|                     "type": "DatabaseLineage",
 | |
|                     "queryLogFilePath": path.dirname(__file__) + "/bad_query_log.csv",
 | |
|                 }
 | |
|             },
 | |
|         },
 | |
|         "sink": sink_config,
 | |
|         "workflowConfig": workflow_config,
 | |
|     }
 | |
| 
 | |
| 
 | |
| def test_log_lineage(
 | |
|     patch_passwords_for_db_services,
 | |
|     run_workflow,
 | |
|     ingestion_config,
 | |
|     log_lineage_config,
 | |
|     metadata,
 | |
|     db_service,
 | |
| ):
 | |
|     # since query cache is stored in ES, we need to reindex to avoid having a stale cache
 | |
|     # TODO fix the server so that we dont need to run this
 | |
|     reindex_search(metadata)
 | |
|     search_cache.clear()
 | |
|     run_workflow(MetadataWorkflow, ingestion_config)
 | |
|     workflow = run_workflow(
 | |
|         MetadataWorkflow, log_lineage_config, raise_from_status=False
 | |
|     )
 | |
|     assert len(workflow.source.status.failures) == 0
 | |
|     customer_table: Table = metadata.get_by_name(
 | |
|         Table,
 | |
|         f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
 | |
|         nullable=False,
 | |
|     )
 | |
|     actor_table: Table = metadata.get_by_name(
 | |
|         Table,
 | |
|         f"{db_service.fullyQualifiedName.root}.dvdrental.public.actor",
 | |
|         nullable=False,
 | |
|     )
 | |
|     staff_table: Table = metadata.get_by_name(
 | |
|         Table,
 | |
|         f"{db_service.fullyQualifiedName.root}.dvdrental.public.staff",
 | |
|         nullable=False,
 | |
|     )
 | |
|     edge = metadata.get_lineage_edge(
 | |
|         str(customer_table.id.root), str(actor_table.id.root)
 | |
|     )
 | |
|     assert edge is not None
 | |
|     edge = metadata.get_lineage_edge(
 | |
|         str(customer_table.id.root), str(staff_table.id.root)
 | |
|     )
 | |
|     assert edge is not None
 | |
| 
 | |
| 
 | |
| def reindex_search(metadata: OpenMetadata, timeout=60):
 | |
|     start = time.time()
 | |
|     # wait for previous reindexing to finish (if any)
 | |
|     while True:
 | |
|         response = metadata.client.get(
 | |
|             "/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
 | |
|         )
 | |
|         if len(response["data"]) == 0:
 | |
|             break
 | |
|         if response["data"][0]["status"] != "running":
 | |
|             break
 | |
|         if time.time() - start > timeout:
 | |
|             raise TimeoutError("Timed out waiting for reindexing to start")
 | |
|         time.sleep(1)
 | |
|     time.sleep(
 | |
|         0.5
 | |
|     )  # app interactivity is not immediate (probably bc async operations), so we wait a bit
 | |
|     metadata.client.post("/apps/trigger/SearchIndexingApplication")
 | |
|     time.sleep(0.5)  # here too
 | |
|     status = None
 | |
|     while status != "success":
 | |
|         response = metadata.client.get(
 | |
|             "/apps/name/SearchIndexingApplication/status?offset=0&limit=1"
 | |
|         )
 | |
|         status = response["data"][0]["status"]
 | |
|         if time.time() - start > timeout:
 | |
|             raise TimeoutError("Timed out waiting for reindexing to complete")
 | |
|         time.sleep(1)
 | |
| 
 | |
| 
 | |
| @pytest.fixture()
 | |
| def long_cell_query_log(tmp_path_factory):
 | |
|     log_file = tmp_path_factory.mktemp("data") / "large_query_log.csv"
 | |
|     with open(log_file, "w") as f:
 | |
|         f.write("query_text,database_name,schema_name\n")
 | |
|         f.write(
 | |
|             "insert into dvdrental.public.rental select {} from dvdrental.public.payment\n".format(
 | |
|                 "first_name || '" + "a" * 100_000 + "'"
 | |
|             )
 | |
|         )
 | |
|     return log_file
 | |
| 
 | |
| 
 | |
| @pytest.fixture()
 | |
| def long_cell_query_file(
 | |
|     db_service, metadata, workflow_config, sink_config, long_cell_query_log
 | |
| ):
 | |
|     return {
 | |
|         "source": {
 | |
|             "type": "query-log-lineage",
 | |
|             "serviceName": db_service.fullyQualifiedName.root,
 | |
|             "sourceConfig": {
 | |
|                 "config": {
 | |
|                     "type": "DatabaseLineage",
 | |
|                     "queryLogFilePath": str(long_cell_query_log),
 | |
|                 }
 | |
|             },
 | |
|         },
 | |
|         "sink": sink_config,
 | |
|         "workflowConfig": workflow_config,
 | |
|     }
 | |
| 
 | |
| 
 | |
| def test_log_file_with_long_cell(
 | |
|     patch_passwords_for_db_services,
 | |
|     run_workflow,
 | |
|     ingestion_config,
 | |
|     long_cell_query_file,
 | |
|     metadata,
 | |
|     db_service,
 | |
| ):
 | |
|     # since query cache is stored in ES, we need to reindex to avoid having a stale cache
 | |
|     # TODO fix the server so that we dont need to run this
 | |
|     reindex_search(metadata)
 | |
|     search_cache.clear()
 | |
|     run_workflow(MetadataWorkflow, ingestion_config)
 | |
|     run_workflow(MetadataWorkflow, long_cell_query_file)
 | |
|     rental_table: Table = metadata.get_by_name(
 | |
|         Table,
 | |
|         f"{db_service.fullyQualifiedName.root}.dvdrental.public.rental",
 | |
|         nullable=False,
 | |
|     )
 | |
|     payment_table: Table = metadata.get_by_name(
 | |
|         Table,
 | |
|         f"{db_service.fullyQualifiedName.root}.dvdrental.public.payment",
 | |
|         nullable=False,
 | |
|     )
 | |
|     edge = metadata.get_lineage_edge(
 | |
|         str(payment_table.id.root), str(rental_table.id.root)
 | |
|     )
 | |
|     assert edge is not None
 | 
