mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-31 02:37:05 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			165 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			165 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| import time
 | |
| 
 | |
| import datahub.emitter.mce_builder as builder
 | |
| from datahub.emitter.mcp import MetadataChangeProposalWrapper
 | |
| from datahub.emitter.rest_emitter import DatahubRestEmitter
 | |
| from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
 | |
|     AssertionInfo,
 | |
|     AssertionResult,
 | |
|     AssertionResultType,
 | |
|     AssertionRunEvent,
 | |
|     AssertionRunStatus,
 | |
|     AssertionStdAggregation,
 | |
|     AssertionStdOperator,
 | |
|     AssertionStdParameter,
 | |
|     AssertionStdParameters,
 | |
|     AssertionStdParameterType,
 | |
|     AssertionType,
 | |
|     DatasetAssertionInfo,
 | |
|     DatasetAssertionScope,
 | |
| )
 | |
| from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
 | |
| from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProperties
 | |
| from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionSpec
 | |
| 
 | |
| 
 | |
| def datasetUrn(tbl: str) -> str:
 | |
|     return builder.make_dataset_urn("postgres", tbl)
 | |
| 
 | |
| 
 | |
| def fldUrn(tbl: str, fld: str) -> str:
 | |
|     return f"urn:li:schemaField:({datasetUrn(tbl)}, {fld})"
 | |
| 
 | |
| 
 | |
| def assertionUrn(info: AssertionInfo) -> str:
 | |
|     return "urn:li:assertion:432475190cc846f2894b5b3aa4d55af2"
 | |
| 
 | |
| 
 | |
| def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:
 | |
|     dataset_assertionRunEvent_mcp = MetadataChangeProposalWrapper(
 | |
|         entityUrn=assertionResult.assertionUrn,
 | |
|         aspect=assertionResult,
 | |
|     )
 | |
| 
 | |
|     # Emit BatchAssertion Result! (timeseries aspect)
 | |
|     emitter.emit_mcp(dataset_assertionRunEvent_mcp)
 | |
| 
 | |
| 
 | |
| # Create an emitter to the GMS REST API.
 | |
| emitter = DatahubRestEmitter("http://localhost:8080")
 | |
| 
 | |
| datasetProperties = DatasetProperties(
 | |
|     name="bazTable",
 | |
| )
 | |
| # Construct a MetadataChangeProposalWrapper object for dataset
 | |
| dataset_mcp = MetadataChangeProposalWrapper(
 | |
|     entityUrn=datasetUrn("bazTable"),
 | |
|     aspect=datasetProperties,
 | |
| )
 | |
| 
 | |
| # Emit Dataset entity properties aspect! (Skip if dataset is already present)
 | |
| emitter.emit_mcp(dataset_mcp)
 | |
| 
 | |
| # Construct an assertion object.
 | |
| assertion_maxVal = AssertionInfo(
 | |
|     type=AssertionType.DATASET,
 | |
|     datasetAssertion=DatasetAssertionInfo(
 | |
|         scope=DatasetAssertionScope.DATASET_COLUMN,
 | |
|         operator=AssertionStdOperator.BETWEEN,
 | |
|         nativeType="expect_column_max_to_be_between",
 | |
|         aggregation=AssertionStdAggregation.MAX,
 | |
|         fields=[fldUrn("bazTable", "col1")],
 | |
|         dataset=datasetUrn("bazTable"),
 | |
|         nativeParameters={"max_value": "99", "min_value": "89"},
 | |
|         parameters=AssertionStdParameters(
 | |
|             minValue=AssertionStdParameter(
 | |
|                 type=AssertionStdParameterType.NUMBER, value="89"
 | |
|             ),
 | |
|             maxValue=AssertionStdParameter(
 | |
|                 type=AssertionStdParameterType.NUMBER, value="99"
 | |
|             ),
 | |
|         ),
 | |
|     ),
 | |
|     customProperties={"suite_name": "demo_suite"},
 | |
| )
 | |
| 
 | |
| # Construct a MetadataChangeProposalWrapper object.
 | |
| assertion_maxVal_mcp = MetadataChangeProposalWrapper(
 | |
|     entityUrn=assertionUrn(assertion_maxVal),
 | |
|     aspect=assertion_maxVal,
 | |
| )
 | |
| 
 | |
| # Emit Assertion entity info aspect!
 | |
| emitter.emit_mcp(assertion_maxVal_mcp)
 | |
| 
 | |
| # Construct an assertion platform object.
 | |
| assertion_dataPlatformInstance = DataPlatformInstance(
 | |
|     platform=builder.make_data_platform_urn("great-expectations")
 | |
| )
 | |
| 
 | |
| # Construct a MetadataChangeProposalWrapper object for assertion platform
 | |
| assertion_dataPlatformInstance_mcp = MetadataChangeProposalWrapper(
 | |
|     entityUrn=assertionUrn(assertion_maxVal),
 | |
|     aspect=assertion_dataPlatformInstance,
 | |
| )
 | |
| # Emit Assertion entity platform aspect!
 | |
| emitter.emit(assertion_dataPlatformInstance_mcp)
 | |
| 
 | |
| 
 | |
| # Construct batch assertion result object for partition 1 batch
 | |
| assertionResult_maxVal_batch_partition1 = AssertionRunEvent(
 | |
|     timestampMillis=int(time.time() * 1000),
 | |
|     assertionUrn=assertionUrn(assertion_maxVal),
 | |
|     asserteeUrn=datasetUrn("bazTable"),
 | |
|     partitionSpec=PartitionSpec(partition=json.dumps([{"country": "IN"}])),
 | |
|     runId="uuid1",
 | |
|     status=AssertionRunStatus.COMPLETE,
 | |
|     result=AssertionResult(
 | |
|         type=AssertionResultType.SUCCESS,
 | |
|         externalUrl="http://example.com/uuid1",
 | |
|         actualAggValue=90,
 | |
|     ),
 | |
| )
 | |
| 
 | |
| emitAssertionResult(
 | |
|     assertionResult_maxVal_batch_partition1,
 | |
| )
 | |
| 
 | |
| # Construct batch assertion result object for partition 2 batch
 | |
| assertionResult_maxVal_batch_partition2 = AssertionRunEvent(
 | |
|     timestampMillis=int(time.time() * 1000),
 | |
|     assertionUrn=assertionUrn(assertion_maxVal),
 | |
|     asserteeUrn=datasetUrn("bazTable"),
 | |
|     partitionSpec=PartitionSpec(partition=json.dumps([{"country": "US"}])),
 | |
|     runId="uuid1",
 | |
|     status=AssertionRunStatus.COMPLETE,
 | |
|     result=AssertionResult(
 | |
|         type=AssertionResultType.FAILURE,
 | |
|         externalUrl="http://example.com/uuid1",
 | |
|         actualAggValue=101,
 | |
|     ),
 | |
| )
 | |
| 
 | |
| emitAssertionResult(
 | |
|     assertionResult_maxVal_batch_partition2,
 | |
| )
 | |
| 
 | |
| # Construct batch assertion result object for full table batch.
 | |
| assertionResult_maxVal_batch_fulltable = AssertionRunEvent(
 | |
|     timestampMillis=int(time.time() * 1000),
 | |
|     assertionUrn=assertionUrn(assertion_maxVal),
 | |
|     asserteeUrn=datasetUrn("bazTable"),
 | |
|     runId="uuid1",
 | |
|     status=AssertionRunStatus.COMPLETE,
 | |
|     result=AssertionResult(
 | |
|         type=AssertionResultType.SUCCESS,
 | |
|         externalUrl="http://example.com/uuid1",
 | |
|         actualAggValue=93,
 | |
|     ),
 | |
| )
 | |
| 
 | |
| emitAssertionResult(
 | |
|     assertionResult_maxVal_batch_fulltable,
 | |
| )
 | 
