feat(sdk): enable parametrized assertion run in python sdk (#15447)

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Jay <159848059+jayacryl@users.noreply.github.com>
Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
This commit is contained in:
Peter Wang 2025-12-19 15:07:52 -05:00 committed by GitHub
parent 7d041cc073
commit a461da02b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 309 additions and 30 deletions

View File

@ -569,7 +569,11 @@ Where **type** will contain the Result of the assertion run, either `SUCCESS`, `
The `saveResult` argument determines whether the result of the assertion will be saved to DataHub's backend,
and available to view through the DataHub UI. If this is set to false, the result will NOT be stored in DataHub's
backend. The value defaults to `true`.
backend. **Default: `true`** (results are saved when not specified).
The `async` argument controls whether the assertion runs asynchronously. When set to `true`, the API will kick off
the assertion run and return immediately. When set to `false` or omitted, the assertion runs synchronously with a
30-second timeout. **Default: `false`** (synchronous execution when not specified).
If the assertion is external (not natively executed by DataHub), this API will return an error.
@ -624,7 +628,11 @@ Where **type** will contain the Result of the assertion run, either `SUCCESS`, `
The `saveResults` argument determines whether the result of the assertion will be saved to DataHub's backend,
and available to view through the DataHub UI. If this is set to false, the result will NOT be stored in DataHub's
backend. The value defaults to `true`.
backend. **Default: `true`** (results are saved when not specified).
The `async` argument controls whether the assertions run asynchronously. When set to `true`, the API will kick off
the assertion runs and return immediately. When set to `false` or omitted, the assertions run synchronously with a
30-second timeout per assertion. **Default: `false`** (synchronous execution when not specified).
If any of the assertion are external (not natively executed by DataHub), they will simply be omitted from the result set.
@ -702,7 +710,11 @@ Where `type` will contain the Result of the assertion run, either `SUCCESS`, `FA
The `saveResults` argument determines whether the result of the assertion will be saved to DataHub's backend,
and available to view through the DataHub UI. If this is set to false, the result will NOT be stored in DataHub's
backend. The value defaults to `true`.
backend. **Default: `true`** (results are saved when not specified).
The `async` argument controls whether the assertions run asynchronously. When set to `true`, the API will kick off
the assertion runs and return immediately. When set to `false` or omitted, the assertions run synchronously with a
30-second timeout per assertion. **Default: `false`** (synchronous execution when not specified).
If any of the assertion are external (not natively executed by DataHub), they will simply be omitted from the result
set.
@ -827,7 +839,7 @@ mutation runAssertionsForAsset {
</Tabs>
<!-- ### Experimental: Providing Dynamic Parameters to Assertions
### Providing Dynamic Parameters to Assertions
You can provide **dynamic parameters** to your assertions to customize their behavior. This is particularly useful for
assertions that require dynamic parameters, such as a threshold value that changes based on the time of day.
@ -859,7 +871,7 @@ mutation runAssertion {
```
At runtime, the `${parameterName}` placeholder in the SQL fragment will be replaced with the provided `parameterValue` before the query
is sent to the database for execution. -->
is sent to the database for execution.
## Get Assertion Details

View File

@ -1711,29 +1711,45 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
def run_assertion(
self,
urn: str,
save_result: bool = True,
save_result: Optional[bool] = None,
parameters: Optional[Dict[str, str]] = None,
async_flag: bool = False,
async_flag: Optional[bool] = None,
) -> Dict:
if parameters is None:
parameters = {}
"""
Run a DataHub assertion on-demand.
Args:
urn: The URN of the assertion to run.
save_result: Whether to save the result to DataHub's backend. If not specified,
the backend default is True (results will be saved and visible in the UI).
parameters: Dynamic parameters to inject into the assertion's SQL fragment.
async_flag: Whether to run the assertion asynchronously. If not specified,
the backend default is False (synchronous execution with 30-second timeout).
Returns:
Dict containing the assertion result with type (SUCCESS/FAILURE/ERROR) and details.
"""
params = self._run_assertion_build_params(parameters)
graph_query: str = """
%s
mutation runAssertion($assertionUrn: String!, $saveResult: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean!) {
mutation runAssertion($assertionUrn: String!, $saveResult: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean) {
runAssertion(urn: $assertionUrn, saveResult: $saveResult, parameters: $parameters, async: $async) {
... assertionResult
}
}
""" % (self._assertion_result_shared())
variables = {
variables: Dict[str, Any] = {
"assertionUrn": urn,
"saveResult": save_result,
"parameters": params,
"async": async_flag,
}
if save_result is not None:
variables["saveResult"] = save_result
if async_flag is not None:
variables["async"] = async_flag
res = self.execute_graphql(
query=graph_query,
variables=variables,
@ -1744,17 +1760,29 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
def run_assertions(
self,
urns: List[str],
save_result: bool = True,
save_result: Optional[bool] = None,
parameters: Optional[Dict[str, str]] = None,
async_flag: bool = False,
async_flag: Optional[bool] = None,
) -> Dict:
if parameters is None:
parameters = {}
"""
Run multiple DataHub assertions on-demand.
Args:
urns: List of assertion URNs to run.
save_result: Whether to save the results to DataHub's backend. If not specified,
the backend default is True (results will be saved and visible in the UI).
parameters: Dynamic parameters to inject into the assertions' SQL fragments.
async_flag: Whether to run the assertions asynchronously. If not specified,
the backend default is False (synchronous execution with 30-second timeout).
Returns:
Dict containing pass/fail/error counts and individual assertion results.
"""
params = self._run_assertion_build_params(parameters)
graph_query: str = """
%s
%s
mutation runAssertions($assertionUrns: [String!]!, $saveResult: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean!) {
mutation runAssertions($assertionUrns: [String!]!, $saveResult: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean) {
runAssertions(urns: $assertionUrns, saveResults: $saveResult, parameters: $parameters, async: $async) {
passingCount
failingCount
@ -1769,13 +1797,17 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
self._run_assertion_result_shared(),
)
variables = {
variables: Dict[str, Any] = {
"assertionUrns": urns,
"saveResult": save_result,
"parameters": params,
"async": async_flag,
}
if save_result is not None:
variables["saveResult"] = save_result
if async_flag is not None:
variables["async"] = async_flag
res = self.execute_graphql(
query=graph_query,
variables=variables,
@ -1788,17 +1820,31 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
urn: str,
tag_urns: Optional[List[str]] = None,
parameters: Optional[Dict[str, str]] = None,
async_flag: bool = False,
async_flag: Optional[bool] = None,
) -> Dict:
if tag_urns is None:
tag_urns = []
if parameters is None:
parameters = {}
"""
Run all assertions (or tagged subset) for a data asset on-demand.
Args:
urn: The URN of the data asset (e.g., dataset) whose assertions to run.
tag_urns: Optional list of tag URNs to filter which assertions to run.
If not specified, all assertions for the asset will be run.
parameters: Dynamic parameters to inject into the assertions' SQL fragments.
async_flag: Whether to run the assertions asynchronously. If not specified,
the backend default is False (synchronous execution with 30-second timeout).
Returns:
Dict containing pass/fail/error counts and individual assertion results.
Note:
This method does not support the save_result parameter. Results are always
saved to DataHub's backend (equivalent to save_result=True).
"""
params = self._run_assertion_build_params(parameters)
graph_query: str = """
%s
%s
mutation runAssertionsForAsset($assetUrn: String!, $tagUrns: [String!], $parameters: [StringMapEntryInput!], $async: Boolean!) {
mutation runAssertionsForAsset($assetUrn: String!, $tagUrns: [String!], $parameters: [StringMapEntryInput!], $async: Boolean) {
runAssertionsForAsset(urn: $assetUrn, tagUrns: $tagUrns, parameters: $parameters, async: $async) {
passingCount
failingCount
@ -1813,13 +1859,17 @@ class DataHubGraph(DatahubRestEmitter, EntityVersioningAPI):
self._run_assertion_result_shared(),
)
variables = {
variables: Dict[str, Any] = {
"assetUrn": urn,
"tagUrns": tag_urns,
"parameters": params,
"async": async_flag,
}
if tag_urns is not None:
variables["tagUrns"] = tag_urns
if async_flag is not None:
variables["async"] = async_flag
res = self.execute_graphql(
query=graph_query,
variables=variables,

View File

@ -0,0 +1,217 @@
"""Tests for assertion running methods in DataHubGraph."""
from unittest.mock import patch
import pytest
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
@pytest.fixture
def mock_graph():
"""Create a mocked DataHubGraph instance."""
with patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection"):
graph = DataHubGraph(DatahubClientConfig(server="http://fake-domain.local"))
return graph
class TestRunAssertion:
"""Tests for the run_assertion method."""
def test_run_assertion_minimal(self, mock_graph):
"""Test run_assertion with only required parameters."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {
"runAssertion": {
"type": "SUCCESS",
"nativeResults": [],
}
}
result = mock_graph.run_assertion(urn="urn:li:assertion:test123")
assert result is not None
mock_execute.assert_called_once()
# Verify variables - should only contain assertionUrn and parameters
variables = mock_execute.call_args.kwargs["variables"]
assert variables["assertionUrn"] == "urn:li:assertion:test123"
assert variables["parameters"] == []
assert "saveResult" not in variables
assert "async" not in variables
def test_run_assertion_with_save_result(self, mock_graph):
"""Test run_assertion with save_result parameter."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {"runAssertion": {"type": "SUCCESS"}}
mock_graph.run_assertion(urn="urn:li:assertion:test123", save_result=True)
variables = mock_execute.call_args.kwargs["variables"]
assert variables["saveResult"] is True
def test_run_assertion_with_async_flag(self, mock_graph):
"""Test run_assertion with async_flag parameter."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {"runAssertion": {"type": "SUCCESS"}}
mock_graph.run_assertion(urn="urn:li:assertion:test123", async_flag=True)
variables = mock_execute.call_args.kwargs["variables"]
assert variables["async"] is True
def test_run_assertion_with_parameters(self, mock_graph):
"""Test run_assertion with dynamic parameters."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {"runAssertion": {"type": "SUCCESS"}}
mock_graph.run_assertion(
urn="urn:li:assertion:test123",
parameters={"threshold": "100", "date": "2024-01-01"},
)
variables = mock_execute.call_args.kwargs["variables"]
assert len(variables["parameters"]) == 2
param_dict = {p["key"]: p["value"] for p in variables["parameters"]}
assert param_dict["threshold"] == "100"
assert param_dict["date"] == "2024-01-01"
class TestRunAssertions:
"""Tests for the run_assertions method."""
def test_run_assertions_minimal(self, mock_graph):
"""Test run_assertions with only required parameters."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {
"runAssertions": {
"passingCount": 2,
"failingCount": 0,
}
}
result = mock_graph.run_assertions(
urns=["urn:li:assertion:test1", "urn:li:assertion:test2"]
)
assert result is not None
mock_execute.assert_called_once()
# Verify variables
variables = mock_execute.call_args.kwargs["variables"]
assert variables["assertionUrns"] == [
"urn:li:assertion:test1",
"urn:li:assertion:test2",
]
assert variables["parameters"] == []
assert "saveResult" not in variables
assert "async" not in variables
def test_run_assertions_with_save_result(self, mock_graph):
"""Test run_assertions with save_result parameter."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {"runAssertions": {"passingCount": 1}}
mock_graph.run_assertions(
urns=["urn:li:assertion:test1"], save_result=False
)
variables = mock_execute.call_args.kwargs["variables"]
assert variables["saveResult"] is False
def test_run_assertions_with_async_flag(self, mock_graph):
"""Test run_assertions with async_flag parameter."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {"runAssertions": {"passingCount": 1}}
mock_graph.run_assertions(urns=["urn:li:assertion:test1"], async_flag=True)
variables = mock_execute.call_args.kwargs["variables"]
assert variables["async"] is True
def test_run_assertions_with_parameters(self, mock_graph):
"""Test run_assertions with dynamic parameters."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {"runAssertions": {"passingCount": 1}}
mock_graph.run_assertions(
urns=["urn:li:assertion:test1"], parameters={"env": "prod"}
)
variables = mock_execute.call_args.kwargs["variables"]
assert len(variables["parameters"]) == 1
assert variables["parameters"][0]["key"] == "env"
assert variables["parameters"][0]["value"] == "prod"
class TestRunAssertionsForAsset:
"""Tests for the run_assertions_for_asset method."""
def test_run_assertions_for_asset_minimal(self, mock_graph):
"""Test run_assertions_for_asset with only required parameters."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {
"runAssertionsForAsset": {
"passingCount": 3,
"failingCount": 1,
}
}
result = mock_graph.run_assertions_for_asset(
urn="urn:li:dataset:(urn:li:dataPlatform:mysql,db.table,PROD)"
)
assert result is not None
mock_execute.assert_called_once()
# Verify variables
variables = mock_execute.call_args.kwargs["variables"]
assert (
variables["assetUrn"]
== "urn:li:dataset:(urn:li:dataPlatform:mysql,db.table,PROD)"
)
assert variables["parameters"] == []
assert "tagUrns" not in variables
assert "async" not in variables
def test_run_assertions_for_asset_with_tag_urns(self, mock_graph):
"""Test run_assertions_for_asset with tag_urns parameter."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {"runAssertionsForAsset": {"passingCount": 2}}
mock_graph.run_assertions_for_asset(
urn="urn:li:dataset:(urn:li:dataPlatform:mysql,db.table,PROD)",
tag_urns=["urn:li:tag:critical"],
)
variables = mock_execute.call_args.kwargs["variables"]
assert variables["tagUrns"] == ["urn:li:tag:critical"]
def test_run_assertions_for_asset_with_async_flag(self, mock_graph):
"""Test run_assertions_for_asset with async_flag parameter."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {"runAssertionsForAsset": {"passingCount": 1}}
mock_graph.run_assertions_for_asset(
urn="urn:li:dataset:(urn:li:dataPlatform:mysql,db.table,PROD)",
async_flag=True,
)
variables = mock_execute.call_args.kwargs["variables"]
assert variables["async"] is True
def test_run_assertions_for_asset_with_parameters(self, mock_graph):
"""Test run_assertions_for_asset with dynamic parameters."""
with patch.object(mock_graph, "execute_graphql") as mock_execute:
mock_execute.return_value = {"runAssertionsForAsset": {"passingCount": 1}}
mock_graph.run_assertions_for_asset(
urn="urn:li:dataset:(urn:li:dataPlatform:mysql,db.table,PROD)",
parameters={"threshold": "50", "window": "24h"},
)
variables = mock_execute.call_args.kwargs["variables"]
assert len(variables["parameters"]) == 2
param_dict = {p["key"]: p["value"] for p in variables["parameters"]}
assert param_dict["threshold"] == "50"
assert param_dict["window"] == "24h"