docs(sdk): extended and organized assertion & subscriptions docs (#14418)

This commit is contained in:
Jay 2025-08-12 17:08:45 -04:00 committed by GitHub
parent 1fd0578813
commit e93de201b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 433 additions and 35 deletions

View File

@ -914,6 +914,7 @@ module.exports = {
},
"docs/api/tutorials/owners",
"docs/api/tutorials/structured-properties",
"docs/api/tutorials/subscriptions",
"docs/api/tutorials/tags",
"docs/api/tutorials/terms",
{

View File

@ -29,11 +29,11 @@ The actor making API calls must have the `Edit Assertions` and `Edit Monitors` p
You can create new dataset Assertions to DataHub using the following APIs.
### Freshness Assertion
<Tabs>
<TabItem value="graphql" label="GraphQL" default>
### Freshness Assertion
To create a new freshness assertion, use the `upsertDatasetFreshnessAssertionMonitor` GraphQL Mutation.
```graphql
@ -71,10 +71,108 @@ This API will return a unique identifier (URN) for the new assertion if you were
}
```
</TabItem>
<TabItem value="python" label="Python">
```python
from datahub.sdk import DataHubClient
from datahub.metadata.urns import DatasetUrn
# Initialize the client
client = DataHubClient(server="<your_server>", token="<your_token>")
# Create smart freshness assertion (AI-powered anomaly detection)
dataset_urn = DatasetUrn.from_string("urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.table,PROD)")
smart_freshness_assertion = client.assertions.sync_smart_freshness_assertion(
dataset_urn=dataset_urn,
display_name="Smart Freshness Anomaly Monitor",
# Detection mechanism - information_schema is recommended
detection_mechanism="information_schema",
# Smart sensitivity setting
sensitivity="medium", # options: "low", "medium", "high"
# Tags for grouping
tags=["automated", "freshness", "data_quality"],
# Enable the assertion
enabled=True
)
print(f"Created smart freshness assertion: {smart_freshness_assertion.urn}")
# Create traditional freshness assertion (fixed interval)
freshness_assertion = client.assertions.sync_freshness_assertion(
dataset_urn=dataset_urn,
display_name="Fixed Interval Freshness Check",
# Fixed interval check - table should be updated within lookback window
freshness_schedule_check_type="fixed_interval",
# Lookback window - table should be updated within 8 hours
lookback_window={"unit": "HOUR", "multiple": 8},
# Detection mechanism
detection_mechanism="information_schema",
# Evaluation schedule - how often to check
schedule="0 */2 * * *", # Check every 2 hours
# Tags
tags=["automated", "freshness", "fixed_interval"],
enabled=True
)
print(f"Created freshness assertion: {freshness_assertion.urn}")
# Create since-last-check freshness assertion
since_last_check_assertion = client.assertions.sync_freshness_assertion(
dataset_urn=dataset_urn,
display_name="Since Last Check Freshness",
# Since last check - table should be updated since the last evaluation
freshness_schedule_check_type="since_the_last_check",
# Detection mechanism with last modified column
detection_mechanism={
"type": "last_modified_column",
"column_name": "updated_at",
"additional_filter": "status = 'active'"
},
# Evaluation schedule - how often to check
schedule="0 */6 * * *", # Check every 6 hours
# Tags
tags=["automated", "freshness", "since_last_check"],
enabled=True
)
print(f"Created since last check assertion: {since_last_check_assertion.urn}")
# Create freshness assertion with high watermark column
watermark_freshness_assertion = client.assertions.sync_freshness_assertion(
dataset_urn=dataset_urn,
display_name="High Watermark Freshness Check",
# Fixed interval check with specific lookback window
freshness_schedule_check_type="fixed_interval",
# Lookback window - check for updates in the last 24 hours
lookback_window={"unit": "DAY", "multiple": 1},
# Detection mechanism using high watermark column (e.g., auto-incrementing ID)
detection_mechanism={
"type": "high_watermark_column",
"column_name": "id",
"additional_filter": "status != 'deleted'"
},
# Evaluation schedule
schedule="0 8 * * *", # Check daily at 8 AM
# Tags
tags=["automated", "freshness", "high_watermark"],
enabled=True
)
print(f"Created watermark freshness assertion: {watermark_freshness_assertion.urn}")
```
</TabItem>
</Tabs>
For more details, see the [Freshness Assertions](/docs/managed-datahub/observe/freshness-assertions.md) guide.
### Volume Assertions
<Tabs>
<TabItem value="graphql" label="GraphQL" default>
To create a new volume assertion, use the `upsertDatasetVolumeAssertionMonitor` GraphQL Mutation.
```graphql
@ -116,10 +214,94 @@ This API will return a unique identifier (URN) for the new assertion if you were
}
```
</TabItem>
<!-- TODO: move the python examples to metadata-ingestion/examples/library -->
<TabItem value="python" label="Python">
```python
from datahub.sdk import DataHubClient
from datahub.metadata.urns import DatasetUrn
# Initialize the client
client = DataHubClient(server="<your_server>", token="<your_token>")
# Create smart volume assertion (AI-powered anomaly detection)
dataset_urn = DatasetUrn.from_string("urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.table,PROD)")
smart_volume_assertion = client.assertions.sync_smart_volume_assertion(
dataset_urn=dataset_urn,
display_name="Smart Volume Check",
# Detection mechanism options
detection_mechanism="information_schema",
# Smart sensitivity setting
sensitivity="medium", # options: "low", "medium", "high"
# Tags for grouping
tags=["automated", "volume", "data_quality"],
# Schedule (optional - defaults to hourly)
schedule="0 */6 * * *", # Every 6 hours
# Enable the assertion
enabled=True
)
print(f"Created smart volume assertion: {smart_volume_assertion.urn}")
# Create traditional volume assertion (fixed threshold range)
volume_assertion = client.assertions.sync_volume_assertion(
dataset_urn=dataset_urn,
display_name="Row Count Range Check",
criteria_condition="ROW_COUNT_IS_WITHIN_A_RANGE",
criteria_parameters=(1000, 10000), # Between 1000 and 10000 rows
# Detection mechanism
detection_mechanism="information_schema",
# Evaluation schedule
schedule="0 */4 * * *", # Every 4 hours
# Tags
tags=["automated", "volume", "threshold_check"],
enabled=True
)
print(f"Created volume assertion: {volume_assertion.urn}")
# Example with single threshold
min_volume_assertion = client.assertions.sync_volume_assertion(
dataset_urn=dataset_urn,
display_name="Minimum Row Count Check",
criteria_condition="ROW_COUNT_IS_GREATER_THAN_OR_EQUAL_TO",
criteria_parameters=500, # At least 500 rows
detection_mechanism="information_schema",
schedule="0 */2 * * *", # Every 2 hours
tags=["automated", "volume", "minimum_check"],
enabled=True
)
print(f"Created minimum volume assertion: {min_volume_assertion.urn}")
# Example with growth-based assertion
growth_volume_assertion = client.assertions.sync_volume_assertion(
dataset_urn=dataset_urn,
display_name="Daily Growth Check",
criteria_condition="ROW_COUNT_GROWS_BY_AT_MOST_ABSOLUTE",
criteria_parameters=1000, # Grows by at most 1000 rows between checks
detection_mechanism="information_schema",
schedule="0 6 * * *", # Daily at 6 AM
tags=["automated", "volume", "growth_check"],
enabled=True
)
print(f"Created growth volume assertion: {growth_volume_assertion.urn}")
```
</TabItem>
</Tabs>
For more details, see the [Volume Assertions](/docs/managed-datahub/observe/volume-assertions.md) guide.
### Column Assertions
<Tabs>
<TabItem value="graphql" label="GraphQL" default>
To create a new column assertion, use the `upsertDatasetFieldAssertionMonitor` GraphQL Mutation.
```graphql
@ -165,11 +347,64 @@ This API will return a unique identifier (URN) for the new assertion if you were
}
```
</TabItem>
<TabItem value="python" label="Python">
```python
from datahub.sdk import DataHubClient
from datahub.metadata.urns import DatasetUrn
# Initialize the client
client = DataHubClient(server="<your_server>", token="<your_token>")
# Create smart column metric assertion (AI-powered anomaly detection)
dataset_urn = DatasetUrn.from_string("urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.table,PROD)")
smart_column_assertion = client.assertions.sync_smart_column_metric_assertion(
dataset_urn=dataset_urn,
column_name="user_id",
metric_type="null_count",
display_name="Smart Null Count Check - user_id",
# Detection mechanism for column metrics
detection_mechanism="all_rows_query_datahub_dataset_profile",
# Smart sensitivity setting
sensitivity="medium", # options: "low", "medium", "high"
# Tags
tags=["automated", "column_quality", "null_checks"],
enabled=True
)
print(f"Created smart column assertion: {smart_column_assertion.urn}")
# Create regular column metric assertion (fixed threshold)
column_assertion = client.assertions.sync_column_metric_assertion(
dataset_urn=dataset_urn,
column_name="price",
metric_type="min",
operator="greater_than_or_equal_to",
criteria_parameters=0,
display_name="Price Minimum Check",
# Evaluation schedule
schedule="0 */4 * * *", # Every 4 hours
# Tags
tags=["automated", "column_quality", "price_validation"],
enabled=True
)
print(f"Created column assertion: {column_assertion.urn}")
```
</TabItem>
</Tabs>
For more details, see the [Column Assertions](/docs/managed-datahub/observe/column-assertions.md) guide.
### Custom SQL Assertions
To create a new column assertion, use the `upsertDatasetSqlAssertionMonitor` GraphQL Mutation.
<Tabs>
<TabItem value="graphql" label="GraphQL" default>
To create a new custom SQL assertion, use the `upsertDatasetSqlAssertionMonitor` GraphQL Mutation.
```graphql
mutation upsertDatasetSqlAssertionMonitor {
@ -207,10 +442,59 @@ This API will return a unique identifier (URN) for the new assertion if you were
}
```
</TabItem>
<TabItem value="python" label="Python">
```python
from datahub.sdk import DataHubClient
from datahub.metadata.urns import DatasetUrn
# Initialize the client
client = DataHubClient(server="<your_server>", token="<your_token>")
# Create custom SQL assertion
dataset_urn = DatasetUrn.from_string("urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.table,PROD)")
sql_assertion = client.assertions.sync_sql_assertion(
dataset_urn=dataset_urn,
display_name="Revenue Quality Check",
statement="SELECT SUM(revenue) FROM database.schema.table WHERE date >= CURRENT_DATE - INTERVAL '1 day'",
criteria_condition="IS_GREATER_THAN_OR_EQUAL_TO",
criteria_parameters=1000,
# Evaluation schedule
schedule="0 6 * * *", # Daily at 6 AM
# Tags
tags=["automated", "revenue", "data_quality"],
enabled=True
)
print(f"Created SQL assertion: {sql_assertion.urn}")
# Example with range check
range_sql_assertion = client.assertions.sync_sql_assertion(
dataset_urn=dataset_urn,
display_name="Daily Order Count Range Check",
statement="SELECT COUNT(*) FROM database.schema.orders WHERE DATE(created_at) = CURRENT_DATE",
criteria_condition="IS_WITHIN_A_RANGE",
criteria_parameters=(50, 500), # Between 50 and 500 orders per day
schedule="0 */6 * * *", # Every 6 hours
tags=["automated", "orders", "volume_check"],
enabled=True
)
print(f"Created range SQL assertion: {range_sql_assertion.urn}")
```
</TabItem>
</Tabs>
For more details, see the [Custom SQL Assertions](/docs/managed-datahub/observe/custom-sql-assertions.md) guide.
### Schema Assertions
<Tabs>
<TabItem value="graphql" label="GraphQL" default>
To create a new schema assertion, use the `upsertDatasetSchemaAssertionMonitor` GraphQL Mutation.
```graphql
@ -248,11 +532,11 @@ This API will return a unique identifier (URN) for the new assertion if you were
}
```
For more details, see the [Schema Assertions](/docs/managed-datahub/observe/schema-assertions.md) guide.
</TabItem>
</Tabs>
For more details, see the [Schema Assertions](/docs/managed-datahub/observe/schema-assertions.md) guide.
## Run Assertions
You can use the following APIs to trigger the assertions you've created to run on-demand. This is
@ -1163,30 +1447,6 @@ urn:li:assertion:<unique-assertion-id>
3. Generate the [**AssertionRunEvent**](/docs/generated/metamodel/entities/assertion.md#assertionrunevent-timeseries) timeseries aspect using the Python SDK. This aspect should contain the result of the assertion
run at a given timestamp and will be shown on the results graph in DataHub's UI.
## Create Subscription
## Create and Remove Subscriptions
You can create subscriptions to receive notifications when assertions change state (pass, fail, or error) or when other entity changes occur. Subscriptions can be created at the dataset level (affecting all assertions on the dataset) or at the assertion level (affecting only specific assertions).
<Tabs>
<TabItem value="python" label="Python">
```python
{{ inline /metadata-ingestion/examples/library/create_subscription.py show_path_as_comment }}
```
</TabItem>
</Tabs>
## Remove Subscription
You can remove existing subscriptions to stop receiving notifications. The unsubscribe method supports selective removal of specific change types or complete removal of subscriptions.
<Tabs>
<TabItem value="python" label="Python">
```python
{{ inline /metadata-ingestion/examples/library/remove_subscription.py show_path_as_comment }}
```
</TabItem>
</Tabs>
Reference the [Subscriptions SDK](/docs/api/tutorials/subscriptions.md) for more information on how to create and remove subscriptions on Datasets or Assertions.

View File

@ -5,7 +5,7 @@ import TabItem from '@theme/TabItem';
<FeatureAvailability saasOnly />
This guide specifically covers how to use the DataHub Cloud Python SDK for **bulk creating smart assertions**, including:
This guide specifically covers how to use the [DataHub Cloud Python SDK](https://pypi.org/project/acryl-datahub-cloud/) for **bulk creating smart assertions**, including:
- Smart Freshness Assertions
- Smart Volume Assertions
@ -33,7 +33,7 @@ The actor making API calls must have the `Edit Assertions` and `Edit Monitors` p
:::note
Before creating assertions, you need to ensure the target datasets are already present in your DataHub instance.
If you attempt to create assertions for entities that do not exist, your operation will fail.
If you attempt to create assertions for entities that do not exist, GMS will continuously report errors to the logs.
:::
### Goal Of This Guide
@ -73,6 +73,12 @@ from datahub.sdk import DataHubClient
client = DataHubClient.from_env()
```
## Important Considerations for Parallel Processing
- Always run bulk assertion creation for a given dataset in a single thread to avoid race conditions.
- Always call subscription APIs for a given dataset in a single thread to avoid race conditions.
- If you are subscribing to assertions directly, make sure to also run the script on a single thread per dataset.
## Step 1: Discover Tables
### Option A: Get Specific Tables
@ -339,7 +345,15 @@ def should_apply_rule(column_name, column_type, rule_config):
create_column_assertions(datasets, dataset_columns, client, assertion_registry)
```
## Step 5: Store Assertion URNs
## Step 5: Create Subscription
Reference the [Subscriptions SDK](/docs/api/tutorials/subscriptions.md) for more information on how to create subscriptions on Datasets or Assertions.
:::note
When creating subscriptions in bulk, you must perform the operation in a single thread to avoid race conditions. Additionally, we recommend creating subscriptions at the dataset level rather than for individual assertions, as this makes ongoing management much easier.
:::
## Step 6: Store Assertion URNs
### Save to File
@ -389,7 +403,7 @@ def load_assertion_registry(filename):
# assertion_registry = load_assertion_registry("assertion_registry_20240101_120000.json")
```
## Step 6: Update Existing Assertions
## Step 7: Update Existing Assertions
```python
def update_existing_assertions(registry, client):

View File

@ -0,0 +1,119 @@
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
# Subscriptions
<FeatureAvailability saasOnly />
## Why Would You Use Subscriptions on Datasets?
Subscriptions are a way to receive notifications when entity changes occur (e.g. deprecations, schema changes, ownership changes, etc.) or when assertions change state (pass, fail, or error). Subscriptions can be created at the dataset level (affecting any changes on the dataset, as well as all assertions on the dataset) or at the assertion level (affecting only specific assertions).
### Goal Of This Guide
This guide specifically covers how to use the [DataHub Cloud Python SDK](https://pypi.org/project/acryl-datahub-cloud/) for managing Subscriptions:
- Create: create a subscription to a dataset or assertion.
- Remove: remove a subscription.
# Prerequisites
- DataHub Cloud Python SDK installed (`pip install acryl-datahub-cloud`)
- The actor making API calls must have the `Manage User Subscriptions` privilege for the datasets at hand.
- If subscribing to a group, the actor should also be a member of the group.
:::note
Before creating subscriptions, you need to ensure the target datasets and groups are already present in your DataHub instance.
If you attempt to create subscriptions for entities that do not exist, GMS will continuously report errors to the logs.
:::
## Create Subscription
You can create subscriptions to receive notifications when assertions change state (pass, fail, or error) or when other entity changes occur. Subscriptions can be created at the dataset level (affecting any changes on the dataset, as well as all assertions on the dataset) or at the assertion level (affecting only specific assertions).
<Tabs>
<TabItem value="python" label="Python">
```python
{{ inline /metadata-ingestion/examples/library/create_subscription.py show_path_as_comment }}
```
</TabItem>
</Tabs>
## Remove Subscription
You can remove existing subscriptions to stop receiving notifications. The unsubscribe method supports selective removal of specific change types or complete removal of subscriptions.
<Tabs>
<TabItem value="python" label="Python">
```python
{{ inline /metadata-ingestion/examples/library/remove_subscription.py show_path_as_comment }}
```
</TabItem>
</Tabs>
# Available Change Types
The following change types are available for subscriptions:
#### Schema Changes
- `OPERATION_COLUMN_ADDED` - When a new column is added to a dataset
- `OPERATION_COLUMN_REMOVED` - When a column is removed from a dataset
- `OPERATION_COLUMN_MODIFIED` - When an existing column is modified
#### Operational Metadata Changes
- `OPERATION_ROWS_INSERTED` - When rows are inserted into a dataset
- `OPERATION_ROWS_UPDATED` - When rows are updated in a dataset
- `OPERATION_ROWS_REMOVED` - When rows are removed from a dataset
#### Assertion Events
- `ASSERTION_PASSED` - When an assertion run passes
- `ASSERTION_FAILED` - When an assertion run fails
- `ASSERTION_ERROR` - When an assertion run encounters an error
#### Incident Status Changes
- `INCIDENT_RAISED` - When a new incident is raised
- `INCIDENT_RESOLVED` - When an incident is resolved
#### Test Status Changes
- `TEST_PASSED` - When a test passes
- `TEST_FAILED` - When a test fails
#### Deprecation Status Changes
- `DEPRECATED` - When an entity is marked as deprecated
- `UNDEPRECATED` - When an entity's deprecation status is removed
#### Ingestion Status Changes
- `INGESTION_SUCCEEDED` - When ingestion completes successfully
- `INGESTION_FAILED` - When ingestion fails
#### Documentation Changes
- `DOCUMENTATION_CHANGE` - When documentation is modified
#### Ownership Changes
- `OWNER_ADDED` - When an owner is added to an entity
- `OWNER_REMOVED` - When an owner is removed from an entity
#### Glossary Term Changes
- `GLOSSARY_TERM_ADDED` - When a glossary term is added to an entity
- `GLOSSARY_TERM_REMOVED` - When a glossary term is removed from an entity
- `GLOSSARY_TERM_PROPOSED` - When a glossary term is proposed for an entity
#### Tag Changes
- `TAG_ADDED` - When a tag is added to an entity
- `TAG_REMOVED` - When a tag is removed from an entity
- `TAG_PROPOSED` - When a tag is proposed for an entity

View File

@ -157,6 +157,10 @@ Then select individual assertions you'd like to subscribe to:
<img width="70%" alt="7" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/saas/subscription-and-notification/s_n-assertion-sub-resub-1.jpg" />
</p>
## Programatically Managing Subscriptions
You can create and remove subscriptions programatically using the [GraphQL APIs](/docs/api/graphql/overview.md) or the [Python Subscriptions SDK](/docs/api/tutorials/subscriptions.md).
## FAQ
<details>