fix(ingest/gc): misc fixes in gc source (#12226)

This commit is contained in:
Aseem Bansal 2024-12-26 18:10:09 +05:30 committed by GitHub
parent 756b199506
commit 16698da509
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 68 additions and 17 deletions

View File

@ -34,6 +34,7 @@ from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import (
SoftDeletedEntitiesCleanupConfig, SoftDeletedEntitiesCleanupConfig,
SoftDeletedEntitiesReport, SoftDeletedEntitiesReport,
) )
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -86,6 +87,7 @@ class DataHubGcSourceReport(
DataProcessCleanupReport, DataProcessCleanupReport,
SoftDeletedEntitiesReport, SoftDeletedEntitiesReport,
DatahubExecutionRequestCleanupReport, DatahubExecutionRequestCleanupReport,
IngestionStageReport,
): ):
expired_tokens_revoked: int = 0 expired_tokens_revoked: int = 0
@ -139,31 +141,40 @@ class DataHubGcSource(Source):
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens: if self.config.cleanup_expired_tokens:
try: try:
self.report.report_ingestion_stage_start("Expired Token Cleanup")
self.revoke_expired_tokens() self.revoke_expired_tokens()
except Exception as e: except Exception as e:
self.report.failure("While trying to cleanup expired token ", exc=e) self.report.failure("While trying to cleanup expired token ", exc=e)
if self.config.truncate_indices: if self.config.truncate_indices:
try: try:
self.report.report_ingestion_stage_start("Truncate Indices")
self.truncate_indices() self.truncate_indices()
except Exception as e: except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e) self.report.failure("While trying to truncate indices ", exc=e)
if self.config.soft_deleted_entities_cleanup.enabled: if self.config.soft_deleted_entities_cleanup.enabled:
try: try:
self.report.report_ingestion_stage_start(
"Soft Deleted Entities Cleanup"
)
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e: except Exception as e:
self.report.failure( self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e "While trying to cleanup soft deleted entities ", exc=e
) )
if self.config.execution_request_cleanup.enabled:
try:
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
if self.config.dataprocess_cleanup.enabled: if self.config.dataprocess_cleanup.enabled:
try: try:
self.report.report_ingestion_stage_start("Data Process Cleanup")
yield from self.dataprocess_cleanup.get_workunits_internal() yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e: except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e) self.report.failure("While trying to cleanup data process ", exc=e)
if self.config.execution_request_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Execution request Cleanup")
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
# Otherwise last stage's duration does not get calculated.
self.report.report_ingestion_stage_start("End")
yield from [] yield from []
def truncate_indices(self) -> None: def truncate_indices(self) -> None:
@ -281,6 +292,8 @@ class DataHubGcSource(Source):
list_access_tokens = expired_tokens_res.get("listAccessTokens", {}) list_access_tokens = expired_tokens_res.get("listAccessTokens", {})
tokens = list_access_tokens.get("tokens", []) tokens = list_access_tokens.get("tokens", [])
total = list_access_tokens.get("total", 0) total = list_access_tokens.get("total", 0)
if tokens == []:
break
for token in tokens: for token in tokens:
self.report.expired_tokens_revoked += 1 self.report.expired_tokens_revoked += 1
token_id = token["id"] token_id = token["id"]

View File

@ -1,3 +1,4 @@
import datetime
import logging import logging
import time import time
from typing import Any, Dict, Iterator, Optional from typing import Any, Dict, Iterator, Optional
@ -42,16 +43,28 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel):
description="Global switch for this cleanup task", description="Global switch for this cleanup task",
) )
runtime_limit_seconds: int = Field(
default=3600,
description="Maximum runtime in seconds for the cleanup task",
)
max_read_errors: int = Field(
default=10,
description="Maximum number of read errors before aborting",
)
def keep_history_max_milliseconds(self): def keep_history_max_milliseconds(self):
return self.keep_history_max_days * 24 * 3600 * 1000 return self.keep_history_max_days * 24 * 3600 * 1000
class DatahubExecutionRequestCleanupReport(SourceReport): class DatahubExecutionRequestCleanupReport(SourceReport):
execution_request_cleanup_records_read: int = 0 ergc_records_read: int = 0
execution_request_cleanup_records_preserved: int = 0 ergc_records_preserved: int = 0
execution_request_cleanup_records_deleted: int = 0 ergc_records_deleted: int = 0
execution_request_cleanup_read_errors: int = 0 ergc_read_errors: int = 0
execution_request_cleanup_delete_errors: int = 0 ergc_delete_errors: int = 0
ergc_start_time: Optional[datetime.datetime] = None
ergc_end_time: Optional[datetime.datetime] = None
class CleanupRecord(BaseModel): class CleanupRecord(BaseModel):
@ -124,6 +137,13 @@ class DatahubExecutionRequestCleanup:
params.update(overrides) params.update(overrides)
while True: while True:
if self._reached_runtime_limit():
break
if self.report.ergc_read_errors >= self.config.max_read_errors:
self.report.failure(
f"ergc({self.instance_id}): too many read errors, aborting."
)
break
try: try:
url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}" url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}"
response = self.graph._session.get(url, headers=headers, params=params) response = self.graph._session.get(url, headers=headers, params=params)
@ -141,7 +161,7 @@ class DatahubExecutionRequestCleanup:
logger.error( logger.error(
f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}" f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
) )
self.report.execution_request_cleanup_read_errors += 1 self.report.ergc_read_errors += 1
def _scroll_garbage_records(self): def _scroll_garbage_records(self):
state: Dict[str, Dict] = {} state: Dict[str, Dict] = {}
@ -150,7 +170,7 @@ class DatahubExecutionRequestCleanup:
running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000 running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000
for entry in self._scroll_execution_requests(): for entry in self._scroll_execution_requests():
self.report.execution_request_cleanup_records_read += 1 self.report.ergc_records_read += 1
key = entry.ingestion_source key = entry.ingestion_source
# Always delete corrupted records # Always delete corrupted records
@ -171,7 +191,7 @@ class DatahubExecutionRequestCleanup:
# Do not delete if number of requests is below minimum # Do not delete if number of requests is below minimum
if state[key]["count"] < self.config.keep_history_min_count: if state[key]["count"] < self.config.keep_history_min_count:
self.report.execution_request_cleanup_records_preserved += 1 self.report.ergc_records_preserved += 1
continue continue
# Do not delete if number of requests do not exceed allowed maximum, # Do not delete if number of requests do not exceed allowed maximum,
@ -179,7 +199,7 @@ class DatahubExecutionRequestCleanup:
if (state[key]["count"] < self.config.keep_history_max_count) and ( if (state[key]["count"] < self.config.keep_history_max_count) and (
entry.requested_at > state[key]["cutoffTimestamp"] entry.requested_at > state[key]["cutoffTimestamp"]
): ):
self.report.execution_request_cleanup_records_preserved += 1 self.report.ergc_records_preserved += 1
continue continue
# Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not # Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not
@ -188,7 +208,7 @@ class DatahubExecutionRequestCleanup:
"RUNNING", "RUNNING",
"PENDING", "PENDING",
]: ]:
self.report.execution_request_cleanup_records_preserved += 1 self.report.ergc_records_preserved += 1
continue continue
# Otherwise delete current record # Otherwise delete current record
@ -200,7 +220,7 @@ class DatahubExecutionRequestCleanup:
f"record timestamp: {entry.requested_at}." f"record timestamp: {entry.requested_at}."
) )
) )
self.report.execution_request_cleanup_records_deleted += 1 self.report.ergc_records_deleted += 1
yield entry yield entry
def _delete_entry(self, entry: CleanupRecord) -> None: def _delete_entry(self, entry: CleanupRecord) -> None:
@ -210,17 +230,31 @@ class DatahubExecutionRequestCleanup:
) )
self.graph.delete_entity(entry.urn, True) self.graph.delete_entity(entry.urn, True)
except Exception as e: except Exception as e:
self.report.execution_request_cleanup_delete_errors += 1 self.report.ergc_delete_errors += 1
logger.error( logger.error(
f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}" f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
) )
def _reached_runtime_limit(self) -> bool:
if (
self.config.runtime_limit_seconds
and self.report.ergc_start_time
and (
datetime.datetime.now() - self.report.ergc_start_time
>= datetime.timedelta(seconds=self.config.runtime_limit_seconds)
)
):
logger.info(f"ergc({self.instance_id}): max runtime reached.")
return True
return False
def run(self) -> None: def run(self) -> None:
if not self.config.enabled: if not self.config.enabled:
logger.info( logger.info(
f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled." f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled."
) )
return return
self.report.ergc_start_time = datetime.datetime.now()
logger.info( logger.info(
( (
@ -232,8 +266,11 @@ class DatahubExecutionRequestCleanup:
) )
for entry in self._scroll_garbage_records(): for entry in self._scroll_garbage_records():
if self._reached_runtime_limit():
break
self._delete_entry(entry) self._delete_entry(entry)
self.report.ergc_end_time = datetime.datetime.now()
logger.info( logger.info(
f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records." f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records."
) )

View File

@ -42,4 +42,5 @@ class IngestionStageReport:
self._timer = PerfTimer() self._timer = PerfTimer()
self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}"
logger.info(f"Stage started: {self.ingestion_stage}")
self._timer.start() self._timer.start()