Openlineage 1 31 0 upgrade - Tamas' changes (#13658)

Co-authored-by: treff7es <treff7es@gmail.com>
This commit is contained in:
Jonny Dixon 2025-05-29 20:50:11 +01:00 committed by GitHub
parent 2cee79fd4d
commit cfc05747aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 125 additions and 25 deletions

View File

@ -61,7 +61,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '15.5.2'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.25.0'
ext.openLineageVersion = '1.31.0'
ext.logbackClassicJava8 = '1.2.12'
ext.awsSdk2Version = '2.30.33'

View File

@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co
```text
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.17
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.18
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```
@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080
## spark-submit command line
```sh
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.17 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.18 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
```
### Configuration Instructions: Amazon EMR
@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)
```text
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.17
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.18
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.17")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.18")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
@ -79,7 +79,7 @@ appName("test-application")
config("spark.master","spark://spark-master:7077")
.
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.17")
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.18")
.
config("spark.extraListeners","datahub.spark.DatahubSparkListener")
@ -199,6 +199,7 @@ information like tokens.
| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set |
| spark.datahub.log.mcps | | true | Set this to true to log MCPS to the log. By default, it is enabled. |
| spark.datahub.legacyLineageCleanup.enabled | | false | Set this to true to remove legacy lineages from older Spark Plugin runs. This will remove those lineages from the Datasets which it adds to DataJob. By default, it is disabled. |
| spark.datahub.capture_spark_plan | | false | Set this to true to capture the Spark plan. By default, it is disabled. |
| spark.datahub.metadata.dataset.enableEnhancedMergeIntoExtraction | | false | Set this to true to enable enhanced table name extraction for Delta Lake MERGE INTO commands. This improves lineage tracking by including the target table name in the job name. By default, it is disabled. |
## What to Expect: The Metadata Model
@ -386,6 +387,12 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b
## Changelog
### Version 0.2.18
- _Changes_:
- OpenLineage 1.31.0 upgrade
- Add `spark.datahub.capture_spark_plan` option to capture the Spark plan. By default, it is disabled.
### Version 0.2.17
- _Major changes_:

View File

@ -36,6 +36,7 @@ import java.net.URISyntaxException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.spark.SparkConf;
@ -67,6 +68,7 @@ public class DatahubSparkListener extends SparkListener {
private static ContextFactory contextFactory;
private static CircuitBreaker circuitBreaker = new NoOpCircuitBreaker();
private static final String sparkVersion = package$.MODULE$.SPARK_VERSION();
private final SparkConf conf;
private final Function0<Option<SparkContext>> activeSparkContext =
ScalaConversionUtils.toScalaFn(SparkContext$.MODULE$::getActive);
@ -74,8 +76,10 @@ public class DatahubSparkListener extends SparkListener {
private static MeterRegistry meterRegistry;
private boolean isDisabled;
public DatahubSparkListener() throws URISyntaxException {
listener = new OpenLineageSparkListener();
public DatahubSparkListener(SparkConf conf) throws URISyntaxException {
this.conf = ((SparkConf) Objects.requireNonNull(conf)).clone();
listener = new OpenLineageSparkListener(conf);
}
private static SparkAppContext getSparkAppContext(
@ -255,7 +259,10 @@ public class DatahubSparkListener extends SparkListener {
SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
if (sparkEnv != null) {
log.info("sparkEnv: {}", sparkEnv.conf().toDebugString());
sparkEnv.conf().set("spark.openlineage.facets.disabled", "[spark_unknown;spark.logicalPlan]");
if (datahubConf.hasPath("capture_spark_plan")
&& datahubConf.getBoolean("capture_spark_plan")) {
sparkEnv.conf().set("spark.openlineage.facets.spark.logicalPlan.disabled", "false");
}
}
if (properties != null) {

View File

@ -1,5 +1,5 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* Copyright 2018-2025 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/
@ -173,20 +173,34 @@ public class PlanUtils {
* and namespace.
*
* @param parentRunId
* @param parentJob
* @param parentJobName
* @param parentJobNamespace
* @return
*/
public static OpenLineage.ParentRunFacet parentRunFacet(
UUID parentRunId, String parentJob, String parentJobNamespace) {
UUID parentRunId,
String parentJobName,
String parentJobNamespace,
UUID rootParentRunId,
String rootParentJobName,
String rootParentJobNamespace) {
return new OpenLineage(Versions.OPEN_LINEAGE_PRODUCER_URI)
.newParentRunFacetBuilder()
.run(new OpenLineage.ParentRunFacetRunBuilder().runId(parentRunId).build())
.job(
new OpenLineage.ParentRunFacetJobBuilder()
.name(NameNormalizer.normalize(parentJob))
.name(NameNormalizer.normalize(parentJobName))
.namespace(parentJobNamespace)
.build())
.root(
new OpenLineage.ParentRunFacetRootBuilder()
.run(new OpenLineage.RootRunBuilder().runId(rootParentRunId).build())
.job(
new OpenLineage.RootJobBuilder()
.namespace(rootParentJobNamespace)
.name(rootParentJobName)
.build())
.build())
.build();
}

View File

@ -1,5 +1,5 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* Copyright 2018-2025 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/
@ -23,6 +23,7 @@ import org.apache.spark.sql.execution.datasources.FilePartition;
import org.apache.spark.sql.execution.datasources.FileScanRDD;
import scala.Tuple2;
import scala.collection.immutable.Seq;
import scala.collection.mutable.ArrayBuffer;
/** Utility class to extract paths from RDD nodes. */
@Slf4j
@ -65,6 +66,11 @@ public class RddPathUtils {
public Stream<Path> extract(HadoopRDD rdd) {
org.apache.hadoop.fs.Path[] inputPaths = FileInputFormat.getInputPaths(rdd.getJobConf());
Configuration hadoopConf = rdd.getConf();
if (log.isDebugEnabled()) {
log.debug("Hadoop RDD class {}", rdd.getClass());
log.debug("Hadoop RDD input paths {}", Arrays.toString(inputPaths));
log.debug("Hadoop RDD job conf {}", rdd.getJobConf());
}
return Arrays.stream(inputPaths).map(p -> PlanUtils.getDirectoryPath(p, hadoopConf));
}
}
@ -78,6 +84,9 @@ public class RddPathUtils {
@Override
public Stream<Path> extract(MapPartitionsRDD rdd) {
if (log.isDebugEnabled()) {
log.debug("Parent RDD: {}", rdd.prev());
}
return findRDDPaths(rdd.prev());
}
}
@ -122,7 +131,9 @@ public class RddPathUtils {
try {
Object data = FieldUtils.readField(rdd, "data", true);
log.debug("ParallelCollectionRDD data: {}", data);
if ((data instanceof Seq) && ((Seq) data).head() instanceof Tuple2) {
if ((data instanceof Seq)
&& (!((Seq<?>) data).isEmpty())
&& ((Seq) data).head() instanceof Tuple2) {
// exit if the first element is invalid
Seq data_slice = (Seq) ((Seq) data).slice(0, SEQ_LIMIT);
return ScalaConversionUtils.fromSeq(data_slice).stream()
@ -140,6 +151,11 @@ public class RddPathUtils {
return path;
})
.filter(Objects::nonNull);
} else if ((data instanceof ArrayBuffer) && !((ArrayBuffer<?>) data).isEmpty()) {
ArrayBuffer<?> dataBuffer = (ArrayBuffer<?>) data;
return ScalaConversionUtils.fromSeq(dataBuffer.toSeq()).stream()
.map(o -> parentOf(o.toString()))
.filter(Objects::nonNull);
} else {
// Changed to debug to silence error
log.debug("Cannot extract path from ParallelCollectionRDD {}", data);
@ -156,6 +172,9 @@ public class RddPathUtils {
try {
return new Path(path).getParent();
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Cannot get parent of path {}", path, e);
}
return null;
}
}

View File

@ -1,5 +1,5 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* Copyright 2018-2025 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

View File

@ -1,5 +1,5 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* Copyright 2018-2025 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/
@ -21,6 +21,8 @@ public interface Vendors {
Arrays.asList(
// Add vendor classes here
"io.openlineage.spark.agent.vendor.snowflake.SnowflakeVendor",
"io.openlineage.spark.agent.vendor.iceberg.IcebergVendor",
"io.openlineage.spark.agent.vendor.gcp.GcpVendor",
// This is the only chance we have to add the RedshiftVendor to the list of vendors
"io.openlineage.spark.agent.vendor.redshift.RedshiftVendor");
@ -56,7 +58,7 @@ public interface Vendors {
// and the app
// https://github.com/OpenLineage/OpenLineage/issues/1860
// ServiceLoader<Vendor> serviceLoader = ServiceLoader.load(Vendor.class);
return new VendorsImpl(vendors);
return new VendorsImpl(vendors, new VendorsContext());
}
static Vendors empty() {
@ -71,10 +73,17 @@ public interface Vendors {
public Collection<OpenLineageEventHandlerFactory> getEventHandlerFactories() {
return Collections.emptyList();
}
@Override
public VendorsContext getVendorsContext() {
return new VendorsContext();
}
};
}
Collection<VisitorFactory> getVisitorFactories();
Collection<OpenLineageEventHandlerFactory> getEventHandlerFactories();
VendorsContext getVendorsContext();
}

View File

@ -0,0 +1,28 @@
/*
/* Copyright 2018-2025 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/
package io.openlineage.spark.api;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/** Class to store all the vendors related context information. */
public class VendorsContext {
private final Map<String, Object> contextMap = new HashMap<>();
public void register(String key, Object value) {
contextMap.put(key, value);
}
public Optional<Object> fromVendorsContext(String key) {
return Optional.ofNullable(contextMap.get(key));
}
public boolean contains(String key) {
return contextMap.containsKey(key);
}
;
}

View File

@ -1,29 +1,27 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* Copyright 2018-2025 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/
package io.openlineage.spark.api;
import io.openlineage.spark.agent.lifecycle.VisitorFactory;
import io.openlineage.spark.agent.vendor.redshift.RedshiftVendor;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class VendorsImpl implements Vendors {
private final List<Vendor> vendors;
private final VendorsContext vendorsContext;
public VendorsImpl(List<Vendor> vendors) {
public VendorsImpl(List<Vendor> vendors, VendorsContext vendorsContext) {
this.vendors = vendors;
this.vendorsContext = vendorsContext;
}
@Override
public Collection<VisitorFactory> getVisitorFactories() {
vendors.add(new RedshiftVendor());
return vendors.stream()
.map(Vendor::getVisitorFactory)
.filter(Optional::isPresent)
@ -39,4 +37,9 @@ public class VendorsImpl implements Vendors {
.map(Optional::get)
.collect(Collectors.toList());
}
@Override
public VendorsContext getVendorsContext() {
return vendorsContext;
}
}

View File

@ -67,6 +67,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
@ -80,6 +81,7 @@ import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@Slf4j
public class OpenLineageToDataHub {
@ -605,12 +607,23 @@ public class OpenLineageToDataHub {
forEachValue(airflowProperties, customProperties);
}
break;
case "spark.logicalPlan":
{
if (flowProperties) {
JSONObject jsonObject = new JSONObject(entry.getValue().getAdditionalProperties());
customProperties.put("spark.logicalPlan", jsonObject.toString());
}
}
break;
case "unknownSourceAttribute":
{
if (!flowProperties) {
List<Map<String, Object>> unknownItems =
(List<Map<String, Object>>)
entry.getValue().getAdditionalProperties().get("unknownItems");
entry
.getValue()
.getAdditionalProperties()
.getOrDefault("unknownItems", Collections.emptyList());
for (Map<String, Object> item : unknownItems) {
forEachValue(item, customProperties);
}