| 
									
										
										
										
											2024-06-17 08:56:28 +02:00
										 |  |  | import sys | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import pytest | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | from freezegun import freeze_time | 
					
						
							|  |  |  | from sqlalchemy import create_engine | 
					
						
							| 
									
										
										
										
											2024-06-17 08:56:28 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | from metadata.generated.schema.entity.data.table import Table | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | from metadata.ingestion.lineage.sql_lineage import search_cache | 
					
						
							|  |  |  | from metadata.workflow.metadata import MetadataWorkflow | 
					
						
							| 
									
										
										
										
											2024-06-17 08:56:28 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | if not sys.version_info >= (3, 9): | 
					
						
							|  |  |  |     pytest.skip("requires python 3.9+", allow_module_level=True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | @pytest.fixture( | 
					
						
							| 
									
										
										
										
											2024-08-01 14:18:45 +05:30
										 |  |  |     params=["german", "english"],  # test for both languages | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  | ) | 
					
						
							|  |  |  | def language_config(mssql_container, request): | 
					
						
							|  |  |  |     language = request.param | 
					
						
							|  |  |  |     engine = create_engine( | 
					
						
							|  |  |  |         "mssql+pytds://" + mssql_container.get_connection_url().split("://")[1], | 
					
						
							|  |  |  |         connect_args={"autocommit": True}, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     engine.execute( | 
					
						
							|  |  |  |         f"ALTER LOGIN {mssql_container.username} WITH DEFAULT_LANGUAGE={language};" | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.fixture() | 
					
						
							| 
									
										
										
										
											2024-09-20 08:52:40 +02:00
										 |  |  | def lineage_config(language_config, db_service, workflow_config, sink_config, db_name): | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  |     return { | 
					
						
							|  |  |  |         "source": { | 
					
						
							|  |  |  |             "type": "mssql-lineage", | 
					
						
							|  |  |  |             "serviceName": db_service.fullyQualifiedName.root, | 
					
						
							|  |  |  |             "sourceConfig": { | 
					
						
							|  |  |  |                 "config": { | 
					
						
							|  |  |  |                     "type": "DatabaseLineage", | 
					
						
							| 
									
										
										
										
											2024-09-20 08:52:40 +02:00
										 |  |  |                     "databaseFilterPattern": {"includes": ["TestDB", db_name]}, | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  |                 }, | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         "sink": sink_config, | 
					
						
							|  |  |  |         "workflowConfig": workflow_config, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @freeze_time("2024-01-30")  # to demonstrate the issue with german language | 
					
						
							| 
									
										
										
										
											2024-06-17 08:56:28 +02:00
										 |  |  | def test_lineage( | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  |     patch_passwords_for_db_services, | 
					
						
							|  |  |  |     run_workflow, | 
					
						
							|  |  |  |     ingestion_config, | 
					
						
							|  |  |  |     lineage_config, | 
					
						
							|  |  |  |     db_service, | 
					
						
							| 
									
										
										
										
											2024-06-17 08:56:28 +02:00
										 |  |  |     metadata, | 
					
						
							| 
									
										
										
										
											2024-09-20 08:52:40 +02:00
										 |  |  |     db_name, | 
					
						
							| 
									
										
										
										
											2024-06-17 08:56:28 +02:00
										 |  |  | ): | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  |     search_cache.clear() | 
					
						
							|  |  |  |     run_workflow(MetadataWorkflow, ingestion_config) | 
					
						
							|  |  |  |     run_workflow(MetadataWorkflow, lineage_config) | 
					
						
							| 
									
										
										
										
											2024-06-17 08:56:28 +02:00
										 |  |  |     department_table = metadata.get_by_name( | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  |         Table, | 
					
						
							| 
									
										
										
										
											2024-09-20 08:52:40 +02:00
										 |  |  |         f"{db_service.fullyQualifiedName.root}.{db_name}.SalesLT.Customer", | 
					
						
							| 
									
										
										
										
											2024-07-17 08:11:34 +02:00
										 |  |  |         nullable=False, | 
					
						
							| 
									
										
										
										
											2024-06-17 08:56:28 +02:00
										 |  |  |     ) | 
					
						
							|  |  |  |     lineage = metadata.get_lineage_by_id(Table, department_table.id.root) | 
					
						
							|  |  |  |     assert lineage is not None |