Add support for runAssertion, runAssertions, and runAssertionsForAsset APIs (#10605)

This commit is contained in:
noggi 2024-05-31 14:23:34 -07:00 committed by GitHub
parent dfa9bd2779
commit b542143901
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1281,6 +1281,170 @@ class DataHubGraph(DatahubRestEmitter):
# return urn
return res["createTag"]
def _assertion_result_shared(self) -> str:
fragment: str = """
fragment assertionResult on AssertionResult {
type
rowCount
missingCount
unexpectedCount
actualAggValue
externalUrl
nativeResults {
value
}
error {
type
properties {
value
}
}
}
"""
return fragment
def _run_assertion_result_shared(self) -> str:
fragment: str = """
fragment runAssertionResult on RunAssertionResult {
assertion {
urn
}
result {
... assertionResult
}
}
"""
return fragment
def _run_assertion_build_params(
self, params: Optional[Dict[str, str]] = {}
) -> List[Any]:
if params is None:
return []
results = []
for key, value in params.items():
result = {
"key": key,
"value": value,
}
results.append(result)
return results
def run_assertion(
self,
urn: str,
save_result: bool = True,
parameters: Optional[Dict[str, str]] = {},
async_flag: bool = False,
) -> Dict:
params = self._run_assertion_build_params(parameters)
graph_query: str = """
%s
mutation runAssertion($assertionUrn: String!, $saveResult: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean!) {
runAssertion(urn: $assertionUrn, saveResult: $saveResult, parameters: $parameters, async: $async) {
... assertionResult
}
}
""" % (
self._assertion_result_shared()
)
variables = {
"assertionUrn": urn,
"saveResult": save_result,
"parameters": params,
"async": async_flag,
}
res = self.execute_graphql(
query=graph_query,
variables=variables,
)
return res["runAssertion"]
def run_assertions(
self,
urns: List[str],
save_result: bool = True,
parameters: Optional[Dict[str, str]] = {},
async_flag: bool = False,
) -> Dict:
params = self._run_assertion_build_params(parameters)
graph_query: str = """
%s
%s
mutation runAssertions($assertionUrns: [String!]!, $saveResult: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean!) {
runAssertions(urns: $assertionUrns, saveResults: $saveResult, parameters: $parameters, async: $async) {
passingCount
failingCount
errorCount
results {
... runAssertionResult
}
}
}
""" % (
self._assertion_result_shared(),
self._run_assertion_result_shared(),
)
variables = {
"assertionUrns": urns,
"saveResult": save_result,
"parameters": params,
"async": async_flag,
}
res = self.execute_graphql(
query=graph_query,
variables=variables,
)
return res["runAssertions"]
def run_assertions_for_asset(
self,
urn: str,
tag_urns: Optional[List[str]] = [],
parameters: Optional[Dict[str, str]] = {},
async_flag: bool = False,
) -> Dict:
params = self._run_assertion_build_params(parameters)
graph_query: str = """
%s
%s
mutation runAssertionsForAsset($assetUrn: String!, $tagUrns: [String!], $parameters: [StringMapEntryInput!], $async: Boolean!) {
runAssertionsForAsset(urn: $assetUrn, tagUrns: $tagUrns, parameters: $parameters, async: $async) {
passingCount
failingCount
errorCount
results {
... runAssertionResult
}
}
}
""" % (
self._assertion_result_shared(),
self._run_assertion_result_shared(),
)
variables = {
"assetUrn": urn,
"tagUrns": tag_urns,
"parameters": params,
"async": async_flag,
}
res = self.execute_graphql(
query=graph_query,
variables=variables,
)
return res["runAssertionsForAsset"]
def close(self) -> None:
self._make_schema_resolver.cache_clear()
super().close()