feat(patch): refactor cll patch (#9922)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
RyanHolstien 2024-02-27 12:00:44 -06:00 committed by GitHub
parent 336d0543c1
commit ddf0b7d2cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 415 additions and 657 deletions

View File

@ -9,12 +9,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.DatasetLineageType;
import com.linkedin.dataset.FineGrainedLineageDownstreamType;
import com.linkedin.dataset.FineGrainedLineageUpstreamType;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
@ToString
@ -52,120 +51,52 @@ public class UpstreamLineagePatchBuilder
return this;
}
/**
* Method for adding an upstream FineGrained Dataset
*
* @param datasetUrn dataset to be set as upstream
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedUpstreamDataset(
@Nonnull DatasetUrn datasetUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "upstreamType"
+ "/"
+ "DATASET"
+ "/"
+ datasetUrn,
instance.numberNode(finalConfidenceScore)));
return this;
}
/**
* Adds a field as a fine grained upstream
*
* @param schemaFieldUrn a schema field to be marked as upstream, format:
* @param upstreamSchemaField a schema field to be marked as upstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the upstream lineage type, either Field or Field Set
* @param downstreamSchemaField the downstream schema field this upstream is derived from, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param queryUrn query urn the relationship is derived from
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedUpstreamField(
@Nonnull Urn schemaFieldUrn,
@Nonnull Urn upstreamSchemaField,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageUpstreamType type) {
@Nonnull Urn downstreamSchemaField,
@Nullable Urn queryUrn) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString();
String finalQueryUrn;
if (queryUrn == null || StringUtils.isBlank(queryUrn.toString())) {
finalQueryUrn = "NONE";
} else {
finalType = type.toString();
finalQueryUrn = queryUrn.toString();
}
ObjectNode fineGrainedLineageNode = instance.objectNode();
fineGrainedLineageNode.put("confidenceScore", instance.numberNode(finalConfidenceScore));
pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "upstreamType"
+ downstreamSchemaField
+ "/"
+ finalType
+ finalQueryUrn
+ "/"
+ schemaFieldUrn,
instance.numberNode(finalConfidenceScore)));
+ upstreamSchemaField,
fineGrainedLineageNode));
return this;
}
/**
* Adds a field as a fine grained downstream
*
* @param schemaFieldUrn a schema field to be marked as downstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the downstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder addFineGrainedDownstreamField(
@Nonnull Urn schemaFieldUrn,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageDownstreamType type) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}
pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "downstreamType"
+ "/"
+ finalType
+ "/"
+ schemaFieldUrn,
instance.numberNode(finalConfidenceScore)));
return this;
}
private Float getConfidenceScoreOrDefault(@Nullable Float confidenceScore) {
float finalConfidenceScore;
if (confidenceScore != null && confidenceScore > 0 && confidenceScore <= 1.0f) {
@ -180,96 +111,43 @@ public class UpstreamLineagePatchBuilder
/**
* Removes a field as a fine grained upstream
*
* @param schemaFieldUrn a schema field to be marked as upstream, format:
* @param upstreamSchemaFieldUrn a schema field to be marked as upstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the upstream lineage type, either Field or Field Set
* @param downstreamSchemaField the downstream schema field this upstream is derived from, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param queryUrn query urn the relationship is derived from
* @return this builder
*/
public UpstreamLineagePatchBuilder removeFineGrainedUpstreamField(
@Nonnull Urn schemaFieldUrn,
@Nonnull Urn upstreamSchemaFieldUrn,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageUpstreamType type) {
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString();
@Nonnull Urn downstreamSchemaField,
@Nullable Urn queryUrn) {
String finalQueryUrn;
if (queryUrn == null || StringUtils.isBlank(queryUrn.toString())) {
finalQueryUrn = "NONE";
} else {
finalType = type.toString();
finalQueryUrn = queryUrn.toString();
}
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "upstreamType"
+ downstreamSchemaField
+ "/"
+ finalType
+ finalQueryUrn
+ "/"
+ schemaFieldUrn,
+ upstreamSchemaFieldUrn,
null));
return this;
}
public UpstreamLineagePatchBuilder removeFineGrainedUpstreamDataset(
@Nonnull DatasetUrn datasetUrn, @Nonnull String transformationOperation) {
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "upstreamType"
+ "/"
+ "DATASET"
+ "/"
+ datasetUrn,
null));
return this;
}
/**
* Adds a field as a fine grained downstream
*
* @param schemaFieldUrn a schema field to be marked as downstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param type the downstream lineage type, either Field or Field Set
* @return this builder
*/
public UpstreamLineagePatchBuilder removeFineGrainedDownstreamField(
@Nonnull Urn schemaFieldUrn,
@Nonnull String transformationOperation,
@Nullable FineGrainedLineageDownstreamType type) {
String finalType;
if (type == null) {
// Default to set of fields if not explicitly a single field
finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString();
} else {
finalType = type.toString();
}
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ "downstreamType"
+ "/"
+ finalType
+ "/"
+ schemaFieldUrn,
null));
return this;
}
@Override
protected String getAspectName() {
return UPSTREAM_LINEAGE_ASPECT_NAME;

View File

@ -1,10 +1,7 @@
package com.linkedin.metadata.aspect.patch.template.dataset;
import static com.fasterxml.jackson.databind.node.JsonNodeFactory.instance;
import static com.linkedin.metadata.Constants.FINE_GRAINED_LINEAGE_DATASET_TYPE;
import static com.linkedin.metadata.Constants.FINE_GRAINED_LINEAGE_FIELD_SET_TYPE;
import static com.linkedin.metadata.Constants.FINE_GRAINED_LINEAGE_FIELD_TYPE;
import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME;
import static com.linkedin.metadata.Constants.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
@ -22,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.codehaus.plexus.util.StringUtils;
public class UpstreamLineageTemplate extends CompoundKeyTemplate<UpstreamLineage> {
@ -35,10 +33,12 @@ public class UpstreamLineageTemplate extends CompoundKeyTemplate<UpstreamLineage
private static final String FINE_GRAINED_DOWNSTREAMS = "downstreams";
private static final String FINE_GRAINED_TRANSFORMATION_OPERATION = "transformOperation";
private static final String FINE_GRAINED_CONFIDENCE_SCORE = "confidenceScore";
private static final String FINE_GRAINED_QUERY_ID = "query";
// Template support
private static final String NONE_TRANSFORMATION_TYPE = "NONE";
private static final Float DEFAULT_CONFIDENCE_SCORE = 1.0f;
private static final String DEFAULT_QUERY_ID = "NONE";
@Override
public UpstreamLineage getSubtype(RecordTemplate recordTemplate) throws ClassCastException {
@ -94,7 +94,7 @@ public class UpstreamLineageTemplate extends CompoundKeyTemplate<UpstreamLineage
/**
* Combines fine grained lineage array into a map using upstream and downstream types as keys,
* defaulting when not present. Due to this construction, patches will look like: path:
* /fineGrainedLineages/TRANSFORMATION_OPERATION/(upstreamType || downstreamType)/TYPE/FIELD_URN,
* /fineGrainedLineages/TRANSFORMATION_OPERATION/DOWNSTREAM_FIELD_URN/QUERY_ID/UPSTREAM_FIELD_URN,
* op: ADD/REMOVE, value: float (confidenceScore) Due to the way FineGrainedLineage was designed
* it doesn't necessarily have a consistent key we can reference, so this specialized method
* mimics the arrayFieldToMap of the super class with the specialization that it does not put the
@ -128,6 +128,18 @@ public class UpstreamLineageTemplate extends CompoundKeyTemplate<UpstreamLineage
ObjectNode transformationOperationNode =
(ObjectNode) mapNode.get(transformationOperation);
ArrayNode downstreams =
nodeClone.has(FINE_GRAINED_DOWNSTREAMS)
? (ArrayNode) nodeClone.get(FINE_GRAINED_DOWNSTREAMS)
: null;
if (downstreams == null || downstreams.size() != 1) {
throw new UnsupportedOperationException(
"Patching not supported on fine grained lineages with not"
+ " exactly one downstream. Current fine grained lineage implementation is downstream derived and "
+ "patches are keyed on the root of this derivation.");
}
Float confidenceScore =
nodeClone.has(FINE_GRAINED_CONFIDENCE_SCORE)
? nodeClone.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue()
@ -145,69 +157,73 @@ public class UpstreamLineageTemplate extends CompoundKeyTemplate<UpstreamLineage
nodeClone.has(FINE_GRAINED_UPSTREAMS)
? (ArrayNode) nodeClone.get(FINE_GRAINED_UPSTREAMS)
: null;
ArrayNode downstreams =
nodeClone.has(FINE_GRAINED_DOWNSTREAMS)
? (ArrayNode) nodeClone.get(FINE_GRAINED_DOWNSTREAMS)
: null;
// Handle upstreams
String queryId =
nodeClone.has(FINE_GRAINED_QUERY_ID)
? nodeClone.get(FINE_GRAINED_QUERY_ID).asText()
: DEFAULT_QUERY_ID;
if (upstreamType == null) {
// Determine default type
Urn upstreamUrn =
upstreams != null ? UrnUtils.getUrn(upstreams.get(0).asText()) : null;
if (upstreamUrn != null
&& SCHEMA_FIELD_ENTITY_NAME.equals(upstreamUrn.getEntityType())) {
upstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE;
} else {
&& DATASET_ENTITY_NAME.equals(upstreamUrn.getEntityType())) {
upstreamType = FINE_GRAINED_LINEAGE_DATASET_TYPE;
} else {
upstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE;
}
}
if (!transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE)) {
transformationOperationNode.set(FINE_GRAINED_UPSTREAM_TYPE, instance.objectNode());
}
ObjectNode upstreamTypeNode =
(ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE);
if (!upstreamTypeNode.has(upstreamType)) {
upstreamTypeNode.set(upstreamType, instance.objectNode());
}
if (upstreams != null) {
addUrnsToSubType(upstreamTypeNode, upstreams, upstreamType, confidenceScore);
}
// Handle downstreams
if (downstreamType == null) {
// Determine default type
if (downstreams != null && downstreams.size() > 1) {
downstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE;
} else {
downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE;
}
// Always use FIELD type, only support patches for single field downstream
downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE;
}
if (!transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) {
transformationOperationNode.set(
FINE_GRAINED_DOWNSTREAM_TYPE, instance.objectNode());
String downstreamRoot = downstreams.get(0).asText();
if (!transformationOperationNode.has(downstreamRoot)) {
transformationOperationNode.set(downstreamRoot, instance.objectNode());
}
ObjectNode downstreamTypeNode =
(ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE);
if (!downstreamTypeNode.has(downstreamType)) {
downstreamTypeNode.set(downstreamType, instance.objectNode());
ObjectNode downstreamRootNode =
(ObjectNode) transformationOperationNode.get(downstreamRoot);
if (!downstreamRootNode.has(queryId)) {
downstreamRootNode.set(queryId, instance.objectNode());
}
if (downstreams != null) {
addUrnsToSubType(downstreamTypeNode, downstreams, downstreamType, confidenceScore);
ObjectNode queryNode = (ObjectNode) downstreamRootNode.get(queryId);
if (upstreams != null) {
addUrnsToParent(
queryNode, upstreams, confidenceScore, upstreamType, downstreamType);
}
});
return mapNode;
}
private void addUrnsToSubType(
JsonNode superType, ArrayNode urnsList, String subType, Float confidenceScore) {
ObjectNode upstreamSubTypeNode = (ObjectNode) superType.get(subType);
private void addUrnsToParent(
JsonNode parentNode,
ArrayNode urnsList,
Float confidenceScore,
String upstreamType,
String downstreamType) {
// Will overwrite repeat urns with different confidence scores with the most recently seen
upstreamSubTypeNode.setAll(
Streams.stream(urnsList.elements())
.map(JsonNode::asText)
.distinct()
.collect(Collectors.toMap(urn -> urn, urn -> instance.numberNode(confidenceScore))));
((ObjectNode) parentNode)
.setAll(
Streams.stream(urnsList.elements())
.map(JsonNode::asText)
.distinct()
.collect(
Collectors.toMap(
urn -> urn,
urn ->
mapToLineageValueNode(confidenceScore, upstreamType, downstreamType))));
}
private JsonNode mapToLineageValueNode(
Float confidenceScore, String upstreamType, String downstreamType) {
ObjectNode objectNode = instance.objectNode();
objectNode.set(FINE_GRAINED_CONFIDENCE_SCORE, instance.numberNode(confidenceScore));
objectNode.set(FINE_GRAINED_UPSTREAM_TYPE, instance.textNode(upstreamType));
objectNode.set(FINE_GRAINED_DOWNSTREAM_TYPE, instance.textNode(downstreamType));
return objectNode;
}
/**
@ -225,7 +241,7 @@ public class UpstreamLineageTemplate extends CompoundKeyTemplate<UpstreamLineage
return (ArrayNode) transformedFineGrainedLineages;
}
ObjectNode mapNode = (ObjectNode) transformedFineGrainedLineages;
ArrayNode arrayNode = instance.arrayNode();
ArrayNode fineGrainedLineages = instance.arrayNode();
mapNode
.fieldNames()
@ -233,83 +249,95 @@ public class UpstreamLineageTemplate extends CompoundKeyTemplate<UpstreamLineage
transformationOperation -> {
final ObjectNode transformationOperationNode =
(ObjectNode) mapNode.get(transformationOperation);
final ObjectNode upstreamType =
transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE)
? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE)
: instance.objectNode();
final ObjectNode downstreamType =
transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)
? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE)
: instance.objectNode();
// Handle upstreams
if (!upstreamType.isEmpty()) {
populateTypeNode(
upstreamType,
transformationOperation,
FINE_GRAINED_UPSTREAM_TYPE,
FINE_GRAINED_UPSTREAMS,
FINE_GRAINED_DOWNSTREAM_TYPE,
arrayNode);
}
// Handle downstreams
if (!downstreamType.isEmpty()) {
populateTypeNode(
downstreamType,
transformationOperation,
FINE_GRAINED_DOWNSTREAM_TYPE,
FINE_GRAINED_DOWNSTREAMS,
FINE_GRAINED_UPSTREAM_TYPE,
arrayNode);
}
transformationOperationNode
.fieldNames()
.forEachRemaining(
downstreamName -> {
final ObjectNode downstreamNode =
(ObjectNode) transformationOperationNode.get(downstreamName);
downstreamNode
.fieldNames()
.forEachRemaining(
queryId ->
buildFineGrainedLineage(
downstreamName,
downstreamNode,
queryId,
transformationOperation,
fineGrainedLineages));
});
});
return arrayNode;
return fineGrainedLineages;
}
private void populateTypeNode(
JsonNode typeNode,
String transformationOperation,
String typeName,
String arrayTypeName,
String defaultTypeName,
ArrayNode arrayNode) {
typeNode
private void buildFineGrainedLineage(
final String downstreamName,
final ObjectNode downstreamNode,
final String queryId,
final String transformationOperation,
final ArrayNode fineGrainedLineages) {
final ObjectNode fineGrainedLineage = instance.objectNode();
final ObjectNode queryNode = (ObjectNode) downstreamNode.get(queryId);
if (queryNode.isEmpty()) {
// Short circuit if no upstreams left
return;
}
ArrayNode downstream = instance.arrayNode();
downstream.add(instance.textNode(downstreamName));
// Set defaults, if found in sub nodes override, for confidenceScore take lowest
AtomicReference<Float> minimumConfidenceScore = new AtomicReference<>(DEFAULT_CONFIDENCE_SCORE);
AtomicReference<String> upstreamType =
new AtomicReference<>(FINE_GRAINED_LINEAGE_FIELD_SET_TYPE);
AtomicReference<String> downstreamType = new AtomicReference<>(FINE_GRAINED_LINEAGE_FIELD_TYPE);
ArrayNode upstreams = instance.arrayNode();
queryNode
.fieldNames()
.forEachRemaining(
subTypeName -> {
ObjectNode subType = (ObjectNode) typeNode.get(subTypeName);
if (!subType.isEmpty()) {
ObjectNode fineGrainedLineage = instance.objectNode();
AtomicReference<Float> minimumConfidenceScore = new AtomicReference<>(1.0f);
upstream ->
processUpstream(
queryNode,
upstream,
minimumConfidenceScore,
upstreamType,
downstreamType,
upstreams));
fineGrainedLineage.set(FINE_GRAINED_DOWNSTREAMS, downstream);
fineGrainedLineage.set(FINE_GRAINED_UPSTREAMS, upstreams);
if (StringUtils.isNotBlank(queryId) && !DEFAULT_QUERY_ID.equals(queryId)) {
fineGrainedLineage.set(FINE_GRAINED_QUERY_ID, instance.textNode(queryId));
}
fineGrainedLineage.set(FINE_GRAINED_UPSTREAM_TYPE, instance.textNode(upstreamType.get()));
fineGrainedLineage.set(FINE_GRAINED_DOWNSTREAM_TYPE, instance.textNode(downstreamType.get()));
fineGrainedLineage.set(
FINE_GRAINED_CONFIDENCE_SCORE, instance.numberNode(minimumConfidenceScore.get()));
fineGrainedLineage.set(
FINE_GRAINED_TRANSFORMATION_OPERATION, instance.textNode(transformationOperation));
fineGrainedLineages.add(fineGrainedLineage);
}
fineGrainedLineage.put(typeName, subTypeName);
fineGrainedLineage.put(
FINE_GRAINED_TRANSFORMATION_OPERATION, transformationOperation);
// Array to actually be filled out
fineGrainedLineage.set(arrayTypeName, instance.arrayNode());
// Added to pass model validation, because we have no way of appropriately pairing
// upstreams and downstreams
// within fine grained lineages consistently due to being able to have multiple
// downstream types paired with a single
// transform operation, we just set a default type because it's a required property
fineGrainedLineage.put(defaultTypeName, FINE_GRAINED_LINEAGE_FIELD_SET_TYPE);
subType
.fieldNames()
.forEachRemaining(
subTypeKey -> {
((ArrayNode) fineGrainedLineage.get(arrayTypeName)).add(subTypeKey);
Float scoreValue = subType.get(subTypeKey).floatValue();
if (scoreValue <= minimumConfidenceScore.get()) {
minimumConfidenceScore.set(scoreValue);
fineGrainedLineage.set(
FINE_GRAINED_CONFIDENCE_SCORE,
instance.numberNode(minimumConfidenceScore.get()));
}
});
arrayNode.add(fineGrainedLineage);
}
});
private void processUpstream(
final ObjectNode queryNode,
final String upstream,
final AtomicReference<Float> minimumConfidenceScore,
final AtomicReference<String> upstreamType,
final AtomicReference<String> downstreamType,
final ArrayNode upstreams) {
final ObjectNode upstreamNode = (ObjectNode) queryNode.get(upstream);
if (upstreamNode.has(FINE_GRAINED_CONFIDENCE_SCORE)) {
Float scoreValue = upstreamNode.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue();
if (scoreValue <= minimumConfidenceScore.get()) {
minimumConfidenceScore.set(scoreValue);
}
}
// Set types to last encountered, should never change, but this at least tries to support
// other types being specified.
if (upstreamNode.has(FINE_GRAINED_UPSTREAM_TYPE)) {
upstreamType.set(upstreamNode.get(FINE_GRAINED_UPSTREAM_TYPE).asText());
}
if (upstreamNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) {
downstreamType.set(upstreamNode.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText());
}
upstreams.add(instance.textNode(upstream));
}
}

View File

@ -3,6 +3,7 @@ package com.linkedin.metadata.aspect.patch.template;
import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*;
import com.fasterxml.jackson.databind.node.NumericNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.fge.jackson.jsonpointer.JsonPointer;
import com.github.fge.jsonpatch.AddOperation;
import com.github.fge.jsonpatch.JsonPatch;
@ -28,12 +29,14 @@ public class UpstreamLineageTemplateTest {
UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate();
UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault();
List<JsonPatchOperation> patchOperations = new ArrayList<>();
ObjectNode fineGrainedLineageNode = instance.objectNode();
NumericNode upstreamConfidenceScore = instance.numberNode(1.0f);
fineGrainedLineageNode.set("confidenceScore", upstreamConfidenceScore);
JsonPatchOperation operation =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"),
upstreamConfidenceScore);
"/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)//urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)"),
fineGrainedLineageNode);
patchOperations.add(operation);
JsonPatch jsonPatch = new JsonPatch(patchOperations);
@ -48,24 +51,42 @@ public class UpstreamLineageTemplateTest {
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)");
urns.add(urn1);
fineGrainedLineage.setUpstreams(urns);
UrnArray upstreams = new UrnArray();
Urn upstreamUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)");
upstreams.add(upstreamUrn);
fineGrainedLineage.setDownstreams(urns);
fineGrainedLineage.setUpstreams(upstreams);
fineGrainedLineage.setTransformOperation("CREATE");
fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD);
Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage);
// Test non-overwrite upstreams and correct confidence score
// Test non-overwrite upstreams and correct confidence score and types w/ overwrite
ObjectNode finegrainedLineageNode2 = instance.objectNode();
finegrainedLineageNode2.set(
"upstreamType", instance.textNode(FineGrainedLineageUpstreamType.FIELD_SET.name()));
finegrainedLineageNode2.set("confidenceScore", upstreamConfidenceScore);
finegrainedLineageNode2.set(
"downstreamType", instance.textNode(FineGrainedLineageDownstreamType.FIELD.name()));
JsonPatchOperation operation2 =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
upstreamConfidenceScore);
"/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:someQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
finegrainedLineageNode2);
NumericNode upstreamConfidenceScore2 = instance.numberNode(0.1f);
ObjectNode finegrainedLineageNode3 = instance.objectNode();
finegrainedLineageNode3.set(
"upstreamType", instance.textNode(FineGrainedLineageUpstreamType.DATASET.name()));
finegrainedLineageNode3.set("confidenceScore", upstreamConfidenceScore2);
finegrainedLineageNode3.set(
"downstreamType", instance.textNode(FineGrainedLineageDownstreamType.FIELD_SET.name()));
JsonPatchOperation operation3 =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
upstreamConfidenceScore2);
"/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:someQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
finegrainedLineageNode3);
List<JsonPatchOperation> patchOperations2 = new ArrayList<>();
patchOperations2.add(operation2);
patchOperations2.add(operation3);
@ -79,20 +100,32 @@ public class UpstreamLineageTemplateTest {
Urn urn2 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)");
urns2.add(urn1);
urns2.add(urn2);
Urn downstreamUrn2 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)");
UrnArray downstreams2 = new UrnArray();
downstreams2.add(downstreamUrn2);
fineGrainedLineage2.setUpstreams(urns2);
fineGrainedLineage2.setDownstreams(downstreams2);
fineGrainedLineage2.setTransformOperation("CREATE");
fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.DATASET);
fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2);
fineGrainedLineage2.setQuery(UrnUtils.getUrn("urn:li:query:someQuery"));
Assert.assertEquals(result2.getFineGrainedLineages().get(1), fineGrainedLineage2);
// Check different upstream types
// Check different queries
ObjectNode finegrainedLineageNode4 = instance.objectNode();
finegrainedLineageNode4.set(
"upstreamType", instance.textNode(FineGrainedLineageUpstreamType.FIELD_SET.name()));
finegrainedLineageNode4.set("confidenceScore", upstreamConfidenceScore);
finegrainedLineageNode4.set(
"downstreamType", instance.textNode(FineGrainedLineageDownstreamType.FIELD.name()));
JsonPatchOperation operation4 =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"),
upstreamConfidenceScore);
"/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
finegrainedLineageNode4);
List<JsonPatchOperation> patchOperations3 = new ArrayList<>();
patchOperations3.add(operation4);
JsonPatch jsonPatch3 = new JsonPatch(patchOperations3);
@ -103,163 +136,36 @@ public class UpstreamLineageTemplateTest {
FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3);
UrnArray urns3 = new UrnArray();
Urn urn3 =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)");
urns3.add(urn3);
fineGrainedLineage3.setUpstreams(urns3);
fineGrainedLineage3.setTransformOperation("CREATE");
fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.DATASET);
fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
// Splits into two for different types
Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3);
// Check different transform types
JsonPatchOperation operation5 =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"),
upstreamConfidenceScore);
List<JsonPatchOperation> patchOperations4 = new ArrayList<>();
patchOperations4.add(operation5);
JsonPatch jsonPatch4 = new JsonPatch(patchOperations4);
UpstreamLineage result4 = upstreamLineageTemplate.applyPatch(result3, jsonPatch4);
// Hack because Jackson parses values to doubles instead of floats
DataMap dataMap4 = new DataMap();
dataMap4.put("confidenceScore", 1.0);
FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4);
UrnArray urns4 = new UrnArray();
Urn urn4 =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)");
urns4.add(urn4);
fineGrainedLineage4.setUpstreams(urns4);
fineGrainedLineage4.setTransformOperation("TRANSFORM");
fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.DATASET);
fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
// New entry in array because of new transformation type
Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4);
// Remove
JsonPatchOperation removeOperation =
new RemoveOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"));
JsonPatchOperation removeOperation2 =
new RemoveOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"));
JsonPatchOperation removeOperation3 =
new RemoveOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"));
JsonPatchOperation removeOperation4 =
new RemoveOperation(
new JsonPointer(
"/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"));
List<JsonPatchOperation> removeOperations = new ArrayList<>();
removeOperations.add(removeOperation);
removeOperations.add(removeOperation2);
removeOperations.add(removeOperation3);
removeOperations.add(removeOperation4);
JsonPatch removePatch = new JsonPatch(removeOperations);
UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch);
Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult);
}
@Test
public void testPatchDownstream() throws Exception {
UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate();
UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault();
List<JsonPatchOperation> patchOperations = new ArrayList<>();
NumericNode downstreamConfidenceScore = instance.numberNode(1.0f);
JsonPatchOperation operation =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"),
downstreamConfidenceScore);
patchOperations.add(operation);
JsonPatch jsonPatch = new JsonPatch(patchOperations);
// Initial population test
UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch);
// Hack because Jackson parses values to doubles instead of floats
DataMap dataMap = new DataMap();
dataMap.put("confidenceScore", 1.0);
FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap);
UrnArray urns = new UrnArray();
Urn urn1 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)");
urns.add(urn1);
fineGrainedLineage.setDownstreams(urns);
fineGrainedLineage.setTransformOperation("CREATE");
fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage);
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)");
urns3.add(urn3);
// Test non-overwrite downstreams and correct confidence score
JsonPatchOperation operation2 =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
downstreamConfidenceScore);
NumericNode downstreamConfidenceScore2 = instance.numberNode(0.1f);
JsonPatchOperation operation3 =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
downstreamConfidenceScore2);
List<JsonPatchOperation> patchOperations2 = new ArrayList<>();
patchOperations2.add(operation2);
patchOperations2.add(operation3);
JsonPatch jsonPatch2 = new JsonPatch(patchOperations2);
UpstreamLineage result2 = upstreamLineageTemplate.applyPatch(result, jsonPatch2);
// Hack because Jackson parses values to doubles instead of floats
DataMap dataMap2 = new DataMap();
dataMap2.put("confidenceScore", 0.1);
FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2);
UrnArray urns2 = new UrnArray();
Urn urn2 =
Urn upstreamUrn3 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)");
urns2.add(urn1);
urns2.add(urn2);
fineGrainedLineage2.setDownstreams(urns2);
fineGrainedLineage2.setTransformOperation("CREATE");
fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2);
// Check different downstream types
JsonPatchOperation operation4 =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"),
downstreamConfidenceScore);
List<JsonPatchOperation> patchOperations3 = new ArrayList<>();
patchOperations3.add(operation4);
JsonPatch jsonPatch3 = new JsonPatch(patchOperations3);
UpstreamLineage result3 = upstreamLineageTemplate.applyPatch(result2, jsonPatch3);
// Hack because Jackson parses values to doubles instead of floats
DataMap dataMap3 = new DataMap();
dataMap3.put("confidenceScore", 1.0);
FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3);
UrnArray urns3 = new UrnArray();
Urn urn3 =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)");
urns3.add(urn3);
UrnArray upstreamUrns3 = new UrnArray();
upstreamUrns3.add(upstreamUrn3);
fineGrainedLineage3.setDownstreams(urns3);
fineGrainedLineage3.setUpstreams(upstreamUrns3);
fineGrainedLineage3.setTransformOperation("CREATE");
fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD);
fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD);
fineGrainedLineage3.setQuery(UrnUtils.getUrn("urn:li:query:anotherQuery"));
// Splits into two for different types
Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3);
Assert.assertEquals(result3.getFineGrainedLineages().get(2), fineGrainedLineage3);
// Check different transform types
ObjectNode finegrainedLineageNode5 = instance.objectNode();
finegrainedLineageNode5.set(
"upstreamType", instance.textNode(FineGrainedLineageUpstreamType.FIELD_SET.name()));
finegrainedLineageNode5.set("confidenceScore", upstreamConfidenceScore);
finegrainedLineageNode5.set(
"downstreamType", instance.textNode(FineGrainedLineageDownstreamType.FIELD.name()));
JsonPatchOperation operation5 =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"),
downstreamConfidenceScore);
"/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"),
finegrainedLineageNode5);
List<JsonPatchOperation> patchOperations4 = new ArrayList<>();
patchOperations4.add(operation5);
JsonPatch jsonPatch4 = new JsonPatch(patchOperations4);
@ -268,34 +174,32 @@ public class UpstreamLineageTemplateTest {
DataMap dataMap4 = new DataMap();
dataMap4.put("confidenceScore", 1.0);
FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4);
UrnArray urns4 = new UrnArray();
Urn urn4 =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)");
urns4.add(urn4);
fineGrainedLineage4.setDownstreams(urns4);
fineGrainedLineage4.setUpstreams(upstreamUrns3);
fineGrainedLineage4.setDownstreams(urns3);
fineGrainedLineage4.setTransformOperation("TRANSFORM");
fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD);
fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD);
fineGrainedLineage4.setQuery(UrnUtils.getUrn("urn:li:query:anotherQuery"));
// New entry in array because of new transformation type
Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4);
Assert.assertEquals(result4.getFineGrainedLineages().get(3), fineGrainedLineage4);
// Remove
JsonPatchOperation removeOperation =
new RemoveOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"));
"/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)"));
JsonPatchOperation removeOperation2 =
new RemoveOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"));
"/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:someQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"));
JsonPatchOperation removeOperation3 =
new RemoveOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"));
"/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"));
JsonPatchOperation removeOperation4 =
new RemoveOperation(
new JsonPointer(
"/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"));
"/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"));
List<JsonPatchOperation> removeOperations = new ArrayList<>();
removeOperations.add(removeOperation);
@ -304,56 +208,6 @@ public class UpstreamLineageTemplateTest {
removeOperations.add(removeOperation4);
JsonPatch removePatch = new JsonPatch(removeOperations);
UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch);
Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult);
}
@Test
public void testUpAndDown() throws Exception {
UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate();
UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault();
List<JsonPatchOperation> patchOperations = new ArrayList<>();
NumericNode downstreamConfidenceScore = instance.numberNode(1.0f);
JsonPatchOperation operation =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"),
downstreamConfidenceScore);
patchOperations.add(operation);
NumericNode upstreamConfidenceScore = instance.numberNode(1.0f);
JsonPatchOperation operation2 =
new AddOperation(
new JsonPointer(
"/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"),
upstreamConfidenceScore);
patchOperations.add(operation2);
JsonPatch jsonPatch = new JsonPatch(patchOperations);
// Initial population test
UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch);
// Hack because Jackson parses values to doubles instead of floats
DataMap dataMap = new DataMap();
dataMap.put("confidenceScore", 1.0);
FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap);
UrnArray urns = new UrnArray();
Urn urn1 =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)");
urns.add(urn1);
fineGrainedLineage.setTransformOperation("CREATE");
fineGrainedLineage.setUpstreams(urns);
fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
fineGrainedLineage.setDownstreams(urns);
// Hack because Jackson parses values to doubles instead of floats
DataMap dataMap2 = new DataMap();
dataMap2.put("confidenceScore", 1.0);
FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2);
fineGrainedLineage2.setTransformOperation("CREATE");
fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET);
fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
fineGrainedLineage2.setDownstreams(urns);
Assert.assertEquals(result.getFineGrainedLineages().get(1), fineGrainedLineage2);
Assert.assertEquals(finalResult, upstreamLineageTemplate.getDefault());
}
}

View File

@ -58,8 +58,8 @@ class MetadataPatchProposal:
self.patches = defaultdict(list)
# Json Patch quoting based on https://jsonpatch.com/#json-pointer
@staticmethod
def quote(value: str) -> str:
@classmethod
def quote(cls, value: str) -> str:
return value.replace("~", "~0").replace("/", "~1")
def _add_patch(self, aspect_name: str, op: str, path: str, value: Any) -> None:

View File

@ -6,8 +6,6 @@ from datahub.metadata.schema_classes import (
EditableDatasetPropertiesClass as EditableDatasetProperties,
EditableSchemaMetadataClass as EditableSchemaMetadata,
FineGrainedLineageClass as FineGrainedLineage,
FineGrainedLineageDownstreamTypeClass as FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamTypeClass as FineGrainedLineageUpstreamType,
GlobalTagsClass as GlobalTags,
GlossaryTermAssociationClass as Term,
GlossaryTermsClass as GlossaryTerms,
@ -126,7 +124,7 @@ class DatasetPatchBuilder(MetadataPatchProposal):
self._add_patch(
UpstreamLineage.ASPECT_NAME,
"add",
path=f"/upstreams/{MetadataPatchProposal.quote(upstream.dataset)}",
path=f"/upstreams/{self.quote(upstream.dataset)}",
value=upstream,
)
return self
@ -153,26 +151,17 @@ class DatasetPatchBuilder(MetadataPatchProposal):
) -> "DatasetPatchBuilder":
(
transform_op,
upstream_type,
downstream_type,
downstream_urn,
query_id,
) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage)
for upstream_urn in fine_grained_lineage.upstreams or []:
self._add_patch(
UpstreamLineage.ASPECT_NAME,
"add",
path=DatasetPatchBuilder.quote_fine_grained_upstream_path(
transform_op, upstream_type, upstream_urn
path=DatasetPatchBuilder.quote_fine_grained_path(
transform_op, downstream_urn, query_id, upstream_urn
),
value=fine_grained_lineage.confidenceScore,
)
for downstream_urn in fine_grained_lineage.downstreams or []:
self._add_patch(
UpstreamLineage.ASPECT_NAME,
"add",
path=DatasetPatchBuilder.quote_fine_grained_downstream_path(
transform_op, downstream_type, downstream_urn
),
value=fine_grained_lineage.confidenceScore,
value={"confidenceScore": fine_grained_lineage.confidenceScore},
)
return self
@ -180,35 +169,21 @@ class DatasetPatchBuilder(MetadataPatchProposal):
def get_fine_grained_key(
fine_grained_lineage: FineGrainedLineage,
) -> Tuple[str, str, str]:
downstreams = fine_grained_lineage.downstreams or []
if len(downstreams) != 1:
raise TypeError("Cannot patch with more or less than one downstream.")
transform_op = fine_grained_lineage.transformOperation or "NONE"
upstream_type = (
fine_grained_lineage.upstreamType
if isinstance(fine_grained_lineage.upstreamType, str)
else FineGrainedLineageUpstreamType.FIELD_SET
)
downstream_type = (
fine_grained_lineage.downstreamType
if isinstance(fine_grained_lineage.downstreamType, str)
else FineGrainedLineageDownstreamType.FIELD_SET
)
return transform_op, upstream_type, downstream_type
downstream_urn = downstreams[0]
query_id = fine_grained_lineage.query or "NONE"
return transform_op, downstream_urn, query_id
@staticmethod
def quote_fine_grained_downstream_path(
transform_op: str, downstream_type: str, downstream_urn: str
@classmethod
def quote_fine_grained_path(
cls, transform_op: str, downstream_urn: str, query_id: str, upstream_urn: str
) -> str:
return (
f"/fineGrainedLineages/{MetadataPatchProposal.quote(transform_op)}/downstreamType/"
f"{MetadataPatchProposal.quote(downstream_type)}/{MetadataPatchProposal.quote(downstream_urn)}"
)
@staticmethod
def quote_fine_grained_upstream_path(
transform_op: str, upstream_type: str, upstream_urn: str
) -> str:
return (
f"/fineGrainedLineages/{MetadataPatchProposal.quote(transform_op)}/upstreamType/"
f"{MetadataPatchProposal.quote(upstream_type)}/{MetadataPatchProposal.quote(upstream_urn)}"
f"/fineGrainedLineages/{cls.quote(transform_op)}/"
f"{cls.quote(downstream_urn)}/{cls.quote(query_id)}/{cls.quote(upstream_urn)}"
)
def remove_fine_grained_upstream_lineage(
@ -216,24 +191,15 @@ class DatasetPatchBuilder(MetadataPatchProposal):
) -> "DatasetPatchBuilder":
(
transform_op,
upstream_type,
downstream_type,
downstream_urn,
query_id,
) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage)
for upstream_urn in fine_grained_lineage.upstreams or []:
self._add_patch(
UpstreamLineage.ASPECT_NAME,
"remove",
path=DatasetPatchBuilder.quote_fine_grained_upstream_path(
transform_op, upstream_type, upstream_urn
),
value={},
)
for downstream_urn in fine_grained_lineage.downstreams or []:
self._add_patch(
UpstreamLineage.ASPECT_NAME,
"remove",
path=DatasetPatchBuilder.quote_fine_grained_downstream_path(
transform_op, downstream_type, downstream_urn
path=DatasetPatchBuilder.quote_fine_grained_path(
transform_op, downstream_urn, query_id, upstream_urn
),
value={},
)

View File

@ -1,105 +1,109 @@
[
{
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "datasetProperties",
"aspect": {
"json": [
{
"op": "add",
"path": "/description",
"value": "test description"
},
{
"op": "add",
"path": "/customProperties/test_key_1",
"value": "test_value_1"
},
{
"op": "add",
"path": "/customProperties/test_key_2",
"value": "test_value_2"
}
]
"json": [
{
"op": "add",
"path": "/description",
"value": "test description"
},
{
"op": "add",
"path": "/customProperties/test_key_1",
"value": "test_value_1"
},
{
"op": "add",
"path": "/customProperties/test_key_2",
"value": "test_value_2"
}
]
}
},
{
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "globalTags",
"aspect": {
"json": [
{
"op": "add",
"path": "/tags/urn:li:tag:test_tag",
"value": {
"tag": "urn:li:tag:test_tag"
}
}
]
"json": [
{
"op": "add",
"path": "/tags/urn:li:tag:test_tag",
"value": {
"tag": "urn:li:tag:test_tag"
}
}
]
}
},
{
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "upstreamLineage",
"aspect": {
"json": [
{
"op": "add",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)",
"value": {
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
"json": [
{
"op": "add",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)",
"value": {
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)",
"type": "TRANSFORMED"
}
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)",
"type": "TRANSFORMED"
}
},
{
"op": "add",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket~1my-folder~1my-file.txt,PROD)",
"value": {
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
{
"op": "add",
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket~1my-folder~1my-file.txt,PROD)",
"value": {
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my-folder/my-file.txt,PROD)",
"type": "TRANSFORMED"
}
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my-folder/my-file.txt,PROD)",
"type": "TRANSFORMED"
}
},
{
"op": "add",
"path": "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)",
"value": 1.0
},
{
"op": "add",
"path": "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket~1my-folder~1my-file.txt,PROD)",
"value": 1.0
}
]
{
"op": "add",
"path": "/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),foo)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD),bar)",
"value": {
"confidenceScore": 1.0
}
},
{
"op": "add",
"path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),foo)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket~1my-folder~1my-file.txt,PROD),foo)",
"value": {
"confidenceScore": 1.0
}
}
]
}
},
{
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "editableSchemaMetadata",
"aspect": {
"json": [
{
"op": "add",
"path": "/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1",
"value": {
"tag": "urn:li:tag:tag1"
}
}
]
"json": [
{
"op": "add",
"path": "/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1",
"value": {
"tag": "urn:li:tag:tag1"
}
}
]
}
}
}
]

View File

@ -1,4 +1,3 @@
import json
import pathlib
import pytest
@ -7,6 +6,7 @@ from datahub.emitter.mce_builder import (
make_chart_urn,
make_dashboard_urn,
make_dataset_urn,
make_schema_field_urn,
make_tag_urn,
)
from datahub.ingestion.sink.file import write_metadata_file
@ -23,6 +23,7 @@ from datahub.metadata.schema_classes import (
from datahub.specific.chart import ChartPatchBuilder
from datahub.specific.dashboard import DashboardPatchBuilder
from datahub.specific.dataset import DatasetPatchBuilder
from tests.test_helpers import mce_helpers
def test_basic_dataset_patch_builder():
@ -73,13 +74,28 @@ def test_complex_dataset_patch(
)
.add_fine_grained_upstream_lineage(
fine_grained_lineage=FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET,
upstreams=[
make_dataset_urn(
platform="hive", name="fct_users_created_upstream", env="PROD"
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
downstreams=[
make_schema_field_urn(
make_dataset_urn(
platform="hive",
name="fct_users_created",
env="PROD",
),
field_path="foo",
)
],
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD_SET,
upstreams=[
make_schema_field_urn(
make_dataset_urn(
platform="hive",
name="fct_users_created_upstream",
env="PROD",
),
field_path="bar",
)
],
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
transformOperation="TRANSFORM",
confidenceScore=1.0,
)
@ -88,15 +104,26 @@ def test_complex_dataset_patch(
fine_grained_lineage=FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET,
upstreams=[
make_dataset_urn(
platform="s3",
name="my-bucket/my-folder/my-file.txt",
env="PROD",
make_schema_field_urn(
make_dataset_urn(
platform="s3",
name="my-bucket/my-folder/my-file.txt",
env="PROD",
),
field_path="foo",
)
],
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD_SET,
transformOperation="TRANSFORM",
confidenceScore=1.0,
downstreams=[
make_schema_field_urn(
make_dataset_urn(
platform="hive",
name="fct_users_created",
env="PROD",
),
field_path="foo",
)
],
)
)
)
@ -105,10 +132,10 @@ def test_complex_dataset_patch(
out_path = tmp_path / "patch.json"
write_metadata_file(out_path, patcher.build())
assert json.loads(out_path.read_text()) == json.loads(
(
pytestconfig.rootpath / "tests/unit/patch/complex_dataset_patch.json"
).read_text()
mce_helpers.check_golden_file(
pytestconfig,
out_path,
pytestconfig.rootpath / "tests/unit/patch/complex_dataset_patch.json",
)

View File

@ -56,18 +56,20 @@ public class PatchTest {
DatasetUrn upstreamUrn =
DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)");
Urn schemaFieldUrn =
Urn upstreamSchemaFieldUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)");
Urn downstreamSchemaFieldUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), bar)");
MetadataChangeProposal upstreamPatch =
new UpstreamLineagePatchBuilder()
.urn(
UrnUtils.getUrn(
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"))
.addUpstream(upstreamUrn, DatasetLineageType.TRANSFORMED)
.addFineGrainedUpstreamDataset(upstreamUrn, null, "TRANSFORM")
.addFineGrainedUpstreamField(schemaFieldUrn, null, "TRANSFORM", null)
.addFineGrainedDownstreamField(schemaFieldUrn, null, "TRANSFORM", null)
.addFineGrainedUpstreamField(
upstreamSchemaFieldUrn, null, "TRANSFORM", downstreamSchemaFieldUrn, null)
.build();
Future<MetadataWriteResponse> response = restEmitter.emit(upstreamPatch);
@ -83,12 +85,12 @@ public class PatchTest {
public void testLocalUpstreamRemove() {
RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().build());
try {
DatasetUrn upstreamUrn =
DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)");
Urn schemaFieldUrn =
Urn upstreamSchemaFieldUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)");
Urn downstreamSchemaFieldUrn =
UrnUtils.getUrn(
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), bar)");
MetadataChangeProposal upstreamPatch =
new UpstreamLineagePatchBuilder()
.urn(
@ -97,9 +99,8 @@ public class PatchTest {
.removeUpstream(
DatasetUrn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"))
.removeFineGrainedUpstreamDataset(upstreamUrn, "TRANSFORM")
.removeFineGrainedUpstreamField(schemaFieldUrn, "TRANSFORM", null)
.removeFineGrainedDownstreamField(schemaFieldUrn, "TRANSFORM", null)
.removeFineGrainedUpstreamField(
upstreamSchemaFieldUrn, "TRANSFORM", downstreamSchemaFieldUrn, null)
.build();
Future<MetadataWriteResponse> response = restEmitter.emit(upstreamPatch);