fix(ingest/gc): Adding test and more checks to gc source (#12027)

This commit is contained in:
Tamas Nemeth 2024-12-05 09:49:44 +01:00 committed by GitHub
parent 8d15df0c11
commit 3c388a56a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 154 additions and 21 deletions

View File

@ -208,23 +208,29 @@ class DataProcessCleanup:
dpis = [] dpis = []
start = 0 start = 0
while True: while True:
try:
job_query_result = self.ctx.graph.execute_graphql( job_query_result = self.ctx.graph.execute_graphql(
DATA_PROCESS_INSTANCES_QUERY, DATA_PROCESS_INSTANCES_QUERY,
{"dataJobUrn": job_urn, "start": start, "count": batch_size}, {"dataJobUrn": job_urn, "start": start, "count": batch_size},
) )
job_data = job_query_result.get("dataJob") job_data = job_query_result.get("dataJob")
if not job_data: if not job_data:
raise ValueError(f"Error getting job {job_urn}") logger.error(f"Error getting job {job_urn}")
break
runs_data = job_data.get("runs") runs_data = job_data.get("runs")
if not runs_data: if not runs_data:
raise ValueError(f"Error getting runs for {job_urn}") logger.error(f"Error getting runs for {job_urn}")
break
runs = runs_data.get("runs") runs = runs_data.get("runs")
dpis.extend(runs) dpis.extend(runs)
start += batch_size start += batch_size
if len(runs) < batch_size: if len(runs) < batch_size:
break break
except Exception as e:
logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}")
break
return dpis return dpis
def keep_last_n_dpi( def keep_last_n_dpi(
@ -243,8 +249,12 @@ class DataProcessCleanup:
futures[future] = dpi futures[future] = dpi
for future in as_completed(futures): for future in as_completed(futures):
try:
future.result()
deleted_count_last_n += 1 deleted_count_last_n += 1
futures[future]["deleted"] = True futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")
if deleted_count_last_n % self.config.batch_size == 0: if deleted_count_last_n % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}") logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
@ -279,7 +289,7 @@ class DataProcessCleanup:
dpis = self.fetch_dpis(job.urn, self.config.batch_size) dpis = self.fetch_dpis(job.urn, self.config.batch_size)
dpis.sort( dpis.sort(
key=lambda x: x["created"]["time"] key=lambda x: x["created"]["time"]
if x["created"] and x["created"]["time"] if "created" in x and "time" in x["created"]
else 0, else 0,
reverse=True, reverse=True,
) )
@ -314,15 +324,23 @@ class DataProcessCleanup:
if dpi.get("deleted"): if dpi.get("deleted"):
continue continue
if dpi["created"]["time"] < retention_time * 1000: if (
"created" not in dpi
or "time" not in dpi["created"]
or dpi["created"]["time"] < retention_time * 1000
):
future = executor.submit( future = executor.submit(
self.delete_entity, dpi["urn"], "dataprocessInstance" self.delete_entity, dpi["urn"], "dataprocessInstance"
) )
futures[future] = dpi futures[future] = dpi
for future in as_completed(futures): for future in as_completed(futures):
try:
future.result()
deleted_count_retention += 1 deleted_count_retention += 1
futures[future]["deleted"] = True futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")
if deleted_count_retention % self.config.batch_size == 0: if deleted_count_retention % self.config.batch_size == 0:
logger.info( logger.info(
@ -378,8 +396,11 @@ class DataProcessCleanup:
dataFlows[flow.urn] = flow dataFlows[flow.urn] = flow
scroll_id: Optional[str] = None scroll_id: Optional[str] = None
previous_scroll_id: Optional[str] = None
dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list) dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list)
deleted_jobs: int = 0 deleted_jobs: int = 0
while True: while True:
result = self.ctx.graph.execute_graphql( result = self.ctx.graph.execute_graphql(
DATAJOB_QUERY, DATAJOB_QUERY,
@ -426,9 +447,11 @@ class DataProcessCleanup:
else: else:
dataJobs[datajob_entity.flow_urn].append(datajob_entity) dataJobs[datajob_entity.flow_urn].append(datajob_entity)
if not scroll_id: if not scroll_id or previous_scroll_id == scroll_id:
break break
previous_scroll_id = scroll_id
logger.info(f"Deleted {deleted_jobs} DataJobs") logger.info(f"Deleted {deleted_jobs} DataJobs")
# Delete empty dataflows if needed # Delete empty dataflows if needed
if self.config.delete_empty_data_flows: if self.config.delete_empty_data_flows:
@ -443,4 +466,5 @@ class DataProcessCleanup:
if deleted_jobs % self.config.batch_size == 0: if deleted_jobs % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_data_flows} DataFlows") logger.info(f"Deleted {deleted_data_flows} DataFlows")
logger.info(f"Deleted {deleted_data_flows} DataFlows") logger.info(f"Deleted {deleted_data_flows} DataFlows")
return [] return []

View File

@ -0,0 +1,109 @@
import unittest
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.gc.dataprocess_cleanup import (
DataJobEntity,
DataProcessCleanup,
DataProcessCleanupConfig,
DataProcessCleanupReport,
)
class TestDataProcessCleanup(unittest.TestCase):
def setUp(self):
self.ctx = PipelineContext(run_id="test_run")
self.ctx.graph = MagicMock()
self.config = DataProcessCleanupConfig()
self.report = DataProcessCleanupReport()
self.cleanup = DataProcessCleanup(
self.ctx, self.config, self.report, dry_run=True
)
@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{
"urn": f"urn:li:dataprocessInstance:{i}",
"created": {
"time": int(datetime.now(timezone.utc).timestamp() + i) * 1000
},
}
for i in range(10)
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(5, self.report.num_aspects_removed)
@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpis(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = []
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(0, self.report.num_aspects_removed)
@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
] + [
{
"urn": "urn:li:dataprocessInstance:11",
"created": {"time": int(datetime.now(timezone.utc).timestamp() * 1000)},
}
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(10, self.report.num_aspects_removed)
def test_fetch_dpis(self):
assert self.cleanup.ctx.graph
self.cleanup.ctx.graph = MagicMock()
self.cleanup.ctx.graph.execute_graphql.return_value = {
"dataJob": {
"runs": {
"runs": [
{
"urn": "urn:li:dataprocessInstance:1",
"created": {
"time": int(datetime.now(timezone.utc).timestamp())
},
}
]
}
}
}
dpis = self.cleanup.fetch_dpis("urn:li:dataJob:1", 10)
self.assertEqual(len(dpis), 1)
if __name__ == "__main__":
unittest.main()