import time import unittest from concurrent.futures import Future from datetime import datetime, timedelta, timezone from typing import Dict, List from unittest.mock import MagicMock, call, patch from freezegun import freeze_time from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.graph.filters import RemovedStatusFilter, SearchFilterRule from datahub.ingestion.source.gc.dataprocess_cleanup import ( DataJobEntity, DataProcessCleanup, DataProcessCleanupConfig, DataProcessCleanupReport, ) from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import ( SoftDeletedEntitiesCleanup, SoftDeletedEntitiesCleanupConfig, SoftDeletedEntitiesReport, ) from datahub.utilities.urns._urn_base import Urn FROZEN_TIME = "2021-12-07 07:00:00" class TestSoftDeletedEntitiesCleanup(unittest.TestCase): def setUp(self): self.ctx = PipelineContext(run_id="test_run") self.ctx.graph = MagicMock() self.config = SoftDeletedEntitiesCleanupConfig() self.report = SoftDeletedEntitiesReport() self.cleanup = SoftDeletedEntitiesCleanup( self.ctx, self.config, self.report, dry_run=True ) def test_update_report(self): self.cleanup._update_report( urn="urn:li:dataset:1", entity_type="dataset", ) self.assertEqual(1, self.report.num_hard_deleted) self.assertEqual(1, self.report.num_hard_deleted_by_type["dataset"]) def test_increment_retained_count(self): self.cleanup._increment_retained_count() class TestDataProcessCleanup(unittest.TestCase): def setUp(self): self.ctx = PipelineContext(run_id="test_run") self.ctx.graph = MagicMock() self.config = DataProcessCleanupConfig() self.report = DataProcessCleanupReport() self.cleanup = DataProcessCleanup( self.ctx, self.config, self.report, dry_run=True ) @patch( "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" ) def test_delete_dpi_from_datajobs(self, mock_fetch_dpis): job = DataJobEntity( urn="urn:li:dataJob:1", flow_urn="urn:li:dataFlow:1", lastIngested=int(datetime.now(timezone.utc).timestamp()), jobId="job1", dataPlatformInstance="urn:li:dataPlatformInstance:1", total_runs=10, ) mock_fetch_dpis.return_value = [ { "urn": f"urn:li:dataprocessInstance:{i}", "created": { "time": int(datetime.now(timezone.utc).timestamp() + i) * 1000 }, } for i in range(10) ] self.cleanup.delete_dpi_from_datajobs(job) self.assertEqual(5, self.report.num_aspects_removed) @patch( "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" ) def test_delete_dpi_from_datajobs_without_dpis(self, mock_fetch_dpis): job = DataJobEntity( urn="urn:li:dataJob:1", flow_urn="urn:li:dataFlow:1", lastIngested=int(datetime.now(timezone.utc).timestamp()), jobId="job1", dataPlatformInstance="urn:li:dataPlatformInstance:1", total_runs=10, ) mock_fetch_dpis.return_value = [] self.cleanup.delete_dpi_from_datajobs(job) self.assertEqual(0, self.report.num_aspects_removed) @patch( "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" ) def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis): job = DataJobEntity( urn="urn:li:dataJob:1", flow_urn="urn:li:dataFlow:1", lastIngested=int(datetime.now(timezone.utc).timestamp()), jobId="job1", dataPlatformInstance="urn:li:dataPlatformInstance:1", total_runs=10, ) mock_fetch_dpis.return_value = [ {"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10) ] + [ { "urn": "urn:li:dataprocessInstance:11", "created": {"time": int(datetime.now(timezone.utc).timestamp() * 1000)}, } ] self.cleanup.delete_dpi_from_datajobs(job) self.assertEqual(10, self.report.num_aspects_removed) @patch( "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" ) def test_delete_dpi_from_datajobs_without_dpi_null_created_time( self, mock_fetch_dpis ): job = DataJobEntity( urn="urn:li:dataJob:1", flow_urn="urn:li:dataFlow:1", lastIngested=int(datetime.now(timezone.utc).timestamp()), jobId="job1", dataPlatformInstance="urn:li:dataPlatformInstance:1", total_runs=10, ) mock_fetch_dpis.return_value = [ {"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10) ] + [ { "urn": "urn:li:dataprocessInstance:11", "created": {"time": None}, } ] self.cleanup.delete_dpi_from_datajobs(job) self.assertEqual(11, self.report.num_aspects_removed) @patch( "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" ) def test_delete_dpi_from_datajobs_without_dpi_without_time(self, mock_fetch_dpis): job = DataJobEntity( urn="urn:li:dataJob:1", flow_urn="urn:li:dataFlow:1", lastIngested=int(datetime.now(timezone.utc).timestamp()), jobId="job1", dataPlatformInstance="urn:li:dataPlatformInstance:1", total_runs=10, ) mock_fetch_dpis.return_value = [ {"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10) ] + [ { "urn": "urn:li:dataprocessInstance:11", "created": None, } ] self.cleanup.delete_dpi_from_datajobs(job) self.assertEqual(11, self.report.num_aspects_removed) def test_fetch_dpis(self): assert self.cleanup.ctx.graph self.cleanup.ctx.graph = MagicMock() self.cleanup.ctx.graph.execute_graphql.return_value = { "dataJob": { "runs": { "runs": [ { "urn": "urn:li:dataprocessInstance:1", "created": { "time": int(datetime.now(timezone.utc).timestamp()) }, } ] } } } dpis = self.cleanup.fetch_dpis("urn:li:dataJob:1", 10) self.assertEqual(len(dpis), 1) class TestSoftDeletedEntitiesCleanup2(unittest.TestCase): def setUp(self): # Create mocks for dependencies self.mock_graph = MagicMock(spec=DataHubGraph) self.mock_ctx = MagicMock(spec=PipelineContext) self.mock_ctx.graph = self.mock_graph # Create a default config self.config = SoftDeletedEntitiesCleanupConfig( enabled=True, retention_days=10, batch_size=100, delay=0.1, max_workers=5, entity_types=["DATASET", "DASHBOARD"], limit_entities_delete=1000, futures_max_at_time=100, runtime_limit_seconds=3600, ) # Create a report self.report = SoftDeletedEntitiesReport() # Create the test instance self.cleanup = SoftDeletedEntitiesCleanup( ctx=self.mock_ctx, config=self.config, report=self.report, dry_run=False, ) # Create a sample URN self.sample_urn = Urn.from_string( "urn:li:dataset:(urn:li:dataPlatform:example,example,PROD)" ) def test_init_requires_graph(self): """Test that initialization fails if graph is not provided.""" self.mock_ctx.graph = None with self.assertRaises(ValueError): SoftDeletedEntitiesCleanup( ctx=self.mock_ctx, config=self.config, report=self.report, ) def test_delete_entity_dry_run(self): """Test that delete_entity doesn't actually delete in dry run mode.""" # Set dry run self.cleanup.dry_run = True # Call the method self.cleanup.delete_entity(self.sample_urn) # Verify no deletion happened self.mock_graph.delete_entity.assert_not_called() self.mock_graph.delete_references_to_urn.assert_not_called() # No report update self.assertEqual(self.report.num_hard_deleted, 0) def test_delete_entity(self): """Test that delete_entity properly deletes and updates reports.""" # Call the method self.cleanup.delete_entity(self.sample_urn) # Verify deletion happened self.mock_graph.delete_entity.assert_called_once_with( urn=self.sample_urn.urn(), hard=True ) self.mock_graph.delete_references_to_urn.assert_called_once_with( urn=self.sample_urn.urn(), dry_run=False, ) # Report update self.assertEqual(self.report.num_hard_deleted, 1) self.assertEqual(self.report.num_hard_deleted_by_type.get("dataset"), 1) self.assertEqual(self.report.num_soft_deleted_entity_removal_started, 1) def test_delete_entity_respects_deletion_limit(self): """Test that delete_entity respects the deletion limit.""" # Set a report value to hit the limit self.config.limit_entities_delete = 1500 self.report.num_hard_deleted = self.config.limit_entities_delete + 1 # Call the method self.cleanup.delete_entity(self.sample_urn) # Verify no deletion happened due to limit self.mock_graph.delete_entity.assert_not_called() self.mock_graph.delete_references_to_urn.assert_not_called() self.assertTrue(self.report.deletion_limit_reached) def test_delete_entity_respects_time_limit(self): """Test that delete_entity respects the runtime limit.""" # Set time to exceed runtime limit self.cleanup.start_time = time.time() - self.config.runtime_limit_seconds - 1 # Call the method self.cleanup.delete_entity(self.sample_urn) # Verify no deletion happened due to time limit self.mock_graph.delete_entity.assert_not_called() self.mock_graph.delete_references_to_urn.assert_not_called() self.assertTrue(self.report.runtime_limit_reached) def test_delete_soft_deleted_entity_old_enough(self): """Test that entities are deleted when they are old enough.""" # Calculate a timestamp older than retention days old_timestamp = int( ( datetime.now(timezone.utc) - timedelta(days=self.config.retention_days + 1) ).timestamp() * 1000 ) # Mock the aspect return self.mock_graph.get_entity_raw.return_value = { "aspects": { "status": { "value": {"removed": True}, "created": {"time": old_timestamp}, } } } # Call the method self.cleanup.delete_soft_deleted_entity(self.sample_urn) # Verify deletion was attempted self.mock_graph.delete_entity.assert_called_once() self.assertEqual(self.report.num_soft_deleted_retained_due_to_age, 0) self.assertIsNone( self.report.num_soft_deleted_retained_due_to_age_by_type.get("dataset") ) def test_delete_soft_deleted_entity_too_recent(self): """Test that entities are not deleted when they are too recent.""" # Calculate a timestamp newer than retention days recent_timestamp = int( ( datetime.now(timezone.utc) - timedelta(days=self.config.retention_days - 1) ).timestamp() * 1000 ) # Mock the aspect return self.mock_graph.get_entity_raw.return_value = { "aspects": { "status": { "value": {"removed": True}, "created": {"time": recent_timestamp}, } } } # Call the method self.cleanup.delete_soft_deleted_entity(self.sample_urn) # Verify no deletion was attempted self.mock_graph.delete_entity.assert_not_called() self.assertEqual(self.report.num_soft_deleted_retained_due_to_age, 1) self.assertEqual( self.report.num_soft_deleted_retained_due_to_age_by_type.get("dataset"), 1 ) @freeze_time(FROZEN_TIME) def test_get_urns(self): """Test that _get_urns calls get_urns_by_filter with correct parameters.""" # Setup mock for get_urns_by_filter self.mock_graph.get_urns_by_filter.return_value = ["urn1", "urn2", "urn3"] # Get all urns urns = list(self.cleanup._get_urns()) # Verify get_urns_by_filter was called correctly self.mock_graph.get_urns_by_filter.assert_called_once_with( entity_types=self.config.entity_types, platform=self.config.platform, env=self.config.env, query=self.config.query, status=RemovedStatusFilter.ONLY_SOFT_DELETED, batch_size=self.config.batch_size, ) # Check the returned urns self.assertEqual(urns, ["urn1", "urn2", "urn3"]) @freeze_time(FROZEN_TIME) def test_get_urns_with_dpi(self): """Test that _get_urns calls get_urns_by_filter with correct parameters.""" # Setup mock for get_urns_by_filter self.mock_graph.get_urns_by_filter.side_effect = [ ["urn1", "urn2", "urn3"], ["dpi_urn1", "dpi_urn2", "dpi_urn3"], ] # Get all urns assert self.config.entity_types self.config.entity_types.append("dataProcessInstance") urns = list(self.cleanup._get_urns()) self.config.entity_types.remove("dataProcessInstance") # Verify get_urns_by_filter was called correctly self.mock_graph.get_urns_by_filter.has_calls( [ call( entity_types=self.config.entity_types, platform=self.config.platform, env=self.config.env, query=self.config.query, status=RemovedStatusFilter.ONLY_SOFT_DELETED, batch_size=self.config.batch_size, extraFilters=[ SearchFilterRule( field="created", condition="LESS_THAN", values=[ str( int( time.time() - self.config.retention_days * 24 * 60 * 60 ) * 1000 ) ], ).to_raw() ], ), call( entity_types=["dataProcessInstance"], platform=self.config.platform, env=self.config.env, query=self.config.query, status=RemovedStatusFilter.ONLY_SOFT_DELETED, batch_size=self.config.batch_size, ), ] ) # Check the returned urns self.assertEqual( urns, ["urn1", "urn2", "urn3", "dpi_urn1", "dpi_urn2", "dpi_urn3"] ) def test_process_futures(self): """Test the _process_futures method properly handles futures.""" # Create sample futures mock_future1 = MagicMock(spec=Future) mock_future1.exception.return_value = None mock_future2 = MagicMock(spec=Future) mock_future2.exception.return_value = Exception("Test exception") # Mock the wait function to return the first future as done with patch( "datahub.ingestion.source.gc.soft_deleted_entity_cleanup.wait", return_value=({mock_future1}, {mock_future2}), ): futures = {mock_future1: self.sample_urn, mock_future2: self.sample_urn} # Process the futures result = self.cleanup._process_futures(futures) # type: ignore # Check result contains only the not_done future self.assertEqual(len(result), 1) self.assertIn(mock_future2, result) # Check report was updated self.assertEqual(self.report.num_soft_deleted_entity_processed, 1) def test_cleanup_disabled(self): """Test that cleanup doesn't run when disabled.""" # Disable cleanup self.config.enabled = False # Mock methods to check they're not called with patch.object(self.cleanup, "_get_urns") as mock_get_urns: self.cleanup.cleanup_soft_deleted_entities() mock_get_urns.assert_not_called() @patch("datahub.ingestion.source.gc.soft_deleted_entity_cleanup.ThreadPoolExecutor") def test_cleanup_soft_deleted_entities(self, mock_executor_class): """Test the main cleanup method submits tasks correctly.""" # Setup mock for executor mock_executor = MagicMock() mock_executor_class.return_value.__enter__.return_value = mock_executor # Mock futures mock_future = MagicMock(spec=Future) mock_executor.submit.return_value = mock_future urns_to_delete = [ "urn:li:dataset:(urn:li:dataPlatform:kafka,PageViewEvent,PROD)", "urn:li:dataset:(urn:li:dataPlatform:kafka,PageViewEvent2,PROD)", ] # Mock Urn.from_string to return our sample URN # and mock _process_futures to simulate completion with patch.object( self.cleanup, "_get_urns", return_value=urns_to_delete ), patch.object(self.cleanup, "_process_futures", return_value={}): # Run cleanup self.cleanup.cleanup_soft_deleted_entities() # Verify executor was created with correct workers mock_executor_class.assert_called_once_with( max_workers=self.config.max_workers ) # Verify submit was called for each urn self.assertEqual(mock_executor.submit.call_count, 2) # Both calls should be to delete_soft_deleted_entity with the sample_urn for idx, call_args in enumerate(mock_executor.submit.call_args_list): self.assertEqual( call_args[0][0], self.cleanup.delete_soft_deleted_entity ) self.assertEqual(call_args[0][1], Urn.from_string(urns_to_delete[idx])) def test_times_up(self): """Test the time limit checker.""" # Test time limit not reached self.cleanup.start_time = time.time() self.assertFalse(self.cleanup._times_up()) # Test time limit reached self.cleanup.start_time = time.time() - self.config.runtime_limit_seconds - 1 self.assertTrue(self.cleanup._times_up()) self.assertTrue(self.report.runtime_limit_reached) def test_deletion_limit_reached(self): """Test the deletion limit checker.""" # Test deletion limit not reached self.config.limit_entities_delete = 1500 self.report.num_hard_deleted = self.config.limit_entities_delete - 1 self.assertFalse(self.cleanup._deletion_limit_reached()) # Test deletion limit reached self.report.num_hard_deleted = self.config.limit_entities_delete + 1 self.assertTrue(self.cleanup._deletion_limit_reached()) self.assertTrue(self.report.deletion_limit_reached) def test_handle_urn_parsing_error(self): """Test handling of URN parsing errors.""" # Mock _get_urns to return an invalid URN # and mock Urn.from_string to raise an exception # and mock logger to capture log messages with patch.object( self.cleanup, "_get_urns", return_value=["invalid:urn"] ), self.assertLogs(level="ERROR") as log_context, patch.object( self.cleanup, "_process_futures", return_value={} ): # Run cleanup self.cleanup.cleanup_soft_deleted_entities() # Verify error was logged self.assertTrue( any("Failed to parse urn" in msg for msg in log_context.output) ) def test_increment_retained_by_type(self): """Test the _increment_retained_by_type method.""" entity_type = "dataset" # Call the method self.cleanup._increment_retained_by_type(entity_type) # Check report was updated self.assertEqual( self.report.num_soft_deleted_retained_due_to_age_by_type.get(entity_type), 1 ) # Call again self.cleanup._increment_retained_by_type(entity_type) # Check report was updated again self.assertEqual( self.report.num_soft_deleted_retained_due_to_age_by_type.get(entity_type), 2 ) def test_entity_with_missing_status_aspect(self): """Test handling of entities without a status aspect.""" # Mock the aspect return with no status self.mock_graph.get_entity_raw.return_value = {"aspects": {}} # Call the method self.cleanup.delete_soft_deleted_entity(self.sample_urn) # Verify no deletion was attempted self.mock_graph.delete_entity.assert_not_called() self.assertEqual( self.report.num_soft_deleted_retained_due_to_age, 0 ) # No increment def test_entity_not_removed(self): """Test handling of entities that have status but are not removed.""" # Mock the aspect return with removed=False recent_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000) self.mock_graph.get_entity_raw.return_value = { "aspects": { "status": { "value": {"removed": False}, "created": {"time": recent_timestamp}, } } } # Call the method self.cleanup.delete_soft_deleted_entity(self.sample_urn) # Verify no deletion was attempted self.mock_graph.delete_entity.assert_not_called() self.assertEqual( self.report.num_soft_deleted_retained_due_to_age, 1 ) # Should increment class TestCleanupSoftDeletedEntities(unittest.TestCase): """Tests for the cleanup_soft_deleted_entities method.""" def setUp(self) -> None: # Create mocks for dependencies self.mock_graph: MagicMock = MagicMock(spec=DataHubGraph) self.mock_ctx: MagicMock = MagicMock(spec=PipelineContext) self.mock_ctx.graph = self.mock_graph # Create a default config self.config: SoftDeletedEntitiesCleanupConfig = ( SoftDeletedEntitiesCleanupConfig( enabled=True, retention_days=10, batch_size=100, max_workers=5, futures_max_at_time=10, ) ) # Create a report self.report: SoftDeletedEntitiesReport = SoftDeletedEntitiesReport() # Create the test instance self.cleanup: SoftDeletedEntitiesCleanup = SoftDeletedEntitiesCleanup( ctx=self.mock_ctx, config=self.config, report=self.report, ) # Sample URNs for testing self.sample_urns: List[str] = [ "urn:li:dataset:(urn:li:dataPlatform:kafka,PageViewEvent,PROD)", "urn:li:dataset:(urn:li:dataPlatform:kafka,PageViewEvent2,PROD)", "urn:li:dashboard:(looker,dashboard1)", ] # Parsed URN objects self.parsed_urns: List[Urn] = [Urn.from_string(urn) for urn in self.sample_urns] @patch("datahub.ingestion.source.gc.soft_deleted_entity_cleanup.ThreadPoolExecutor") def test_cleanup_disabled(self, mock_executor_class: MagicMock) -> None: """Test that cleanup doesn't run when disabled.""" # Disable cleanup self.config.enabled = False # Mock methods to check they're not called with patch.object(self.cleanup, "_get_urns") as mock_get_urns: self.cleanup.cleanup_soft_deleted_entities() # Verify that nothing happens when disabled mock_get_urns.assert_not_called() mock_executor_class.assert_not_called() @patch("datahub.ingestion.source.gc.soft_deleted_entity_cleanup.ThreadPoolExecutor") def test_cleanup_with_valid_urns(self, mock_executor_class: MagicMock) -> None: """Test the main cleanup method with valid URNs.""" # Setup mock for executor mock_executor: MagicMock = MagicMock() mock_executor_class.return_value.__enter__.return_value = mock_executor # Mock futures mock_futures: List[MagicMock] = [ MagicMock(spec=Future) for _ in range(len(self.sample_urns)) ] mock_executor.submit.side_effect = mock_futures # Set up _get_urns to return our sample URNs # and mock _process_futures to simulate completion # and mock _print_report to avoid timing issues with patch.object( self.cleanup, "_get_urns", return_value=self.sample_urns ), patch.object( self.cleanup, "_process_futures", return_value={} ), patch.object(self.cleanup, "_print_report"): # Run cleanup self.cleanup.cleanup_soft_deleted_entities() # Verify executor was created with correct workers mock_executor_class.assert_called_once_with( max_workers=self.config.max_workers ) # Verify submit was called for each urn self.assertEqual(mock_executor.submit.call_count, len(self.sample_urns)) # Check that the correct method and parameters were used expected_calls: List = [ call( self.cleanup.delete_soft_deleted_entity, Urn.from_string(urn), ) for urn in self.sample_urns ] mock_executor.submit.assert_has_calls(expected_calls) @patch("datahub.ingestion.source.gc.soft_deleted_entity_cleanup.ThreadPoolExecutor") def test_cleanup_with_invalid_urns(self, mock_executor_class: MagicMock) -> None: """Test how the cleanup handles invalid URNs.""" # Setup mock for executor mock_executor: MagicMock = MagicMock() mock_executor_class.return_value.__enter__.return_value = mock_executor # Valid and invalid URNs mixed_urns: List[str] = self.sample_urns + ["invalid:urn:format"] # Set up _get_urns to return mixed URNs # and mock _process_futures to simulate completion # and mock _print_report to avoid timing issues # and mock logger to capture log messages with patch.object( self.cleanup, "_get_urns", return_value=mixed_urns ), patch.object( self.cleanup, "_process_futures", return_value={} ), patch.object(self.cleanup, "_print_report"), self.assertLogs( level="ERROR" ) as log_context: # Run cleanup self.cleanup.cleanup_soft_deleted_entities() # Verify error was logged for invalid URN self.assertTrue( any("Failed to parse urn" in msg for msg in log_context.output) ) # Verify submit was called only for valid URNs self.assertEqual(mock_executor.submit.call_count, len(self.sample_urns)) @patch("datahub.ingestion.source.gc.soft_deleted_entity_cleanup.ThreadPoolExecutor") def test_cleanup_with_max_futures_limit( self, mock_executor_class: MagicMock ) -> None: """Test that the cleanup respects the max futures limit.""" # Setup mock for executor mock_executor: MagicMock = MagicMock() mock_executor_class.return_value.__enter__.return_value = mock_executor # Set max futures to 1 to force processing after each submission self.config.futures_max_at_time = 1 # Keep track of how many times _process_futures is called process_futures_call_count = 0 # Define a side effect function that simulates the behavior of _process_futures # It needs to clear the futures dictionary once per call to prevent infinite loops def process_futures_side_effect( futures: Dict[Future, Urn], ) -> Dict[Future, Urn]: nonlocal process_futures_call_count process_futures_call_count += 1 # Return an empty dict to simulate that all futures are processed return {} # Set up _get_urns to return sample URNs # and mock _process_futures with our side effect function # and mock _print_report to avoid timing issues with patch.object( self.cleanup, "_get_urns", return_value=self.sample_urns ), patch.object( self.cleanup, "_process_futures", side_effect=process_futures_side_effect, ), patch.object(self.cleanup, "_print_report"): # Run cleanup self.cleanup.cleanup_soft_deleted_entities() # Verify _process_futures was called for each URN (since max_futures=1) self.assertEqual(process_futures_call_count, len(self.sample_urns)) @patch("datahub.ingestion.source.gc.soft_deleted_entity_cleanup.ThreadPoolExecutor") def test_cleanup_respects_deletion_limit( self, mock_executor_class: MagicMock ) -> None: """Test that cleanup stops when deletion limit is reached.""" # Setup mock for executor mock_executor: MagicMock = MagicMock() mock_executor_class.return_value.__enter__.return_value = mock_executor # Set up to hit deletion limit after first URN with patch.object(self.cleanup, "_deletion_limit_reached") as mock_limit: # Return False for first URN, True for others mock_limit.side_effect = [False, True, True] # Set up _get_urns to return sample URNs # and mock _process_futures to simulate completion # and mock _print_report to avoid timing issues with patch.object( self.cleanup, "_get_urns", return_value=self.sample_urns ), patch.object( self.cleanup, "_process_futures", return_value={} ), patch.object(self.cleanup, "_print_report"): # Run cleanup self.cleanup.cleanup_soft_deleted_entities() # Should only process the first URN before hitting limit self.assertEqual(mock_executor.submit.call_count, 1) @patch("datahub.ingestion.source.gc.soft_deleted_entity_cleanup.ThreadPoolExecutor") def test_cleanup_respects_time_limit(self, mock_executor_class: MagicMock) -> None: """Test that cleanup stops when time limit is reached.""" # Setup mock for executor mock_executor: MagicMock = MagicMock() mock_executor_class.return_value.__enter__.return_value = mock_executor # Set up to hit time limit after first URN with patch.object(self.cleanup, "_times_up") as mock_times_up: # Return False for first URN, True for others mock_times_up.side_effect = [False, True, True] # Set up _get_urns to return sample URNs # and mock _process_futures to simulate completion # and mock _print_report to avoid timing issues with patch.object( self.cleanup, "_get_urns", return_value=self.sample_urns ), patch.object( self.cleanup, "_process_futures", return_value={} ), patch.object(self.cleanup, "_print_report"): # Run cleanup self.cleanup.cleanup_soft_deleted_entities() # Should only process the first URN before hitting time limit self.assertEqual(mock_executor.submit.call_count, 1) @patch("datahub.ingestion.source.gc.soft_deleted_entity_cleanup.ThreadPoolExecutor") def test_cleanup_handles_empty_urn_list( self, mock_executor_class: MagicMock ) -> None: """Test cleanup when no URNs are returned.""" # Setup mock for executor mock_executor: MagicMock = MagicMock() mock_executor_class.return_value.__enter__.return_value = mock_executor # Set up _get_urns to return empty list # and mock _process_futures to simulate completion # and mock _print_report to avoid timing issues with patch.object(self.cleanup, "_get_urns", return_value=[]), patch.object( self.cleanup, "_process_futures" ), patch.object(self.cleanup, "_print_report"): # Run cleanup self.cleanup.cleanup_soft_deleted_entities() # Verify submit was not called mock_executor.submit.assert_not_called() if __name__ == "__main__": unittest.main()