From ddf0b7d2cddefbe3a64c835687295d79db3a94ed Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Tue, 27 Feb 2024 12:00:44 -0600 Subject: [PATCH] feat(patch): refactor cll patch (#9922) Co-authored-by: Harshal Sheth --- .../builder/UpstreamLineagePatchBuilder.java | 188 ++--------- .../dataset/UpstreamLineageTemplate.java | 268 +++++++++------- .../template/UpstreamLineageTemplateTest.java | 296 +++++------------- .../src/datahub/emitter/mcp_patch_builder.py | 4 +- .../src/datahub/specific/dataset.py | 76 ++--- .../unit/patch/complex_dataset_patch.json | 158 +++++----- .../tests/unit/patch/test_patch_builder.py | 59 +++- .../java/datahub/client/patch/PatchTest.java | 23 +- 8 files changed, 415 insertions(+), 657 deletions(-) diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java index bfb46d8fc5..a5fd3ac0ba 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java @@ -9,12 +9,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.Urn; import com.linkedin.dataset.DatasetLineageType; -import com.linkedin.dataset.FineGrainedLineageDownstreamType; -import com.linkedin.dataset.FineGrainedLineageUpstreamType; import com.linkedin.metadata.aspect.patch.PatchOperationType; import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.ToString; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutableTriple; @ToString @@ -52,120 +51,52 @@ public class UpstreamLineagePatchBuilder return this; } - /** - * Method for adding an upstream FineGrained Dataset - * - * @param datasetUrn dataset to be set as upstream - * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for - * full confidence - * @param transformationOperation string operation type that describes the transformation - * operation happening in the lineage edge - * @return this builder - */ - public UpstreamLineagePatchBuilder addFineGrainedUpstreamDataset( - @Nonnull DatasetUrn datasetUrn, - @Nullable Float confidenceScore, - @Nonnull String transformationOperation) { - Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); - - pathValues.add( - ImmutableTriple.of( - PatchOperationType.ADD.getValue(), - FINE_GRAINED_PATH_START - + transformationOperation - + "/" - + "upstreamType" - + "/" - + "DATASET" - + "/" - + datasetUrn, - instance.numberNode(finalConfidenceScore))); - return this; - } - /** * Adds a field as a fine grained upstream * - * @param schemaFieldUrn a schema field to be marked as upstream, format: + * @param upstreamSchemaField a schema field to be marked as upstream, format: * urn:li:schemaField(DATASET_URN, COLUMN NAME) * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for * full confidence * @param transformationOperation string operation type that describes the transformation * operation happening in the lineage edge - * @param type the upstream lineage type, either Field or Field Set + * @param downstreamSchemaField the downstream schema field this upstream is derived from, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param queryUrn query urn the relationship is derived from * @return this builder */ public UpstreamLineagePatchBuilder addFineGrainedUpstreamField( - @Nonnull Urn schemaFieldUrn, + @Nonnull Urn upstreamSchemaField, @Nullable Float confidenceScore, @Nonnull String transformationOperation, - @Nullable FineGrainedLineageUpstreamType type) { + @Nonnull Urn downstreamSchemaField, + @Nullable Urn queryUrn) { Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); - String finalType; - if (type == null) { - // Default to set of fields if not explicitly a single field - finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString(); + String finalQueryUrn; + if (queryUrn == null || StringUtils.isBlank(queryUrn.toString())) { + finalQueryUrn = "NONE"; } else { - finalType = type.toString(); + finalQueryUrn = queryUrn.toString(); } + ObjectNode fineGrainedLineageNode = instance.objectNode(); + fineGrainedLineageNode.put("confidenceScore", instance.numberNode(finalConfidenceScore)); pathValues.add( ImmutableTriple.of( PatchOperationType.ADD.getValue(), FINE_GRAINED_PATH_START + transformationOperation + "/" - + "upstreamType" + + downstreamSchemaField + "/" - + finalType + + finalQueryUrn + "/" - + schemaFieldUrn, - instance.numberNode(finalConfidenceScore))); + + upstreamSchemaField, + fineGrainedLineageNode)); return this; } - /** - * Adds a field as a fine grained downstream - * - * @param schemaFieldUrn a schema field to be marked as downstream, format: - * urn:li:schemaField(DATASET_URN, COLUMN NAME) - * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for - * full confidence - * @param transformationOperation string operation type that describes the transformation - * operation happening in the lineage edge - * @param type the downstream lineage type, either Field or Field Set - * @return this builder - */ - public UpstreamLineagePatchBuilder addFineGrainedDownstreamField( - @Nonnull Urn schemaFieldUrn, - @Nullable Float confidenceScore, - @Nonnull String transformationOperation, - @Nullable FineGrainedLineageDownstreamType type) { - Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); - String finalType; - if (type == null) { - // Default to set of fields if not explicitly a single field - finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString(); - } else { - finalType = type.toString(); - } - - pathValues.add( - ImmutableTriple.of( - PatchOperationType.ADD.getValue(), - FINE_GRAINED_PATH_START - + transformationOperation - + "/" - + "downstreamType" - + "/" - + finalType - + "/" - + schemaFieldUrn, - instance.numberNode(finalConfidenceScore))); - return this; - } - private Float getConfidenceScoreOrDefault(@Nullable Float confidenceScore) { float finalConfidenceScore; if (confidenceScore != null && confidenceScore > 0 && confidenceScore <= 1.0f) { @@ -180,96 +111,43 @@ public class UpstreamLineagePatchBuilder /** * Removes a field as a fine grained upstream * - * @param schemaFieldUrn a schema field to be marked as upstream, format: + * @param upstreamSchemaFieldUrn a schema field to be marked as upstream, format: * urn:li:schemaField(DATASET_URN, COLUMN NAME) * @param transformationOperation string operation type that describes the transformation * operation happening in the lineage edge - * @param type the upstream lineage type, either Field or Field Set + * @param downstreamSchemaField the downstream schema field this upstream is derived from, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param queryUrn query urn the relationship is derived from * @return this builder */ public UpstreamLineagePatchBuilder removeFineGrainedUpstreamField( - @Nonnull Urn schemaFieldUrn, + @Nonnull Urn upstreamSchemaFieldUrn, @Nonnull String transformationOperation, - @Nullable FineGrainedLineageUpstreamType type) { - String finalType; - if (type == null) { - // Default to set of fields if not explicitly a single field - finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString(); + @Nonnull Urn downstreamSchemaField, + @Nullable Urn queryUrn) { + + String finalQueryUrn; + if (queryUrn == null || StringUtils.isBlank(queryUrn.toString())) { + finalQueryUrn = "NONE"; } else { - finalType = type.toString(); + finalQueryUrn = queryUrn.toString(); } - pathValues.add( ImmutableTriple.of( PatchOperationType.REMOVE.getValue(), FINE_GRAINED_PATH_START + transformationOperation + "/" - + "upstreamType" + + downstreamSchemaField + "/" - + finalType + + finalQueryUrn + "/" - + schemaFieldUrn, + + upstreamSchemaFieldUrn, null)); return this; } - public UpstreamLineagePatchBuilder removeFineGrainedUpstreamDataset( - @Nonnull DatasetUrn datasetUrn, @Nonnull String transformationOperation) { - - pathValues.add( - ImmutableTriple.of( - PatchOperationType.REMOVE.getValue(), - FINE_GRAINED_PATH_START - + transformationOperation - + "/" - + "upstreamType" - + "/" - + "DATASET" - + "/" - + datasetUrn, - null)); - return this; - } - - /** - * Adds a field as a fine grained downstream - * - * @param schemaFieldUrn a schema field to be marked as downstream, format: - * urn:li:schemaField(DATASET_URN, COLUMN NAME) - * @param transformationOperation string operation type that describes the transformation - * operation happening in the lineage edge - * @param type the downstream lineage type, either Field or Field Set - * @return this builder - */ - public UpstreamLineagePatchBuilder removeFineGrainedDownstreamField( - @Nonnull Urn schemaFieldUrn, - @Nonnull String transformationOperation, - @Nullable FineGrainedLineageDownstreamType type) { - String finalType; - if (type == null) { - // Default to set of fields if not explicitly a single field - finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString(); - } else { - finalType = type.toString(); - } - - pathValues.add( - ImmutableTriple.of( - PatchOperationType.REMOVE.getValue(), - FINE_GRAINED_PATH_START - + transformationOperation - + "/" - + "downstreamType" - + "/" - + finalType - + "/" - + schemaFieldUrn, - null)); - return this; - } - @Override protected String getAspectName() { return UPSTREAM_LINEAGE_ASPECT_NAME; diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/dataset/UpstreamLineageTemplate.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/dataset/UpstreamLineageTemplate.java index 6907181b3f..e5da7551ac 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/dataset/UpstreamLineageTemplate.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/dataset/UpstreamLineageTemplate.java @@ -1,10 +1,7 @@ package com.linkedin.metadata.aspect.patch.template.dataset; import static com.fasterxml.jackson.databind.node.JsonNodeFactory.instance; -import static com.linkedin.metadata.Constants.FINE_GRAINED_LINEAGE_DATASET_TYPE; -import static com.linkedin.metadata.Constants.FINE_GRAINED_LINEAGE_FIELD_SET_TYPE; -import static com.linkedin.metadata.Constants.FINE_GRAINED_LINEAGE_FIELD_TYPE; -import static com.linkedin.metadata.Constants.SCHEMA_FIELD_ENTITY_NAME; +import static com.linkedin.metadata.Constants.*; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -22,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.codehaus.plexus.util.StringUtils; public class UpstreamLineageTemplate extends CompoundKeyTemplate { @@ -35,10 +33,12 @@ public class UpstreamLineageTemplate extends CompoundKeyTemplate 1) { - downstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE; - } else { - downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE; - } + // Always use FIELD type, only support patches for single field downstream + downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE; } - if (!transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) { - transformationOperationNode.set( - FINE_GRAINED_DOWNSTREAM_TYPE, instance.objectNode()); + + String downstreamRoot = downstreams.get(0).asText(); + if (!transformationOperationNode.has(downstreamRoot)) { + transformationOperationNode.set(downstreamRoot, instance.objectNode()); } - ObjectNode downstreamTypeNode = - (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE); - if (!downstreamTypeNode.has(downstreamType)) { - downstreamTypeNode.set(downstreamType, instance.objectNode()); + ObjectNode downstreamRootNode = + (ObjectNode) transformationOperationNode.get(downstreamRoot); + if (!downstreamRootNode.has(queryId)) { + downstreamRootNode.set(queryId, instance.objectNode()); } - if (downstreams != null) { - addUrnsToSubType(downstreamTypeNode, downstreams, downstreamType, confidenceScore); + ObjectNode queryNode = (ObjectNode) downstreamRootNode.get(queryId); + if (upstreams != null) { + addUrnsToParent( + queryNode, upstreams, confidenceScore, upstreamType, downstreamType); } }); return mapNode; } - private void addUrnsToSubType( - JsonNode superType, ArrayNode urnsList, String subType, Float confidenceScore) { - ObjectNode upstreamSubTypeNode = (ObjectNode) superType.get(subType); + private void addUrnsToParent( + JsonNode parentNode, + ArrayNode urnsList, + Float confidenceScore, + String upstreamType, + String downstreamType) { // Will overwrite repeat urns with different confidence scores with the most recently seen - upstreamSubTypeNode.setAll( - Streams.stream(urnsList.elements()) - .map(JsonNode::asText) - .distinct() - .collect(Collectors.toMap(urn -> urn, urn -> instance.numberNode(confidenceScore)))); + ((ObjectNode) parentNode) + .setAll( + Streams.stream(urnsList.elements()) + .map(JsonNode::asText) + .distinct() + .collect( + Collectors.toMap( + urn -> urn, + urn -> + mapToLineageValueNode(confidenceScore, upstreamType, downstreamType)))); + } + + private JsonNode mapToLineageValueNode( + Float confidenceScore, String upstreamType, String downstreamType) { + ObjectNode objectNode = instance.objectNode(); + objectNode.set(FINE_GRAINED_CONFIDENCE_SCORE, instance.numberNode(confidenceScore)); + objectNode.set(FINE_GRAINED_UPSTREAM_TYPE, instance.textNode(upstreamType)); + objectNode.set(FINE_GRAINED_DOWNSTREAM_TYPE, instance.textNode(downstreamType)); + return objectNode; } /** @@ -225,7 +241,7 @@ public class UpstreamLineageTemplate extends CompoundKeyTemplate { final ObjectNode transformationOperationNode = (ObjectNode) mapNode.get(transformationOperation); - final ObjectNode upstreamType = - transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE) - ? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE) - : instance.objectNode(); - final ObjectNode downstreamType = - transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE) - ? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE) - : instance.objectNode(); - - // Handle upstreams - if (!upstreamType.isEmpty()) { - populateTypeNode( - upstreamType, - transformationOperation, - FINE_GRAINED_UPSTREAM_TYPE, - FINE_GRAINED_UPSTREAMS, - FINE_GRAINED_DOWNSTREAM_TYPE, - arrayNode); - } - - // Handle downstreams - if (!downstreamType.isEmpty()) { - populateTypeNode( - downstreamType, - transformationOperation, - FINE_GRAINED_DOWNSTREAM_TYPE, - FINE_GRAINED_DOWNSTREAMS, - FINE_GRAINED_UPSTREAM_TYPE, - arrayNode); - } + transformationOperationNode + .fieldNames() + .forEachRemaining( + downstreamName -> { + final ObjectNode downstreamNode = + (ObjectNode) transformationOperationNode.get(downstreamName); + downstreamNode + .fieldNames() + .forEachRemaining( + queryId -> + buildFineGrainedLineage( + downstreamName, + downstreamNode, + queryId, + transformationOperation, + fineGrainedLineages)); + }); }); - return arrayNode; + return fineGrainedLineages; } - private void populateTypeNode( - JsonNode typeNode, - String transformationOperation, - String typeName, - String arrayTypeName, - String defaultTypeName, - ArrayNode arrayNode) { - typeNode + private void buildFineGrainedLineage( + final String downstreamName, + final ObjectNode downstreamNode, + final String queryId, + final String transformationOperation, + final ArrayNode fineGrainedLineages) { + final ObjectNode fineGrainedLineage = instance.objectNode(); + final ObjectNode queryNode = (ObjectNode) downstreamNode.get(queryId); + if (queryNode.isEmpty()) { + // Short circuit if no upstreams left + return; + } + ArrayNode downstream = instance.arrayNode(); + downstream.add(instance.textNode(downstreamName)); + // Set defaults, if found in sub nodes override, for confidenceScore take lowest + AtomicReference minimumConfidenceScore = new AtomicReference<>(DEFAULT_CONFIDENCE_SCORE); + AtomicReference upstreamType = + new AtomicReference<>(FINE_GRAINED_LINEAGE_FIELD_SET_TYPE); + AtomicReference downstreamType = new AtomicReference<>(FINE_GRAINED_LINEAGE_FIELD_TYPE); + ArrayNode upstreams = instance.arrayNode(); + queryNode .fieldNames() .forEachRemaining( - subTypeName -> { - ObjectNode subType = (ObjectNode) typeNode.get(subTypeName); - if (!subType.isEmpty()) { - ObjectNode fineGrainedLineage = instance.objectNode(); - AtomicReference minimumConfidenceScore = new AtomicReference<>(1.0f); + upstream -> + processUpstream( + queryNode, + upstream, + minimumConfidenceScore, + upstreamType, + downstreamType, + upstreams)); + fineGrainedLineage.set(FINE_GRAINED_DOWNSTREAMS, downstream); + fineGrainedLineage.set(FINE_GRAINED_UPSTREAMS, upstreams); + if (StringUtils.isNotBlank(queryId) && !DEFAULT_QUERY_ID.equals(queryId)) { + fineGrainedLineage.set(FINE_GRAINED_QUERY_ID, instance.textNode(queryId)); + } + fineGrainedLineage.set(FINE_GRAINED_UPSTREAM_TYPE, instance.textNode(upstreamType.get())); + fineGrainedLineage.set(FINE_GRAINED_DOWNSTREAM_TYPE, instance.textNode(downstreamType.get())); + fineGrainedLineage.set( + FINE_GRAINED_CONFIDENCE_SCORE, instance.numberNode(minimumConfidenceScore.get())); + fineGrainedLineage.set( + FINE_GRAINED_TRANSFORMATION_OPERATION, instance.textNode(transformationOperation)); + fineGrainedLineages.add(fineGrainedLineage); + } - fineGrainedLineage.put(typeName, subTypeName); - fineGrainedLineage.put( - FINE_GRAINED_TRANSFORMATION_OPERATION, transformationOperation); - // Array to actually be filled out - fineGrainedLineage.set(arrayTypeName, instance.arrayNode()); - // Added to pass model validation, because we have no way of appropriately pairing - // upstreams and downstreams - // within fine grained lineages consistently due to being able to have multiple - // downstream types paired with a single - // transform operation, we just set a default type because it's a required property - fineGrainedLineage.put(defaultTypeName, FINE_GRAINED_LINEAGE_FIELD_SET_TYPE); - subType - .fieldNames() - .forEachRemaining( - subTypeKey -> { - ((ArrayNode) fineGrainedLineage.get(arrayTypeName)).add(subTypeKey); - Float scoreValue = subType.get(subTypeKey).floatValue(); - if (scoreValue <= minimumConfidenceScore.get()) { - minimumConfidenceScore.set(scoreValue); - fineGrainedLineage.set( - FINE_GRAINED_CONFIDENCE_SCORE, - instance.numberNode(minimumConfidenceScore.get())); - } - }); - arrayNode.add(fineGrainedLineage); - } - }); + private void processUpstream( + final ObjectNode queryNode, + final String upstream, + final AtomicReference minimumConfidenceScore, + final AtomicReference upstreamType, + final AtomicReference downstreamType, + final ArrayNode upstreams) { + final ObjectNode upstreamNode = (ObjectNode) queryNode.get(upstream); + if (upstreamNode.has(FINE_GRAINED_CONFIDENCE_SCORE)) { + Float scoreValue = upstreamNode.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue(); + if (scoreValue <= minimumConfidenceScore.get()) { + minimumConfidenceScore.set(scoreValue); + } + } + // Set types to last encountered, should never change, but this at least tries to support + // other types being specified. + if (upstreamNode.has(FINE_GRAINED_UPSTREAM_TYPE)) { + upstreamType.set(upstreamNode.get(FINE_GRAINED_UPSTREAM_TYPE).asText()); + } + if (upstreamNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) { + downstreamType.set(upstreamNode.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText()); + } + upstreams.add(instance.textNode(upstream)); } } diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/UpstreamLineageTemplateTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/UpstreamLineageTemplateTest.java index 7d59664513..4bad6a8e3d 100644 --- a/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/UpstreamLineageTemplateTest.java +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/UpstreamLineageTemplateTest.java @@ -3,6 +3,7 @@ package com.linkedin.metadata.aspect.patch.template; import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*; import com.fasterxml.jackson.databind.node.NumericNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.github.fge.jackson.jsonpointer.JsonPointer; import com.github.fge.jsonpatch.AddOperation; import com.github.fge.jsonpatch.JsonPatch; @@ -28,12 +29,14 @@ public class UpstreamLineageTemplateTest { UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); List patchOperations = new ArrayList<>(); + ObjectNode fineGrainedLineageNode = instance.objectNode(); NumericNode upstreamConfidenceScore = instance.numberNode(1.0f); + fineGrainedLineageNode.set("confidenceScore", upstreamConfidenceScore); JsonPatchOperation operation = new AddOperation( new JsonPointer( - "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), - upstreamConfidenceScore); + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)//urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)"), + fineGrainedLineageNode); patchOperations.add(operation); JsonPatch jsonPatch = new JsonPatch(patchOperations); @@ -48,24 +51,42 @@ public class UpstreamLineageTemplateTest { UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); urns.add(urn1); - fineGrainedLineage.setUpstreams(urns); + UrnArray upstreams = new UrnArray(); + Urn upstreamUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)"); + upstreams.add(upstreamUrn); + fineGrainedLineage.setDownstreams(urns); + fineGrainedLineage.setUpstreams(upstreams); fineGrainedLineage.setTransformOperation("CREATE"); fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); - fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage); - // Test non-overwrite upstreams and correct confidence score + // Test non-overwrite upstreams and correct confidence score and types w/ overwrite + ObjectNode finegrainedLineageNode2 = instance.objectNode(); + finegrainedLineageNode2.set( + "upstreamType", instance.textNode(FineGrainedLineageUpstreamType.FIELD_SET.name())); + finegrainedLineageNode2.set("confidenceScore", upstreamConfidenceScore); + finegrainedLineageNode2.set( + "downstreamType", instance.textNode(FineGrainedLineageDownstreamType.FIELD.name())); JsonPatchOperation operation2 = new AddOperation( new JsonPointer( - "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), - upstreamConfidenceScore); + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:someQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + finegrainedLineageNode2); NumericNode upstreamConfidenceScore2 = instance.numberNode(0.1f); + ObjectNode finegrainedLineageNode3 = instance.objectNode(); + finegrainedLineageNode3.set( + "upstreamType", instance.textNode(FineGrainedLineageUpstreamType.DATASET.name())); + finegrainedLineageNode3.set("confidenceScore", upstreamConfidenceScore2); + finegrainedLineageNode3.set( + "downstreamType", instance.textNode(FineGrainedLineageDownstreamType.FIELD_SET.name())); JsonPatchOperation operation3 = new AddOperation( new JsonPointer( - "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), - upstreamConfidenceScore2); + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:someQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + finegrainedLineageNode3); List patchOperations2 = new ArrayList<>(); patchOperations2.add(operation2); patchOperations2.add(operation3); @@ -79,20 +100,32 @@ public class UpstreamLineageTemplateTest { Urn urn2 = UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); - urns2.add(urn1); urns2.add(urn2); + Urn downstreamUrn2 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)"); + UrnArray downstreams2 = new UrnArray(); + downstreams2.add(downstreamUrn2); fineGrainedLineage2.setUpstreams(urns2); + fineGrainedLineage2.setDownstreams(downstreams2); fineGrainedLineage2.setTransformOperation("CREATE"); - fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.DATASET); fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); - Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2); + fineGrainedLineage2.setQuery(UrnUtils.getUrn("urn:li:query:someQuery")); + Assert.assertEquals(result2.getFineGrainedLineages().get(1), fineGrainedLineage2); - // Check different upstream types + // Check different queries + ObjectNode finegrainedLineageNode4 = instance.objectNode(); + finegrainedLineageNode4.set( + "upstreamType", instance.textNode(FineGrainedLineageUpstreamType.FIELD_SET.name())); + finegrainedLineageNode4.set("confidenceScore", upstreamConfidenceScore); + finegrainedLineageNode4.set( + "downstreamType", instance.textNode(FineGrainedLineageDownstreamType.FIELD.name())); JsonPatchOperation operation4 = new AddOperation( new JsonPointer( - "/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"), - upstreamConfidenceScore); + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + finegrainedLineageNode4); List patchOperations3 = new ArrayList<>(); patchOperations3.add(operation4); JsonPatch jsonPatch3 = new JsonPatch(patchOperations3); @@ -103,163 +136,36 @@ public class UpstreamLineageTemplateTest { FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3); UrnArray urns3 = new UrnArray(); Urn urn3 = - UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"); - urns3.add(urn3); - fineGrainedLineage3.setUpstreams(urns3); - fineGrainedLineage3.setTransformOperation("CREATE"); - fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.DATASET); - fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); - // Splits into two for different types - Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3); - - // Check different transform types - JsonPatchOperation operation5 = - new AddOperation( - new JsonPointer( - "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"), - upstreamConfidenceScore); - List patchOperations4 = new ArrayList<>(); - patchOperations4.add(operation5); - JsonPatch jsonPatch4 = new JsonPatch(patchOperations4); - UpstreamLineage result4 = upstreamLineageTemplate.applyPatch(result3, jsonPatch4); - // Hack because Jackson parses values to doubles instead of floats - DataMap dataMap4 = new DataMap(); - dataMap4.put("confidenceScore", 1.0); - FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4); - UrnArray urns4 = new UrnArray(); - Urn urn4 = - UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"); - urns4.add(urn4); - fineGrainedLineage4.setUpstreams(urns4); - fineGrainedLineage4.setTransformOperation("TRANSFORM"); - fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.DATASET); - fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); - // New entry in array because of new transformation type - Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4); - - // Remove - JsonPatchOperation removeOperation = - new RemoveOperation( - new JsonPointer( - "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)")); - JsonPatchOperation removeOperation2 = - new RemoveOperation( - new JsonPointer( - "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)")); - JsonPatchOperation removeOperation3 = - new RemoveOperation( - new JsonPointer( - "/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)")); - JsonPatchOperation removeOperation4 = - new RemoveOperation( - new JsonPointer( - "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)")); - - List removeOperations = new ArrayList<>(); - removeOperations.add(removeOperation); - removeOperations.add(removeOperation2); - removeOperations.add(removeOperation3); - removeOperations.add(removeOperation4); - JsonPatch removePatch = new JsonPatch(removeOperations); - UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch); - Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult); - } - - @Test - public void testPatchDownstream() throws Exception { - UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); - UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); - List patchOperations = new ArrayList<>(); - NumericNode downstreamConfidenceScore = instance.numberNode(1.0f); - JsonPatchOperation operation = - new AddOperation( - new JsonPointer( - "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), - downstreamConfidenceScore); - patchOperations.add(operation); - JsonPatch jsonPatch = new JsonPatch(patchOperations); - - // Initial population test - UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch); - // Hack because Jackson parses values to doubles instead of floats - DataMap dataMap = new DataMap(); - dataMap.put("confidenceScore", 1.0); - FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap); - UrnArray urns = new UrnArray(); - Urn urn1 = UrnUtils.getUrn( - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); - urns.add(urn1); - fineGrainedLineage.setDownstreams(urns); - fineGrainedLineage.setTransformOperation("CREATE"); - fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); - fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); - Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage); + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)"); + urns3.add(urn3); - // Test non-overwrite downstreams and correct confidence score - JsonPatchOperation operation2 = - new AddOperation( - new JsonPointer( - "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), - downstreamConfidenceScore); - NumericNode downstreamConfidenceScore2 = instance.numberNode(0.1f); - JsonPatchOperation operation3 = - new AddOperation( - new JsonPointer( - "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), - downstreamConfidenceScore2); - List patchOperations2 = new ArrayList<>(); - patchOperations2.add(operation2); - patchOperations2.add(operation3); - JsonPatch jsonPatch2 = new JsonPatch(patchOperations2); - UpstreamLineage result2 = upstreamLineageTemplate.applyPatch(result, jsonPatch2); - // Hack because Jackson parses values to doubles instead of floats - DataMap dataMap2 = new DataMap(); - dataMap2.put("confidenceScore", 0.1); - FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2); - UrnArray urns2 = new UrnArray(); - Urn urn2 = + Urn upstreamUrn3 = UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); - urns2.add(urn1); - urns2.add(urn2); - fineGrainedLineage2.setDownstreams(urns2); - fineGrainedLineage2.setTransformOperation("CREATE"); - fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); - fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); - Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2); - - // Check different downstream types - JsonPatchOperation operation4 = - new AddOperation( - new JsonPointer( - "/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"), - downstreamConfidenceScore); - List patchOperations3 = new ArrayList<>(); - patchOperations3.add(operation4); - JsonPatch jsonPatch3 = new JsonPatch(patchOperations3); - UpstreamLineage result3 = upstreamLineageTemplate.applyPatch(result2, jsonPatch3); - // Hack because Jackson parses values to doubles instead of floats - DataMap dataMap3 = new DataMap(); - dataMap3.put("confidenceScore", 1.0); - FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3); - UrnArray urns3 = new UrnArray(); - Urn urn3 = - UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"); - urns3.add(urn3); + UrnArray upstreamUrns3 = new UrnArray(); + upstreamUrns3.add(upstreamUrn3); fineGrainedLineage3.setDownstreams(urns3); + fineGrainedLineage3.setUpstreams(upstreamUrns3); fineGrainedLineage3.setTransformOperation("CREATE"); - fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); + fineGrainedLineage3.setQuery(UrnUtils.getUrn("urn:li:query:anotherQuery")); // Splits into two for different types - Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3); + Assert.assertEquals(result3.getFineGrainedLineages().get(2), fineGrainedLineage3); // Check different transform types + ObjectNode finegrainedLineageNode5 = instance.objectNode(); + finegrainedLineageNode5.set( + "upstreamType", instance.textNode(FineGrainedLineageUpstreamType.FIELD_SET.name())); + finegrainedLineageNode5.set("confidenceScore", upstreamConfidenceScore); + finegrainedLineageNode5.set( + "downstreamType", instance.textNode(FineGrainedLineageDownstreamType.FIELD.name())); JsonPatchOperation operation5 = new AddOperation( new JsonPointer( - "/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"), - downstreamConfidenceScore); + "/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + finegrainedLineageNode5); List patchOperations4 = new ArrayList<>(); patchOperations4.add(operation5); JsonPatch jsonPatch4 = new JsonPatch(patchOperations4); @@ -268,34 +174,32 @@ public class UpstreamLineageTemplateTest { DataMap dataMap4 = new DataMap(); dataMap4.put("confidenceScore", 1.0); FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4); - UrnArray urns4 = new UrnArray(); - Urn urn4 = - UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"); - urns4.add(urn4); - fineGrainedLineage4.setDownstreams(urns4); + fineGrainedLineage4.setUpstreams(upstreamUrns3); + fineGrainedLineage4.setDownstreams(urns3); fineGrainedLineage4.setTransformOperation("TRANSFORM"); - fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); + fineGrainedLineage4.setQuery(UrnUtils.getUrn("urn:li:query:anotherQuery")); // New entry in array because of new transformation type - Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4); + Assert.assertEquals(result4.getFineGrainedLineages().get(3), fineGrainedLineage4); // Remove JsonPatchOperation removeOperation = new RemoveOperation( new JsonPointer( - "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)")); + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)")); JsonPatchOperation removeOperation2 = new RemoveOperation( new JsonPointer( - "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)")); + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:someQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)")); JsonPatchOperation removeOperation3 = new RemoveOperation( new JsonPointer( - "/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)")); + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)")); JsonPatchOperation removeOperation4 = new RemoveOperation( new JsonPointer( - "/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)")); + "/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)")); List removeOperations = new ArrayList<>(); removeOperations.add(removeOperation); @@ -304,56 +208,6 @@ public class UpstreamLineageTemplateTest { removeOperations.add(removeOperation4); JsonPatch removePatch = new JsonPatch(removeOperations); UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch); - Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult); - } - - @Test - public void testUpAndDown() throws Exception { - UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); - UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); - List patchOperations = new ArrayList<>(); - NumericNode downstreamConfidenceScore = instance.numberNode(1.0f); - JsonPatchOperation operation = - new AddOperation( - new JsonPointer( - "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), - downstreamConfidenceScore); - patchOperations.add(operation); - NumericNode upstreamConfidenceScore = instance.numberNode(1.0f); - JsonPatchOperation operation2 = - new AddOperation( - new JsonPointer( - "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), - upstreamConfidenceScore); - patchOperations.add(operation2); - JsonPatch jsonPatch = new JsonPatch(patchOperations); - - // Initial population test - UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch); - // Hack because Jackson parses values to doubles instead of floats - DataMap dataMap = new DataMap(); - dataMap.put("confidenceScore", 1.0); - FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap); - UrnArray urns = new UrnArray(); - Urn urn1 = - UrnUtils.getUrn( - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); - urns.add(urn1); - fineGrainedLineage.setTransformOperation("CREATE"); - fineGrainedLineage.setUpstreams(urns); - fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); - fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); - fineGrainedLineage.setDownstreams(urns); - - // Hack because Jackson parses values to doubles instead of floats - DataMap dataMap2 = new DataMap(); - dataMap2.put("confidenceScore", 1.0); - FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2); - fineGrainedLineage2.setTransformOperation("CREATE"); - fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); - fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); - fineGrainedLineage2.setDownstreams(urns); - - Assert.assertEquals(result.getFineGrainedLineages().get(1), fineGrainedLineage2); + Assert.assertEquals(finalResult, upstreamLineageTemplate.getDefault()); } } diff --git a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py index f528a51052..f5ef276d5f 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py @@ -58,8 +58,8 @@ class MetadataPatchProposal: self.patches = defaultdict(list) # Json Patch quoting based on https://jsonpatch.com/#json-pointer - @staticmethod - def quote(value: str) -> str: + @classmethod + def quote(cls, value: str) -> str: return value.replace("~", "~0").replace("/", "~1") def _add_patch(self, aspect_name: str, op: str, path: str, value: Any) -> None: diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index 658490b015..c5439e4d28 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -6,8 +6,6 @@ from datahub.metadata.schema_classes import ( EditableDatasetPropertiesClass as EditableDatasetProperties, EditableSchemaMetadataClass as EditableSchemaMetadata, FineGrainedLineageClass as FineGrainedLineage, - FineGrainedLineageDownstreamTypeClass as FineGrainedLineageDownstreamType, - FineGrainedLineageUpstreamTypeClass as FineGrainedLineageUpstreamType, GlobalTagsClass as GlobalTags, GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, @@ -126,7 +124,7 @@ class DatasetPatchBuilder(MetadataPatchProposal): self._add_patch( UpstreamLineage.ASPECT_NAME, "add", - path=f"/upstreams/{MetadataPatchProposal.quote(upstream.dataset)}", + path=f"/upstreams/{self.quote(upstream.dataset)}", value=upstream, ) return self @@ -153,26 +151,17 @@ class DatasetPatchBuilder(MetadataPatchProposal): ) -> "DatasetPatchBuilder": ( transform_op, - upstream_type, - downstream_type, + downstream_urn, + query_id, ) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage) for upstream_urn in fine_grained_lineage.upstreams or []: self._add_patch( UpstreamLineage.ASPECT_NAME, "add", - path=DatasetPatchBuilder.quote_fine_grained_upstream_path( - transform_op, upstream_type, upstream_urn + path=DatasetPatchBuilder.quote_fine_grained_path( + transform_op, downstream_urn, query_id, upstream_urn ), - value=fine_grained_lineage.confidenceScore, - ) - for downstream_urn in fine_grained_lineage.downstreams or []: - self._add_patch( - UpstreamLineage.ASPECT_NAME, - "add", - path=DatasetPatchBuilder.quote_fine_grained_downstream_path( - transform_op, downstream_type, downstream_urn - ), - value=fine_grained_lineage.confidenceScore, + value={"confidenceScore": fine_grained_lineage.confidenceScore}, ) return self @@ -180,35 +169,21 @@ class DatasetPatchBuilder(MetadataPatchProposal): def get_fine_grained_key( fine_grained_lineage: FineGrainedLineage, ) -> Tuple[str, str, str]: + downstreams = fine_grained_lineage.downstreams or [] + if len(downstreams) != 1: + raise TypeError("Cannot patch with more or less than one downstream.") transform_op = fine_grained_lineage.transformOperation or "NONE" - upstream_type = ( - fine_grained_lineage.upstreamType - if isinstance(fine_grained_lineage.upstreamType, str) - else FineGrainedLineageUpstreamType.FIELD_SET - ) - downstream_type = ( - fine_grained_lineage.downstreamType - if isinstance(fine_grained_lineage.downstreamType, str) - else FineGrainedLineageDownstreamType.FIELD_SET - ) - return transform_op, upstream_type, downstream_type + downstream_urn = downstreams[0] + query_id = fine_grained_lineage.query or "NONE" + return transform_op, downstream_urn, query_id - @staticmethod - def quote_fine_grained_downstream_path( - transform_op: str, downstream_type: str, downstream_urn: str + @classmethod + def quote_fine_grained_path( + cls, transform_op: str, downstream_urn: str, query_id: str, upstream_urn: str ) -> str: return ( - f"/fineGrainedLineages/{MetadataPatchProposal.quote(transform_op)}/downstreamType/" - f"{MetadataPatchProposal.quote(downstream_type)}/{MetadataPatchProposal.quote(downstream_urn)}" - ) - - @staticmethod - def quote_fine_grained_upstream_path( - transform_op: str, upstream_type: str, upstream_urn: str - ) -> str: - return ( - f"/fineGrainedLineages/{MetadataPatchProposal.quote(transform_op)}/upstreamType/" - f"{MetadataPatchProposal.quote(upstream_type)}/{MetadataPatchProposal.quote(upstream_urn)}" + f"/fineGrainedLineages/{cls.quote(transform_op)}/" + f"{cls.quote(downstream_urn)}/{cls.quote(query_id)}/{cls.quote(upstream_urn)}" ) def remove_fine_grained_upstream_lineage( @@ -216,24 +191,15 @@ class DatasetPatchBuilder(MetadataPatchProposal): ) -> "DatasetPatchBuilder": ( transform_op, - upstream_type, - downstream_type, + downstream_urn, + query_id, ) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage) for upstream_urn in fine_grained_lineage.upstreams or []: self._add_patch( UpstreamLineage.ASPECT_NAME, "remove", - path=DatasetPatchBuilder.quote_fine_grained_upstream_path( - transform_op, upstream_type, upstream_urn - ), - value={}, - ) - for downstream_urn in fine_grained_lineage.downstreams or []: - self._add_patch( - UpstreamLineage.ASPECT_NAME, - "remove", - path=DatasetPatchBuilder.quote_fine_grained_downstream_path( - transform_op, downstream_type, downstream_urn + path=DatasetPatchBuilder.quote_fine_grained_path( + transform_op, downstream_urn, query_id, upstream_urn ), value={}, ) diff --git a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json index 3de21e62e7..26159be17e 100644 --- a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json +++ b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json @@ -1,105 +1,109 @@ [ - { +{ "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", "changeType": "PATCH", "aspectName": "datasetProperties", "aspect": { - "json": [ - { - "op": "add", - "path": "/description", - "value": "test description" - }, - { - "op": "add", - "path": "/customProperties/test_key_1", - "value": "test_value_1" - }, - { - "op": "add", - "path": "/customProperties/test_key_2", - "value": "test_value_2" - } - ] + "json": [ + { + "op": "add", + "path": "/description", + "value": "test description" + }, + { + "op": "add", + "path": "/customProperties/test_key_1", + "value": "test_value_1" + }, + { + "op": "add", + "path": "/customProperties/test_key_2", + "value": "test_value_2" + } + ] } - }, - { +}, +{ "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", "changeType": "PATCH", "aspectName": "globalTags", "aspect": { - "json": [ - { - "op": "add", - "path": "/tags/urn:li:tag:test_tag", - "value": { - "tag": "urn:li:tag:test_tag" - } - } - ] + "json": [ + { + "op": "add", + "path": "/tags/urn:li:tag:test_tag", + "value": { + "tag": "urn:li:tag:test_tag" + } + } + ] } - }, - { +}, +{ "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", "changeType": "PATCH", "aspectName": "upstreamLineage", "aspect": { - "json": [ - { - "op": "add", - "path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", - "value": { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" + "json": [ + { + "op": "add", + "path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", + "value": { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", + "type": "TRANSFORMED" + } }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", - "type": "TRANSFORMED" - } - }, - { - "op": "add", - "path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket~1my-folder~1my-file.txt,PROD)", - "value": { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" + { + "op": "add", + "path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket~1my-folder~1my-file.txt,PROD)", + "value": { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my-folder/my-file.txt,PROD)", + "type": "TRANSFORMED" + } }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my-folder/my-file.txt,PROD)", - "type": "TRANSFORMED" - } - }, - { - "op": "add", - "path": "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", - "value": 1.0 - }, - { - "op": "add", - "path": "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket~1my-folder~1my-file.txt,PROD)", - "value": 1.0 - } - ] + { + "op": "add", + "path": "/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),foo)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD),bar)", + "value": { + "confidenceScore": 1.0 + } + }, + { + "op": "add", + "path": "/fineGrainedLineages/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),foo)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket~1my-folder~1my-file.txt,PROD),foo)", + "value": { + "confidenceScore": 1.0 + } + } + ] } - }, - { +}, +{ "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", "changeType": "PATCH", "aspectName": "editableSchemaMetadata", "aspect": { - "json": [ - { - "op": "add", - "path": "/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1", - "value": { - "tag": "urn:li:tag:tag1" - } - } - ] + "json": [ + { + "op": "add", + "path": "/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1", + "value": { + "tag": "urn:li:tag:tag1" + } + } + ] } - } +} ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/patch/test_patch_builder.py b/metadata-ingestion/tests/unit/patch/test_patch_builder.py index 43da2fb7a7..8c2a4b2c4a 100644 --- a/metadata-ingestion/tests/unit/patch/test_patch_builder.py +++ b/metadata-ingestion/tests/unit/patch/test_patch_builder.py @@ -1,4 +1,3 @@ -import json import pathlib import pytest @@ -7,6 +6,7 @@ from datahub.emitter.mce_builder import ( make_chart_urn, make_dashboard_urn, make_dataset_urn, + make_schema_field_urn, make_tag_urn, ) from datahub.ingestion.sink.file import write_metadata_file @@ -23,6 +23,7 @@ from datahub.metadata.schema_classes import ( from datahub.specific.chart import ChartPatchBuilder from datahub.specific.dashboard import DashboardPatchBuilder from datahub.specific.dataset import DatasetPatchBuilder +from tests.test_helpers import mce_helpers def test_basic_dataset_patch_builder(): @@ -73,13 +74,28 @@ def test_complex_dataset_patch( ) .add_fine_grained_upstream_lineage( fine_grained_lineage=FineGrainedLineageClass( - upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET, - upstreams=[ - make_dataset_urn( - platform="hive", name="fct_users_created_upstream", env="PROD" + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + downstreams=[ + make_schema_field_urn( + make_dataset_urn( + platform="hive", + name="fct_users_created", + env="PROD", + ), + field_path="foo", ) ], - downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD_SET, + upstreams=[ + make_schema_field_urn( + make_dataset_urn( + platform="hive", + name="fct_users_created_upstream", + env="PROD", + ), + field_path="bar", + ) + ], + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, transformOperation="TRANSFORM", confidenceScore=1.0, ) @@ -88,15 +104,26 @@ def test_complex_dataset_patch( fine_grained_lineage=FineGrainedLineageClass( upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET, upstreams=[ - make_dataset_urn( - platform="s3", - name="my-bucket/my-folder/my-file.txt", - env="PROD", + make_schema_field_urn( + make_dataset_urn( + platform="s3", + name="my-bucket/my-folder/my-file.txt", + env="PROD", + ), + field_path="foo", ) ], downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD_SET, - transformOperation="TRANSFORM", - confidenceScore=1.0, + downstreams=[ + make_schema_field_urn( + make_dataset_urn( + platform="hive", + name="fct_users_created", + env="PROD", + ), + field_path="foo", + ) + ], ) ) ) @@ -105,10 +132,10 @@ def test_complex_dataset_patch( out_path = tmp_path / "patch.json" write_metadata_file(out_path, patcher.build()) - assert json.loads(out_path.read_text()) == json.loads( - ( - pytestconfig.rootpath / "tests/unit/patch/complex_dataset_patch.json" - ).read_text() + mce_helpers.check_golden_file( + pytestconfig, + out_path, + pytestconfig.rootpath / "tests/unit/patch/complex_dataset_patch.json", ) diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java index 1107f55201..46d1481836 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java @@ -56,18 +56,20 @@ public class PatchTest { DatasetUrn upstreamUrn = DatasetUrn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"); - Urn schemaFieldUrn = + Urn upstreamSchemaFieldUrn = UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)"); + Urn downstreamSchemaFieldUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), bar)"); MetadataChangeProposal upstreamPatch = new UpstreamLineagePatchBuilder() .urn( UrnUtils.getUrn( "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)")) .addUpstream(upstreamUrn, DatasetLineageType.TRANSFORMED) - .addFineGrainedUpstreamDataset(upstreamUrn, null, "TRANSFORM") - .addFineGrainedUpstreamField(schemaFieldUrn, null, "TRANSFORM", null) - .addFineGrainedDownstreamField(schemaFieldUrn, null, "TRANSFORM", null) + .addFineGrainedUpstreamField( + upstreamSchemaFieldUrn, null, "TRANSFORM", downstreamSchemaFieldUrn, null) .build(); Future response = restEmitter.emit(upstreamPatch); @@ -83,12 +85,12 @@ public class PatchTest { public void testLocalUpstreamRemove() { RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().build()); try { - DatasetUrn upstreamUrn = - DatasetUrn.createFromString( - "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"); - Urn schemaFieldUrn = + Urn upstreamSchemaFieldUrn = UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)"); + Urn downstreamSchemaFieldUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), bar)"); MetadataChangeProposal upstreamPatch = new UpstreamLineagePatchBuilder() .urn( @@ -97,9 +99,8 @@ public class PatchTest { .removeUpstream( DatasetUrn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)")) - .removeFineGrainedUpstreamDataset(upstreamUrn, "TRANSFORM") - .removeFineGrainedUpstreamField(schemaFieldUrn, "TRANSFORM", null) - .removeFineGrainedDownstreamField(schemaFieldUrn, "TRANSFORM", null) + .removeFineGrainedUpstreamField( + upstreamSchemaFieldUrn, "TRANSFORM", downstreamSchemaFieldUrn, null) .build(); Future response = restEmitter.emit(upstreamPatch);