feat(SDK) Add FormPatchBuilder in python sdk and provide sample CRUD files (#10821)

This commit is contained in:
Chris Collins 2024-07-01 17:48:09 -04:00 committed by GitHub
parent f9d53b48a6
commit 6745dfb45e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 391 additions and 1 deletions

View File

@ -8,6 +8,7 @@ import static com.linkedin.metadata.Constants.DATA_JOB_INFO_ASPECT_NAME;
import static com.linkedin.metadata.Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME;
import static com.linkedin.metadata.Constants.DATA_PRODUCT_PROPERTIES_ASPECT_NAME;
import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME;
import static com.linkedin.metadata.Constants.FORM_INFO_ASPECT_NAME;
import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.GLOSSARY_TERMS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.OWNERSHIP_ASPECT_NAME;
@ -46,7 +47,8 @@ public class AspectTemplateEngine {
DATA_JOB_INPUT_OUTPUT_ASPECT_NAME,
CHART_INFO_ASPECT_NAME,
DASHBOARD_INFO_ASPECT_NAME,
STRUCTURED_PROPERTIES_ASPECT_NAME)
STRUCTURED_PROPERTIES_ASPECT_NAME,
FORM_INFO_ASPECT_NAME)
.collect(Collectors.toSet());
private final Map<String, Template<? extends RecordTemplate>> _aspectTemplateMap;

View File

@ -0,0 +1,84 @@
package com.linkedin.metadata.aspect.patch.template.form;
import static com.fasterxml.jackson.databind.node.JsonNodeFactory.instance;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.form.FormInfo;
import com.linkedin.metadata.aspect.patch.template.CompoundKeyTemplate;
import java.util.Collections;
import javax.annotation.Nonnull;
public class FormInfoTemplate extends CompoundKeyTemplate<FormInfo> {
private static final String PROMPTS_FIELD_NAME = "prompts";
private static final String PROMPT_ID_FIELD_NAME = "id";
private static final String ACTORS_FIELD_NAME = "actors";
private static final String USERS_FIELD_NAME = "users";
private static final String GROUPS_FIELD_NAME = "groups";
@Override
public FormInfo getSubtype(RecordTemplate recordTemplate) throws ClassCastException {
if (recordTemplate instanceof FormInfo) {
return (FormInfo) recordTemplate;
}
throw new ClassCastException("Unable to cast RecordTemplate to FormInfo");
}
@Override
public Class<FormInfo> getTemplateType() {
return FormInfo.class;
}
@Nonnull
@Override
public FormInfo getDefault() {
FormInfo formInfo = new FormInfo();
formInfo.setName("");
return formInfo;
}
@Nonnull
@Override
public JsonNode transformFields(JsonNode baseNode) {
JsonNode transformedNode =
arrayFieldToMap(
baseNode, PROMPTS_FIELD_NAME, Collections.singletonList(PROMPT_ID_FIELD_NAME));
JsonNode actors = transformedNode.get(ACTORS_FIELD_NAME);
if (actors == null) {
actors = instance.objectNode();
}
JsonNode transformedActorsNode =
arrayFieldToMap(actors, USERS_FIELD_NAME, Collections.emptyList());
transformedActorsNode =
arrayFieldToMap(transformedActorsNode, GROUPS_FIELD_NAME, Collections.emptyList());
((ObjectNode) transformedNode).set(ACTORS_FIELD_NAME, transformedActorsNode);
return transformedNode;
}
@Nonnull
@Override
public JsonNode rebaseFields(JsonNode patched) {
JsonNode transformedNode =
transformedMapToArray(
patched, PROMPTS_FIELD_NAME, Collections.singletonList(PROMPT_ID_FIELD_NAME));
JsonNode actors = transformedNode.get(ACTORS_FIELD_NAME);
if (actors == null) {
actors = instance.objectNode();
}
JsonNode transformedActorsNode =
transformedMapToArray(actors, USERS_FIELD_NAME, Collections.emptyList());
transformedActorsNode =
transformedMapToArray(transformedActorsNode, GROUPS_FIELD_NAME, Collections.emptyList());
((ObjectNode) transformedNode).set(ACTORS_FIELD_NAME, transformedActorsNode);
return transformedNode;
}
}

View File

@ -20,6 +20,7 @@ import com.linkedin.metadata.aspect.patch.template.dataproduct.DataProductProper
import com.linkedin.metadata.aspect.patch.template.dataset.DatasetPropertiesTemplate;
import com.linkedin.metadata.aspect.patch.template.dataset.EditableSchemaMetadataTemplate;
import com.linkedin.metadata.aspect.patch.template.dataset.UpstreamLineageTemplate;
import com.linkedin.metadata.aspect.patch.template.form.FormInfoTemplate;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.DefaultEntitySpec;
import com.linkedin.metadata.models.EntitySpec;
@ -87,6 +88,7 @@ public class SnapshotEntityRegistry implements EntityRegistry {
aspectSpecTemplateMap.put(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME, new DataJobInputOutputTemplate());
aspectSpecTemplateMap.put(
STRUCTURED_PROPERTIES_ASPECT_NAME, new StructuredPropertiesTemplate());
aspectSpecTemplateMap.put(FORM_INFO_ASPECT_NAME, new FormInfoTemplate());
return new AspectTemplateEngine(aspectSpecTemplateMap);
}

View File

@ -0,0 +1,56 @@
import logging
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
# Imports for metadata model classes
from datahub.metadata.schema_classes import (
FormActorAssignmentClass,
FormInfoClass,
FormPromptClass,
FormPromptTypeClass,
FormTypeClass,
StructuredPropertyParamsClass,
)
from datahub.metadata.urns import FormUrn
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# define the prompts for our form
prompt_1 = FormPromptClass(
id="1", # ensure IDs are globally unique
title="First Prompt",
type=FormPromptTypeClass.STRUCTURED_PROPERTY, # structured property type prompt
structuredPropertyParams=StructuredPropertyParamsClass(
urn="urn:li:structuredProperty:property1"
), # reference existing structured property
required=True,
)
prompt_2 = FormPromptClass(
id="2", # ensure IDs are globally unique
title="Second Prompt",
type=FormPromptTypeClass.FIELDS_STRUCTURED_PROPERTY, # structured property prompt on dataset schema fields
structuredPropertyParams=StructuredPropertyParamsClass(
urn="urn:li:structuredProperty:property1"
),
required=False, # dataset schema fields prompts should not be required
)
form_urn = FormUrn("metadata_initiative_1")
form_info_aspect = FormInfoClass(
name="Metadata Initiative 2024",
description="Please respond to this form for metadata compliance purposes",
type=FormTypeClass.VERIFICATION,
actors=FormActorAssignmentClass(owners=True),
prompts=[prompt_1, prompt_2],
)
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=str(form_urn),
aspect=form_info_aspect,
)
# Create rest emitter
rest_emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
rest_emitter.emit(event)

View File

@ -0,0 +1,22 @@
import logging
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.urns import FormUrn
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
graph = DataHubGraph(
config=DatahubClientConfig(
server="http://localhost:8080",
)
)
form_urn = FormUrn("metadata_initiative_1")
# Hard delete the form
graph.delete_entity(urn=str(form_urn), hard=True)
# Delete references to this form (must do)
graph.delete_references_to_urn(urn=str(form_urn), dry_run=False)
log.info(f"Deleted form {form_urn}")

View File

@ -0,0 +1,78 @@
import logging
from typing import Union
from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.metadata.schema_classes import (
FormPromptClass,
FormPromptTypeClass,
FormTypeClass,
OwnerClass,
OwnershipTypeClass,
StructuredPropertyParamsClass,
)
from datahub.metadata.urns import FormUrn
from datahub.specific.form import FormPatchBuilder
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Get an emitter, either REST or Kafka, this example shows you both
def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
USE_REST_EMITTER = True
if USE_REST_EMITTER:
gms_endpoint = "http://localhost:8080"
return DataHubRestEmitter(gms_server=gms_endpoint)
else:
kafka_server = "localhost:9092"
schema_registry_url = "http://localhost:8081"
return DatahubKafkaEmitter(
config=KafkaEmitterConfig(
connection=KafkaProducerConnectionConfig(
bootstrap=kafka_server, schema_registry_url=schema_registry_url
)
)
)
# input your unique form ID
form_urn = FormUrn("metadata_initiative_1")
# example prompts to add, must reference an existing structured property
new_prompt = FormPromptClass(
id="abcd",
title="title",
type=FormPromptTypeClass.STRUCTURED_PROPERTY,
structuredPropertyParams=StructuredPropertyParamsClass(
"urn:li:structuredProperty:io.acryl.test"
),
required=True,
)
new_prompt2 = FormPromptClass(
id="1234",
title="title",
type=FormPromptTypeClass.FIELDS_STRUCTURED_PROPERTY,
structuredPropertyParams=StructuredPropertyParamsClass(
"urn:li:structuredProperty:io.acryl.test"
),
required=True,
)
with get_emitter() as emitter:
for patch_mcp in (
FormPatchBuilder(str(form_urn))
.add_owner(
OwnerClass(
owner="urn:li:corpuser:jdoe", type=OwnershipTypeClass.TECHNICAL_OWNER
)
)
.set_name("New Name")
.set_description("New description here")
.set_type(FormTypeClass.VERIFICATION)
.set_ownership_form(True)
.add_prompts([new_prompt, new_prompt2])
.build()
):
emitter.emit(patch_mcp)

View File

@ -0,0 +1,146 @@
from typing import List, Optional, Union
from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
FormInfoClass as FormInfo,
FormPromptClass,
KafkaAuditHeaderClass,
OwnerClass as Owner,
OwnershipTypeClass,
SystemMetadataClass,
)
from datahub.specific.ownership import OwnershipPatchHelper
from datahub.utilities.urns.urn import Urn
class FormPatchBuilder(MetadataPatchProposal):
def __init__(
self,
urn: str,
system_metadata: Optional[SystemMetadataClass] = None,
audit_header: Optional[KafkaAuditHeaderClass] = None,
) -> None:
super().__init__(
urn, system_metadata=system_metadata, audit_header=audit_header
)
self.ownership_patch_helper = OwnershipPatchHelper(self)
def add_owner(self, owner: Owner) -> "FormPatchBuilder":
self.ownership_patch_helper.add_owner(owner)
return self
def remove_owner(
self, owner: str, owner_type: Optional[OwnershipTypeClass] = None
) -> "FormPatchBuilder":
"""
param: owner_type is optional
"""
self.ownership_patch_helper.remove_owner(owner, owner_type)
return self
def set_owners(self, owners: List[Owner]) -> "FormPatchBuilder":
self.ownership_patch_helper.set_owners(owners)
return self
def set_name(self, name: Optional[str] = None) -> "FormPatchBuilder":
if name is not None:
self._add_patch(
FormInfo.ASPECT_NAME,
"add",
path="/name",
value=name,
)
return self
def set_description(self, description: Optional[str] = None) -> "FormPatchBuilder":
if description is not None:
self._add_patch(
FormInfo.ASPECT_NAME,
"add",
path="/description",
value=description,
)
return self
def set_type(self, type: Optional[str] = None) -> "FormPatchBuilder":
if type is not None:
self._add_patch(
FormInfo.ASPECT_NAME,
"add",
path="/type",
value=type,
)
return self
def add_prompt(self, prompt: FormPromptClass) -> "FormPatchBuilder":
self._add_patch(
FormInfo.ASPECT_NAME,
"add",
path=f"/prompts/{self.quote(prompt.id)}",
value=prompt,
)
return self
def add_prompts(self, prompts: List[FormPromptClass]) -> "FormPatchBuilder":
for prompt in prompts:
self.add_prompt(prompt)
return self
def remove_prompt(self, prompt_id: str) -> "FormPatchBuilder":
self._add_patch(
FormInfo.ASPECT_NAME,
"remove",
path=f"/prompts/{self.quote(prompt_id)}",
value=prompt_id,
)
return self
def remove_prompts(self, prompt_ids: List[str]) -> "FormPatchBuilder":
for prompt_id in prompt_ids:
self.remove_prompt(prompt_id)
return self
def set_ownership_form(self, is_ownership: bool) -> "FormPatchBuilder":
self._add_patch(
FormInfo.ASPECT_NAME,
"add",
path="/actors/owners",
value=is_ownership,
)
return self
def add_assigned_user(self, user_urn: Union[str, Urn]) -> "FormPatchBuilder":
self._add_patch(
FormInfo.ASPECT_NAME,
"add",
path=f"/actors/users/{self.quote(str(user_urn))}",
value=user_urn,
)
return self
def remove_assigned_user(self, user_urn: Union[str, Urn]) -> "FormPatchBuilder":
self._add_patch(
FormInfo.ASPECT_NAME,
"remove",
path=f"/actors/users/{self.quote(str(user_urn))}",
value=user_urn,
)
return self
def add_assigned_group(self, group_urn: Union[str, Urn]) -> "FormPatchBuilder":
self._add_patch(
FormInfo.ASPECT_NAME,
"add",
path=f"/actors/groups/{self.quote(str(group_urn))}",
value=group_urn,
)
return self
def remove_assigned_group(self, group_urn: Union[str, Urn]) -> "FormPatchBuilder":
self._add_patch(
FormInfo.ASPECT_NAME,
"remove",
path=f"/actors/groups/{self.quote(str(group_urn))}",
value=group_urn,
)
return self