datahub/assets/js/6fc80b22.8ff2a59b.js

1 line
36 KiB
JavaScript

"use strict";(self.webpackChunkdocs_website=self.webpackChunkdocs_website||[]).push([[75668],{15680:(e,t,n)=>{n.d(t,{xA:()=>c,yg:()=>g});var a=n(96540);function s(e,t,n){return t in e?Object.defineProperty(e,t,{value:n,enumerable:!0,configurable:!0,writable:!0}):e[t]=n,e}function r(e,t){var n=Object.keys(e);if(Object.getOwnPropertySymbols){var a=Object.getOwnPropertySymbols(e);t&&(a=a.filter((function(t){return Object.getOwnPropertyDescriptor(e,t).enumerable}))),n.push.apply(n,a)}return n}function i(e){for(var t=1;t<arguments.length;t++){var n=null!=arguments[t]?arguments[t]:{};t%2?r(Object(n),!0).forEach((function(t){s(e,t,n[t])})):Object.getOwnPropertyDescriptors?Object.defineProperties(e,Object.getOwnPropertyDescriptors(n)):r(Object(n)).forEach((function(t){Object.defineProperty(e,t,Object.getOwnPropertyDescriptor(n,t))}))}return e}function o(e,t){if(null==e)return{};var n,a,s=function(e,t){if(null==e)return{};var n,a,s={},r=Object.keys(e);for(a=0;a<r.length;a++)n=r[a],t.indexOf(n)>=0||(s[n]=e[n]);return s}(e,t);if(Object.getOwnPropertySymbols){var r=Object.getOwnPropertySymbols(e);for(a=0;a<r.length;a++)n=r[a],t.indexOf(n)>=0||Object.prototype.propertyIsEnumerable.call(e,n)&&(s[n]=e[n])}return s}var l=a.createContext({}),u=function(e){var t=a.useContext(l),n=t;return e&&(n="function"==typeof e?e(t):i(i({},t),e)),n},c=function(e){var t=u(e.components);return a.createElement(l.Provider,{value:t},e.children)},d="mdxType",m={inlineCode:"code",wrapper:function(e){var t=e.children;return a.createElement(a.Fragment,{},t)}},p=a.forwardRef((function(e,t){var n=e.components,s=e.mdxType,r=e.originalType,l=e.parentName,c=o(e,["components","mdxType","originalType","parentName"]),d=u(n),p=s,g=d["".concat(l,".").concat(p)]||d[p]||m[p]||r;return n?a.createElement(g,i(i({ref:t},c),{},{components:n})):a.createElement(g,i({ref:t},c))}));function g(e,t){var n=arguments,s=t&&t.mdxType;if("string"==typeof e||s){var r=n.length,i=new Array(r);i[0]=p;var o={};for(var l in t)hasOwnProperty.call(t,l)&&(o[l]=t[l]);o.originalType=e,o[d]="string"==typeof e?e:s,i[1]=o;for(var u=2;u<r;u++)i[u]=n[u];return a.createElement.apply(null,i)}return a.createElement.apply(null,n)}p.displayName="MDXCreateElement"},25077:(e,t,n)=>{n.r(t),n.d(t,{assets:()=>c,contentTitle:()=>l,default:()=>f,frontMatter:()=>o,metadata:()=>u,toc:()=>d});n(96540);var a=n(15680);function s(e,t,n){return t in e?Object.defineProperty(e,t,{value:n,enumerable:!0,configurable:!0,writable:!0}):e[t]=n,e}function r(e,t){return t=null!=t?t:{},Object.getOwnPropertyDescriptors?Object.defineProperties(e,Object.getOwnPropertyDescriptors(t)):function(e,t){var n=Object.keys(e);if(Object.getOwnPropertySymbols){var a=Object.getOwnPropertySymbols(e);t&&(a=a.filter((function(t){return Object.getOwnPropertyDescriptor(e,t).enumerable}))),n.push.apply(n,a)}return n}(Object(t)).forEach((function(n){Object.defineProperty(e,n,Object.getOwnPropertyDescriptor(t,n))})),e}function i(e,t){if(null==e)return{};var n,a,s=function(e,t){if(null==e)return{};var n,a,s={},r=Object.keys(e);for(a=0;a<r.length;a++)n=r[a],t.indexOf(n)>=0||(s[n]=e[n]);return s}(e,t);if(Object.getOwnPropertySymbols){var r=Object.getOwnPropertySymbols(e);for(a=0;a<r.length;a++)n=r[a],t.indexOf(n)>=0||Object.prototype.propertyIsEnumerable.call(e,n)&&(s[n]=e[n])}return s}const o={title:"Bulk Creating Smart Assertions with Python SDK",slug:"/api/tutorials/sdk/bulk-assertions-sdk",custom_edit_url:"https://github.com/datahub-project/datahub/blob/master/docs/api/tutorials/sdk/bulk-assertions-sdk.md"},l="Bulk Creating Smart Assertions with Python SDK",u={unversionedId:"docs/api/tutorials/sdk/bulk-assertions-sdk",id:"docs/api/tutorials/sdk/bulk-assertions-sdk",title:"Bulk Creating Smart Assertions with Python SDK",description:"This guide specifically covers how to use the DataHub Cloud Python SDK for bulk creating smart assertions, including:",source:"@site/genDocs/docs/api/tutorials/sdk/bulk-assertions-sdk.md",sourceDirName:"docs/api/tutorials/sdk",slug:"/api/tutorials/sdk/bulk-assertions-sdk",permalink:"/docs/api/tutorials/sdk/bulk-assertions-sdk",draft:!1,editUrl:"https://github.com/datahub-project/datahub/blob/master/docs/api/tutorials/sdk/bulk-assertions-sdk.md",tags:[],version:"current",frontMatter:{title:"Bulk Creating Smart Assertions with Python SDK",slug:"/api/tutorials/sdk/bulk-assertions-sdk",custom_edit_url:"https://github.com/datahub-project/datahub/blob/master/docs/api/tutorials/sdk/bulk-assertions-sdk.md"},sidebar:"overviewSidebar",previous:{title:"Custom Assertions",permalink:"/docs/api/tutorials/custom-assertions"},next:{title:"Incidents",permalink:"/docs/api/tutorials/incidents"}},c={},d=[{value:"Why Would You Use Bulk Assertion Creation?",id:"why-would-you-use-bulk-assertion-creation",level:2},{value:"Prerequisites",id:"prerequisites",level:2},{value:"Goal Of This Guide",id:"goal-of-this-guide",level:3},{value:"Overview",id:"overview",level:2},{value:"Setup",id:"setup",level:2},{value:"Step 1: Discover Tables",id:"step-1-discover-tables",level:2},{value:"Option A: Get Specific Tables",id:"option-a-get-specific-tables",level:3},{value:"Option B: Search for Tables by Pattern",id:"option-b-search-for-tables-by-pattern",level:3},{value:"Option C: Get Tables by Tag or Domain",id:"option-c-get-tables-by-tag-or-domain",level:3},{value:"Step 2: Create Table-Level Assertions",id:"step-2-create-table-level-assertions",level:2},{value:"Smart Freshness Assertions",id:"smart-freshness-assertions",level:3},{value:"Smart Volume Assertions",id:"smart-volume-assertions",level:3},{value:"Step 3: Get Column Information",id:"step-3-get-column-information",level:2},{value:"Step 4: Create Column-Level Assertions",id:"step-4-create-column-level-assertions",level:2},{value:"Smart Column Metric Assertions",id:"smart-column-metric-assertions",level:3},{value:"Step 5: Store Assertion URNs",id:"step-5-store-assertion-urns",level:2},{value:"Save to File",id:"save-to-file",level:3},{value:"Load from File (for updates)",id:"load-from-file-for-updates",level:3},{value:"Step 6: Update Existing Assertions",id:"step-6-update-existing-assertions",level:2},{value:"Advanced Patterns",id:"advanced-patterns",level:2},{value:"Conditional Assertion Creation",id:"conditional-assertion-creation",level:3},{value:"Batch Processing with Error Handling",id:"batch-processing-with-error-handling",level:3},{value:"Best Practices",id:"best-practices",level:2},{value:"1. <strong>Tag Strategy</strong>",id:"1-tag-strategy",level:3},{value:"2. <strong>Error Handling</strong>",id:"2-error-handling",level:3},{value:"3. <strong>URN Management</strong>",id:"3-urn-management",level:3},{value:"4. <strong>Performance Considerations</strong>",id:"4-performance-considerations",level:3},{value:"5. <strong>Testing Strategy</strong>",id:"5-testing-strategy",level:3},{value:"Complete Example Script",id:"complete-example-script",level:2}],m=(p="FeatureAvailability",function(e){return console.warn("Component "+p+" was not imported, exported, or provided by MDXProvider as global scope"),(0,a.yg)("div",e)});var p;const g={toc:d},y="wrapper";function f(e){var{components:t}=e,n=i(e,["components"]);return(0,a.yg)(y,r(function(e){for(var t=1;t<arguments.length;t++){var n=null!=arguments[t]?arguments[t]:{},a=Object.keys(n);"function"==typeof Object.getOwnPropertySymbols&&(a=a.concat(Object.getOwnPropertySymbols(n).filter((function(e){return Object.getOwnPropertyDescriptor(n,e).enumerable})))),a.forEach((function(t){s(e,t,n[t])}))}return e}({},g,n),{components:t,mdxType:"MDXLayout"}),(0,a.yg)("h1",{id:"bulk-creating-smart-assertions-with-python-sdk"},"Bulk Creating Smart Assertions with Python SDK"),(0,a.yg)(m,{saasOnly:!0,mdxType:"FeatureAvailability"}),(0,a.yg)("p",null,"This guide specifically covers how to use the DataHub Cloud Python SDK for ",(0,a.yg)("strong",{parentName:"p"},"bulk creating smart assertions"),", including:"),(0,a.yg)("ul",null,(0,a.yg)("li",{parentName:"ul"},"Smart Freshness Assertions"),(0,a.yg)("li",{parentName:"ul"},"Smart Volume Assertions"),(0,a.yg)("li",{parentName:"ul"},"Smart Column Metric Assertions")),(0,a.yg)("p",null,"This is particularly useful for applying data quality checks across many tables and columns at scale."),(0,a.yg)("h2",{id:"why-would-you-use-bulk-assertion-creation"},"Why Would You Use Bulk Assertion Creation?"),(0,a.yg)("p",null,"Bulk creating assertions with the Python SDK allows you to:"),(0,a.yg)("ul",null,(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"Scale data quality"),": Apply consistent assertions across hundreds or thousands of tables"),(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"Automate assertion management"),": Programmatically create and update assertions based on metadata patterns"),(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"Implement governance policies"),": Ensure all critical tables have appropriate data quality checks"),(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"Save time"),": Avoid manually creating assertions one by one through the UI")),(0,a.yg)("h2",{id:"prerequisites"},"Prerequisites"),(0,a.yg)("p",null,"You need:"),(0,a.yg)("ul",null,(0,a.yg)("li",{parentName:"ul"},"DataHub Cloud Python SDK installed (",(0,a.yg)("inlineCode",{parentName:"li"},"pip install acryl-datahub-cloud"),")"),(0,a.yg)("li",{parentName:"ul"},"Valid DataHub Cloud credentials configured (server URL and access token with appropriate permissions)")),(0,a.yg)("p",null,"The actor making API calls must have the ",(0,a.yg)("inlineCode",{parentName:"p"},"Edit Assertions")," and ",(0,a.yg)("inlineCode",{parentName:"p"},"Edit Monitors")," privileges for the datasets at hand."),(0,a.yg)("admonition",{type:"note"},(0,a.yg)("p",{parentName:"admonition"},"Before creating assertions, you need to ensure the target datasets are already present in your DataHub instance.\nIf you attempt to create assertions for entities that do not exist, your operation will fail.")),(0,a.yg)("h3",{id:"goal-of-this-guide"},"Goal Of This Guide"),(0,a.yg)("p",null,"This guide will show you how to programmatically create large numbers of smart assertions using the DataHub Cloud Python SDK."),(0,a.yg)("h2",{id:"overview"},"Overview"),(0,a.yg)("p",null,"The bulk assertion creation process follows these steps:"),(0,a.yg)("ol",null,(0,a.yg)("li",{parentName:"ol"},(0,a.yg)("strong",{parentName:"li"},"Discover tables"),": Use search or direct table queries to find datasets"),(0,a.yg)("li",{parentName:"ol"},(0,a.yg)("strong",{parentName:"li"},"Create table-level assertions"),": Add freshness and volume assertions for each table"),(0,a.yg)("li",{parentName:"ol"},(0,a.yg)("strong",{parentName:"li"},"Get column information"),": Retrieve schema details for each table"),(0,a.yg)("li",{parentName:"ol"},(0,a.yg)("strong",{parentName:"li"},"Create column-level assertions"),": Add column metric assertions for each relevant column"),(0,a.yg)("li",{parentName:"ol"},(0,a.yg)("strong",{parentName:"li"},"Store assertion URNs"),": Save assertion identifiers for future updates")),(0,a.yg)("h2",{id:"setup"},"Setup"),(0,a.yg)("p",null,"Connect to your DataHub instance:"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'from datahub.sdk import DataHubClient\n\nclient = DataHubClient(server="<your_server>", token="<your_token>")\n')),(0,a.yg)("ul",null,(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"server"),": The URL of your DataHub GMS server",(0,a.yg)("ul",{parentName:"li"},(0,a.yg)("li",{parentName:"ul"},"local: ",(0,a.yg)("inlineCode",{parentName:"li"},"http://localhost:8080")),(0,a.yg)("li",{parentName:"ul"},"hosted: ",(0,a.yg)("inlineCode",{parentName:"li"},"https://<your_datahub_url>/gms")))),(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"token"),": You'll need to ",(0,a.yg)("a",{parentName:"li",href:"/docs/authentication/personal-access-tokens"},"generate a Personal Access Token")," from your DataHub instance.")),(0,a.yg)("p",null,"Alternatively, initialize via using the ",(0,a.yg)("inlineCode",{parentName:"p"},"from_env()")," method after setting the ",(0,a.yg)("inlineCode",{parentName:"p"},"DATAHUB_GMS_URL")," and ",(0,a.yg)("inlineCode",{parentName:"p"},"DATAHUB_GMS_TOKEN")," env vars or by creating a ",(0,a.yg)("inlineCode",{parentName:"p"},"~/.datahubenv")," file via running ",(0,a.yg)("inlineCode",{parentName:"p"},"datahub init"),"."),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},"from datahub.sdk import DataHubClient\n\nclient = DataHubClient.from_env()\n")),(0,a.yg)("h2",{id:"step-1-discover-tables"},"Step 1: Discover Tables"),(0,a.yg)("h3",{id:"option-a-get-specific-tables"},"Option A: Get Specific Tables"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'from datahub.metadata.urns import DatasetUrn\n\n# Define specific tables you want to add assertions to\ntable_urns = [\n "urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.users,PROD)",\n "urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.orders,PROD)",\n "urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.products,PROD)",\n]\n\n# Convert to DatasetUrn objects\ndatasets = [DatasetUrn.from_string(urn) for urn in table_urns]\n')),(0,a.yg)("h3",{id:"option-b-search-for-tables-by-pattern"},"Option B: Search for Tables by Pattern"),(0,a.yg)("p",null,"For comprehensive search capabilities and filter options, see the ",(0,a.yg)("a",{parentName:"p",href:"/docs/api/tutorials/sdk/search_client"},"Search API documentation"),"."),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'from datahub.sdk.search_filters import FilterDsl\nfrom datahub.metadata.urns import DatasetUrn\n\n# Search for tables matching criteria\ndef find_tables_by_pattern(client, platform="snowflake", name_pattern="production_*"):\n """Find tables matching a specific pattern."""\n # Create filters for datasets on a specific platform with name pattern\n filters = FilterDsl.and_(\n FilterDsl.entity_type("dataset"),\n FilterDsl.platform(platform),\n FilterDsl.custom_filter("name", "EQUAL", [name_pattern])\n )\n\n # Use the search client to find matching datasets\n urns = list(client.search.get_urns(filter=filters))\n return [DatasetUrn.from_string(str(urn)) for urn in urns]\n\n# Use the search function\ndatasets = find_tables_by_pattern(client, platform="snowflake", name_pattern="production_*")\n')),(0,a.yg)("h3",{id:"option-c-get-tables-by-tag-or-domain"},"Option C: Get Tables by Tag or Domain"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'def find_tables_by_tag(client, tag_name="critical"):\n """Find tables with a specific tag."""\n # Create filters for datasets with a specific tag\n filters = FilterDsl.and_(\n FilterDsl.entity_type("dataset"),\n FilterDsl.custom_filter("tags", "EQUAL", [f"urn:li:tag:{tag_name}"])\n )\n\n # Use the search client to find matching datasets\n urns = list(client.search.get_urns(filter=filters))\n return [DatasetUrn.from_string(str(urn)) for urn in urns]\n\n# Find all tables tagged as "critical"\ncritical_datasets = find_tables_by_tag(client, "critical")\n')),(0,a.yg)("h2",{id:"step-2-create-table-level-assertions"},"Step 2: Create Table-Level Assertions"),(0,a.yg)("h3",{id:"smart-freshness-assertions"},"Smart Freshness Assertions"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'# Storage for assertion URNs (for later updates)\nassertion_registry = {\n "freshness": {},\n "volume": {},\n "column_metrics": {}\n}\n\ndef create_freshness_assertions(datasets, client, registry):\n """Create smart freshness assertions for multiple datasets."""\n\n for dataset_urn in datasets:\n try:\n # Create smart freshness assertion\n freshness_assertion = client.assertions.sync_smart_freshness_assertion(\n dataset_urn=dataset_urn,\n display_name=f"Freshness Anomaly Monitor",\n # Detection mechanism - information_schema is recommended\n detection_mechanism="information_schema",\n # Smart sensitivity setting\n sensitivity="medium", # options: "low", "medium", "high"\n # Tags for grouping (supports urns or plain tag names!)\n tags=["automated", "freshness", "data_quality"],\n # Enable the assertion\n enabled=True\n )\n\n # Store the assertion URN for future reference\n registry["freshness"][str(dataset_urn)] = str(freshness_assertion.urn)\n\n print(f"\u2705 Created freshness assertion for {dataset_urn.name}: {freshness_assertion.urn}")\n\n except Exception as e:\n print(f"\u274c Failed to create freshness assertion for {dataset_urn.name}: {e}")\n\n# Create freshness assertions for all datasets\ncreate_freshness_assertions(datasets, client, assertion_registry)\n')),(0,a.yg)("h3",{id:"smart-volume-assertions"},"Smart Volume Assertions"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'def create_volume_assertions(datasets, client, registry):\n """Create smart volume assertions for multiple datasets."""\n\n for dataset_urn in datasets:\n try:\n # Create smart volume assertion\n volume_assertion = client.assertions.sync_smart_volume_assertion(\n dataset_urn=dataset_urn,\n display_name=f"Smart Volume Check",\n # Detection mechanism options\n detection_mechanism="information_schema",\n # Smart sensitivity setting\n sensitivity="medium",\n # Tags for grouping\n tags=["automated", "volume", "data_quality"],\n # Schedule (optional - defaults to hourly)\n schedule="0 */6 * * *", # Every 6 hours\n # Enable the assertion\n enabled=True\n )\n\n # Store the assertion URN\n registry["volume"][str(dataset_urn)] = str(volume_assertion.urn)\n\n print(f"\u2705 Created volume assertion for {dataset_urn.name}: {volume_assertion.urn}")\n\n except Exception as e:\n print(f"\u274c Failed to create volume assertion for {dataset_urn.name}: {e}")\n\n# Create volume assertions for all datasets\ncreate_volume_assertions(datasets, client, assertion_registry)\n')),(0,a.yg)("h2",{id:"step-3-get-column-information"},"Step 3: Get Column Information"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'def get_dataset_columns(client, dataset_urn):\n """Get column information for a dataset."""\n try:\n # Get dataset using the entities client\n dataset = client.entities.get(dataset_urn)\n if dataset and hasattr(dataset, \'schema\') and dataset.schema:\n return [\n {\n "name": field.field_path,\n "type": field.native_data_type,\n "nullable": field.nullable if hasattr(field, \'nullable\') else True\n }\n for field in dataset.schema.fields\n ]\n return []\n except Exception as e:\n print(f"\u274c Failed to get columns for {dataset_urn}: {e}")\n return []\n\n# Get columns for each dataset\ndataset_columns = {}\nfor dataset_urn in datasets:\n columns = get_dataset_columns(client, dataset_urn)\n dataset_columns[str(dataset_urn)] = columns\n print(f"\ud83d\udcca Found {len(columns)} columns in {dataset_urn.name}")\n')),(0,a.yg)("h2",{id:"step-4-create-column-level-assertions"},"Step 4: Create Column-Level Assertions"),(0,a.yg)("h3",{id:"smart-column-metric-assertions"},"Smart Column Metric Assertions"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'def create_column_assertions(datasets, columns_dict, client, registry):\n """Create smart column metric assertions for multiple datasets and columns."""\n\n # Define rules for which columns should get which assertions\n assertion_rules = {\n # Null count checks for critical columns\n "null_checks": {\n "column_patterns": ["id", "*_id", "user_id", "email"],\n "metric_type": "null_count",\n "operator": "equal_to",\n "value": 0\n },\n # Unique count checks for ID columns\n "unique_checks": {\n "column_patterns": ["*_id", "email", "username"],\n "metric_type": "unique_percentage",\n "operator": "greater_than_or_equal_to",\n "value": 0.95\n },\n # Range checks for numeric columns\n "range_checks": {\n "column_patterns": ["amount", "price", "quantity", "score"],\n "metric_type": "min",\n "operator": "greater_than_or_equal_to",\n "value": 0\n }\n }\n\n for dataset_urn in datasets:\n dataset_key = str(dataset_urn)\n columns = columns_dict.get(dataset_key, [])\n\n if not columns:\n print(f"\u26a0\ufe0f No columns found for {dataset_urn.name}")\n continue\n\n registry["column_metrics"][dataset_key] = {}\n\n for column in columns:\n column_name = column["name"]\n column_type = column["type"].upper()\n\n # Apply assertion rules based on column name and type\n for rule_name, rule_config in assertion_rules.items():\n if should_apply_rule(column_name, column_type, rule_config):\n try:\n assertion = client.assertions.sync_smart_column_metric_assertion(\n dataset_urn=dataset_urn,\n column_name=column_name,\n metric_type=rule_config["metric_type"],\n operator=rule_config["operator"],\n criteria_parameters=rule_config["value"],\n display_name=f"{rule_name.replace(\'_\', \' \').title()} - {column_name}",\n # Detection mechanism for column metrics\n detection_mechanism="all_rows_query_datahub_dataset_profile",\n # Tags (plain names automatically converted to URNs)\n tags=["automated", "column_quality", rule_name],\n enabled=True\n )\n\n # Store assertion URN\n if column_name not in registry["column_metrics"][dataset_key]:\n registry["column_metrics"][dataset_key][column_name] = {}\n registry["column_metrics"][dataset_key][column_name][rule_name] = str(assertion.urn)\n\n print(f"\u2705 Created {rule_name} assertion for {dataset_urn.name}.{column_name}")\n\n except Exception as e:\n print(f"\u274c Failed to create {rule_name} assertion for {dataset_urn.name}.{column_name}: {e}")\n\ndef should_apply_rule(column_name, column_type, rule_config):\n """Determine if a rule should be applied to a column."""\n import fnmatch\n\n # Check column name patterns\n for pattern in rule_config["column_patterns"]:\n if fnmatch.fnmatch(column_name.lower(), pattern.lower()):\n return True\n\n # Add type-based rules if needed\n if rule_config.get("column_types"):\n return any(col_type in column_type for col_type in rule_config["column_types"])\n\n return False\n\n# Create column assertions\ncreate_column_assertions(datasets, dataset_columns, client, assertion_registry)\n')),(0,a.yg)("h2",{id:"step-5-store-assertion-urns"},"Step 5: Store Assertion URNs"),(0,a.yg)("h3",{id:"save-to-file"},"Save to File"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'import json\nfrom datetime import datetime\n\ndef save_assertion_registry(registry, filename=None):\n """Save assertion URNs to a file for future reference."""\n if filename is None:\n timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")\n filename = f"assertion_registry_{timestamp}.json"\n\n # Add metadata\n registry_with_metadata = {\n "created_at": datetime.now().isoformat(),\n "total_assertions": {\n "freshness": len(registry["freshness"]),\n "volume": len(registry["volume"]),\n "column_metrics": sum(\n len(cols) for cols in registry["column_metrics"].values()\n )\n },\n "assertions": registry\n }\n\n with open(filename, \'w\') as f:\n json.dump(registry_with_metadata, f, indent=2)\n\n print(f"\ud83d\udcbe Saved assertion registry to {filename}")\n return filename\n\n# Save the registry\nregistry_file = save_assertion_registry(assertion_registry)\n')),(0,a.yg)("h3",{id:"load-from-file-for-updates"},"Load from File (for updates)"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'def load_assertion_registry(filename):\n """Load assertion URNs from a previously saved file."""\n with open(filename, \'r\') as f:\n data = json.load(f)\n return data["assertions"]\n\n# Later, load for updates\n# assertion_registry = load_assertion_registry("assertion_registry_20240101_120000.json")\n')),(0,a.yg)("h2",{id:"step-6-update-existing-assertions"},"Step 6: Update Existing Assertions"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'def update_existing_assertions(registry, client):\n """Update existing assertions using stored URNs."""\n\n # Update freshness assertions\n for dataset_urn, assertion_urn in registry["freshness"].items():\n try:\n updated_assertion = client.assertions.sync_smart_freshness_assertion(\n dataset_urn=dataset_urn,\n urn=assertion_urn, # Provide existing URN for updates\n # Update any parameters as needed\n sensitivity="high", # Change sensitivity\n tags=["automated", "freshness", "data_quality", "updated"],\n enabled=True\n )\n print(f"\ud83d\udd04 Updated freshness assertion {assertion_urn}")\n except Exception as e:\n print(f"\u274c Failed to update freshness assertion {assertion_urn}: {e}")\n\n# Update assertions when needed\n# update_existing_assertions(assertion_registry, client)\n')),(0,a.yg)("h2",{id:"advanced-patterns"},"Advanced Patterns"),(0,a.yg)("h3",{id:"conditional-assertion-creation"},"Conditional Assertion Creation"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'def create_conditional_assertions(datasets, client):\n """Create assertions based on dataset metadata conditions."""\n\n for dataset_urn in datasets:\n try:\n # Get dataset metadata\n dataset = client.entities.get(dataset_urn)\n\n # Check if dataset has specific tags\n if dataset.tags and any("critical" in str(tag.tag) for tag in dataset.tags):\n # Create more stringent assertions for critical datasets\n client.assertions.sync_smart_freshness_assertion(\n dataset_urn=dataset_urn,\n sensitivity="high",\n detection_mechanism="information_schema",\n tags=["critical", "automated", "freshness"]\n )\n\n # Check dataset size and apply appropriate volume checks\n if dataset.dataset_properties:\n # Create different volume assertions based on table characteristics\n pass\n\n except Exception as e:\n print(f"\u274c Error processing {dataset_urn}: {e}")\n')),(0,a.yg)("h3",{id:"batch-processing-with-error-handling"},"Batch Processing with Error Handling"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'import time\nfrom typing import List, Dict, Any\n\ndef batch_create_assertions(\n datasets: List[DatasetUrn],\n client: DataHubClient,\n batch_size: int = 10,\n delay_seconds: float = 1.0\n) -> Dict[str, Any]:\n """Create assertions in batches with error handling and rate limiting."""\n\n results = {\n "successful": [],\n "failed": [],\n "total_processed": 0\n }\n\n for i in range(0, len(datasets), batch_size):\n batch = datasets[i:i + batch_size]\n print(f"Processing batch {i//batch_size + 1}: {len(batch)} datasets")\n\n for dataset_urn in batch:\n try:\n # Create assertion\n assertion = client.assertions.sync_smart_freshness_assertion(\n dataset_urn=dataset_urn,\n tags=["batch_created", "automated"],\n enabled=True\n )\n results["successful"].append({\n "dataset_urn": str(dataset_urn),\n "assertion_urn": str(assertion.urn)\n })\n\n except Exception as e:\n results["failed"].append({\n "dataset_urn": str(dataset_urn),\n "error": str(e)\n })\n\n results["total_processed"] += 1\n\n # Rate limiting between batches\n if i + batch_size < len(datasets):\n time.sleep(delay_seconds)\n\n return results\n\n# Use batch processing\nbatch_results = batch_create_assertions(datasets, client, batch_size=5)\nprint(f"Batch results: {batch_results[\'total_processed\']} processed, "\n f"{len(batch_results[\'successful\'])} successful, "\n f"{len(batch_results[\'failed\'])} failed")\n')),(0,a.yg)("h2",{id:"best-practices"},"Best Practices"),(0,a.yg)("h3",{id:"1-tag-strategy"},"1. ",(0,a.yg)("strong",{parentName:"h3"},"Tag Strategy")),(0,a.yg)("ul",null,(0,a.yg)("li",{parentName:"ul"},"Use consistent tag names for grouping assertions: ",(0,a.yg)("inlineCode",{parentName:"li"},'["automated", "freshness", "critical"]')),(0,a.yg)("li",{parentName:"ul"},"Plain tag names are automatically converted to URNs: ",(0,a.yg)("inlineCode",{parentName:"li"},'"my_tag"')," \u2192 ",(0,a.yg)("inlineCode",{parentName:"li"},'"urn:li:tag:my_tag"')),(0,a.yg)("li",{parentName:"ul"},"Create a tag hierarchy for different assertion types and priorities")),(0,a.yg)("h3",{id:"2-error-handling"},"2. ",(0,a.yg)("strong",{parentName:"h3"},"Error Handling")),(0,a.yg)("ul",null,(0,a.yg)("li",{parentName:"ul"},"Always wrap assertion creation in try-catch blocks"),(0,a.yg)("li",{parentName:"ul"},"Log failures for later investigation"),(0,a.yg)("li",{parentName:"ul"},"Implement retry logic for transient failures")),(0,a.yg)("h3",{id:"3-urn-management"},"3. ",(0,a.yg)("strong",{parentName:"h3"},"URN Management")),(0,a.yg)("ul",null,(0,a.yg)("li",{parentName:"ul"},"Store assertion URNs in a persistent location (file, database, etc.)"),(0,a.yg)("li",{parentName:"ul"},"Use meaningful file naming with timestamps"),(0,a.yg)("li",{parentName:"ul"},"Include metadata about when and why assertions were created")),(0,a.yg)("h3",{id:"4-performance-considerations"},"4. ",(0,a.yg)("strong",{parentName:"h3"},"Performance Considerations")),(0,a.yg)("p",null,"Our backend is designed to handle large scale operations. However, since writes are submitted asynchronously onto a Kafka queue, you may experience significant delays in the operations being applied. If you run into any issues, here are some tips that may help:"),(0,a.yg)("ul",null,(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"Consider running off peak")," to prevent causing spikes in Kafka lag for large bulk operations"),(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"Before you re-run sync")," (i.e. to update), wait for GMS to complete processing the previous run to prevent inconsistencies and duplicating: i.e., check if last ingested item has reflected in GMS"),(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"Monitor processing status")," through the DataHub UI or API to ensure operations complete successfully"),(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"Process datasets in batches")," to avoid overwhelming the API"),(0,a.yg)("li",{parentName:"ul"},(0,a.yg)("strong",{parentName:"li"},"Add delays")," between batch processing if needed")),(0,a.yg)("h3",{id:"5-testing-strategy"},"5. ",(0,a.yg)("strong",{parentName:"h3"},"Testing Strategy")),(0,a.yg)("ul",null,(0,a.yg)("li",{parentName:"ul"},"Start with a small subset of datasets for testing"),(0,a.yg)("li",{parentName:"ul"},"Validate assertion creation before bulk processing"),(0,a.yg)("li",{parentName:"ul"},"Test update scenarios with existing assertions")),(0,a.yg)("h2",{id:"complete-example-script"},"Complete Example Script"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},'#!/usr/bin/env python3\n"""\nComplete example script for bulk creating smart assertions.\n"""\n\nimport json\nimport time\nfrom datetime import datetime\nfrom typing import List, Dict, Any\n\nfrom datahub.sdk import DataHubClient\nfrom datahub.ingestion.graph.client import DataHubGraph\nfrom datahub.metadata.urns import DatasetUrn\n\ndef main():\n # Initialize the DataHub client\n client = DataHubClient(\n server="https://your-datahub-instance.com",\n token="your-access-token",\n )\n\n # The client provides both search and entity access\n\n # Define target datasets\n table_urns = [\n "urn:li:dataset:(urn:li:dataPlatform:snowflake,prod.analytics.users,PROD)",\n "urn:li:dataset:(urn:li:dataPlatform:snowflake,prod.analytics.orders,PROD)",\n "urn:li:dataset:(urn:li:dataPlatform:snowflake,prod.analytics.products,PROD)",\n ]\n\n datasets = [DatasetUrn.from_string(urn) for urn in table_urns]\n\n # Registry to store assertion URNs\n assertion_registry = {\n "freshness": {},\n "volume": {},\n "column_metrics": {}\n }\n\n print(f"\ud83d\ude80 Starting bulk assertion creation for {len(datasets)} datasets")\n\n # Step 1: Create table-level assertions\n print("\\n\ud83d\udccb Creating freshness assertions...")\n create_freshness_assertions(datasets, client, assertion_registry)\n\n print("\\n\ud83d\udcca Creating volume assertions...")\n create_volume_assertions(datasets, client, assertion_registry)\n\n # Step 2: Get column information and create column assertions\n print("\\n\ud83d\udd0d Analyzing columns and creating column assertions...")\n dataset_columns = {}\n for dataset_urn in datasets:\n columns = get_dataset_columns(client, dataset_urn)\n dataset_columns[str(dataset_urn)] = columns\n\n create_column_assertions(datasets, dataset_columns, client, assertion_registry)\n\n # Step 3: Save results\n print("\\n\ud83d\udcbe Saving assertion registry...")\n registry_file = save_assertion_registry(assertion_registry)\n\n # Summary\n total_assertions = (\n len(assertion_registry["freshness"]) +\n len(assertion_registry["volume"]) +\n sum(len(cols) for cols in assertion_registry["column_metrics"].values())\n )\n\n print(f"\\n\u2705 Bulk assertion creation complete!")\n print(f" \ud83d\udcc8 Total assertions created: {total_assertions}")\n print(f" \ud83d\udd50 Freshness assertions: {len(assertion_registry[\'freshness\'])}")\n print(f" \ud83d\udcca Volume assertions: {len(assertion_registry[\'volume\'])}")\n print(f" \ud83c\udfaf Column assertions: {sum(len(cols) for cols in assertion_registry[\'column_metrics\'].values())}")\n print(f" \ud83d\udcbe Registry saved to: {registry_file}")\n\nif __name__ == "__main__":\n main()\n')),(0,a.yg)("p",null,"This guide provides a comprehensive approach to bulk creating smart assertions using the DataHub Cloud Python SDK. The new tag name auto-conversion feature makes it easier to organize and manage your assertions with simple, readable tag names that are automatically converted to proper URN format."))}f.isMDXComponent=!0}}]);