| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  | from typing import Any, Dict, Optional, cast | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from datahub.ingestion.api.committable import StatefulCommittable | 
					
						
							|  |  |  | from datahub.ingestion.run.pipeline import Pipeline | 
					
						
							|  |  |  | from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource | 
					
						
							|  |  |  | from datahub.ingestion.source.state.checkpoint import Checkpoint | 
					
						
							| 
									
										
										
										
											2024-01-31 14:42:40 +05:30
										 |  |  | from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState | 
					
						
							|  |  |  | from datahub.ingestion.source.state.stale_entity_removal_handler import ( | 
					
						
							|  |  |  |     StaleEntityRemovalHandler, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2023-05-24 01:27:57 +05:30
										 |  |  | from sqlalchemy import create_engine | 
					
						
							|  |  |  | from sqlalchemy.sql import text | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-27 11:31:25 -05:00
										 |  |  | from tests.utils import get_mysql_password, get_mysql_url, get_mysql_username | 
					
						
							| 
									
										
										
										
											2022-06-30 16:00:50 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-27 11:31:25 -05:00
										 |  |  | def test_stateful_ingestion(auth_session): | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |     def create_mysql_engine(mysql_source_config_dict: Dict[str, Any]) -> Any: | 
					
						
							|  |  |  |         mysql_config = MySQLConfig.parse_obj(mysql_source_config_dict) | 
					
						
							|  |  |  |         url = mysql_config.get_sql_alchemy_url() | 
					
						
							|  |  |  |         return create_engine(url) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def create_table(engine: Any, name: str, defn: str) -> None: | 
					
						
							|  |  |  |         create_table_query = text(f"CREATE TABLE IF NOT EXISTS {name}{defn};") | 
					
						
							|  |  |  |         engine.execute(create_table_query) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def drop_table(engine: Any, table_name: str) -> None: | 
					
						
							|  |  |  |         drop_table_query = text(f"DROP TABLE {table_name};") | 
					
						
							|  |  |  |         engine.execute(drop_table_query) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run_and_get_pipeline(pipeline_config_dict: Dict[str, Any]) -> Pipeline: | 
					
						
							|  |  |  |         pipeline = Pipeline.create(pipeline_config_dict) | 
					
						
							|  |  |  |         pipeline.run() | 
					
						
							|  |  |  |         pipeline.raise_from_status() | 
					
						
							|  |  |  |         return pipeline | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def validate_all_providers_have_committed_successfully(pipeline: Pipeline) -> None: | 
					
						
							|  |  |  |         provider_count: int = 0 | 
					
						
							|  |  |  |         for name, provider in pipeline.ctx.get_committables(): | 
					
						
							|  |  |  |             provider_count += 1 | 
					
						
							|  |  |  |             assert isinstance(provider, StatefulCommittable) | 
					
						
							|  |  |  |             stateful_committable = cast(StatefulCommittable, provider) | 
					
						
							|  |  |  |             assert stateful_committable.has_successfully_committed() | 
					
						
							|  |  |  |             assert stateful_committable.state_to_commit | 
					
						
							| 
									
										
										
										
											2022-08-19 09:08:17 -07:00
										 |  |  |         assert provider_count == 1 | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def get_current_checkpoint_from_pipeline( | 
					
						
							| 
									
										
										
										
											2024-09-27 11:31:25 -05:00
										 |  |  |         auth_session, | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |         pipeline: Pipeline, | 
					
						
							| 
									
										
										
										
											2022-12-16 13:04:48 -05:00
										 |  |  |     ) -> Optional[Checkpoint[GenericCheckpointState]]: | 
					
						
							| 
									
										
										
										
											2023-05-24 01:27:57 +05:30
										 |  |  |         # TODO: Refactor to use the helper method in the metadata-ingestion tests, instead of copying it here. | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |         mysql_source = cast(MySQLSource, pipeline.source) | 
					
						
							| 
									
										
										
										
											2023-05-24 01:27:57 +05:30
										 |  |  |         return mysql_source.state_provider.get_current_checkpoint( | 
					
						
							| 
									
										
										
										
											2023-05-25 14:39:43 -04:00
										 |  |  |             StaleEntityRemovalHandler.compute_job_id( | 
					
						
							|  |  |  |                 getattr(mysql_source, "platform", "default") | 
					
						
							|  |  |  |             ) | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     source_config_dict: Dict[str, Any] = { | 
					
						
							| 
									
										
										
										
											2022-08-19 09:08:17 -07:00
										 |  |  |         "host_port": get_mysql_url(), | 
					
						
							| 
									
										
										
										
											2022-07-14 22:04:06 +05:30
										 |  |  |         "username": get_mysql_username(), | 
					
						
							|  |  |  |         "password": get_mysql_password(), | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |         "database": "datahub", | 
					
						
							|  |  |  |         "stateful_ingestion": { | 
					
						
							|  |  |  |             "enabled": True, | 
					
						
							|  |  |  |             "remove_stale_metadata": True, | 
					
						
							| 
									
										
										
										
											2022-09-22 16:09:22 -07:00
										 |  |  |             "fail_safe_threshold": 100.0, | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |         }, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     pipeline_config_dict: Dict[str, Any] = { | 
					
						
							|  |  |  |         "source": { | 
					
						
							|  |  |  |             "type": "mysql", | 
					
						
							|  |  |  |             "config": source_config_dict, | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |         "sink": { | 
					
						
							|  |  |  |             "type": "datahub-rest", | 
					
						
							| 
									
										
										
										
											2024-09-27 11:31:25 -05:00
										 |  |  |             "config": { | 
					
						
							|  |  |  |                 "server": auth_session.gms_url(), | 
					
						
							|  |  |  |                 "token": auth_session.gms_token(), | 
					
						
							|  |  |  |             }, | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |         }, | 
					
						
							|  |  |  |         "pipeline_name": "mysql_stateful_ingestion_smoke_test_pipeline", | 
					
						
							|  |  |  |         "reporting": [ | 
					
						
							|  |  |  |             { | 
					
						
							|  |  |  |                 "type": "datahub", | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         ], | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 1. Setup the SQL engine | 
					
						
							|  |  |  |     mysql_engine = create_mysql_engine(source_config_dict) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 2. Create test tables for first run of the  pipeline. | 
					
						
							|  |  |  |     table_prefix = "stateful_ingestion_test" | 
					
						
							|  |  |  |     table_defs = { | 
					
						
							|  |  |  |         f"{table_prefix}_t1": "(id INT, name VARCHAR(10))", | 
					
						
							|  |  |  |         f"{table_prefix}_t2": "(id INT)", | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |     table_names = sorted(table_defs.keys()) | 
					
						
							|  |  |  |     for table_name, defn in table_defs.items(): | 
					
						
							|  |  |  |         create_table(mysql_engine, table_name, defn) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 3. Do the first run of the pipeline and get the default job's checkpoint. | 
					
						
							|  |  |  |     pipeline_run1 = run_and_get_pipeline(pipeline_config_dict) | 
					
						
							| 
									
										
										
										
											2024-09-27 11:31:25 -05:00
										 |  |  |     checkpoint1 = get_current_checkpoint_from_pipeline(auth_session, pipeline_run1) | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |     assert checkpoint1 | 
					
						
							|  |  |  |     assert checkpoint1.state | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 4. Drop table t1 created during step 2 + rerun the pipeline and get the checkpoint state. | 
					
						
							|  |  |  |     drop_table(mysql_engine, table_names[0]) | 
					
						
							|  |  |  |     pipeline_run2 = run_and_get_pipeline(pipeline_config_dict) | 
					
						
							| 
									
										
										
										
											2024-09-27 11:31:25 -05:00
										 |  |  |     checkpoint2 = get_current_checkpoint_from_pipeline(auth_session, pipeline_run2) | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |     assert checkpoint2 | 
					
						
							|  |  |  |     assert checkpoint2.state | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 5. Perform all assertions on the states | 
					
						
							| 
									
										
										
										
											2022-12-16 13:04:48 -05:00
										 |  |  |     state1 = checkpoint1.state | 
					
						
							|  |  |  |     state2 = checkpoint2.state | 
					
						
							| 
									
										
										
										
											2022-09-14 09:30:42 -07:00
										 |  |  |     difference_urns = list( | 
					
						
							| 
									
										
										
										
											2022-12-13 22:35:37 -05:00
										 |  |  |         state1.get_urns_not_in(type="*", other_checkpoint_state=state2) | 
					
						
							| 
									
										
										
										
											2022-09-14 09:30:42 -07:00
										 |  |  |     ) | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |     assert len(difference_urns) == 1 | 
					
						
							|  |  |  |     assert ( | 
					
						
							|  |  |  |         difference_urns[0] | 
					
						
							|  |  |  |         == "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.stateful_ingestion_test_t1,PROD)" | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-16 13:04:48 -05:00
										 |  |  |     # 6. Cleanup table t2 as well to prevent other tests that rely on data in the smoke-test world. | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |     drop_table(mysql_engine, table_names[1]) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-16 13:04:48 -05:00
										 |  |  |     # 7. Validate that all providers have committed successfully. | 
					
						
							| 
									
										
										
										
											2022-02-02 13:19:15 -08:00
										 |  |  |     # NOTE: The following validation asserts for presence of state as well | 
					
						
							|  |  |  |     # and validates reporting. | 
					
						
							|  |  |  |     validate_all_providers_have_committed_successfully(pipeline_run1) | 
					
						
							|  |  |  |     validate_all_providers_have_committed_successfully(pipeline_run2) |