feat(ingest/datahub-gc): gc source to cleanup things (#10085)

This commit is contained in:
Aseem Bansal 2024-03-21 15:21:17 +05:30 committed by GitHub
parent e6e5c091ed
commit 9659d60867
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 119 additions and 0 deletions

View File

@ -609,6 +609,7 @@ entry_points = {
"ldap = datahub.ingestion.source.ldap:LDAPSource",
"looker = datahub.ingestion.source.looker.looker_source:LookerDashboardSource",
"lookml = datahub.ingestion.source.looker.lookml_source:LookMLSource",
"datahub-gc = datahub.ingestion.source.gc.datahub_gc:DataHubGcSource",
"datahub-lineage-file = datahub.ingestion.source.metadata.lineage:LineageFileSource",
"datahub-business-glossary = datahub.ingestion.source.metadata.business_glossary:BusinessGlossaryFileSource",
"mlflow = datahub.ingestion.source.mlflow:MLflowSource",

View File

@ -0,0 +1,118 @@
import time
from dataclasses import dataclass
from typing import Iterable
from pydantic import Field
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
class DataHubGcSourceConfig(ConfigModel):
cleanup_expired_tokens: bool = Field(
default=True,
description="Whether to clean up expired tokens or not",
)
@dataclass
class DataHubGcSourceReport(SourceReport):
expired_tokens_revoked: int = 0
@platform_name("DataHubGc")
@config_class(DataHubGcSourceConfig)
@support_status(SupportStatus.TESTING)
class DataHubGcSource(Source):
def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.ctx = ctx
self.config = config
self.report = DataHubGcSourceReport()
self.graph = ctx.graph
assert (
self.graph is not None
), "DataHubGc source requires a graph. Please either use datahub-rest sink or set datahub_api"
@classmethod
def create(cls, config_dict, ctx):
config = DataHubGcSourceConfig.parse_obj(config_dict)
return cls(ctx, config)
def get_workunits_internal(
self,
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
self.revoke_expired_tokens()
yield from []
def revoke_expired_tokens(self) -> None:
total = 1
while total > 0:
expired_tokens_res = self._get_expired_tokens()
list_access_tokens = expired_tokens_res.get("listAccessTokens", {})
tokens = list_access_tokens.get("tokens", [])
total = list_access_tokens.get("total", 0)
for token in tokens:
self.report.expired_tokens_revoked += 1
token_id = token["id"]
self._revoke_token(token_id)
def _revoke_token(self, token_id: str) -> None:
assert self.graph is not None
self.graph.execute_graphql(
query="""mutation revokeAccessToken($tokenId: String!) {
revokeAccessToken(tokenId: $tokenId)
}
""",
variables={"tokenId": token_id},
)
def _get_expired_tokens(self) -> dict:
assert self.graph is not None
return self.graph.execute_graphql(
query="""query listAccessTokens($input: ListAccessTokenInput!) {
listAccessTokens(input: $input) {
start
count
total
tokens {
urn
type
id
name
description
actorUrn
ownerUrn
createdAt
expiresAt
__typename
}
__typename
}
}
""",
variables={
"input": {
"start": 0,
"count": 10,
"filters": [
{
"field": "expiresAt",
"values": [str(int(time.time() * 1000))],
"condition": "LESS_THAN",
}
],
}
},
)
def get_report(self) -> SourceReport:
return self.report