feat(spark-lineage): add ability to push data lineage from spark to d… (#3664)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
MugdhaHardikar-GSLab 2021-12-14 01:30:51 +05:30 committed by GitHub
parent 83207b37af
commit 5b68a4dff9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1946 additions and 1 deletions

5
.gitignore vendored
View File

@ -39,6 +39,11 @@ MANIFEST
# Mac OS
**/.DS_Store
#spark-lineage
**/spark-lineage/metastore_db/
**/spark-lineage/**/derby.log
**/spark-lineage/**/hive/
**/spark-lineage/**/out.csv/
.vscode
# Metadata Ingestion Generated

View File

@ -3,7 +3,6 @@ buildscript {
ext.gmaVersion = '0.2.81'
ext.pegasusVersion = '28.3.7'
ext.mavenVersion = '3.6.3'
apply from: './repositories.gradle'
buildscript.repositories.addAll(project.repositories)
dependencies {
@ -11,6 +10,7 @@ buildscript {
classpath 'com.github.node-gradle:gradle-node-plugin:2.2.4'
classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.8.1'
classpath 'org.springframework.boot:spring-boot-gradle-plugin:2.1.4.RELEASE'
classpath 'com.github.jengelman.gradle.plugins:shadow:5.2.0'
}
}
@ -68,6 +68,8 @@ project.ext.externalDependency = [
'guava': 'com.google.guava:guava:27.0.1-jre',
'h2': 'com.h2database:h2:1.4.196',
'hadoopClient': 'org.apache.hadoop:hadoop-client:3.1.1',
'hadoopCommon':'org.apache.hadoop:hadoop-common:2.7.2',
'hadoopMapreduceClient':'org.apache.hadoop:hadoop-mapreduce-client-core:2.7.2',
'hibernateCore': 'org.hibernate:hibernate-core:5.2.16.Final',
'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9',
'iStackCommons': 'com.sun.istack:istack-commons-runtime:4.0.1',
@ -114,6 +116,8 @@ project.ext.externalDependency = [
'rythmEngine': 'org.rythmengine:rythm-engine:1.3.0',
'servletApi': 'javax.servlet:javax.servlet-api:3.1.0',
'shiroCore': 'org.apache.shiro:shiro-core:1.7.1',
'sparkSql' : 'org.apache.spark:spark-sql_2.11:2.4.8',
'sparkHive' : 'org.apache.spark:spark-hive_2.11:2.4.8',
'springBeans': 'org.springframework:spring-beans:5.2.3.RELEASE',
'springContext': 'org.springframework:spring-context:5.2.3.RELEASE',
'springCore': 'org.springframework:spring-core:5.2.3.RELEASE',
@ -129,7 +133,9 @@ project.ext.externalDependency = [
'testng': 'org.testng:testng:7.3.0',
'testContainers': 'org.testcontainers:testcontainers:1.15.1',
'testContainersJunit': 'org.testcontainers:junit-jupiter:1.15.1',
'testContainersPostgresql':'org.testcontainers:postgresql:1.2.0',
'testContainersElasticsearch': 'org.testcontainers:elasticsearch:1.15.3',
'wiremock':'com.github.tomakehurst:wiremock:2.10.0',
'zookeeper': 'org.apache.zookeeper:zookeeper:3.4.14'
]

View File

@ -98,6 +98,7 @@ module.exports = {
"docs/lineage/airflow",
"docker/airflow/local_airflow",
"docs/lineage/sample_code",
"spark-lineage/README",
],
},
{

View File

@ -39,3 +39,4 @@ include 'metadata-perf'
include 'docs-website'
include 'metadata-models-custom'
include 'entity-registry:custom-test-model'
include 'spark-lineage'

88
spark-lineage/README.md Normal file
View File

@ -0,0 +1,88 @@
# Spark lineage emitter
The Spark lineage emitter is a java library that provides a Spark listener implementation "DatahubLineageEmitter". The DatahubLineageEmitter listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage.
## Configuring Spark emitter
Listener configuration can be done using a config file or while creating a spark Session.
### Config file for spark-submit
When running jobs using spark-submit, the listener is to be configured in the config file.
```
spark.master spark://spark-master:7077
#Configuring datahub spark listener jar
spark.jars.packages io.acryl:spark-lineage:0.0.1
spark.extraListeners com.linkedin.datahub.lineage.spark.interceptor.DatahubLineageEmitter
spark.datahub.lineage.mcpEmitter.gmsUrl http://localhost:8080
```
### Configuring with SparkSession Builder for notebooks
When running interactive jobs from a notebook, the listener can be configured while building the Spark Session.
```python
spark = SparkSession.builder \
.master("spark://spark-master:7077") \
.appName("test-application") \
.config("spark.jars.packages","io.acryl:spark-lineage:0.0.1") \
.config("spark.extraListeners","com.linkedin.datahub.lineage.interceptor.spark.DatahubLineageEmitter") \
.config("spark.datahub.lineage.mcpEmitter.gmsUrl", "http://localhost:8080") \
.enableHiveSupport() \
.getOrCreate()
```
## Model mapping
A pipeline is created per Spark <master, appName>.
A task is created per unique Spark query execution within an app.
### Custom properties & relating to Spark UI
The following custom properties in pipelines and tasks relate to the Spark UI:
- appName and appId in a pipeline can be used to determine the Spark application
- description and SQLQueryId in a task can be used to determine the Query Execution within the application on the SQL tab of Spark UI
Other custom properties of pipelines and tasks capture the start and end times of execution etc.
The query plan is captured in the *queryPlan* property of a task.
## Release notes for v0.0.1
In this version, basic dataset-level lineage is captured using the model mapping as mentioned earlier.
### Spark versions supported
The primary version tested is Spark/Scala version 2.4.8/2_11.
We anticipate this to work well with other Spark 2.4.x versions and Scala 2_11.
Support for other Spark versions is planned in the very near future.
### Environments tested with
This initial release has been tested with the following environments:
- spark-submit of Python/Java applications to local and remote servers
- notebooks
Note that testing for other environments such as Databricks and standalone applications is planned in near future.
### Spark commands supported
Below is a list of Spark commands that are parsed currently:
- InsertIntoHadoopFsRelationCommand
- SaveIntoDataSourceCommand (jdbc)
- CreateHiveTableAsSelectCommand
- InsertIntoHiveTable
Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC.
### Spark commands not yet supported
- View related commands
- Cache commands and implications on lineage
- RDD jobs
### Important notes on usage
- It is advisable to ensure appName is used appropriately to ensure you can trace lineage from a pipeline back to your source code.
- If multiple apps with the same appName run concurrently, dataset-lineage will be captured correctly but the custom-properties e.g. app-id, SQLQueryId would be unreliable. We expect this to be quite rare.
- If spark execution fails, then an empty pipeline would still get created, but it may not have any tasks.
- For HDFS sources, the folder (name) is regarded as the dataset (name) to align with typical storage of parquet/csv formats.
## Known limitations
- Only postgres supported for JDBC sources in this initial release. Support for other driver URL formats will be added in future.
- Behavior with cached datasets is not fully specified/defined in context of lineage.
- There is a possibility that very short-lived jobs that run within a few milliseconds may not be captured by the listener. This should not cause an issue for realistic Spark applications.

2
spark-lineage/bin/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/main/
/test/

138
spark-lineage/build.gradle Normal file
View File

@ -0,0 +1,138 @@
apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'maven'
apply plugin: 'signing'
dependencies {
//Needed for tie breaking of guava version need for spark and wiremock
compile(externalDependency.hadoopMapreduceClient) {
force = true
}
compile(externalDependency.hadoopCommon) {
force = true
} // required for org.apache.hadoop.util.StopWatch
compile(externalDependency.commonsIo) {
force = true
} // required for org.apache.commons.io.Charsets that is used internally
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok
implementation(project(':metadata-models')) {
exclude group: "org.antlr"
exclude group: "com.google.guava" // causes issues with Guava Stopwatch constructor
}
implementation(externalDependency.sparkSql){
exclude group: "org.apache.hadoop"
}
implementation(externalDependency.sparkHive){
exclude group: "org.apache.hadoop"
}
testImplementation(externalDependency.postgresql)
testImplementation externalDependency.mockito
testImplementation(externalDependency.wiremock){
exclude group: "com.fasterxml.jackson.core"
} // older version to allow older guava
testImplementation(externalDependency.testContainersPostgresql) // older version to allow older jackson
}
shadowJar {
zip64=true
classifier=''
dependencies {
exclude(dependency("org.apache.hadoop::"))
exclude(dependency("org.apache.spark::"))
exclude(dependency(externalDependency.commonsIo))
}
}
test {
useJUnit()
}
assemble {
dependsOn shadowJar
}
task sourceJar(type: Jar) {
classifier 'sources'
from sourceSets.main.allJava
}
task javadocJar(type: Jar, dependsOn: javadoc) {
classifier 'javadoc'
from javadoc.destinationDir
}
artifacts {
archives shadowJar
}
// uploadArchives {
// repositories {
// mavenDeployer {
// def ossrhUsername = System.getenv('RELEASE_USERNAME')
// def ossrhPassword = System.getenv('RELEASE_PASSWORD')
// beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
// repository(url: "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/") {
// authentication(userName: ossrhUsername, password: ossrhPassword)
// }
// snapshotRepository(url: "https://s01.oss.sonatype.org/content/repositories/snapshots/") {
// authentication(userName: ossrhUsername, password: ossrhPassword)
// }
// pom.project {
// //No need to specify name here. Name is always picked up from project name
// //name 'spark-lineage'
// packaging 'jar'
// // optionally artifactId can be defined here
// description 'Library to push data lineage from spark to datahub'
// url 'https://datahubproject.io'
// scm {
// connection 'scm:git:git://github.com/linkedin/datahub.git'
// developerConnection 'scm:git:ssh://github.com:linkedin/datahub.git'
// url 'https://github.com/linkedin/datahub.git'
// }
// licenses {
// license {
// name 'The Apache License, Version 2.0'
// url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
// }
// }
// developers {
// developer {
// id 'datahub'
// name 'datahub'
//
// }
// }
// }
// }
// }
// }
// signing {
// def signingKey = findProperty("signingKey")
// def signingPassword = findProperty("signingPassword")
// useInMemoryPgpKeys(signingKey, signingPassword)
// sign configurations.archives
// }

View File

@ -0,0 +1,67 @@
package com.linkedin.datahub.lineage.consumer.impl;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import com.linkedin.datahub.lineage.spark.model.LineageConsumer;
import com.linkedin.datahub.lineage.spark.model.LineageEvent;
import com.linkedin.mxe.MetadataChangeProposal;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MCPEmitter implements LineageConsumer {
private static final String GMS_URL_KEY = "spark.datahub.lineage.mcpEmitter.gmsUrl";
private static final String SENTINEL = "moot";
private ConcurrentHashMap<String, RESTEmitter> singleton = new ConcurrentHashMap<>();
private void emit(List<MetadataChangeProposal> mcps) {
RESTEmitter emitter = emitter();
if (emitter != null) {
mcps.forEach(mcp -> {
log.debug("Emitting \n" + mcp);
try {
emitter.emit(mcp);
} catch (IOException e) {
// log error, but don't impact thread
StringWriter s = new StringWriter();
PrintWriter p = new PrintWriter(s);
e.printStackTrace(p);
log.error(s.toString());
p.close();
}
});
}
}
// TODO ideally the impl here should not be tied to Spark; the LineageConsumer
// API needs tweaking to include configs
private RESTEmitter emitter() {
singleton.computeIfAbsent(SENTINEL, x -> {
SparkConf conf = SparkEnv.get().conf();
if (conf.contains(GMS_URL_KEY)) {
String gmsUrl = conf.get(GMS_URL_KEY);
log.debug("REST emitter configured with GMS url " + gmsUrl);
return RESTEmitter.create(gmsUrl);
}
log.error("GMS URL not configured.");
return null;
});
return singleton.get(SENTINEL);
}
@Override
public void accept(LineageEvent evt) {
emit(evt.toMcps());
}
}

View File

@ -0,0 +1,85 @@
package com.linkedin.datahub.lineage.consumer.impl;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.mxe.MetadataChangeProposal;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class RESTEmitter {
private static final JacksonDataTemplateCodec DATA_TEMPLATE_CODEC = new JacksonDataTemplateCodec();
@Getter
private final String gmsUrl;
public void emit(MetadataChangeProposal mcp) throws IOException {
String payloadJson = DATA_TEMPLATE_CODEC.mapToString(mcp.data());
ObjectMapper om = new ObjectMapper();
TypeReference<HashMap<String, Object>> typeRef = new TypeReference<HashMap<String, Object>>() {
};
HashMap<String, Object> o = om.readValue(payloadJson, typeRef);
while (o.values().remove(null)) {
}
payloadJson = om.writeValueAsString(o);
payloadJson = "{" + " \"proposal\" :" + payloadJson + "}";
log.debug("Emitting payload: " + payloadJson + "\n to URL " + this.gmsUrl + "/aspects?action=ingestProposal");
RESTEmitter.makeRequest(this.gmsUrl + "/aspects?action=ingestProposal", "POST", payloadJson);
}
public static boolean makeRequest(String urlStr, String method, String payloadJson) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod(method);
con.setRequestProperty("Content-Type", "application/json");
con.setRequestProperty("X-RestLi-Protocol-Version", "2.0.0");
// con.setRequestProperty("Accept", "application/json");
con.setDoOutput(true);
if (payloadJson != null) {
try (OutputStream os = con.getOutputStream()) {
byte[] input = payloadJson.getBytes("utf-8");
os.write(input, 0, input.length);
}
}
try (BufferedReader br = new BufferedReader(new InputStreamReader(con.getInputStream(), "utf-8"))) {
StringBuilder response = new StringBuilder();
String responseLine = null;
while ((responseLine = br.readLine()) != null) {
response.append(responseLine.trim());
}
log.debug("URL: " + urlStr + " Response: " + response.toString());
}
return true;
}
public boolean testConnection() {
try {
RESTEmitter.makeRequest(this.gmsUrl + "/config", "GET", null);
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
public static RESTEmitter create(String gmsUrl) {
return new RESTEmitter(gmsUrl);
}
}

View File

@ -0,0 +1,290 @@
package com.linkedin.datahub.lineage.spark.interceptor;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.QueryPlan;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import com.google.common.base.Splitter;
import com.linkedin.datahub.lineage.spark.model.AppEndEvent;
import com.linkedin.datahub.lineage.spark.model.AppStartEvent;
import com.linkedin.datahub.lineage.spark.model.DatasetLineage;
import com.linkedin.datahub.lineage.spark.model.LineageConsumer;
import com.linkedin.datahub.lineage.spark.model.SQLQueryExecEndEvent;
import com.linkedin.datahub.lineage.spark.model.SQLQueryExecStartEvent;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
import lombok.extern.slf4j.Slf4j;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractPartialFunction;
@Slf4j
public class DatahubLineageEmitter extends SparkListener {
private static final int THREAD_CNT = 10;
public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes";
private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap<>();
private final Map<String, ExecutorService> appPoolDetails = new ConcurrentHashMap<>();
// private static LineageConsumer loggingConsumer() {
// log.warn("Lineage consumer not specified. Defaulting to LoggingConsumer.");
// return LineageUtils.LOGGING_CONSUMER;
// }
private class SqlStartTask implements Runnable {
private SparkListenerSQLExecutionStart sqlStart;
private SparkContext ctx;
private LogicalPlan plan;
public SqlStartTask(SparkListenerSQLExecutionStart sqlStart, LogicalPlan plan, SparkContext ctx) {
this.sqlStart = sqlStart;
this.plan = plan;
this.ctx = ctx;
}
@Override
public void run() {
appSqlDetails.get(ctx.appName()).put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
log.debug("PLAN for execution id: " + ctx.appName() + ":" + sqlStart.executionId() + "\n");
log.debug(plan.toString());
DatasetExtractor extractor = new DatasetExtractor();
Optional<? extends SparkDataset> outputDS = extractor.asDataset(plan, ctx, true);
if (!outputDS.isPresent()) {
log.debug("Skipping execution as no output dataset present for execution id: " + ctx.appName() + ":"
+ sqlStart.executionId());
return;
}
DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get());
Collection<QueryPlan<?>> allInners = new ArrayList<>();
plan.collect(new AbstractPartialFunction<LogicalPlan, Void>() {
@Override
public Void apply(LogicalPlan plan) {
log.debug("CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n");
Optional<? extends SparkDataset> inputDS = extractor.asDataset(plan, ctx, false);
inputDS.ifPresent(x -> lineage.addSource(x));
allInners.addAll(JavaConversions.asJavaCollection(plan.innerChildren()));
return null;
}
@Override
public boolean isDefinedAt(LogicalPlan x) {
return true;
}
});
for (QueryPlan<?> qp : allInners) {
if (!(qp instanceof LogicalPlan)) {
continue;
}
LogicalPlan nestedPlan = (LogicalPlan) qp;
nestedPlan.collect(new AbstractPartialFunction<LogicalPlan, Void>() {
@Override
public Void apply(LogicalPlan plan) {
log.debug("INNER CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n");
Optional<? extends SparkDataset> inputDS = extractor.asDataset(plan, ctx, false);
inputDS.ifPresent(
x -> log.debug("source added for " + ctx.appName() + "/" + sqlStart.executionId() + ": " + x));
inputDS.ifPresent(x -> lineage.addSource(x));
return null;
}
@Override
public boolean isDefinedAt(LogicalPlan x) {
return true;
}
});
}
SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(),
ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);
appSqlDetails.get(ctx.appName()).put(sqlStart.executionId(), evt);
consumers().forEach(c -> c.accept(evt)); // TODO parallel stream here?
log.debug("LINEAGE \n" + lineage + "\n");
log.debug("Parsed execution id " + ctx.appName() + ":" + sqlStart.executionId());
return;
}
}
@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
try {
log.debug("App started: " + applicationStart);
LineageUtils.findSparkCtx().foreach(new AbstractFunction1<SparkContext, Void>() {
@Override
public Void apply(SparkContext sc) {
String appId = applicationStart.appId().isDefined() ? applicationStart.appId().get() : "";
AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(sc), applicationStart.appName(), appId,
applicationStart.time(), applicationStart.sparkUser());
consumers().forEach(x -> x.accept(evt));
// TODO keyed by appName; only latest will be considered. Potential
// inconsistencies not mapped.
appDetails.put(applicationStart.appName(), evt);
appSqlDetails.put(applicationStart.appName(), new ConcurrentHashMap<>());
ExecutorService pool = Executors.newFixedThreadPool(THREAD_CNT);
appPoolDetails.put(applicationStart.appName(), pool);
return null;
}
});
super.onApplicationStart(applicationStart);
} catch (Exception e) {
// log error, but don't impact thread
StringWriter s = new StringWriter();
PrintWriter p = new PrintWriter(s);
e.printStackTrace(p);
log.error(s.toString());
p.close();
}
}
@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
try {
LineageUtils.findSparkCtx().foreach(new AbstractFunction1<SparkContext, Void>() {
@Override
public Void apply(SparkContext sc) {
log.debug("Application end event received for appId :" + sc.appName());
AppStartEvent start = appDetails.remove(sc.appName());
appPoolDetails.remove(sc.appName()).shutdown();
appSqlDetails.remove(sc.appName());
if (start == null) {
log.error(
"Application end event received, but start event missing for appId " + sc.applicationId());
} else {
AppEndEvent evt = new AppEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(),
applicationEnd.time(), start);
consumers().forEach(x -> x.accept(evt));
}
return null;
}
});
super.onApplicationEnd(applicationEnd);
} catch (Exception e) {
// log error, but don't impact thread
StringWriter s = new StringWriter();
PrintWriter p = new PrintWriter(s);
e.printStackTrace(p);
log.error(s.toString());
p.close();
}
}
@Override
public void onOtherEvent(SparkListenerEvent event) {
try {
if (event instanceof SparkListenerSQLExecutionStart) {
SparkListenerSQLExecutionStart sqlEvt = (SparkListenerSQLExecutionStart) event;
log.debug("SQL Exec start event with id " + sqlEvt.executionId());
processExecution(sqlEvt);
} else if (event instanceof SparkListenerSQLExecutionEnd) {
SparkListenerSQLExecutionEnd sqlEvt = (SparkListenerSQLExecutionEnd) event;
log.debug("SQL Exec end event with id " + sqlEvt.executionId());
processExecutionEnd(sqlEvt);
}
} catch (Exception e) {
// log error, but don't impact thread
StringWriter s = new StringWriter();
PrintWriter p = new PrintWriter(s);
e.printStackTrace(p);
log.error(s.toString());
p.close();
}
}
public void processExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
LineageUtils.findSparkCtx().foreach(new AbstractFunction1<SparkContext, Void>() {
@Override
public Void apply(SparkContext sc) {
SQLQueryExecStartEvent start = appSqlDetails.get(sc.appName()).remove(sqlEnd.executionId());
if (start == null) {
log.error("Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId()
+ ":" + sqlEnd.executionId());
} else if (start.getDatasetLineage() != null) {
// JobStatus status = jobEnd.jobResult().equals(org.apache.spark.scheduler.JobSucceeded$.MODULE$)
// ? JobStatus.COMPLETED
// : JobStatus.FAILED;
SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(),
sc.applicationId(),
sqlEnd.time(), sqlEnd.executionId(), start);
consumers().forEach(x -> x.accept(evt));
}
return null;
}
});
}
// TODO sqlEvt.details() unused
private void processExecution(SparkListenerSQLExecutionStart sqlStart) {
QueryExecution queryExec = SQLExecution.getQueryExecution(sqlStart.executionId());
if (queryExec == null) {
log.error("Skipping processing for sql exec Id" + sqlStart.executionId() + " as Query execution context could not be read from current spark state");
return;
}
LogicalPlan plan = queryExec.optimizedPlan();
SparkSession sess = queryExec.sparkSession();
SparkContext ctx = sess.sparkContext();
ExecutorService pool = appPoolDetails.get(ctx.appName());
pool.execute(new SqlStartTask(sqlStart, plan, ctx));
}
private static List<LineageConsumer> consumers() {
SparkConf conf = SparkEnv.get().conf();
if (conf.contains(CONSUMER_TYPE_KEY)) {
String consumerTypes = conf.get(CONSUMER_TYPE_KEY);
return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false)
.map(x -> LineageUtils.getConsumer(x)).filter(x -> x != null).collect(Collectors.toList());
} else {
return Collections.singletonList(LineageUtils.getConsumer("mcpEmitter"));
}
}
}

View File

@ -0,0 +1,156 @@
package com.linkedin.datahub.lineage.spark.interceptor;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand;
import org.apache.spark.sql.execution.datasources.HadoopFsRelation;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import org.apache.spark.sql.execution.datasources.LogicalRelation;
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation;
import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand;
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.sources.BaseRelation;
import com.google.common.collect.ImmutableSet;
import com.linkedin.datahub.lineage.spark.model.dataset.CatalogTableDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.HdfsPathDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.JdbcDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
import scala.Option;
import scala.collection.JavaConversions;
public class DatasetExtractor {
private static final Map<Class<? extends LogicalPlan>, PlanToDataset> PLAN_TO_DATASET = new HashMap<>();
private static final Map<Class<? extends BaseRelation>, RelationToDataset> REL_TO_DATASET = new HashMap<>();
private static final Set<Class<? extends LogicalPlan>> OUTPUT_CMD = ImmutableSet
.of(InsertIntoHadoopFsRelationCommand.class, SaveIntoDataSourceCommand.class,
CreateDataSourceTableAsSelectCommand.class, CreateHiveTableAsSelectCommand.class,
InsertIntoHiveTable.class);
// TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand
private static interface PlanToDataset {
Optional<? extends SparkDataset> fromPlanNode(LogicalPlan plan, SparkContext ctx);
}
private static interface RelationToDataset {
Optional<? extends SparkDataset> fromRelation(BaseRelation rel, SparkContext ctx);
}
static {
PLAN_TO_DATASET.put(InsertIntoHadoopFsRelationCommand.class, (p, ctx) -> {
InsertIntoHadoopFsRelationCommand cmd = (InsertIntoHadoopFsRelationCommand) p;
if (cmd.catalogTable().isDefined()) {
return Optional.of(new CatalogTableDataset(cmd.catalogTable().get()));
}
return Optional.of(new HdfsPathDataset(cmd.outputPath()));
});
PLAN_TO_DATASET.put(LogicalRelation.class, (p, ctx) -> {
BaseRelation baseRel = ((LogicalRelation) p).relation();
if (!REL_TO_DATASET.containsKey(baseRel.getClass())) {
return Optional.empty();
}
return REL_TO_DATASET.get(baseRel.getClass()).fromRelation(baseRel, ctx);
});
PLAN_TO_DATASET.put(SaveIntoDataSourceCommand.class, (p, ctx) -> {
/*
* BaseRelation relation; if (((SaveIntoDataSourceCommand) p).dataSource()
* instanceof RelationProvider) { RelationProvider relProvider =
* (RelationProvider) ((SaveIntoDataSourceCommand) p).dataSource(); relation =
* relProvider.createRelation(ctx, ((SaveIntoDataSourceCommand) p).options()); }
* else { SchemaRelationProvider relProvider = (SchemaRelationProvider)
* ((SaveIntoDataSourceCommand) p).dataSource(); relation =
* p.createRelation(ctx, ((SaveIntoDataSourceCommand) p).options(), p.schema());
* }
*/
SaveIntoDataSourceCommand cmd = (SaveIntoDataSourceCommand) p;
Map<String, String> options = JavaConversions.mapAsJavaMap(cmd.options());
String url = options.get("url"); // e.g. jdbc:postgresql://localhost:5432/sparktestdb
if (!url.contains("jdbc")) {
return Optional.empty();
}
String tbl = options.get("dbtable");
return Optional.of(new JdbcDataset(url, tbl));
});
PLAN_TO_DATASET.put(CreateDataSourceTableAsSelectCommand.class, (p, ctx) -> {
CreateDataSourceTableAsSelectCommand cmd = (CreateDataSourceTableAsSelectCommand) p;
// TODO what of cmd.mode()
return Optional.of(new CatalogTableDataset(cmd.table()));
});
PLAN_TO_DATASET.put(CreateHiveTableAsSelectCommand.class, (p, ctx) -> {
CreateHiveTableAsSelectCommand cmd = (CreateHiveTableAsSelectCommand) p;
return Optional.of(new CatalogTableDataset(cmd.tableDesc()));
});
PLAN_TO_DATASET.put(InsertIntoHiveTable.class, (p, ctx) -> {
InsertIntoHiveTable cmd = (InsertIntoHiveTable) p;
return Optional.of(new CatalogTableDataset(cmd.table()));
});
PLAN_TO_DATASET.put(HiveTableRelation.class, (p, ctx) -> {
HiveTableRelation cmd = (HiveTableRelation) p;
return Optional.of(new CatalogTableDataset(cmd.tableMeta()));
});
REL_TO_DATASET.put(HadoopFsRelation.class, (r, ctx) -> {
List<Path> res = JavaConversions.asJavaCollection(((HadoopFsRelation) r).location().rootPaths()).stream()
.map(p -> getDirectoryPath(p, ctx.hadoopConfiguration()))
.distinct()
.collect(Collectors.toList());
// TODO mapping to URN TBD
return Optional.of(new HdfsPathDataset(res.get(0)));
});
REL_TO_DATASET.put(JDBCRelation.class, (r, ctx) -> {
JDBCRelation rel = (JDBCRelation) r;
Option<String> tbl = rel.jdbcOptions().parameters().get(JDBCOptions.JDBC_TABLE_NAME());
if (tbl.isEmpty()) {
return Optional.empty();
}
return Optional.of(new JdbcDataset(rel.jdbcOptions().url(), tbl.get()));
});
}
Optional<? extends SparkDataset> asDataset(LogicalPlan logicalPlan, SparkContext ctx, boolean outputNode) {
if (!outputNode && OUTPUT_CMD.contains(logicalPlan.getClass())) {
return Optional.empty();
}
if (!PLAN_TO_DATASET.containsKey(logicalPlan.getClass())) {
return Optional.empty();
}
return PLAN_TO_DATASET.get(logicalPlan.getClass()).fromPlanNode(logicalPlan, ctx);
}
private static Path getDirectoryPath(Path p, Configuration hadoopConf) {
try {
if (p.getFileSystem(hadoopConf).getFileStatus(p).isFile()) {
return p.getParent();
} else {
return p;
}
} catch (IOException e) {
// log.warn("Unable to get file system for path ", e);
return p;
}
}
}

View File

@ -0,0 +1,125 @@
package com.linkedin.datahub.lineage.spark.interceptor;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.sql.SparkSession;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.data.ByteString;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.datahub.lineage.consumer.impl.MCPEmitter;
import com.linkedin.datahub.lineage.spark.model.LineageConsumer;
import com.linkedin.mxe.GenericAspect;
import lombok.extern.slf4j.Slf4j;
import scala.Option;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
@Slf4j
public class LineageUtils {
private static final JacksonDataTemplateCodec DATA_TEMPLATE_CODEC = new JacksonDataTemplateCodec();
private static Map<String, LineageConsumer> consumers = new ConcurrentHashMap<>();
public static final LineageConsumer LOGGING_CONSUMER = (x -> log.info(x.toString()));
// hook for replacing paths during testing. Not the cleanest way, TODO improve.
/* This is for generating urn from a hash of the plan */
// private static Function<String, String> PATH_REPLACER = (x -> x);
static {
// system defined consumers
registerConsumer("mcpEmitter", new MCPEmitter());
}
private LineageUtils() {
}
// overwrites existing consumer entry of same type
public static void registerConsumer(String consumerType, LineageConsumer consumer) {
consumers.put(consumerType, consumer);
}
public static LineageConsumer getConsumer(String consumerType) {
return consumers.get(consumerType);
}
public static DataFlowUrn flowUrn(String master, String appName) {
return new DataFlowUrn("spark", appName, master.replaceAll(":", "_").replaceAll("/", "_").replaceAll("[_]+", "_"));
}
// Taken from GenericAspectUtils
public static GenericAspect serializeAspect(@Nonnull RecordTemplate aspect) {
GenericAspect genericAspect = new GenericAspect();
try {
String aspectStr = DATA_TEMPLATE_CODEC.mapToString(aspect.data());
genericAspect.setValue(
ByteString.unsafeWrap(aspectStr.getBytes(StandardCharsets.UTF_8)));
genericAspect.setContentType("application/json");
return genericAspect;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static Option<SparkContext> findSparkCtx() {
return SparkSession.getActiveSession()
.map(new AbstractFunction1<SparkSession, SparkContext>() {
@Override
public SparkContext apply(SparkSession sess) {
return sess.sparkContext();
}
})
.orElse(new AbstractFunction0<Option<SparkContext>>() {
@Override
public Option<SparkContext> apply() {
return SparkContext$.MODULE$.getActive();
}
});
}
public static String getMaster(SparkContext ctx) {
return ctx.conf().get("spark.master");
}
/* This is for generating urn from a hash of the plan */
/*
public static String scrubPlan(String plan) {
String s = plan.replaceAll("#[0-9]*", "");
s = s.replaceAll("JdbcRelationProvider@[0-9a-zA-Z]*,", "JdbcRelationProvider,");
s = s.replaceAll("InMemoryFileIndex@[0-9a-zA-Z]*,", "InMemoryFileIndex,");
s = s.replaceAll("Created Time:[^\n]+\n", "");
s = s.replaceAll("Last Access:[^\n]+\n", "");
s = s.replaceAll("Owner:[^\n]+\n", "");
s = s.replaceAll("Statistics:[^\n]+\n", "");
s = s.replaceAll("Table Properties:[^\n]+\n", "");
// System.out.println("CLEAN: " + s);
return s;
}
public static void setPathReplacer(Function<String, String> replacer) {
PATH_REPLACER = replacer;
}
public static String hash(String s) {
s = PATH_REPLACER.apply(s);
log.debug("PATH REPLACED " + s);
return Hashing.md5().hashString(s, Charset.forName("US-ASCII")).toString();
}
*/
}

View File

@ -0,0 +1,46 @@
package com.linkedin.datahub.lineage.spark.model;
import java.util.Arrays;
import java.util.List;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils;
import com.linkedin.datajob.DataFlowInfo;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.mxe.MetadataChangeProposal;
import lombok.Getter;
import lombok.ToString;
@ToString
@Getter
public class AppEndEvent extends LineageEvent {
private final AppStartEvent start;
public AppEndEvent(String master, String appName, String appId, long time, AppStartEvent start) {
super(master, appName, appId, time);
this.start = start;
}
@Override
public List<MetadataChangeProposal> toMcps() {
DataFlowUrn flowUrn = LineageUtils.flowUrn(getMaster(), getAppName());
StringMap customProps = start.customProps();
customProps.put("completedAt", timeStr());
DataFlowInfo flowInfo = new DataFlowInfo()
.setName(getAppName())
.setCustomProperties(customProps);
MetadataChangeProposal mcpFlowInfo = new MetadataChangeProposal();
mcpFlowInfo.setAspectName("dataFlowInfo");
mcpFlowInfo.setAspect(LineageUtils.serializeAspect(flowInfo));
mcpFlowInfo.setEntityUrn(flowUrn);
mcpFlowInfo.setEntityType("dataFlow");
mcpFlowInfo.setChangeType(ChangeType.UPSERT);
return Arrays.asList(mcpFlowInfo);
}
}

View File

@ -0,0 +1,52 @@
package com.linkedin.datahub.lineage.spark.model;
import java.util.Arrays;
import java.util.List;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils;
import com.linkedin.datajob.DataFlowInfo;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.mxe.MetadataChangeProposal;
import lombok.Getter;
import lombok.ToString;
@ToString
@Getter
public class AppStartEvent extends LineageEvent {
private final String sparkUser;
public AppStartEvent(String master, String appName, String appId, long time, String sparkUser) {
super(master, appName, appId, time);
this.sparkUser = sparkUser;
}
@Override
public List<MetadataChangeProposal> toMcps() {
DataFlowUrn flowUrn = LineageUtils.flowUrn(getMaster(), getAppName());
DataFlowInfo flowInfo = new DataFlowInfo()
.setName(getAppName())
.setCustomProperties(customProps());
MetadataChangeProposal mcpFlowInfo = new MetadataChangeProposal();
mcpFlowInfo.setAspectName("dataFlowInfo");
mcpFlowInfo.setAspect(LineageUtils.serializeAspect(flowInfo));
mcpFlowInfo.setEntityUrn(flowUrn);
mcpFlowInfo.setEntityType("dataFlow");
mcpFlowInfo.setChangeType(ChangeType.UPSERT);
return Arrays.asList(mcpFlowInfo);
}
StringMap customProps() {
StringMap customProps = new StringMap();
customProps.put("startedAt", timeStr());
customProps.put("appId", getAppId());
customProps.put("appName", getAppName());
customProps.put("sparkUser", sparkUser);
return customProps;
}
}

View File

@ -0,0 +1,35 @@
package com.linkedin.datahub.lineage.spark.model;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
@RequiredArgsConstructor
@ToString
public class DatasetLineage {
private final Set<SparkDataset> sources = new HashSet<>();
@Getter
private final String callSiteShort;
@Getter
private final String plan;
@Getter
private final SparkDataset sink;
public void addSource(SparkDataset source) {
sources.add(source);
}
public Set<SparkDataset> getSources() {
return Collections.unmodifiableSet(sources);
}
}

View File

@ -0,0 +1,6 @@
package com.linkedin.datahub.lineage.spark.model;
import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue.Consumer;
public interface LineageConsumer extends Consumer<LineageEvent> {
}

View File

@ -0,0 +1,22 @@
package com.linkedin.datahub.lineage.spark.model;
import java.util.Date;
import java.util.List;
import com.linkedin.mxe.MetadataChangeProposal;
import lombok.Data;
@Data
public abstract class LineageEvent {
private final String master;
private final String appName;
private final String appId;
private final long time;
public abstract List<MetadataChangeProposal> toMcps();
protected String timeStr() {
return new Date(getTime()).toInstant().toString();
}
}

View File

@ -0,0 +1,47 @@
package com.linkedin.datahub.lineage.spark.model;
import java.util.Arrays;
import java.util.List;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils;
import com.linkedin.datajob.DataJobInfo;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.mxe.MetadataChangeProposal;
import lombok.Getter;
import lombok.ToString;
@ToString
@Getter
public class SQLQueryExecEndEvent extends LineageEvent {
private final long sqlQueryExecId;
private final SQLQueryExecStartEvent start;
public SQLQueryExecEndEvent(String master, String appName, String appId, long time, long sqlQueryExecId, SQLQueryExecStartEvent start) {
super(master, appName, appId, time);
this.sqlQueryExecId = sqlQueryExecId;
this.start = start;
}
@Override
public List<MetadataChangeProposal> toMcps() {
DataJobUrn jobUrn = start.jobUrn();
StringMap customProps = start.customProps();
customProps.put("completedAt", timeStr());
DataJobInfo jobInfo = start.jobInfo()
.setCustomProperties(customProps);
MetadataChangeProposal mcpJobInfo = new MetadataChangeProposal();
mcpJobInfo.setAspectName("dataJobInfo");
mcpJobInfo.setAspect(LineageUtils.serializeAspect(jobInfo));
mcpJobInfo.setEntityUrn(jobUrn);
mcpJobInfo.setEntityType("dataJob");
mcpJobInfo.setChangeType(ChangeType.UPSERT);
return Arrays.asList(mcpJobInfo);
}
}

View File

@ -0,0 +1,119 @@
package com.linkedin.datahub.lineage.spark.model;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import com.linkedin.common.DatasetUrnArray;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
import com.linkedin.datajob.DataJobInfo;
import com.linkedin.datajob.DataJobInputOutput;
import com.linkedin.datajob.JobStatus;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.mxe.MetadataChangeProposal;
import lombok.Getter;
import lombok.ToString;
@ToString
@Getter
public class SQLQueryExecStartEvent extends LineageEvent {
private final long sqlQueryExecId;
private final DatasetLineage datasetLineage;
public SQLQueryExecStartEvent(String master, String appName, String appId, long time, long sqlQueryExecId,
DatasetLineage datasetLineage) {
super(master, appName, appId, time);
this.sqlQueryExecId = sqlQueryExecId;
this.datasetLineage = datasetLineage;
}
@Override
public List<MetadataChangeProposal> toMcps() {
DataJobUrn jobUrn = jobUrn();
MetadataChangeProposal mcpJobIO = new MetadataChangeProposal();
mcpJobIO.setAspectName("dataJobInputOutput");
mcpJobIO.setAspect(LineageUtils.serializeAspect(jobIO()));
mcpJobIO.setEntityUrn(jobUrn);
mcpJobIO.setEntityType("dataJob");
mcpJobIO.setChangeType(ChangeType.UPSERT);
DataJobInfo jobInfo = jobInfo();
jobInfo.setCustomProperties(customProps());
jobInfo.setStatus(JobStatus.IN_PROGRESS);
MetadataChangeProposal mcpJobInfo = new MetadataChangeProposal();
mcpJobInfo.setAspectName("dataJobInfo");
mcpJobInfo.setAspect(LineageUtils.serializeAspect(jobInfo));
mcpJobInfo.setEntityUrn(jobUrn);
mcpJobInfo.setEntityType("dataJob");
mcpJobInfo.setChangeType(ChangeType.UPSERT);
return Arrays.asList(mcpJobIO, mcpJobInfo);
}
DataJobInfo jobInfo() {
return new DataJobInfo()
.setName(datasetLineage.getCallSiteShort())
.setType(DataJobInfo.Type.create("sparkJob"));
}
DataJobUrn jobUrn() {
/* This is for generating urn from a hash of the plan */
/*
* Set<String> sourceUrns = datasetLineage.getSources() .parallelStream() .map(x
* -> x.urn().toString()) .collect(Collectors.toSet()); sourceUrns = new
* TreeSet<>(sourceUrns); //sort for consistency
*
* String sinkUrn = datasetLineage.getSink().urn().toString(); String plan =
* LineageUtils.scrubPlan(datasetLineage.getPlan()); String id =
* Joiner.on(",").join(sinkUrn, sourceUrns, plan);
*
* return new DataJobUrn(flowUrn(), "planHash_" + LineageUtils.hash(id));
*/
return new DataJobUrn(flowUrn(), "QueryExecId_" + sqlQueryExecId);
}
DataFlowUrn flowUrn() {
return LineageUtils.flowUrn(getMaster(), getAppName());
}
StringMap customProps() {
StringMap customProps = new StringMap();
customProps.put("startedAt", timeStr());
customProps.put("description", datasetLineage.getCallSiteShort());
customProps.put("SQLQueryId", Long.toString(sqlQueryExecId));
customProps.put("appId", getAppId());
customProps.put("appName", getAppName());
customProps.put("queryPlan", datasetLineage.getPlan());
return customProps;
}
private DataJobInputOutput jobIO() {
DatasetUrnArray out = new DatasetUrnArray();
out.add(datasetLineage.getSink().urn());
DatasetUrnArray in = new DatasetUrnArray();
Set<SparkDataset> sources = new TreeSet<>(new Comparator<SparkDataset>() {
@Override
public int compare(SparkDataset x, SparkDataset y) {
return x.urn().toString().compareTo(y.urn().toString());
}
});
sources.addAll(datasetLineage.getSources()); // maintain ordering
for (SparkDataset source : sources) {
in.add(source.urn());
}
DataJobInputOutput io = new DataJobInputOutput().setInputDatasets(in).setOutputDatasets(out);
return io;
}
}

View File

@ -0,0 +1,29 @@
package com.linkedin.datahub.lineage.spark.model.dataset;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@EqualsAndHashCode
@ToString
public class CatalogTableDataset implements SparkDataset {
private final DatasetUrn urn;
public CatalogTableDataset(CatalogTable table) {
this(table.qualifiedName());
}
public CatalogTableDataset(String dsName) {
this.urn = new DatasetUrn(new DataPlatformUrn("hive"), dsName, FabricType.PROD);
}
@Override
public DatasetUrn urn() {
return this.urn;
}
}

View File

@ -0,0 +1,31 @@
package com.linkedin.datahub.lineage.spark.model.dataset;
import org.apache.hadoop.fs.Path;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@EqualsAndHashCode
@ToString
public class HdfsPathDataset implements SparkDataset {
private final DatasetUrn urn;
public HdfsPathDataset(Path path) {
// TODO check static partitions?
this(path.toUri().toString());
}
public HdfsPathDataset(String pathUri) {
// TODO check static partitions?
this.urn = new DatasetUrn(new DataPlatformUrn("hdfs"), pathUri, FabricType.PROD);
}
@Override
public DatasetUrn urn() {
return this.urn;
}
}

View File

@ -0,0 +1,39 @@
package com.linkedin.datahub.lineage.spark.model.dataset;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@EqualsAndHashCode
@ToString
public class JdbcDataset implements SparkDataset {
private final DatasetUrn urn;
public JdbcDataset(String url, String tbl) {
this.urn = new DatasetUrn(new DataPlatformUrn(platformName(url)), dsName(url, tbl), FabricType.PROD);
}
@Override
public DatasetUrn urn() {
return this.urn;
}
private static String platformName(String url) {
if (url.contains("postgres")) {
return "postgres";
}
return "unknownJdbc";
}
private static String dsName(String url, String tbl) {
url = url.replaceFirst("jdbc:", "");
if (url.contains("postgres")) {
url = url.substring(url.lastIndexOf('/') + 1);
}
// TODO different DBs have different formats. TBD mapping to data source names
return url + "." + tbl;
}
}

View File

@ -0,0 +1,7 @@
package com.linkedin.datahub.lineage.spark.model.dataset;
import com.linkedin.common.urn.DatasetUrn;
public interface SparkDataset {
DatasetUrn urn();
}

View File

@ -0,0 +1,522 @@
package com.linkedin.datahub.lineage;
import static com.github.tomakehurst.wiremock.client.WireMock.ok;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.testcontainers.containers.PostgreSQLContainer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.MappingBuilder;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.Admin;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.extension.Parameters;
import com.github.tomakehurst.wiremock.extension.PostServeAction;
import com.github.tomakehurst.wiremock.matching.MatchResult;
import com.github.tomakehurst.wiremock.matching.StringValuePattern;
import com.github.tomakehurst.wiremock.stubbing.ServeEvent;
import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils;
import com.linkedin.datahub.lineage.spark.model.DatasetLineage;
import com.linkedin.datahub.lineage.spark.model.LineageConsumer;
import com.linkedin.datahub.lineage.spark.model.LineageEvent;
import com.linkedin.datahub.lineage.spark.model.SQLQueryExecStartEvent;
import com.linkedin.datahub.lineage.spark.model.dataset.CatalogTableDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.HdfsPathDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.JdbcDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
public class TestSparkJobsLineage {
private static final boolean MOCK_GMS = Boolean.valueOf("true"); // if false, MCPs get written to real GMS server (see GMS_PORT)
private static final boolean VERIFY_EXPECTED = MOCK_GMS && Boolean.valueOf("true"); // if false, "expected" JSONs are overwritten.
private static final String APP_NAME = "sparkTestApp";
private static final String RESOURCE_DIR = "src/test/resources";
private static final String DATA_DIR = RESOURCE_DIR + "/data";
private static final String WAREHOUSE_LOC = DATA_DIR + "/hive/warehouse";
private static final String TEST_DB = "sparktestdb";
private static final String MASTER = "local";
private static final int N = 3; // num of GMS requests per spark job
private static final int GMS_PORT = MOCK_GMS ? 8089 : 8080;
private static final String EXPECTED_JSON_ROOT = "src/test/resources/expected/";
private static SparkSession spark;
private static Properties jdbcConnnProperties;
private static DatasetLineageAccumulator acc;
@SuppressWarnings("rawtypes")
private static final class McpContentPattern extends StringValuePattern {
public McpContentPattern(String expectedValue) {
super(expectedValue);
}
// dataflow case, we do not match against expected string
public McpContentPattern() {
super("");
}
@Override
public MatchResult match(String actual) {
if (actual.contains("dataJobInputOutput")) {
return expectedValue.contains(relPaths(actual)) ? MatchResult.exactMatch() : MatchResult.noMatch();
}
try {
HashMap body = new ObjectMapper().readValue(actual, HashMap.class);
HashMap proposal = (HashMap) body.get("proposal");
String aspectName = (String) proposal.get("aspectName");
if (aspectName.equals("dataFlowInfo")) {
return checkFlowInfo(proposal);
}
if (actual.contains("dataJobInfo")) {
return checkJobInfo(proposal);
}
return MatchResult.noMatch();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private MatchResult checkJobInfo(HashMap proposal) {
// TODO check custom props etc.
return MatchResult.exactMatch();
}
private MatchResult checkFlowInfo(HashMap proposal) {
// TODO check custom props etc.
return MatchResult.exactMatch();
}
}
private static String relPaths(String s) {
return s.replaceAll("file:[0-9|a-z|A-Z|\\-|\\/|_|\\.]*" + RESOURCE_DIR, "file:/" + RESOURCE_DIR);
}
private static final class RequestFileWriter extends PostServeAction {
@Override
public String getName() {
return "writeReqJson";
}
@SuppressWarnings({ "rawtypes", "unused" })
public void doAction(ServeEvent serveEvent, Admin admin, Parameters parameters) {
String currentTestRes = parameters.getString("filename");
if (currentTestRes == null || VERIFY_EXPECTED) {
return;
}
Path p = Paths.get(EXPECTED_JSON_ROOT, currentTestRes);
System.out.println("Writing json to file " + p);
String json = serveEvent.getRequest().getBodyAsString();
try {
HashMap body = new ObjectMapper().readValue(json, HashMap.class);
HashMap proposal = (HashMap) body.get("proposal");
String aspectName = (String) proposal.get("aspectName");
// this effectively checks URNs and lineages; other jobInfo/flowInfo are checked
// in McpContentPattern
if (aspectName.equals("dataJobInputOutput")) {
json = relPaths(json);
Files.write(p, Collections.singletonList(json), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private static class DatasetLineageAccumulator implements LineageConsumer {
private final List<DatasetLineage> lineages = new ArrayList<>();
public void flushJobs() {
lineages.clear();
}
public List<DatasetLineage> getLineages() {
return Collections.unmodifiableList(lineages);
}
@Override
public void accept(LineageEvent e) {
if (e instanceof SQLQueryExecStartEvent) {
lineages.add(((SQLQueryExecStartEvent) e).getDatasetLineage());
}
}
}
@Rule
public TestRule watcher = new TestWatcher() {
protected void starting(Description description) {
if (!MOCK_GMS) {
return;
}
String currentTestRes = description.getMethodName() + ".json";
MappingBuilder mapping = baseMapping();
try {
if (VERIFY_EXPECTED) {
// setup the request body that we expect
List<String> expected = Files.readAllLines(Paths.get(EXPECTED_JSON_ROOT, currentTestRes));
mapping.withRequestBody(new McpContentPattern(expected.toString()));
} else {
// overwrite "expected" json file with the MCP request bodies
mapping.withPostServeAction("writeReqJson", Parameters.one("filename", currentTestRes));
Files.deleteIfExists(Paths.get(EXPECTED_JSON_ROOT, currentTestRes));
}
} catch (IOException e) {
e.printStackTrace();
}
wireMock.stubFor(mapping);
}
@Override
protected void finished(Description description) {
if (!VERIFY_EXPECTED) {
return;
}
assertTrue(WireMock.findUnmatchedRequests().isEmpty());
wireMock.resetRequests();
wireMock.resetMappings();
super.finished(description);
}
};
public static WireMockServer wireMock = new WireMockServer(WireMockConfiguration
.options()
.port(8089)
.extensions(new RequestFileWriter()));
@ClassRule
public static PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:9.6.12")
.withDatabaseName("sparktestdb");
private static MappingBuilder baseMapping() {
return post("/aspects?action=ingestProposal")
.willReturn(ok()
.withBody("<response>SUCCESS</response>"));
}
@BeforeClass
public static void setup() {
acc = new DatasetLineageAccumulator();
LineageUtils.registerConsumer("accumulator", acc);
wireMock.start();
WireMock.configureFor("localhost", 8089);
MappingBuilder mapping = baseMapping();
if (VERIFY_EXPECTED) {
mapping.withRequestBody(new McpContentPattern());
}
wireMock.stubFor(mapping);
spark = SparkSession
.builder()
.appName(APP_NAME)
.config("spark.master", MASTER)
.config("spark.extraListeners",
"com.linkedin.datahub.lineage.spark.interceptor.DatahubLineageEmitter")
.config("spark.datahub.lineage.consumerTypes", "accumulator, mcpEmitter")
.config("spark.datahub.lineage.mcpEmitter.gmsUrl", "http://localhost:" + GMS_PORT)
.config("spark.sql.warehouse.dir", new File(WAREHOUSE_LOC).getAbsolutePath())
.enableHiveSupport()
.getOrCreate();
spark.sql("drop database if exists " + TEST_DB + " cascade");
spark.sql("create database " + TEST_DB);
jdbcConnnProperties = new Properties();
jdbcConnnProperties.put("user", db.getUsername());
jdbcConnnProperties.put("password", db.getPassword());
if (VERIFY_EXPECTED) {
verify(1, postRequestedFor(urlEqualTo("/aspects?action=ingestProposal")));
assertTrue(WireMock.findUnmatchedRequests().isEmpty());
}
wireMock.resetRequests();
wireMock.resetMappings();
}
@AfterClass
public static void tearDown() throws Exception {
wireMock.resetRequests();
wireMock.resetMappings();
MappingBuilder mapping = baseMapping();
if (VERIFY_EXPECTED) {
mapping.withRequestBody(new McpContentPattern());
}
wireMock.stubFor(mapping);
spark.stop();
if (VERIFY_EXPECTED) {
verify(1, postRequestedFor(urlEqualTo("/aspects?action=ingestProposal")));
assertTrue(WireMock.findUnmatchedRequests().isEmpty());
}
wireMock.stop();
}
@Before
public void before() {
acc.flushJobs();
}
@Test
public void testHdfsInOut() throws Exception {
Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv");
Dataset<Row> df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv");
df1.createOrReplaceTempView("v1");
df2.createOrReplaceTempView("v2");
Dataset<Row> df = spark
.sql("select v1.c1 as a, v1.c2 as b, v2.c1 as c, v2.c2 as d from v1 join v2 on v1.id = v2.id");
// InsertIntoHadoopFsRelationCommand
df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out.csv");
Thread.sleep(5000);
check(dsl(hdfsDs("out.csv"), hdfsDs("in1.csv"), hdfsDs("in2.csv")), acc.getLineages().get(0));
if (VERIFY_EXPECTED) {
verify(1 * N, postRequestedFor(urlEqualTo("/aspects?action=ingestProposal")));
}
}
@Test
public void testHdfsInJdbcOut() throws Exception {
Dataset<Row> df1 = spark.read()
.option("header", "true").csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a").withColumnRenamed("c2", "b");
Dataset<Row> df2 = spark.read()
.option("header", "true").csv(DATA_DIR + "/in2.csv")
.withColumnRenamed("c1", "c").withColumnRenamed("c2", "d");
Dataset<Row> df = df1.join(df2, "id").drop("id");
// SaveIntoDataSourceCommand
// HadoopFsRelation input
df.write().mode(SaveMode.Overwrite).jdbc(
db.getJdbcUrl(),
"foo1", jdbcConnnProperties);
Thread.sleep(5000);
check(dsl(pgDs("foo1"), hdfsDs("in1.csv"), hdfsDs("in2.csv")), acc.getLineages().get(0));
if (VERIFY_EXPECTED) {
verify(1 * N, postRequestedFor(urlEqualTo("/aspects?action=ingestProposal")));
}
}
@Test
public void testHdfsJdbcInJdbcOut() throws Exception {
Connection c = db.createConnection("");
c.createStatement().execute("create table foo2 (a varchar(5), b int);");
c.createStatement().execute("insert into foo2 values('a', 4);");
c.close();
Dataset<Row> df1 = spark.read()
.option("header", "true").csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a").withColumnRenamed("c2", "b2");
Dataset<Row> df2 = spark.read()
.jdbc(db.getJdbcUrl(), "foo2", jdbcConnnProperties);
Dataset<Row> df = df1.join(df2, "a");
// SaveIntoDataSourceCommand
// JDBCRelation input
df.write().mode(SaveMode.Overwrite).jdbc(
db.getJdbcUrl(),
"foo3", jdbcConnnProperties);
Thread.sleep(5000);
check(dsl(pgDs("foo3"), hdfsDs("in1.csv"), pgDs("foo2")), acc.getLineages().get(0));
if (VERIFY_EXPECTED) {
verify(1 * N, postRequestedFor(urlEqualTo("/aspects?action=ingestProposal")));
}
}
@Test
public void testHdfsInHiveOut() throws Exception {
Dataset<Row> df1 = spark.read()
.option("header", "true").csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a").withColumnRenamed("c2", "b");
Dataset<Row> df2 = spark.read()
.option("header", "true").csv(DATA_DIR + "/in2.csv")
.withColumnRenamed("c1", "c").withColumnRenamed("c2", "d");
Dataset<Row> df = df1.join(df2, "id").drop("id");
df.write().mode(SaveMode.Overwrite).saveAsTable(tbl("foo4")); // CreateDataSourceTableAsSelectCommand
df.write().mode(SaveMode.Append).saveAsTable(tbl("foo4")); // CreateDataSourceTableAsSelectCommand
df.write().insertInto(tbl("foo4")); // InsertIntoHadoopFsRelationCommand
Thread.sleep(5000);
// TODO same data accessed as Hive Table or Path URI ??
DatasetLineage exp = dsl(catTblDs("foo4"), hdfsDs("in1.csv"), hdfsDs("in2.csv"));
check(Collections.nCopies(3, exp), acc.getLineages());
if (VERIFY_EXPECTED) {
verify(3 * N, postRequestedFor(urlEqualTo("/aspects?action=ingestProposal")));
}
}
@Test
public void testHiveInHiveOut() throws Exception {
Dataset<Row> df1 = spark.read()
.option("header", "true").csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a").withColumnRenamed("c2", "b");
Dataset<Row> df2 = spark.read()
.option("header", "true").csv(DATA_DIR + "/in2.csv")
.withColumnRenamed("c1", "c").withColumnRenamed("c2", "d");
df1.createOrReplaceTempView("v1");
df2.createOrReplaceTempView("v2");
// CreateHiveTableAsSelectCommand
spark.sql("create table " + tbl("foo5") + " as "
+ "(select v1.a, v1.b, v2.c, v2.d from v1 join v2 on v1.id = v2.id)");
check(dsl(catTblDs("foo5"), hdfsDs("in1.csv"), hdfsDs("in2.csv")), acc.getLineages().get(0));
// CreateHiveTableAsSelectCommand
spark.sql("create table " + tbl("hivetab") + " as "
+ "(select * from " + tbl("foo5") + ")");
check(dsl(catTblDs("hivetab"), catTblDs("foo5")), acc.getLineages().get(1));
// InsertIntoHiveTable
spark.sql("insert into " + tbl("hivetab") + " (select * from " + tbl("foo5") + ")");
check(dsl(catTblDs("hivetab"), catTblDs("foo5")), acc.getLineages().get(2));
Dataset<Row> df = spark.sql("select * from " + tbl("foo5"));
// InsertIntoHiveTable
df.write().insertInto(tbl("hivetab"));
Thread.sleep(5000);
check(dsl(catTblDs("hivetab"), catTblDs("foo5")), acc.getLineages().get(3));
if (VERIFY_EXPECTED) {
verify(4 * N, postRequestedFor(urlEqualTo("/aspects?action=ingestProposal")));
}
}
@Test
public void testHdfsJdbcInJdbcOutTwoLevel() throws Exception {
Connection c = db.createConnection("");
c.createStatement().execute("create table foo6 (a varchar(5), b int);");
c.createStatement().execute("insert into foo6 values('a', 4);");
c.close();
Dataset<Row> df1 = spark.read()
.option("header", "true").csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a").withColumnRenamed("c2", "b2");
Dataset<Row> df2 = spark.read()
.jdbc(db.getJdbcUrl(), "foo6", jdbcConnnProperties);
Dataset<Row> df3 = spark.read()
.option("header", "true").csv(DATA_DIR + "/in2.csv")
.withColumnRenamed("c1", "a").withColumnRenamed("c2", "b3");
Dataset<Row> df = df1.join(df2, "a").drop("id").join(df3, "a");
// SaveIntoDataSourceCommand
// JDBCRelation input
df.write().mode(SaveMode.Overwrite).jdbc(
db.getJdbcUrl(),
"foo7", jdbcConnnProperties);
Thread.sleep(5000);
check(dsl(pgDs("foo7"), hdfsDs("in1.csv"), hdfsDs("in2.csv"), pgDs("foo6")), acc.getLineages().get(0));
if (VERIFY_EXPECTED) {
verify(1 * N, postRequestedFor(urlEqualTo("/aspects?action=ingestProposal")));
}
}
private static void check(List<DatasetLineage> expected, List<DatasetLineage> actual) {
assertEquals(expected.size(), actual.size());
for (int i = 0; i < expected.size(); i++) {
check(expected.get(i), actual.get(i));
}
}
private static void check(DatasetLineage expected, DatasetLineage actual) {
assertEquals(expected.getSink().toString(), actual.getSink().toString());
assertEquals(dsToStrings(expected.getSources()), dsToStrings(actual.getSources()));
assertTrue(actual.getCallSiteShort().contains("TestSparkJobsLineage"));
}
private static Set<String> dsToStrings(Set<SparkDataset> datasets) {
return datasets.stream().map(x -> x.toString()).collect(Collectors.toSet());
}
private static DatasetLineage dsl(SparkDataset sink, SparkDataset... source) {
return dsl(null, sink, source);
}
private static DatasetLineage dsl(String callSite, SparkDataset sink, SparkDataset... source) {
DatasetLineage lineage = new DatasetLineage(callSite, "unknownPlan", sink);
Arrays.asList(source).forEach(x -> lineage.addSource(x));
return lineage;
}
private static HdfsPathDataset hdfsDs(String fileName) {
return new HdfsPathDataset("file:" + abs(DATA_DIR + "/" + fileName));
}
private static JdbcDataset pgDs(String tbl) {
return new JdbcDataset(db.getJdbcUrl(), tbl);
}
private static CatalogTableDataset catTblDs(String tbl) {
return new CatalogTableDataset(tbl(tbl));
}
private static String tbl(String tbl) {
return TEST_DB + "." + tbl;
}
private static String abs(String relPath) {
return new File(relPath).getAbsolutePath();
}
}

View File

@ -0,0 +1,3 @@
id,c1,c2
1,a,4
2,a,5
1 id c1 c2
2 1 a 4
3 2 a 5

View File

@ -0,0 +1,4 @@
id,c1,c2
1,a,4
2,b,5
3,b,6
1 id c1 c2
2 1 a 4
3 2 b 5
4 3 b 6

View File

@ -0,0 +1,3 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_9)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in1.csv,PROD)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in2.csv,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.foo4,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_10)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in1.csv,PROD)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in2.csv,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.foo4,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_11)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in1.csv,PROD)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in2.csv,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.foo4,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}

View File

@ -0,0 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_24)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in1.csv,PROD)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in2.csv,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,sparktestdb.foo1,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}

View File

@ -0,0 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_6)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in1.csv,PROD)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in2.csv,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/out.csv,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}

View File

@ -0,0 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_13)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in1.csv,PROD)\",\"urn:li:dataset:(urn:li:dataPlatform:postgres,sparktestdb.foo2,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,sparktestdb.foo3,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}

View File

@ -0,0 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_27)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in1.csv,PROD)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in2.csv,PROD)\",\"urn:li:dataset:(urn:li:dataPlatform:postgres,sparktestdb.foo6,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,sparktestdb.foo7,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}

View File

@ -0,0 +1,4 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_18)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in1.csv,PROD)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/src/test/resources/data/in2.csv,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.foo5,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_19)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.foo5,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.hivetab,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_20)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.foo5,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.hivetab,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,sparkTestApp,local),QueryExecId_21)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.foo5,PROD)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,sparktestdb.hivetab,PROD)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}

View File

@ -0,0 +1,8 @@
# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
log4j.logger.com.linkedin.datahub.lineage=DEBUG