feat(field-level-lineage): Add models for field level lineage (#1936)

* feat(field-level-lineage): adding models for field level lineage

adding models for field level lineage. Introduce DatasetFieldUrn as a unique identifier for dataset field
This commit is contained in:
Nagarjuna Kanamarlapudi 2020-11-09 14:08:48 -08:00 committed by GitHub
parent 89c78551cc
commit 7d574d1094
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 378 additions and 4 deletions

View File

@ -30,6 +30,40 @@
"doc" : "The entity (e.g. a service URN) which performs the change on behalf of the Actor and must be authorized to act as the Actor.",
"optional" : true
} ]
}, {
"type" : "record",
"name" : "BaseFieldMapping",
"namespace" : "com.linkedin.common",
"doc" : "Base model representing field mappings",
"fields" : [ {
"name" : "created",
"type" : "AuditStamp",
"doc" : "Audit stamp containing who reported the field mapping and when"
}, {
"name" : "transformation",
"type" : [ {
"type" : "enum",
"name" : "TransformationType",
"namespace" : "com.linkedin.common.fieldtransformer",
"doc" : "Type of the transformation involved in generating destination fields from source fields.",
"symbols" : [ "BLACKBOX", "IDENTITY" ],
"symbolDocs" : {
"BLACKBOX" : "Field transformation expressed as unknown black box function.",
"IDENTITY" : "Field transformation expressed as Identity function."
}
}, {
"type" : "record",
"name" : "UDFTransformer",
"namespace" : "com.linkedin.common.fieldtransformer",
"doc" : "Field transformation expressed in UDF",
"fields" : [ {
"name" : "udf",
"type" : "string",
"doc" : "A UDF mentioning how the source fields got transformed to destination field. This is the FQCN(Fully Qualified Class Name) of the udf."
} ]
} ],
"doc" : "Transfomration function between the fields involved"
} ]
}, {
"type" : "record",
"name" : "ChangeAuditStamps",
@ -77,6 +111,38 @@
"owningTeam" : "urn:li:internalTeam:wherehows"
}
}
}, {
"type" : "typeref",
"name" : "DatasetFieldUrn",
"namespace" : "com.linkedin.common",
"doc" : "Standardized dataset field information identifier.",
"ref" : "string",
"java" : {
"class" : "com.linkedin.common.urn.DatasetFieldUrn"
},
"validate" : {
"com.linkedin.common.validator.TypedUrnValidator" : {
"accessible" : true,
"constructable" : true,
"doc" : "Standardized dataset field information identifier",
"entityType" : "datasetField",
"fields" : [ {
"doc" : "Dataset that this dataset field belongs to.",
"name" : "dataset",
"type" : "com.linkedin.common.urn.DatasetUrn"
}, {
"doc" : "Dataset field path",
"maxLength" : 500,
"name" : "fieldPath",
"type" : "string"
} ],
"maxLength" : 807,
"name" : "DatasetField",
"namespace" : "li",
"owners" : [ "urn:li:corpuser:fbar", "urn:li:corpuser:bfoo" ],
"owningTeam" : "urn:li:internalTeam:datahub"
}
}
}, {
"type" : "typeref",
"name" : "DatasetUrn",
@ -269,7 +335,7 @@
"type" : "string",
"optional" : true
} ]
}, {
}, "com.linkedin.common.fieldtransformer.TransformationType", "com.linkedin.common.fieldtransformer.UDFTransformer", {
"type" : "record",
"name" : "Dataset",
"namespace" : "com.linkedin.dataset",
@ -787,7 +853,30 @@
"doc" : "Upstream lineage metadata of the dataset",
"optional" : true
} ]
}, "com.linkedin.dataset.DatasetDeprecation", "com.linkedin.dataset.DatasetKey", "com.linkedin.dataset.DatasetLineageType", {
}, "com.linkedin.dataset.DatasetDeprecation", {
"type" : "record",
"name" : "DatasetFieldMapping",
"namespace" : "com.linkedin.dataset",
"doc" : "Representation of mapping between fields in source dataset to the field in destination dataset",
"include" : [ "com.linkedin.common.BaseFieldMapping" ],
"fields" : [ {
"name" : "sourceFields",
"type" : {
"type" : "array",
"items" : {
"type" : "typeref",
"name" : "DatasetFieldUpstream",
"doc" : "Upstreams of a dataset field.",
"ref" : [ "com.linkedin.common.DatasetFieldUrn" ]
}
},
"doc" : "Source fields from which the fine grained lineage is derived"
}, {
"name" : "destinationField",
"type" : "com.linkedin.common.DatasetFieldUrn",
"doc" : "Destination field which is derived from source fields"
} ]
}, "com.linkedin.dataset.DatasetFieldUpstream", "com.linkedin.dataset.DatasetKey", "com.linkedin.dataset.DatasetLineageType", {
"type" : "record",
"name" : "DatasetProperties",
"namespace" : "com.linkedin.dataset",
@ -819,6 +908,19 @@
"doc" : "A key-value map to capture any other non-standardized properties for the dataset",
"default" : { }
} ]
}, {
"type" : "record",
"name" : "DatasetUpstreamLineage",
"namespace" : "com.linkedin.dataset",
"doc" : "Fine Grained upstream lineage for fields in a dataset",
"fields" : [ {
"name" : "fieldMappings",
"type" : {
"type" : "array",
"items" : "DatasetFieldMapping"
},
"doc" : "Upstream to downstream field level lineage mappings"
} ]
}, {
"type" : "record",
"name" : "Downstream",
@ -868,7 +970,7 @@
"name" : "DatasetAspect",
"namespace" : "com.linkedin.metadata.aspect",
"doc" : "A union of all supported metadata aspects for a Dataset",
"ref" : [ "com.linkedin.dataset.DatasetProperties", "com.linkedin.dataset.DatasetDeprecation", "com.linkedin.dataset.UpstreamLineage", "com.linkedin.common.InstitutionalMemory", "com.linkedin.common.Ownership", "com.linkedin.common.Status", "com.linkedin.schema.SchemaMetadata" ]
"ref" : [ "com.linkedin.dataset.DatasetProperties", "com.linkedin.dataset.DatasetDeprecation", "com.linkedin.dataset.DatasetUpstreamLineage", "com.linkedin.dataset.UpstreamLineage", "com.linkedin.common.InstitutionalMemory", "com.linkedin.common.Ownership", "com.linkedin.common.Status", "com.linkedin.schema.SchemaMetadata" ]
}, {
"type" : "record",
"name" : "AggregationMetadata",

View File

@ -2,8 +2,11 @@ apply plugin: 'java'
apply plugin: 'pegasus'
dependencies {
dataModel externalDependency.gmaCoreModels
compile spec.product.pegasus.data
dataModel externalDependency.gmaCoreModels
testCompile externalDependency.assertJ
}
idea {

View File

@ -0,0 +1,101 @@
package com.linkedin.common.urn;
import com.linkedin.common.FabricType;
import com.linkedin.data.template.Custom;
import com.linkedin.data.template.DirectCoercer;
import com.linkedin.data.template.TemplateOutputCastException;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Standardized dataset field information identifier
*/
public class DatasetFieldUrn extends Urn {
// uniquely identifies urn's key type
public static final String ENTITY_TYPE = "datasetField";
// urn pattern
private static final Pattern DATASET_FIELD_URN_PATTERN = Pattern.compile(
"urn:li:datasetField:\\(urn:li:dataset:\\(urn:li:dataPlatform:(?<dataPlatform>.+),(?<datasetName>.+),(?<fabric>.+)\\),(?<fieldPath>.+)\\)");
/**
* Dataset urn of the datasetFieldUrn
*/
private final DatasetUrn _dataset;
/**
* Field of datasetFieldUrn
*/
private final String _fieldPath;
static {
Custom.initializeCustomClass(DatasetUrn.class);
Custom.registerCoercer(new DirectCoercer<DatasetFieldUrn>() {
@Override
public String coerceInput(DatasetFieldUrn object) throws ClassCastException {
return object.toString();
}
@Override
public DatasetFieldUrn coerceOutput(Object object) throws TemplateOutputCastException {
if (object instanceof String) {
try {
return DatasetFieldUrn.deserialize(((String) object));
} catch (URISyntaxException e) {
throw new TemplateOutputCastException((("Deserializing output '" + object) + "' failed"), e);
}
}
throw new TemplateOutputCastException((("Output '" + object) + ("' is not a String, and cannot be coerced to "
+ DatasetFieldUrn.class.getName())));
}
}, DatasetFieldUrn.class);
}
/**
* Creates a new instance of a {@link DatasetFieldUrn }.
*
* @param dataset Dataset that this dataset field belongs to.
* @param fieldPath Dataset field path or column name
*/
public DatasetFieldUrn(DatasetUrn dataset, String fieldPath) {
this(dataset.getPlatformEntity().getPlatformNameEntity(), dataset.getDatasetNameEntity(), dataset.getOriginEntity(),
fieldPath);
}
public DatasetFieldUrn(String dataPlatform, String datasetName, FabricType fabricType, String fieldPath) {
super(ENTITY_TYPE, String.format("(urn:li:dataset:(urn:li:dataPlatform:%s,%s,%s),%s)", dataPlatform, datasetName,
fabricType.name(), fieldPath));
this._dataset = new DatasetUrn(new DataPlatformUrn(dataPlatform), datasetName, fabricType);
this._fieldPath = fieldPath;
}
public DatasetUrn getDatasetEntity() {
return _dataset;
}
public String getFieldPathEntity() {
return _fieldPath;
}
/**
* Creates an instance of a DatasetFieldUrn from a raw urn string.
* @param rawUrn The raw urn input to convert to a full DatasetFieldUrn instance.
* @return {@link DatasetFieldUrn} dataset Field Urn
*/
public static DatasetFieldUrn deserialize(String rawUrn) throws URISyntaxException {
final Matcher matcher = DATASET_FIELD_URN_PATTERN.matcher(rawUrn);
if (matcher.matches()) {
final String dataPlatform = matcher.group("dataPlatform");
final String datasetName = matcher.group("datasetName");
final String fabric = matcher.group("fabric");
final String fieldName = matcher.group("fieldPath");
return new DatasetFieldUrn(dataPlatform, datasetName, FabricType.valueOf(fabric), fieldName);
}
throw new URISyntaxException(rawUrn,
String.format("urn does match dataset field urn pattern %s", DATASET_FIELD_URN_PATTERN.toString()));
}
}

View File

@ -0,0 +1,28 @@
namespace com.linkedin.common
/**
* Standardized dataset field information identifier.
*/
@java.class = "com.linkedin.common.urn.DatasetFieldUrn"
@validate.`com.linkedin.common.validator.TypedUrnValidator` = {
"accessible" : true,
"owningTeam" : "urn:li:internalTeam:datahub",
"entityType" : "datasetField",
"constructable" : true,
"namespace" : "li",
"name" : "DatasetField",
"doc" : "Standardized dataset field information identifier",
"owners" : [ "urn:li:corpuser:fbar", "urn:li:corpuser:bfoo" ],
"fields" : [ {
"type" : "com.linkedin.common.urn.DatasetUrn",
"name" : "dataset",
"doc" : "Dataset that this dataset field belongs to."
}, {
"name" : "fieldPath",
"doc" : "Dataset field path",
"type" : "string",
"maxLength" : 500
} ],
"maxLength" : 807
}
typeref DatasetFieldUrn = string

View File

@ -0,0 +1,54 @@
package com.linkedin.common.urn;
import com.linkedin.common.FabricType;
import java.net.URISyntaxException;
import org.assertj.core.api.Assertions;
import org.testng.annotations.Test;
public class DatasetFieldUrnTest {
private static final String PLATFORM = "fooPlatform";
private static final String DATASET_NAME = "fooName";
private static final String FIELD_NAME = "fooField";
private static final FabricType FABRIC_TYPE = FabricType.PROD;
@Test
public void testSerialization() throws URISyntaxException {
final String datasetFieldString =
String.format("urn:li:datasetField:(urn:li:dataset:(urn:li:dataPlatform:%s,%s,%s),%s)", PLATFORM, DATASET_NAME,
FABRIC_TYPE, FIELD_NAME);
final DatasetFieldUrn datasetFieldUrn = DatasetFieldUrn.deserialize(datasetFieldString);
final DatasetUrn datasetUrn = datasetFieldUrn.getDatasetEntity();
Assertions.assertThat(datasetFieldUrn.getFieldPathEntity()).isEqualTo(FIELD_NAME);
Assertions.assertThat(datasetUrn.getDatasetNameEntity()).isEqualTo(DATASET_NAME);
Assertions.assertThat(datasetUrn.getPlatformEntity().getPlatformNameEntity()).isEqualTo(PLATFORM);
Assertions.assertThat(datasetUrn.getOriginEntity()).isEqualTo(FabricType.PROD);
Assertions.assertThat(datasetFieldUrn.toString())
.isEqualTo(datasetFieldString)
.describedAs("serialization followed by deserialization should produce the same urn string");
}
@Test
public void testCreateUrn() {
final DatasetFieldUrn datasetFieldUrn = new DatasetFieldUrn(PLATFORM, DATASET_NAME, FABRIC_TYPE, FIELD_NAME);
final DatasetUrn datasetUrn = datasetFieldUrn.getDatasetEntity();
Assertions.assertThat(datasetFieldUrn.getFieldPathEntity()).isEqualTo(FIELD_NAME);
Assertions.assertThat(datasetUrn.getDatasetNameEntity()).isEqualTo(DATASET_NAME);
Assertions.assertThat(datasetUrn.getPlatformEntity().getPlatformNameEntity()).isEqualTo(PLATFORM);
Assertions.assertThat(datasetUrn.getOriginEntity()).isEqualTo(FabricType.PROD);
}
@Test
public void testUrnConstructors() {
final DatasetFieldUrn datasetFieldUrn1 = new DatasetFieldUrn(PLATFORM, DATASET_NAME, FABRIC_TYPE, FIELD_NAME);
final DatasetUrn datasetUrn = datasetFieldUrn1.getDatasetEntity();
final DatasetFieldUrn datasetFieldUrn2 = new DatasetFieldUrn(datasetUrn, FIELD_NAME);
Assertions.assertThat(datasetFieldUrn1).isEqualTo(datasetFieldUrn2);
}
}

View File

@ -0,0 +1,19 @@
namespace com.linkedin.common
import com.linkedin.common.fieldtransformer.TransformationType
import com.linkedin.common.fieldtransformer.UDFTransformer
/**
* Base model representing field mappings
*/
record BaseFieldMapping {
/**
* Audit stamp containing who reported the field mapping and when
*/
created: AuditStamp
/**
* Transfomration function between the fields involved
*/
transformation: union [TransformationType, UDFTransformer]
}

View File

@ -0,0 +1,16 @@
namespace com.linkedin.common.fieldtransformer
/**
* Type of the transformation involved in generating destination fields from source fields.
*/
enum TransformationType {
/**
* Field transformation expressed as unknown black box function.
*/
BLACKBOX,
/**
* Field transformation expressed as Identity function.
*/
IDENTITY
}

View File

@ -0,0 +1,11 @@
namespace com.linkedin.common.fieldtransformer
/**
* Field transformation expressed in UDF
*/
record UDFTransformer {
/**
* A UDF mentioning how the source fields got transformed to destination field. This is the FQCN(Fully Qualified Class Name) of the udf.
*/
udf: string
}

View File

@ -0,0 +1,19 @@
namespace com.linkedin.dataset
import com.linkedin.common.BaseFieldMapping
import com.linkedin.common.DatasetFieldUrn
/**
* Representation of mapping between fields in source dataset to the field in destination dataset
*/
record DatasetFieldMapping includes BaseFieldMapping {
/**
* Source fields from which the fine grained lineage is derived
*/
sourceFields: array[DatasetFieldUpstream]
/**
* Destination field which is derived from source fields
*/
destinationField: DatasetFieldUrn
}

View File

@ -0,0 +1,8 @@
namespace com.linkedin.dataset
import com.linkedin.common.DatasetFieldUrn
/**
* Upstreams of a dataset field.
*/
typeref DatasetFieldUpstream = union[DatasetFieldUrn]

View File

@ -0,0 +1,11 @@
namespace com.linkedin.dataset
/**
* Fine Grained upstream lineage for fields in a dataset
*/
record DatasetUpstreamLineage {
/**
* Upstream to downstream field level lineage mappings
*/
fieldMappings: array[DatasetFieldMapping]
}

View File

@ -5,6 +5,7 @@ import com.linkedin.common.Ownership
import com.linkedin.common.Status
import com.linkedin.dataset.DatasetDeprecation
import com.linkedin.dataset.DatasetProperties
import com.linkedin.dataset.DatasetUpstreamLineage
import com.linkedin.dataset.UpstreamLineage
import com.linkedin.schema.SchemaMetadata
@ -14,6 +15,7 @@ import com.linkedin.schema.SchemaMetadata
typeref DatasetAspect = union[
DatasetProperties,
DatasetDeprecation,
DatasetUpstreamLineage,
UpstreamLineage,
InstitutionalMemory,
Ownership,