feat(spark-lineage): simplified jars, config, auto publish to maven (#3924)

This commit is contained in:
Swaroop Jagadish 2022-01-20 00:48:09 -08:00 committed by GitHub
parent 4aa14214d9
commit d23a5bb725
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 638 additions and 287 deletions

View File

@ -0,0 +1,73 @@
name: Publish Datahub Client
on:
workflow_run:
workflows: ["build & test"]
types:
- completed
release:
types: [published, edited]
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs:
check-secret:
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' }}
outputs:
publish-enabled: ${{ steps.publish-enabled.outputs.defined }}
steps:
- id: publish-enabled
if: "${{ env.SIGNING_KEY != '' }}"
run: echo "::set-output name=defined::true"
env:
SIGNING_KEY: ${{ secrets.SIGNING_KEY }}
publish:
runs-on: ubuntu-latest
needs: [check-secret]
if: needs.check-secret.outputs.publish-enabled == 'true'
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- uses: actions/setup-python@v2
with:
python-version: "3.6"
- name: checkout upstream repo
run: |
git remote add upstream https://github.com/linkedin/datahub.git
git fetch upstream --tags
- name: publish datahub-client jar
env:
RELEASE_USERNAME: ${{ secrets.RELEASE_USERNAME }}
RELEASE_PASSWORD: ${{ secrets.RELEASE_PASSWORD }}
SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
SIGNING_KEY: ${{ secrets.SIGNING_KEY }}
NEXUS_USERNAME: ${{ secrets.NEXUS_USERNAME }}
NEXUS_PASSWORD: ${{ secrets.NEXUS_PASSWORD }}
run: |
echo signingKey=$SIGNING_KEY >> gradle.properties
./gradlew :metadata-integration:java:datahub-client:printVersion
./gradlew :metadata-integration:java:datahub-client:publishToMavenLocal
#./gradlew :metadata-integration:java:datahub-client:closeAndReleaseRepository --info
- name: publish datahub-spark jar
env:
RELEASE_USERNAME: ${{ secrets.RELEASE_USERNAME }}
RELEASE_PASSWORD: ${{ secrets.RELEASE_PASSWORD }}
SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
SIGNING_KEY: ${{ secrets.SIGNING_KEY }}
NEXUS_USERNAME: ${{ secrets.NEXUS_USERNAME }}
NEXUS_PASSWORD: ${{ secrets.NEXUS_PASSWORD }}
run: |
echo signingKey=$SIGNING_KEY >> gradle.properties
./gradlew :metadata-integration:java:spark-lineage:printVersion
./gradlew :metadata-integration:java:spark-lineage:publishToMavenLocal
#./gradlew :metadata-integration:java:datahub-client:closeAndReleaseRepository --info

View File

@ -10,6 +10,8 @@ buildscript {
classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.8.1'
classpath 'org.springframework.boot:spring-boot-gradle-plugin:2.1.4.RELEASE'
classpath 'com.github.jengelman.gradle.plugins:shadow:5.2.0'
classpath "io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.30.0"
classpath "com.palantir.gradle.gitversion:gradle-git-version:0.12.3"
}
}

View File

@ -7,3 +7,7 @@ org.gradle.caching=false
org.gradle.internal.repository.max.retries=5
org.gradle.internal.repository.max.tentatives=5
org.gradle.internal.repository.initial.backoff=1000
# Needed to publish to Nexus from a sub-module
gnsp.disableApplyOnlyOnRootProjectEnforcement=true

View File

@ -1,6 +1,11 @@
apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'jacoco'
apply plugin: 'signing'
apply plugin: 'io.codearte.nexus-staging'
apply plugin: 'maven-publish'
apply plugin: 'com.palantir.git-version'
import org.apache.tools.ant.filters.ReplaceTokens
jar.enabled = false // Since we only want to build shadow jars, disabling the regular jar creation
@ -21,6 +26,19 @@ jacocoTestReport {
dependsOn test // tests are required to run before generating the report
}
def details = versionDetails()
version = details.lastTag
version = version.startsWith("v")? version.substring(1): version
// trim version if it is of size 4 to size 3
def versionParts = version.tokenize(".")
def lastPart = details.isCleanTag? versionParts[2]: (versionParts[2].toInteger()+1).toString() + "-SNAPSHOT"
version = versionParts[0] + "." + versionParts[1] + "." + lastPart
processResources {
filter(ReplaceTokens, tokens:[fullVersion: gitVersion()])
}
test {
useJUnit()
finalizedBy jacocoTestReport
@ -64,3 +82,79 @@ checkShadowJar {
assemble {
dependsOn shadowJar
}
task sourcesJar(type: Jar) {
archiveClassifier = 'sources'
from sourceSets.main.allSource
}
task javadocJar(type: Jar) {
archiveClassifier = 'javadoc'
from javadoc
}
publishing {
publications {
shadow(MavenPublication) {
publication -> project.shadow.component(publication)
pom {
name = 'Datahub Client'
group = 'io.acryl'
artifactId = 'datahub-client'
description = 'DataHub Java client for metadata integration'
url = 'https://datahubproject.io'
artifacts = [ shadowJar, javadocJar, sourcesJar ]
scm {
connection = 'scm:git:git://github.com/linkedin/datahub.git'
developerConnection = 'scm:git:ssh://github.com:linkedin/datahub.git'
url = 'https://github.com/linkedin/datahub.git'
}
licenses {
license {
name = 'The Apache License, Version 2.0'
url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
developers {
developer {
id = 'datahub'
name = 'Datahub'
email = 'datahub@acryl.io'
}
}
}
}
}
repositories {
maven {
def releasesRepoUrl = "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/"
def snapshotsRepoUrl = "https://s01.oss.sonatype.org/content/repositories/snapshots/"
def ossrhUsername = System.getenv('RELEASE_USERNAME')
def ossrhPassword = System.getenv('RELEASE_PASSWORD')
credentials {
username ossrhUsername
password ossrhPassword
}
url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl
}
}
}
signing {
def signingKey = findProperty("signingKey")
def signingPassword = System.getenv("SIGNING_PASSWORD")
useInMemoryPgpKeys(signingKey, signingPassword)
sign publishing.publications.shadow
}
nexusStaging {
serverUrl = "https://s01.oss.sonatype.org/service/local/" //required only for projects registered in Sonatype after 2021-02-24
username = System.getenv("NEXUS_USERNAME")
password = System.getenv("NEXUS_PASSWORD")
}

View File

@ -1,5 +1,7 @@
# This script checks the shadow jar to ensure that we only have allowed classes being exposed through the jar
jar -tvf build/libs/datahub-client.jar |\
jarFiles=$(find build/libs -name "datahub-client*.jar" | grep -v sources | grep -v javadoc)
for jarFile in ${jarFiles}; do
jar -tvf $jarFile |\
grep -v "datahub/shaded" |\
grep -v "META-INF" |\
grep -v "com/linkedin" |\
@ -9,12 +11,14 @@ jar -tvf build/libs/datahub-client.jar |\
grep -v "pegasus/" |\
grep -v "legacyPegasusSchemas/" |\
grep -v " com/$" |\
grep -v "git.properties"
grep -v "git.properties" |\
grep -v "client.properties"
if [ $? -ne 0 ]; then
echo "No other packages found. Great"
exit 0
echo "✅ No unexpected class paths found in ${jarFile}"
else
echo "Found other packages than what we were expecting"
echo "💥 Found unexpected class paths in ${jarFile}"
exit 1
fi
done
exit 0

View File

@ -41,17 +41,17 @@ import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
*
* Constructing a REST Emitter follows a lambda-based fluent builder pattern using the `create` method.
* e.g.
* RestEmitter emitter = RestEmitter.create(b -> b
* RestEmitter emitter = RestEmitter.create(b :: b
* .server("http://localhost:8080")
* .extraHeaders(Collections.singletonMap("Custom-Header", "custom-val")
* );
* You can also customize the underlying
* http client by calling the `customizeHttpAsyncClient` method on the builder.
* e.g.
* RestEmitter emitter = RestEmitter.create(b -> b
* RestEmitter emitter = RestEmitter.create(b :: b
* .server("http://localhost:8080")
* .extraHeaders(Collections.singletonMap("Custom-Header", "custom-val")
* .customizeHttpAsyncClient(c -> c.setConnectionTimeToLive(30, TimeUnit.SECONDS))
* .customizeHttpAsyncClient(c :: c.setConnectionTimeToLive(30, TimeUnit.SECONDS))
* );
*/
public class RestEmitter implements Emitter {
@ -117,16 +117,16 @@ public class RestEmitter implements Emitter {
/**
* Constructing a REST Emitter follows a lambda-based fluent builder pattern using the `create` method.
* e.g.
* RestEmitter emitter = RestEmitter.create(b -> b
* RestEmitter emitter = RestEmitter.create(b :: b
* .server("http://localhost:8080") // coordinates of gms server
* .extraHeaders(Collections.singletonMap("Custom-Header", "custom-val")
* );
* You can also customize the underlying http client by calling the `customizeHttpAsyncClient` method on the builder.
* e.g.
* RestEmitter emitter = RestEmitter.create(b -> b
* RestEmitter emitter = RestEmitter.create(b :: b
* .server("http://localhost:8080")
* .extraHeaders(Collections.singletonMap("Custom-Header", "custom-val")
* .customizeHttpAsyncClient(c -> c.setConnectionTimeToLive(30, TimeUnit.SECONDS))
* .customizeHttpAsyncClient(c :: c.setConnectionTimeToLive(30, TimeUnit.SECONDS))
* );
* @param builderSupplier
* @return a constructed RestEmitter. Call #testConnection to make sure this emitter has a valid connection to the server

View File

@ -1,23 +1,28 @@
package datahub.client.rest;
import datahub.event.EventFormatter;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
@Value
@Builder
@Slf4j
public class RestEmitterConfig {
public static final int DEFAULT_CONNECT_TIMEOUT_SEC = 10;
public static final int DEFAULT_READ_TIMEOUT_SEC = 10;
public static final String DEFAULT_AUTH_TOKEN = null;
public static final String CLIENT_VERSION_PROPERTY = "clientVersion";
@Builder.Default
private final String server = "http://localhost:8080";
@ -38,12 +43,25 @@ public class RestEmitterConfig {
public static class RestEmitterConfigBuilder {
private String getVersion() {
try (
InputStream foo = this.getClass().getClassLoader().getResourceAsStream("client.properties")) {
Properties properties = new Properties();
properties.load(foo);
return properties.getProperty(CLIENT_VERSION_PROPERTY, "unknown");
} catch (Exception e) {
log.warn("Unable to find a version for datahub-client. Will set to unknown", e);
return "unknown";
}
}
private HttpAsyncClientBuilder asyncHttpClientBuilder = HttpAsyncClientBuilder
.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_SEC * 1000)
.setSocketTimeout(DEFAULT_READ_TIMEOUT_SEC * 1000)
.build());
.build())
.setUserAgent("DataHub-RestClient/" + getVersion());
public RestEmitterConfigBuilder with(Consumer<RestEmitterConfigBuilder> builderFunction) {
builderFunction.accept(this);

View File

@ -0,0 +1 @@
clientVersion=@fullVersion@

View File

@ -16,6 +16,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -363,4 +364,20 @@ public class RestEmitterTest {
Assert.assertTrue(ioe instanceof TimeoutException);
}
}
@Test
public void testUserAgentHeader() throws IOException, ExecutionException, InterruptedException {
TestDataHubServer testDataHubServer = new TestDataHubServer();
Integer port = testDataHubServer.getMockServer().getPort();
RestEmitter emitter = RestEmitter.create(b -> b.server("http://localhost:" + port));
testDataHubServer.getMockServer().reset();
emitter.testConnection();
Properties properties = new Properties();
properties.load(emitter.getClass().getClassLoader().getResourceAsStream("client.properties"));
Assert.assertNotNull(properties.getProperty("clientVersion"));
String version = properties.getProperty("clientVersion");
testDataHubServer.getMockServer().verify(
request("/config")
.withHeader("User-Agent", "DataHub-RestClient/" + version));
}
}

View File

@ -1,38 +1,59 @@
# Spark lineage emitter
The Spark lineage emitter is a java library that provides a Spark listener implementation "DatahubLineageEmitter". The DatahubLineageEmitter listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage.
# Spark Integration
To integrate Spark with DataHub, we provide a lightweight Java agent that listens for Spark application and job events and pushes metadata out to DataHub in real-time. The agent listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage to datasets that are being read from and written to. Read on to learn how to configure this for different Spark scenarios.
## Configuring Spark emitter
Listener configuration can be done using a config file or while creating a spark Session.
## Configuring Spark agent
The Spark agent can be configured using a config file or while creating a spark Session.
### Config file for spark-submit
When running jobs using spark-submit, the listener is to be configured in the config file.
## Before you begin: Versions and Release Notes
Versioning of the jar artifact will follow the semantic versioning of the main [DataHub repo](https://github.com/linkedin/datahub) and release notes will be available [here](https://github.com/linkedin/datahub/releases).
Always check [the Maven central repository](https://search.maven.org/search?q=a:datahub-spark-lineage) for the latest released version.
### Configuration Instructions: spark-submit
When running jobs using spark-submit, the agent needs to be configured in the config file.
```
spark.master spark://spark-master:7077
#Configuring datahub spark listener jar
spark.jars.packages io.acryl:datahub-spark-lineage:0.0.3
spark.extraListeners com.linkedin.datahub.lineage.spark.interceptor.DatahubLineageEmitter
#Configuring datahub spark agent jar
spark.jars.packages io.acryl:datahub-spark-lineage:0.8.23
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```
### Configuring with SparkSession Builder for notebooks
### Configuration Instructions: Notebooks
When running interactive jobs from a notebook, the listener can be configured while building the Spark Session.
```python
spark = SparkSession.builder \
.master("spark://spark-master:7077") \
.appName("test-application") \
.config("spark.jars.packages","io.acryl:datahub-spark-lineage:0.0.3") \
.config("spark.extraListeners","com.linkedin.datahub.lineage.interceptor.spark.DatahubLineageEmitter") \
.config("spark.jars.packages","io.acryl:datahub-spark-lineage:0.8.23") \
.config("spark.extraListeners","datahub.spark.DatahubSparkListener") \
.config("spark.datahub.rest.server", "http://localhost:8080") \
.enableHiveSupport() \
.getOrCreate()
```
## Model mapping
A pipeline is created per Spark <master, appName>.
A task is created per unique Spark query execution within an app.
### Configuration Instructions: Standalone Java Applications
The configuration for standalone Java apps is very similar.
```java
spark = SparkSession.builder()
.appName("test-application")
.config("spark.master", "spark://spark-master:7077")
.config("spark.jars.packages","io.acryl:datahub-spark-lineage:0.8.23")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
.getOrCreate();
```
## What to Expect: The Metadata Model
As of current writing, the Spark agent produces metadata related to the Spark job, tasks and lineage edges to datasets.
- A pipeline is created per Spark <master, appName>.
- A task is created per unique Spark query execution within an app.
### Custom properties & relating to Spark UI
The following custom properties in pipelines and tasks relate to the Spark UI:
@ -42,21 +63,21 @@ The following custom properties in pipelines and tasks relate to the Spark UI:
Other custom properties of pipelines and tasks capture the start and end times of execution etc.
The query plan is captured in the *queryPlan* property of a task.
## Release notes for v0.0.3
In this version, basic dataset-level lineage is captured using the model mapping as mentioned earlier.
### Spark versions supported
The primary version tested is Spark/Scala version 2.4.8/2_11.
We anticipate this to work well with other Spark 2.4.x versions and Scala 2_11.
This library has also been tested to work with Spark versions(2.2.0 - 2.4.8) and Scala versions(2.10 - 2.12).
For the Spark 3.x series, this has been tested to work with Spark 3.1.2 and 3.2.0 with Scala 2.12. Other combinations are not guaranteed to work currently.
Support for other Spark versions is planned in the very near future.
### Environments tested with
This initial release has been tested with the following environments:
- spark-submit of Python/Java applications to local and remote servers
- notebooks
- Jupyter notebooks with pyspark code
- Standalone Java applications
Note that testing for other environments such as Databricks and standalone applications is planned in near future.
Note that testing for other environments such as Databricks is planned in near future.
### Spark commands supported
Below is a list of Spark commands that are parsed currently:
@ -75,11 +96,8 @@ Effectively, these support data sources/sinks corresponding to Hive, HDFS and JD
### Important notes on usage
- It is advisable to ensure appName is used appropriately to ensure you can trace lineage from a pipeline back to your source code.
- If multiple apps with the same appName run concurrently, dataset-lineage will be captured correctly but the custom-properties e.g. app-id, SQLQueryId would be unreliable. We expect this to be quite rare.
- If spark execution fails, then an empty pipeline would still get created, but it may not have any tasks.
- For HDFS sources, the folder (name) is regarded as the dataset (name) to align with typical storage of parquet/csv formats.
## Known limitations

View File

@ -1,21 +1,49 @@
apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'maven'
apply plugin: 'signing'
apply plugin: 'io.codearte.nexus-staging'
apply plugin: 'maven-publish'
apply plugin: 'jacoco'
apply plugin: 'com.palantir.git-version'
import org.apache.tools.ant.filters.ReplaceTokens
jar.enabled = false // Since we only want to build shadow jars, disabling the regular jar creation
//to rename artifacts for publish
project.archivesBaseName = 'datahub-'+project.name
//mark implementaion dependencies which needs to excluded along with transitive dependencies from shadowjar
//functionality is exactly same as "implementation"
configurations {
provided
implementation.extendsFrom provided
}
def details = versionDetails()
version = details.lastTag
version = version.startsWith("v")? version.substring(1): version
// trim version if it is of size 4 to size 3
def versionParts = version.tokenize(".")
def lastPart = details.isCleanTag? versionParts[2]: (versionParts[2].toInteger()+1).toString() + "-SNAPSHOT"
version = versionParts[0] + "." + versionParts[1] + "." + lastPart
processResources {
filter(ReplaceTokens, tokens:[fullVersion: gitVersion()])
}
dependencies {
//Needed for tie breaking of guava version need for spark and wiremock
compile(externalDependency.hadoopMapreduceClient) {
provided(externalDependency.hadoopMapreduceClient) {
force = true
}
compile(externalDependency.hadoopCommon) {
provided(externalDependency.hadoopCommon) {
force = true
} // required for org.apache.hadoop.util.StopWatch
compile(externalDependency.commonsIo) {
provided(externalDependency.commonsIo) {
force = true
} // required for org.apache.commons.io.Charsets that is used internally
@ -25,8 +53,8 @@ dependencies {
implementation project(path: ':metadata-integration:java:datahub-client', configuration: 'shadow')
implementation(externalDependency.sparkSql)
implementation(externalDependency.sparkHive)
provided(externalDependency.sparkSql)
provided(externalDependency.sparkHive)
// Tests need a concrete log4j available. Providing it here
testImplementation 'org.apache.logging.log4j:log4j-api:2.17.1'
@ -49,18 +77,43 @@ dependencies {
testImplementation(externalDependency.testContainersPostgresql)
}
task checkShadowJar(type: Exec) {
commandLine 'sh', '-c', 'scripts/check_jar.sh'
}
shadowJar {
zip64=true
classifier=''
mergeServiceFiles()
def exclude_modules = project
.configurations
.provided
.resolvedConfiguration
.getLenientConfiguration()
.getAllModuleDependencies()
.collect {
it.name
}
dependencies {
exclude(dependency("org.apache.hadoop::"))
exclude(dependency("org.apache.spark::"))
exclude(dependency(externalDependency.commonsIo))
exclude(dependency {
exclude_modules.contains(it.name)
})
}
relocate 'org.apache.http','datahub.spark2.shaded.http'
relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec'
relocate 'mozilla', 'datahub.spark2.shaded.mozilla'
finalizedBy checkShadowJar
}
checkShadowJar {
dependsOn shadowJar
}
jacocoTestReport {
dependsOn test // tests are required to run before generating the report
}
@ -74,7 +127,7 @@ assemble {
dependsOn shadowJar
}
task sourceJar(type: Jar) {
task sourcesJar(type: Jar) {
classifier 'sources'
from sourceSets.main.allJava
}
@ -84,63 +137,71 @@ task javadocJar(type: Jar, dependsOn: javadoc) {
from javadoc.destinationDir
}
artifacts {
archives shadowJar
publishing {
publications {
shadow(MavenPublication) {
publication -> project.shadow.component(publication)
pom {
name = 'Datahub Spark Lineage'
group = 'io.acryl'
artifactId = 'datahub-spark-lineage'
description = 'Library to push data lineage from spark to datahub'
url = 'https://datahubproject.io'
artifacts = [ shadowJar, javadocJar, sourcesJar ]
scm {
connection = 'scm:git:git://github.com/linkedin/datahub.git'
developerConnection = 'scm:git:ssh://github.com:linkedin/datahub.git'
url = 'https://github.com/linkedin/datahub.git'
}
licenses {
license {
name = 'The Apache License, Version 2.0'
url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
developers {
developer {
id = 'datahub'
name = 'Datahub'
email = 'datahub@acryl.io'
}
}
}
}
}
repositories {
maven {
def releasesRepoUrl = "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/"
def snapshotsRepoUrl = "https://s01.oss.sonatype.org/content/repositories/snapshots/"
def ossrhUsername = System.getenv('RELEASE_USERNAME')
def ossrhPassword = System.getenv('RELEASE_PASSWORD')
credentials {
username ossrhUsername
password ossrhPassword
}
url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl
}
}
}
// uploadArchives {
// repositories {
// mavenDeployer {
// def ossrhUsername = System.getenv('RELEASE_USERNAME')
// def ossrhPassword = System.getenv('RELEASE_PASSWORD')
// beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
// repository(url: "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/") {
// authentication(userName: ossrhUsername, password: ossrhPassword)
// }
signing {
def signingKey = findProperty("signingKey")
def signingPassword = System.getenv("SIGNING_PASSWORD")
useInMemoryPgpKeys(signingKey, signingPassword)
sign publishing.publications.shadow
}
nexusStaging {
serverUrl = "https://s01.oss.sonatype.org/service/local/" //required only for projects registered in Sonatype after 2021-02-24
username = System.getenv("NEXUS_USERNAME")
password = System.getenv("NEXUS_PASSWORD")
}
// snapshotRepository(url: "https://s01.oss.sonatype.org/content/repositories/snapshots/") {
// authentication(userName: ossrhUsername, password: ossrhPassword)
// }
// pom.project {
// //No need to specify name here. Name is always picked up from project name
// //name 'spark-lineage'
// packaging 'jar'
// // optionally artifactId can be defined here
// description 'Library to push data lineage from spark to datahub'
// url 'https://datahubproject.io'
// scm {
// connection 'scm:git:git://github.com/linkedin/datahub.git'
// developerConnection 'scm:git:ssh://github.com:linkedin/datahub.git'
// url 'https://github.com/linkedin/datahub.git'
// }
// licenses {
// license {
// name 'The Apache License, Version 2.0'
// url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
// }
// }
// developers {
// developer {
// id 'datahub'
// name 'datahub'
//
// }
// }
// }
// }
// }
// }
// signing {
// def signingKey = findProperty("signingKey")
// def signingPassword = findProperty("signingPassword")
// useInMemoryPgpKeys(signingKey, signingPassword)
// sign configurations.archives
// }

View File

@ -0,0 +1,24 @@
# This script checks the shadow jar to ensure that we only have allowed classes being exposed through the jar
jarFiles=$(find build/libs -name "datahub-spark-lineage*.jar" | grep -v sources | grep -v javadoc)
for jarFile in ${jarFiles}; do
jar -tvf $jarFile |\
grep -v "datahub/shaded" |\
grep -v "META-INF" |\
grep -v "com/linkedin" |\
grep -v "com/datahub" |\
grep -v "datahub" |\
grep -v "entity-registry" |\
grep -v "pegasus/" |\
grep -v "legacyPegasusSchemas/" |\
grep -v " com/$" |\
grep -v "git.properties" |\
grep -v "client.properties"
if [ $? -ne 0 ]; then
echo "✅ No unexpected class paths found in ${jarFile}"
else
echo "💥 Found unexpected class paths in ${jarFile}"
exit 1
fi
done
exit 0

View File

@ -1,70 +0,0 @@
package com.linkedin.datahub.lineage.consumer.impl;
import com.linkedin.datahub.lineage.spark.model.LineageConsumer;
import com.linkedin.datahub.lineage.spark.model.LineageEvent;
import datahub.client.Emitter;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
@Slf4j
public class McpEmitter implements LineageConsumer {
private static final String GMS_URL_KEY = "spark.datahub.rest.server";
private static final String SENTINEL = "moot";
private ConcurrentHashMap<String, RestEmitter> singleton = new ConcurrentHashMap<>();
private void emit(List<MetadataChangeProposalWrapper> mcpws) {
Emitter emitter = emitter();
if (emitter != null) {
mcpws.stream().map(mcpw -> {
try {
return emitter.emit(mcpw);
} catch (IOException ioException) {
log.error("Failed to emit metadata to DataHub", ioException);
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList()).forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
// log error, but don't impact thread
log.error("Failed to emit metadata to DataHub", e);
}
});
}
}
// TODO ideally the impl here should not be tied to Spark; the LineageConsumer
// API needs tweaking to include configs
private Emitter emitter() {
singleton.computeIfAbsent(SENTINEL, x -> {
SparkConf conf = SparkEnv.get().conf();
if (conf.contains(GMS_URL_KEY)) {
String gmsUrl = conf.get(GMS_URL_KEY);
log.debug("REST emitter configured with GMS url " + gmsUrl);
return RestEmitter.create($ -> $.server(gmsUrl));
}
log.error("GMS URL not configured.");
return null;
});
return singleton.get(SENTINEL);
}
@Override
public void accept(LineageEvent evt) {
emit(evt.toMcps());
}
}

View File

@ -1,6 +0,0 @@
package com.linkedin.datahub.lineage.spark.model;
import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue.Consumer;
public interface LineageConsumer extends Consumer<LineageEvent> {
}

View File

@ -1,5 +1,16 @@
package com.linkedin.datahub.lineage.spark.interceptor;
package datahub.spark;
import com.google.common.base.Splitter;
import datahub.spark.model.LineageUtils;
import datahub.spark.model.AppEndEvent;
import datahub.spark.model.AppStartEvent;
import datahub.spark.model.DatasetLineage;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.SQLQueryExecEndEvent;
import datahub.spark.model.SQLQueryExecStartEvent;
import datahub.spark.model.dataset.SparkDataset;
import datahub.spark.consumer.impl.McpEmitter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
@ -7,13 +18,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
@ -28,41 +40,28 @@ import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import com.google.common.base.Splitter;
import com.linkedin.datahub.lineage.spark.model.AppEndEvent;
import com.linkedin.datahub.lineage.spark.model.AppStartEvent;
import com.linkedin.datahub.lineage.spark.model.DatasetLineage;
import com.linkedin.datahub.lineage.spark.model.LineageConsumer;
import com.linkedin.datahub.lineage.spark.model.SQLQueryExecEndEvent;
import com.linkedin.datahub.lineage.spark.model.SQLQueryExecStartEvent;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
import lombok.extern.slf4j.Slf4j;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractPartialFunction;
@Slf4j
public class DatahubLineageEmitter extends SparkListener {
public class DatahubSparkListener extends SparkListener {
private static final int THREAD_CNT = 16;
public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes";
public static final String DATAHUB_EMITTER = "mcpEmitter";
private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap<>();
private final Map<String, ExecutorService> appPoolDetails = new ConcurrentHashMap<>();
// private static LineageConsumer loggingConsumer() {
// log.warn("Lineage consumer not specified. Defaulting to LoggingConsumer.");
// return LineageUtils.LOGGING_CONSUMER;
// }
private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>();
private class SqlStartTask implements Runnable {
private SparkListenerSQLExecutionStart sqlStart;
private SparkContext ctx;
private LogicalPlan plan;
private final SparkListenerSQLExecutionStart sqlStart;
private final SparkContext ctx;
private final LogicalPlan plan;
public SqlStartTask(SparkListenerSQLExecutionStart sqlStart, LogicalPlan plan, SparkContext ctx) {
this.sqlStart = sqlStart;
@ -72,9 +71,10 @@ public class DatahubLineageEmitter extends SparkListener {
@Override
public void run() {
appSqlDetails.get(ctx.appName()).put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
appSqlDetails.get(ctx.appName())
.put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
log.debug("PLAN for execution id: " + ctx.appName() + ":" + sqlStart.executionId() + "\n");
log.debug(plan.toString());
@ -131,20 +131,21 @@ public class DatahubLineageEmitter extends SparkListener {
});
}
SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(),
ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);
SQLQueryExecStartEvent evt =
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);
appSqlDetails.get(ctx.appName()).put(sqlStart.executionId(), evt);
consumers().forEach(c -> c.accept(evt)); // TODO parallel stream here?
McpEmitter emitter = appEmitters.get(ctx.appName());
if (emitter != null) {
emitter.accept(evt);
}
consumers().forEach(c -> c.accept(evt));
log.debug("LINEAGE \n" + lineage + "\n");
log.debug("Parsed execution id " + ctx.appName() + ":" + sqlStart.executionId());
return;
log.debug("LINEAGE \n{}\n", lineage);
log.debug("Parsed execution id {}:{}", ctx.appName(), sqlStart.executionId());
}
}
@Override
@ -156,12 +157,13 @@ public class DatahubLineageEmitter extends SparkListener {
@Override
public Void apply(SparkContext sc) {
String appId = applicationStart.appId().isDefined() ? applicationStart.appId().get() : "";
AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(sc), applicationStart.appName(), appId,
applicationStart.time(), applicationStart.sparkUser());
AppStartEvent evt =
new AppStartEvent(LineageUtils.getMaster(sc), applicationStart.appName(), appId, applicationStart.time(),
applicationStart.sparkUser());
appEmitters.computeIfAbsent(applicationStart.appName(), s -> new McpEmitter()).accept(evt);
consumers().forEach(c -> c.accept(evt));
consumers().forEach(x -> x.accept(evt));
// TODO keyed by appName; only latest will be considered. Potential
// inconsistencies not mapped.
appDetails.put(applicationStart.appName(), evt);
appSqlDetails.put(applicationStart.appName(), new ConcurrentHashMap<>());
ExecutorService pool = Executors.newFixedThreadPool(THREAD_CNT);
@ -192,13 +194,29 @@ public class DatahubLineageEmitter extends SparkListener {
appPoolDetails.remove(sc.appName()).shutdown();
appSqlDetails.remove(sc.appName());
if (start == null) {
log.error(
"Application end event received, but start event missing for appId " + sc.applicationId());
log.error("Application end event received, but start event missing for appId " + sc.applicationId());
} else {
AppEndEvent evt = new AppEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(),
applicationEnd.time(), start);
consumers().forEach(x -> x.accept(evt));
McpEmitter emitter = appEmitters.get(sc.appName());
if (emitter != null) {
emitter.accept(evt);
try {
emitter.close();
appEmitters.remove(sc.appName());
} catch (Exception e) {
log.warn("Failed to close underlying emitter due to {}", e.getMessage());
}
}
consumers().forEach(x -> {
x.accept(evt);
try {
x.close();
} catch (IOException e) {
log.warn("Failed to close lineage consumer", e);
}
});
}
return null;
}
@ -243,17 +261,20 @@ public class DatahubLineageEmitter extends SparkListener {
public Void apply(SparkContext sc) {
SQLQueryExecStartEvent start = appSqlDetails.get(sc.appName()).remove(sqlEnd.executionId());
if (start == null) {
log.error("Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId()
+ ":" + sqlEnd.executionId());
log.error(
"Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":"
+ sqlEnd.executionId());
} else if (start.getDatasetLineage() != null) {
// JobStatus status = jobEnd.jobResult().equals(org.apache.spark.scheduler.JobSucceeded$.MODULE$)
// ? JobStatus.COMPLETED
// : JobStatus.FAILED;
SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(),
sc.applicationId(),
sqlEnd.time(), sqlEnd.executionId(), start);
consumers().forEach(x -> x.accept(evt));
SQLQueryExecEndEvent evt =
new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(),
sqlEnd.executionId(), start);
McpEmitter emitter = appEmitters.get(sc.appName());
if (emitter != null) {
emitter.accept(evt);
}
}
return null;
}
@ -264,7 +285,8 @@ public class DatahubLineageEmitter extends SparkListener {
private void processExecution(SparkListenerSQLExecutionStart sqlStart) {
QueryExecution queryExec = SQLExecution.getQueryExecution(sqlStart.executionId());
if (queryExec == null) {
log.error("Skipping processing for sql exec Id" + sqlStart.executionId() + " as Query execution context could not be read from current spark state");
log.error("Skipping processing for sql exec Id" + sqlStart.executionId()
+ " as Query execution context could not be read from current spark state");
return;
}
LogicalPlan plan = queryExec.optimizedPlan();
@ -274,17 +296,16 @@ public class DatahubLineageEmitter extends SparkListener {
pool.execute(new SqlStartTask(sqlStart, plan, ctx));
}
private static List<LineageConsumer> consumers() {
private List<LineageConsumer> consumers() {
SparkConf conf = SparkEnv.get().conf();
if (conf.contains(CONSUMER_TYPE_KEY)) {
String consumerTypes = conf.get(CONSUMER_TYPE_KEY);
return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false)
.map(x -> LineageUtils.getConsumer(x)).filter(x -> x != null).collect(Collectors.toList());
.map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList());
} else {
return Collections.singletonList(LineageUtils.getConsumer("mcpEmitter"));
return Collections.emptyList();
//singletonList(LineageUtils.getConsumer(DATAHUB_EMITTER));
}
}
}

View File

@ -1,5 +1,9 @@
package com.linkedin.datahub.lineage.spark.interceptor;
package datahub.spark;
import datahub.spark.model.dataset.CatalogTableDataset;
import datahub.spark.model.dataset.HdfsPathDataset;
import datahub.spark.model.dataset.JdbcDataset;
import datahub.spark.model.dataset.SparkDataset;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
@ -25,10 +29,6 @@ import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.sources.BaseRelation;
import com.google.common.collect.ImmutableSet;
import com.linkedin.datahub.lineage.spark.model.dataset.CatalogTableDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.HdfsPathDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.JdbcDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
import scala.Option;
import scala.collection.JavaConversions;

View File

@ -0,0 +1,85 @@
package datahub.spark.consumer.impl;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import datahub.client.Emitter;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
@Slf4j
public class McpEmitter implements LineageConsumer, Closeable {
private final Optional<Emitter> emitter;
private static final String CONF_PREFIX = "spark.datahub.";
private static final String TRANSPORT_KEY = "transport";
private static final String GMS_URL_KEY = "rest.server";
private static final String GMS_AUTH_TOKEN = "rest.token";
private void emit(List<MetadataChangeProposalWrapper> mcpws) {
if (emitter.isPresent()) {
mcpws.stream().map(mcpw -> {
try {
return emitter.get().emit(mcpw);
} catch (IOException ioException) {
log.error("Failed to emit metadata to DataHub", ioException);
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList()).forEach(future -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
// log error, but don't impact thread
log.error("Failed to emit metadata to DataHub", e);
}
});
}
}
// TODO ideally the impl here should not be tied to Spark; the LineageConsumer
// API needs tweaking to include configs
public McpEmitter() {
SparkConf sparkConf = SparkEnv.get().conf();
Map<String, String> conf =
Arrays.stream(sparkConf.getAllWithPrefix("spark.datahub.")).collect(Collectors.toMap(x -> x._1, x -> x._2));
String emitterType = conf.getOrDefault(TRANSPORT_KEY, "rest");
if (emitterType.toLowerCase(Locale.ROOT).equals("rest")) {
String gmsUrl = conf.getOrDefault(GMS_URL_KEY, "http://localhost:8080");
String token = conf.getOrDefault(GMS_AUTH_TOKEN, null);
log.info("REST Emitter Configuration: GMS url {}{}", gmsUrl, (conf.containsKey(GMS_URL_KEY) ? "" : "(default)"));
if (token != null) {
log.info("REST Emitter Configuration: Token {}", (token != null) ? "XXXXX" : "(empty)");
}
emitter = Optional.of(RestEmitter.create($ -> $.server(gmsUrl).token(token)));
} else {
emitter = Optional.empty();
log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", emitterType);
}
}
@Override
public void accept(LineageEvent evt) {
emit(evt.asMetadataEvents());
}
@Override
public void close() throws IOException {
if (emitter.isPresent()) {
emitter.get().close();
}
}
}

View File

@ -1,8 +1,7 @@
package com.linkedin.datahub.lineage.spark.model;
package datahub.spark.model;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils;
import com.linkedin.datajob.DataFlowInfo;
import datahub.event.MetadataChangeProposalWrapper;
import java.util.Collections;
@ -23,7 +22,7 @@ public class AppEndEvent extends LineageEvent {
}
@Override
public List<MetadataChangeProposalWrapper> toMcps() {
public List<MetadataChangeProposalWrapper> asMetadataEvents() {
DataFlowUrn flowUrn = LineageUtils.flowUrn(getMaster(), getAppName());
StringMap customProps = start.customProps();

View File

@ -1,8 +1,7 @@
package com.linkedin.datahub.lineage.spark.model;
package datahub.spark.model;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils;
import com.linkedin.datajob.DataFlowInfo;
import datahub.event.MetadataChangeProposalWrapper;
import java.util.Collections;
@ -23,7 +22,7 @@ public class AppStartEvent extends LineageEvent {
}
@Override
public List<MetadataChangeProposalWrapper> toMcps() {
public List<MetadataChangeProposalWrapper> asMetadataEvents() {
DataFlowUrn flowUrn = LineageUtils.flowUrn(getMaster(), getAppName());
DataFlowInfo flowInfo = new DataFlowInfo().setName(getAppName()).setCustomProperties(customProps());

View File

@ -1,10 +1,10 @@
package com.linkedin.datahub.lineage.spark.model;
package datahub.spark.model;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
import datahub.spark.model.dataset.SparkDataset;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

View File

@ -0,0 +1,7 @@
package datahub.spark.model;
import java.io.Closeable;
import java.util.function.Consumer;
public interface LineageConsumer extends Consumer<LineageEvent>, Closeable {
}

View File

@ -1,4 +1,4 @@
package com.linkedin.datahub.lineage.spark.model;
package datahub.spark.model;
import datahub.event.MetadataChangeProposalWrapper;
import java.util.Date;
@ -13,7 +13,7 @@ public abstract class LineageEvent {
private final String appId;
private final long time;
public abstract List<MetadataChangeProposalWrapper> toMcps();
public abstract List<MetadataChangeProposalWrapper> asMetadataEvents();
protected String timeStr() {
return new Date(getTime()).toInstant().toString();

View File

@ -1,8 +1,6 @@
package com.linkedin.datahub.lineage.spark.interceptor;
package datahub.spark.model;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.datahub.lineage.consumer.impl.McpEmitter;
import com.linkedin.datahub.lineage.spark.model.LineageConsumer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
@ -19,30 +17,14 @@ public class LineageUtils {
private static Map<String, LineageConsumer> consumers = new ConcurrentHashMap<>();
public static final LineageConsumer LOGGING_CONSUMER = (x -> log.info(x.toString()));
// hook for replacing paths during testing. Not the cleanest way, TODO improve.
/* This is for generating urn from a hash of the plan */
// private static Function<String, String> PATH_REPLACER = (x -> x);
static {
// system defined consumers
registerConsumer("mcpEmitter", new McpEmitter());
}
private LineageUtils() {
}
// overwrites existing consumer entry of same type
public static void registerConsumer(String consumerType, LineageConsumer consumer) {
consumers.put(consumerType, consumer);
}
public static LineageConsumer getConsumer(String consumerType) {
return consumers.get(consumerType);
}
public static DataFlowUrn flowUrn(String master, String appName) {
return new DataFlowUrn("spark", appName, master.replaceAll(":", "_").replaceAll("/", "_").replaceAll("[_]+", "_"));
}
@ -67,6 +49,16 @@ public class LineageUtils {
return ctx.conf().get("spark.master");
}
// overwrites existing consumer entry of same type
public static void registerConsumer(String consumerType, LineageConsumer consumer) {
consumers.put(consumerType, consumer);
}
public static LineageConsumer getConsumer(String consumerType) {
return consumers.get(consumerType);
}
/* This is for generating urn from a hash of the plan */
/*

View File

@ -1,4 +1,4 @@
package com.linkedin.datahub.lineage.spark.model;
package datahub.spark.model;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.data.template.StringMap;
@ -25,7 +25,7 @@ public class SQLQueryExecEndEvent extends LineageEvent {
}
@Override
public List<MetadataChangeProposalWrapper> toMcps() {
public List<MetadataChangeProposalWrapper> asMetadataEvents() {
DataJobUrn jobUrn = start.jobUrn();
StringMap customProps = start.customProps();
customProps.put("completedAt", timeStr());

View File

@ -1,11 +1,10 @@
package com.linkedin.datahub.lineage.spark.model;
package datahub.spark.model;
import com.linkedin.common.DatasetUrnArray;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.data.template.StringMap;
import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
import datahub.spark.model.dataset.SparkDataset;
import com.linkedin.datajob.DataJobInfo;
import com.linkedin.datajob.DataJobInputOutput;
import com.linkedin.datajob.JobStatus;
@ -33,7 +32,7 @@ public class SQLQueryExecStartEvent extends LineageEvent {
}
@Override
public List<MetadataChangeProposalWrapper> toMcps() {
public List<MetadataChangeProposalWrapper> asMetadataEvents() {
DataJobUrn jobUrn = jobUrn();
MetadataChangeProposalWrapper mcpJobIO =
MetadataChangeProposalWrapper.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobIO()));

View File

@ -1,4 +1,4 @@
package com.linkedin.datahub.lineage.spark.model.dataset;
package datahub.spark.model.dataset;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;

View File

@ -1,4 +1,4 @@
package com.linkedin.datahub.lineage.spark.model.dataset;
package datahub.spark.model.dataset;
import org.apache.hadoop.fs.Path;

View File

@ -1,4 +1,4 @@
package com.linkedin.datahub.lineage.spark.model.dataset;
package datahub.spark.model.dataset;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;

View File

@ -1,4 +1,4 @@
package com.linkedin.datahub.lineage.spark.model.dataset;
package datahub.spark.model.dataset;
import com.linkedin.common.urn.DatasetUrn;

View File

@ -1,14 +1,14 @@
package com.linkedin.datahub.lineage;
package datahub.spark;
import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils;
import com.linkedin.datahub.lineage.spark.model.DatasetLineage;
import com.linkedin.datahub.lineage.spark.model.LineageConsumer;
import com.linkedin.datahub.lineage.spark.model.LineageEvent;
import com.linkedin.datahub.lineage.spark.model.SQLQueryExecStartEvent;
import com.linkedin.datahub.lineage.spark.model.dataset.CatalogTableDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.HdfsPathDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.JdbcDataset;
import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset;
import datahub.spark.model.LineageUtils;
import datahub.spark.model.DatasetLineage;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import datahub.spark.model.SQLQueryExecStartEvent;
import datahub.spark.model.dataset.CatalogTableDataset;
import datahub.spark.model.dataset.HdfsPathDataset;
import datahub.spark.model.dataset.JdbcDataset;
import datahub.spark.model.dataset.SparkDataset;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@ -140,8 +140,8 @@ public class TestSparkJobsLineage {
spark = SparkSession.builder()
.appName(APP_NAME)
.config("spark.master", MASTER)
.config("spark.extraListeners", "com.linkedin.datahub.lineage.spark.interceptor.DatahubLineageEmitter")
.config("spark.datahub.lineage.consumerTypes", "accumulator, mcpEmitter")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.lineage.consumerTypes", "accumulator")
.config("spark.datahub.rest.server", "http://localhost:" + mockServer.getPort())
.config("spark.sql.warehouse.dir", new File(WAREHOUSE_LOC).getAbsolutePath())
.enableHiveSupport()
@ -407,6 +407,7 @@ public class TestSparkJobsLineage {
}
private static class DatasetLineageAccumulator implements LineageConsumer {
boolean closed = false;
private final List<DatasetLineage> lineages = new ArrayList<>();
@ -420,9 +421,17 @@ public class TestSparkJobsLineage {
@Override
public void accept(LineageEvent e) {
if (closed) {
throw new RuntimeException("Called after close");
}
if (e instanceof SQLQueryExecStartEvent) {
lineages.add(((SQLQueryExecStartEvent) e).getDatasetLineage());
}
}
@Override
public void close() throws IOException {
closed = true;
}
}
}