2023-04-20 12:17:11 +09:00
|
|
|
import logging
|
|
|
|
import time
|
|
|
|
|
|
|
|
from datahub.emitter.mce_builder import make_dataset_urn
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
|
|
|
|
|
|
# read-modify-write requires access to the DataHubGraph (RestEmitter is not enough)
|
|
|
|
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
|
|
|
|
|
|
|
|
# Imports for metadata model classes
|
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
AuditStampClass,
|
|
|
|
EditableSchemaFieldInfoClass,
|
2023-04-20 16:53:36 +05:30
|
|
|
EditableSchemaMetadataClass,
|
|
|
|
InstitutionalMemoryClass,
|
2023-04-20 12:17:11 +09:00
|
|
|
)
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
2023-04-20 16:53:36 +05:30
|
|
|
|
2023-04-20 12:17:11 +09:00
|
|
|
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
|
|
|
|
"""A helper function to extract simple . path notation from the v2 field path"""
|
|
|
|
if not field_path.startswith("[version=2.0]"):
|
|
|
|
# not a v2, we assume this is a simple path
|
|
|
|
return field_path
|
|
|
|
# this is a v2 field path
|
|
|
|
tokens = [
|
|
|
|
t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
|
|
|
|
]
|
|
|
|
|
|
|
|
return ".".join(tokens)
|
|
|
|
|
2023-04-20 16:53:36 +05:30
|
|
|
|
2023-04-20 12:17:11 +09:00
|
|
|
# Inputs -> owner, ownership_type, dataset
|
2023-04-20 16:53:36 +05:30
|
|
|
documentation_to_add = (
|
|
|
|
"Name of the user who was deleted. This description is updated via PythonSDK."
|
|
|
|
)
|
2023-04-20 12:17:11 +09:00
|
|
|
dataset_urn = make_dataset_urn(platform="hive", name="fct_users_deleted", env="PROD")
|
|
|
|
column = "user_name"
|
|
|
|
field_info_to_set = EditableSchemaFieldInfoClass(
|
|
|
|
fieldPath=column, description=documentation_to_add
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Some helpful variables to fill out objects later
|
|
|
|
now = int(time.time() * 1000) # milliseconds since epoch
|
|
|
|
current_timestamp = AuditStampClass(time=now, actor="urn:li:corpuser:ingestion")
|
|
|
|
|
|
|
|
|
|
|
|
# First we get the current owners
|
|
|
|
gms_endpoint = "http://localhost:8080"
|
|
|
|
graph = DataHubGraph(config=DatahubClientConfig(server=gms_endpoint))
|
|
|
|
|
|
|
|
current_editable_schema_metadata = graph.get_aspect(
|
|
|
|
entity_urn=dataset_urn,
|
|
|
|
aspect_type=EditableSchemaMetadataClass,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
need_write = False
|
|
|
|
|
|
|
|
if current_editable_schema_metadata:
|
|
|
|
for fieldInfo in current_editable_schema_metadata.editableSchemaFieldInfo:
|
|
|
|
if get_simple_field_path_from_v2_field_path(fieldInfo.fieldPath) == column:
|
|
|
|
# we have some editable schema metadata for this field
|
|
|
|
field_match = True
|
|
|
|
if documentation_to_add != fieldInfo.description:
|
|
|
|
fieldInfo.description = documentation_to_add
|
|
|
|
need_write = True
|
|
|
|
else:
|
|
|
|
# create a brand new editable dataset properties aspect
|
|
|
|
current_editable_schema_metadata = EditableSchemaMetadataClass(
|
|
|
|
editableSchemaFieldInfo=[field_info_to_set],
|
|
|
|
created=current_timestamp,
|
|
|
|
)
|
|
|
|
need_write = True
|
|
|
|
|
|
|
|
if need_write:
|
|
|
|
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
|
|
|
|
entityUrn=dataset_urn,
|
|
|
|
aspect=current_editable_schema_metadata,
|
|
|
|
)
|
|
|
|
graph.emit(event)
|
|
|
|
log.info(f"Documentation added to dataset {dataset_urn}")
|
|
|
|
|
|
|
|
else:
|
|
|
|
log.info("Documentation already exists and is identical, omitting write")
|
|
|
|
|
|
|
|
|
|
|
|
current_institutional_memory = graph.get_aspect(
|
|
|
|
entity_urn=dataset_urn, aspect_type=InstitutionalMemoryClass
|
|
|
|
)
|
|
|
|
|
2023-04-20 16:53:36 +05:30
|
|
|
need_write = False
|