fix(ingestion/spark): Platform instance and column level lineage fix (#10843)

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This commit is contained in:
Tamas Nemeth 2024-07-09 13:54:06 +02:00 committed by GitHub
parent b6c7fe8267
commit d204d5654a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 557 additions and 26 deletions

View File

@ -11,7 +11,7 @@ The alternative way to integrate is via the Hive connector. The [Hive starter re
## Databricks Spark
To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage/README.md#configuration-instructions-databricks).
To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage-beta/README.md#configuration-instructions-databricks).
## Watch the DataHub Talk at the Data and AI Summit 2022

View File

@ -33,6 +33,7 @@ public class DatahubOpenlineageConfig {
@Builder.Default private String hivePlatformAlias = "hive";
@Builder.Default private Map<String, String> urnAliases = new HashMap<>();
@Builder.Default private final boolean disableSymlinkResolution = false;
@Builder.Default private final boolean lowerCaseDatasetUrns = false;
public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {

View File

@ -4,6 +4,7 @@ import com.linkedin.common.AuditStamp;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.Edge;
import com.linkedin.common.EdgeArray;
import com.linkedin.common.FabricType;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.Owner;
import com.linkedin.common.OwnerArray;
@ -57,6 +58,8 @@ import io.datahubproject.openlineage.dataset.DatahubDataset;
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.datahubproject.openlineage.dataset.HdfsPathDataset;
import io.datahubproject.openlineage.dataset.HdfsPlatform;
import io.datahubproject.openlineage.dataset.PathSpec;
import io.datahubproject.openlineage.utils.DatahubUtils;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import java.io.IOException;
@ -151,6 +154,11 @@ public class OpenLineageToDataHub {
private static Optional<DatasetUrn> getDatasetUrnFromOlDataset(
String namespace, String datasetName, DatahubOpenlineageConfig mappingConfig) {
String platform;
if (mappingConfig.isLowerCaseDatasetUrns()) {
namespace = namespace.toLowerCase();
datasetName = datasetName.toLowerCase();
}
if (namespace.contains(SCHEME_SEPARATOR)) {
try {
URI datasetUri;
@ -183,12 +191,45 @@ public class OpenLineageToDataHub {
platform = namespace;
}
if (mappingConfig.getCommonDatasetPlatformInstance() != null) {
datasetName = mappingConfig.getCommonDatasetPlatformInstance() + "." + datasetName;
}
String platformInstance = getPlatformInstance(mappingConfig, platform);
FabricType env = getEnv(mappingConfig, platform);
return Optional.of(DatahubUtils.createDatasetUrn(platform, platformInstance, datasetName, env));
}
return Optional.of(
new DatasetUrn(new DataPlatformUrn(platform), datasetName, mappingConfig.getFabricType()));
private static FabricType getEnv(DatahubOpenlineageConfig mappingConfig, String platform) {
FabricType fabricType = mappingConfig.getFabricType();
if (mappingConfig.getPathSpecs() != null
&& mappingConfig.getPathSpecs().containsKey(platform)) {
List<PathSpec> path_specs = mappingConfig.getPathSpecs().get(platform);
for (PathSpec pathSpec : path_specs) {
if (pathSpec.getEnv().isPresent()) {
try {
fabricType = FabricType.valueOf(pathSpec.getEnv().get());
return fabricType;
} catch (IllegalArgumentException e) {
log.warn("Invalid environment value: {}", pathSpec.getEnv());
}
}
}
}
return fabricType;
}
private static String getPlatformInstance(
DatahubOpenlineageConfig mappingConfig, String platform) {
// Use the platform instance from the path spec if it is present otherwise use the one from the
// commonDatasetPlatformInstance
String platformInstance = mappingConfig.getCommonDatasetPlatformInstance();
if (mappingConfig.getPathSpecs() != null
&& mappingConfig.getPathSpecs().containsKey(platform)) {
List<PathSpec> path_specs = mappingConfig.getPathSpecs().get(platform);
for (PathSpec pathSpec : path_specs) {
if (pathSpec.getPlatformInstance().isPresent()) {
return pathSpec.getPlatformInstance().get();
}
}
}
return platformInstance;
}
public static GlobalTags generateTags(List<String> tags) {

View File

@ -280,11 +280,11 @@ public class DatahubJob {
for (Urn downstream :
Objects.requireNonNull(fineGrainedLineage.getDownstreams())) {
upstreamLineagePatchBuilder.addFineGrainedUpstreamField(
downstream,
upstream,
fineGrainedLineage.getConfidenceScore(),
StringUtils.defaultIfEmpty(
fineGrainedLineage.getTransformOperation(), "TRANSFORM"),
upstream,
downstream,
null);
}
}

View File

@ -14,7 +14,7 @@ import lombok.ToString;
public class PathSpec {
final String alias;
final String platform;
@Builder.Default final String env = "PROD";
@Builder.Default final Optional<String> env = Optional.empty();
final List<String> pathSpecList;
@Builder.Default final Optional<String> platformInstance = Optional.empty();
}

View File

@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co
```text
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.11
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```
@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080
## spark-submit command line
```sh
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.11 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.13 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
```
### Configuration Instructions: Amazon EMR
@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)
```text
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.11
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.11")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.13")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
@ -79,7 +79,7 @@ appName("test-application")
config("spark.master","spark://spark-master:7077")
.
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.11")
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.13")
.
config("spark.extraListeners","datahub.spark.DatahubSparkListener")
@ -159,7 +159,7 @@ information like tokens.
| Field | Required | Default | Description |
|---------------------------------------------------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 |
| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.13 |
| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server | ✅ | | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
@ -181,9 +181,10 @@ information like tokens.
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run . |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled. |
|
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. |
| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. |
| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. |
## What to Expect: The Metadata Model
@ -343,3 +344,15 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b
```
## Known limitations
+
## Changelog
### Version 0.2.12
- Silencing some chatty warnings in RddPathUtils
### Version 0.2.12
- Add option to lowercase dataset URNs
- Add option to set platform instance and/or env per platform with `spark.datahub.platform.<platform_name>.env` and `spark.datahub.platform.<platform_name>.platform_instance` config parameter
- Fixing platform instance setting for datasets when `spark.datahub.metadata.dataset.platformInstance` is set
- Fixing column level lineage support when patch is enabled

View File

@ -42,6 +42,8 @@ public class SparkConfigParser {
public static final String DATAHUB_FLOW_NAME = "flow_name";
public static final String DATASET_ENV_KEY = "metadata.dataset.env";
public static final String DATASET_HIVE_PLATFORM_ALIAS = "metadata.dataset.hivePlatformAlias";
public static final String DATASET_LOWERCASE_URNS = "metadata.dataset.lowerCaseUrns";
public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize";
public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
public static final String DATASET_INCLUDE_SCHEMA_METADATA =
@ -152,6 +154,7 @@ public class SparkConfigParser {
builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig));
builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig));
builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig));
builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig));
try {
String parentJob = SparkConfigParser.getParentJobKey(sparkConfig);
if (parentJob != null) {
@ -246,15 +249,18 @@ public class SparkConfigParser {
pathSpecBuilder.alias(pathSpecKey);
pathSpecBuilder.platform(key);
if (datahubConfig.hasPath(aliasKey + ".env")) {
pathSpecBuilder.env(datahubConfig.getString(aliasKey + ".env"));
pathSpecBuilder.env(Optional.ofNullable(datahubConfig.getString(aliasKey + ".env")));
}
if (datahubConfig.hasPath(aliasKey + ".platformInstance")) {
if (datahubConfig.hasPath(aliasKey + "." + PLATFORM_INSTANCE_KEY)) {
pathSpecBuilder.platformInstance(
Optional.ofNullable(datahubConfig.getString(aliasKey + ".platformInstance")));
Optional.ofNullable(
datahubConfig.getString(aliasKey + "." + PLATFORM_INSTANCE_KEY)));
}
if (datahubConfig.hasPath(aliasKey + "." + PATH_SPEC_LIST_KEY)) {
pathSpecBuilder.pathSpecList(
Arrays.asList(
datahubConfig.getString(aliasKey + "." + PATH_SPEC_LIST_KEY).split(",")));
}
pathSpecBuilder.pathSpecList(
Arrays.asList(datahubConfig.getString(aliasKey + "." + pathSpecKey).split(",")));
platformSpecs.add(pathSpecBuilder.build());
}
pathSpecMap.put(key, platformSpecs);
@ -264,8 +270,8 @@ public class SparkConfigParser {
}
public static String getPlatformInstance(Config pathSpecConfig) {
return pathSpecConfig.hasPath(PLATFORM_INSTANCE_KEY)
? pathSpecConfig.getString(PLATFORM_INSTANCE_KEY)
return pathSpecConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)
? pathSpecConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY)
: null;
}
@ -341,4 +347,9 @@ public class SparkConfigParser {
return datahubConfig.hasPath(STAGE_METADATA_COALESCING)
&& datahubConfig.getBoolean(STAGE_METADATA_COALESCING);
}
public static boolean isLowerCaseDatasetUrns(Config datahubConfig) {
return datahubConfig.hasPath(DATASET_LOWERCASE_URNS)
&& datahubConfig.getBoolean(DATASET_LOWERCASE_URNS);
}
}

View File

@ -0,0 +1,162 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/
package io.openlineage.spark.agent.util;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.spark.package$;
import org.apache.spark.rdd.HadoopRDD;
import org.apache.spark.rdd.MapPartitionsRDD;
import org.apache.spark.rdd.ParallelCollectionRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.execution.datasources.FileScanRDD;
import scala.Tuple2;
import scala.collection.immutable.Seq;
/** Utility class to extract paths from RDD nodes. */
@Slf4j
public class RddPathUtils {
public static Stream<Path> findRDDPaths(RDD rdd) {
return Stream.<RddPathExtractor>of(
new HadoopRDDExtractor(),
new FileScanRDDExtractor(),
new MapPartitionsRDDExtractor(),
new ParallelCollectionRDDExtractor())
.filter(e -> e.isDefinedAt(rdd))
.findFirst()
.orElse(new UnknownRDDExtractor())
.extract(rdd)
.filter(p -> p != null);
}
static class UnknownRDDExtractor implements RddPathExtractor<RDD> {
@Override
public boolean isDefinedAt(Object rdd) {
return true;
}
@Override
public Stream<Path> extract(RDD rdd) {
// Change to debug to silence error
log.debug("Unknown RDD class {}", rdd);
return Stream.empty();
}
}
static class HadoopRDDExtractor implements RddPathExtractor<HadoopRDD> {
@Override
public boolean isDefinedAt(Object rdd) {
return rdd instanceof HadoopRDD;
}
@Override
public Stream<Path> extract(HadoopRDD rdd) {
org.apache.hadoop.fs.Path[] inputPaths = FileInputFormat.getInputPaths(rdd.getJobConf());
Configuration hadoopConf = rdd.getConf();
return Arrays.stream(inputPaths).map(p -> PlanUtils.getDirectoryPath(p, hadoopConf));
}
}
static class MapPartitionsRDDExtractor implements RddPathExtractor<MapPartitionsRDD> {
@Override
public boolean isDefinedAt(Object rdd) {
return rdd instanceof MapPartitionsRDD;
}
@Override
public Stream<Path> extract(MapPartitionsRDD rdd) {
return findRDDPaths(rdd.prev());
}
}
static class FileScanRDDExtractor implements RddPathExtractor<FileScanRDD> {
@Override
public boolean isDefinedAt(Object rdd) {
return rdd instanceof FileScanRDD;
}
@Override
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
public Stream<Path> extract(FileScanRDD rdd) {
return ScalaConversionUtils.fromSeq(rdd.filePartitions()).stream()
.flatMap(fp -> Arrays.stream(fp.files()))
.map(
f -> {
if ("3.4".compareTo(package$.MODULE$.SPARK_VERSION()) <= 0) {
// filePath returns SparkPath for Spark 3.4
return ReflectionUtils.tryExecuteMethod(f, "filePath")
.map(o -> ReflectionUtils.tryExecuteMethod(o, "toPath"))
.map(o -> (Path) o.get())
.get()
.getParent();
} else {
return parentOf(f.filePath());
}
});
}
}
static class ParallelCollectionRDDExtractor implements RddPathExtractor<ParallelCollectionRDD> {
@Override
public boolean isDefinedAt(Object rdd) {
return rdd instanceof ParallelCollectionRDD;
}
@Override
public Stream<Path> extract(ParallelCollectionRDD rdd) {
try {
Object data = FieldUtils.readField(rdd, "data", true);
log.debug("ParallelCollectionRDD data: {}", data);
if (data instanceof Seq) {
return ScalaConversionUtils.fromSeq((Seq) data).stream()
.map(
el -> {
Path path = null;
if (el instanceof Tuple2) {
// we're able to extract path
path = parentOf(((Tuple2) el)._1.toString());
log.debug("Found input {}", path);
} else {
// Change to debug to silence error
log.debug("unable to extract Path from {}", el.getClass().getCanonicalName());
}
return path;
})
.filter(Objects::nonNull);
} else {
// Changed to debug to silence error
log.debug("Cannot extract path from ParallelCollectionRDD {}", data);
}
} catch (IllegalAccessException | IllegalArgumentException e) {
// Changed to debug to silence error
log.debug("Cannot read data field from ParallelCollectionRDD {}", rdd);
}
return Stream.empty();
}
}
private static Path parentOf(String path) {
try {
return new Path(path).getParent();
} catch (Exception e) {
return null;
}
}
interface RddPathExtractor<T extends RDD> {
boolean isDefinedAt(Object rdd);
Stream<Path> extract(T rdd);
}
}

View File

@ -13,12 +13,14 @@ import io.datahubproject.openlineage.config.DatahubOpenlineageConfig;
import io.datahubproject.openlineage.converter.OpenLineageToDataHub;
import io.datahubproject.openlineage.dataset.DatahubDataset;
import io.datahubproject.openlineage.dataset.DatahubJob;
import io.datahubproject.openlineage.dataset.PathSpec;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import junit.framework.TestCase;
@ -598,4 +600,158 @@ public class OpenLineageEventToDatahubTest extends TestCase {
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}
public void testProcessRedshiftOutputWithPlatformInstance()
throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.hivePlatformAlias("glue");
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.commonDatasetPlatformInstance("my-platform-instance");
String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"),
StandardCharsets.UTF_8);
OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());
assertNotNull(datahubJob);
for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:mysql,my-platform-instance.datahub.metadata_aspect_v2,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:redshift,my-platform-instance.dev.public.spark_redshift_load_test,DEV)",
dataset.getUrn().toString());
assertEquals(
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}
public void testProcessRedshiftOutputWithPlatformSpecificPlatformInstance()
throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.hivePlatformAlias("glue");
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.pathSpecs(
new HashMap<String, List<PathSpec>>() {
{
put(
"redshift",
List.of(
PathSpec.builder()
.platform("redshift")
.platformInstance(Optional.of("my-platform-instance"))
.build()));
}
});
String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"),
StandardCharsets.UTF_8);
OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());
assertNotNull(datahubJob);
for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:redshift,my-platform-instance.dev.public.spark_redshift_load_test,DEV)",
dataset.getUrn().toString());
assertEquals(
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}
public void testProcessRedshiftOutputWithPlatformSpecificEnv()
throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.hivePlatformAlias("glue");
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.pathSpecs(
new HashMap<String, List<PathSpec>>() {
{
put(
"redshift",
List.of(PathSpec.builder().platform("redshift").env(Optional.of("PROD")).build()));
}
});
String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/redshift_lineage_spark.json"),
StandardCharsets.UTF_8);
OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());
assertNotNull(datahubJob);
for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.spark_redshift_load_test,PROD)",
dataset.getUrn().toString());
assertEquals(
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}
public void testProcessRedshiftOutputLowercasedUrns() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.hivePlatformAlias("glue");
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.lowerCaseDatasetUrns(true);
String olEvent =
IOUtils.toString(
this.getClass()
.getResourceAsStream("/ol_events/redshift_mixed_case_lineage_spark.json"),
StandardCharsets.UTF_8);
OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());
assertNotNull(datahubJob);
for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.spark_redshift_load_test,DEV)",
dataset.getUrn().toString());
assertEquals(
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}
}

View File

@ -0,0 +1,147 @@
{
"eventTime": "2024-06-18T06:52:21.64Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE",
"run": {
"runId": "01902a1e-371a-7dbf-8098-2337d441e8dc",
"facets": {
"parent": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet",
"run": {
"runId": "01902a1e-0b05-750e-b38d-439998f7a853"
},
"job": {
"namespace": "default",
"name": "jdbc_test_demo"
}
},
"processing_engine": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet",
"version": "3.3.4",
"name": "spark"
},
"environment-properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"environment-properties": {}
},
"spark_properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"properties": {
"spark.master": "local[*]",
"spark.app.name": "JdbcTest-Demo"
}
}
}
},
"job": {
"namespace": "default",
"name": "jdbc_test_demo.execute_save_into_data_source_command.spark_redshift_load_test",
"facets": {
"jobType": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
"processingType": "BATCH",
"integration": "SPARK",
"jobType": "SQL_JOB"
}
}
},
"inputs": [
{
"namespace": "mysql://localhost:3306",
"name": "DataHub.Metadata_Aspect_V2",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet",
"name": "mysql://localhost:3306",
"uri": "mysql://localhost:3306"
},
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
"fields": [
{
"name": "urn",
"type": "string"
},
{
"name": "aspect",
"type": "string"
},
{
"name": "version",
"type": "long"
},
{
"name": "metadata",
"type": "string"
},
{
"name": "systemmetadata",
"type": "string"
},
{
"name": "createdon",
"type": "timestamp"
},
{
"name": "createdby",
"type": "string"
},
{
"name": "createdfor",
"type": "string"
}
]
}
},
"inputFacets": {}
}
],
"outputs": [
{
"namespace": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439",
"name": "DEV.PuBliC.SparK_RedshifT_Load_Test",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet",
"name": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439",
"uri": "redshift://my-redshift-cluster.us-west-2.redshift.amazonaws.com:5439"
},
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
"fields": [
{
"name": "urn",
"type": "string"
}
]
},
"columnLineage": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-2/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet",
"fields": {
"urn": {
"inputFields": [
{
"namespace": "mysql://localhost:3306",
"name": "datahub.metadata_aspect_v2",
"field": "urn"
}
]
}
}
}
},
"outputFacets": {}
}
]
}