From d23a5bb7254bf52e56b24fbd79d2b74ec53fb8ca Mon Sep 17 00:00:00 2001 From: Swaroop Jagadish <67564030+swaroopjagadish@users.noreply.github.com> Date: Thu, 20 Jan 2022 00:48:09 -0800 Subject: [PATCH] feat(spark-lineage): simplified jars, config, auto publish to maven (#3924) --- .github/workflows/publish-datahub-jars.yml | 73 +++++++ build.gradle | 2 + gradle.properties | 4 + .../java/datahub-client/build.gradle | 94 +++++++++ .../java/datahub-client/scripts/check_jar.sh | 14 +- .../java/datahub/client/rest/RestEmitter.java | 12 +- .../client/rest/RestEmitterConfig.java | 20 +- .../src/main/resources/client.properties | 1 + .../datahub/client/rest/RestEmitterTest.java | 17 ++ .../java/spark-lineage/README.md | 66 +++--- .../java/spark-lineage/build.gradle | 189 ++++++++++++------ .../java/spark-lineage/scripts/check_jar.sh | 24 +++ .../lineage/consumer/impl/McpEmitter.java | 70 ------- .../lineage/spark/model/LineageConsumer.java | 6 - .../spark/DatahubSparkListener.java} | 131 +++++++----- .../spark}/DatasetExtractor.java | 10 +- .../spark/consumer/impl/McpEmitter.java | 85 ++++++++ .../spark/model/AppEndEvent.java | 5 +- .../spark/model/AppStartEvent.java | 5 +- .../spark/model/DatasetLineage.java | 4 +- .../datahub/spark/model/LineageConsumer.java | 7 + .../spark/model/LineageEvent.java | 4 +- .../spark/model}/LineageUtils.java | 30 +-- .../spark/model/SQLQueryExecEndEvent.java | 4 +- .../spark/model/SQLQueryExecStartEvent.java | 7 +- .../model/dataset/CatalogTableDataset.java | 2 +- .../spark/model/dataset/HdfsPathDataset.java | 2 +- .../spark/model/dataset/JdbcDataset.java | 2 +- .../spark/model/dataset/SparkDataset.java | 2 +- .../spark}/TestSparkJobsLineage.java | 33 +-- 30 files changed, 638 insertions(+), 287 deletions(-) create mode 100644 .github/workflows/publish-datahub-jars.yml create mode 100644 metadata-integration/java/datahub-client/src/main/resources/client.properties create mode 100755 metadata-integration/java/spark-lineage/scripts/check_jar.sh delete mode 100644 metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/consumer/impl/McpEmitter.java delete mode 100644 metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/LineageConsumer.java rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage/spark/interceptor/DatahubLineageEmitter.java => datahub/spark/DatahubSparkListener.java} (71%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage/spark/interceptor => datahub/spark}/DatasetExtractor.java (94%) create mode 100644 metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/AppEndEvent.java (85%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/AppStartEvent.java (87%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/DatasetLineage.java (83%) create mode 100644 metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageConsumer.java rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/LineageEvent.java (76%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage/spark/interceptor => datahub/spark/model}/LineageUtils.java (88%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/SQLQueryExecEndEvent.java (91%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/SQLQueryExecStartEvent.java (93%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/dataset/CatalogTableDataset.java (91%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/dataset/HdfsPathDataset.java (92%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/dataset/JdbcDataset.java (94%) rename metadata-integration/java/spark-lineage/src/main/java/{com/linkedin/datahub/lineage => datahub}/spark/model/dataset/SparkDataset.java (63%) rename metadata-integration/java/spark-lineage/src/test/java/{com/linkedin/datahub/lineage => datahub/spark}/TestSparkJobsLineage.java (94%) diff --git a/.github/workflows/publish-datahub-jars.yml b/.github/workflows/publish-datahub-jars.yml new file mode 100644 index 0000000000..a3922f29d1 --- /dev/null +++ b/.github/workflows/publish-datahub-jars.yml @@ -0,0 +1,73 @@ +name: Publish Datahub Client + +on: + workflow_run: + workflows: ["build & test"] + types: + - completed + + release: + types: [published, edited] + + # Allows you to run this workflow manually from the Actions tab + workflow_dispatch: + +jobs: + + check-secret: + runs-on: ubuntu-latest + if: ${{ github.event.workflow_run.conclusion == 'success' }} + outputs: + publish-enabled: ${{ steps.publish-enabled.outputs.defined }} + steps: + - id: publish-enabled + if: "${{ env.SIGNING_KEY != '' }}" + run: echo "::set-output name=defined::true" + env: + SIGNING_KEY: ${{ secrets.SIGNING_KEY }} + + publish: + runs-on: ubuntu-latest + needs: [check-secret] + if: needs.check-secret.outputs.publish-enabled == 'true' + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - uses: actions/setup-python@v2 + with: + python-version: "3.6" + - name: checkout upstream repo + run: | + git remote add upstream https://github.com/linkedin/datahub.git + git fetch upstream --tags + - name: publish datahub-client jar + env: + RELEASE_USERNAME: ${{ secrets.RELEASE_USERNAME }} + RELEASE_PASSWORD: ${{ secrets.RELEASE_PASSWORD }} + SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }} + SIGNING_KEY: ${{ secrets.SIGNING_KEY }} + NEXUS_USERNAME: ${{ secrets.NEXUS_USERNAME }} + NEXUS_PASSWORD: ${{ secrets.NEXUS_PASSWORD }} + run: | + echo signingKey=$SIGNING_KEY >> gradle.properties + ./gradlew :metadata-integration:java:datahub-client:printVersion + ./gradlew :metadata-integration:java:datahub-client:publishToMavenLocal + #./gradlew :metadata-integration:java:datahub-client:closeAndReleaseRepository --info + - name: publish datahub-spark jar + env: + RELEASE_USERNAME: ${{ secrets.RELEASE_USERNAME }} + RELEASE_PASSWORD: ${{ secrets.RELEASE_PASSWORD }} + SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }} + SIGNING_KEY: ${{ secrets.SIGNING_KEY }} + NEXUS_USERNAME: ${{ secrets.NEXUS_USERNAME }} + NEXUS_PASSWORD: ${{ secrets.NEXUS_PASSWORD }} + run: | + echo signingKey=$SIGNING_KEY >> gradle.properties + ./gradlew :metadata-integration:java:spark-lineage:printVersion + ./gradlew :metadata-integration:java:spark-lineage:publishToMavenLocal + #./gradlew :metadata-integration:java:datahub-client:closeAndReleaseRepository --info diff --git a/build.gradle b/build.gradle index 27dfaefc52..b87eb1f2e9 100644 --- a/build.gradle +++ b/build.gradle @@ -10,6 +10,8 @@ buildscript { classpath 'com.commercehub.gradle.plugin:gradle-avro-plugin:0.8.1' classpath 'org.springframework.boot:spring-boot-gradle-plugin:2.1.4.RELEASE' classpath 'com.github.jengelman.gradle.plugins:shadow:5.2.0' + classpath "io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.30.0" + classpath "com.palantir.gradle.gitversion:gradle-git-version:0.12.3" } } diff --git a/gradle.properties b/gradle.properties index e956dc85b5..66edd9e705 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,3 +7,7 @@ org.gradle.caching=false org.gradle.internal.repository.max.retries=5 org.gradle.internal.repository.max.tentatives=5 org.gradle.internal.repository.initial.backoff=1000 + +# Needed to publish to Nexus from a sub-module +gnsp.disableApplyOnlyOnRootProjectEnforcement=true + diff --git a/metadata-integration/java/datahub-client/build.gradle b/metadata-integration/java/datahub-client/build.gradle index 754402f836..e3f26437e1 100644 --- a/metadata-integration/java/datahub-client/build.gradle +++ b/metadata-integration/java/datahub-client/build.gradle @@ -1,6 +1,11 @@ apply plugin: 'java' apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'jacoco' +apply plugin: 'signing' +apply plugin: 'io.codearte.nexus-staging' +apply plugin: 'maven-publish' +apply plugin: 'com.palantir.git-version' +import org.apache.tools.ant.filters.ReplaceTokens jar.enabled = false // Since we only want to build shadow jars, disabling the regular jar creation @@ -21,6 +26,19 @@ jacocoTestReport { dependsOn test // tests are required to run before generating the report } + +def details = versionDetails() +version = details.lastTag +version = version.startsWith("v")? version.substring(1): version +// trim version if it is of size 4 to size 3 +def versionParts = version.tokenize(".") +def lastPart = details.isCleanTag? versionParts[2]: (versionParts[2].toInteger()+1).toString() + "-SNAPSHOT" +version = versionParts[0] + "." + versionParts[1] + "." + lastPart + +processResources { + filter(ReplaceTokens, tokens:[fullVersion: gitVersion()]) +} + test { useJUnit() finalizedBy jacocoTestReport @@ -64,3 +82,79 @@ checkShadowJar { assemble { dependsOn shadowJar } + +task sourcesJar(type: Jar) { + archiveClassifier = 'sources' + from sourceSets.main.allSource +} + +task javadocJar(type: Jar) { + archiveClassifier = 'javadoc' + from javadoc +} + + +publishing { + publications { + shadow(MavenPublication) { + publication -> project.shadow.component(publication) + pom { + name = 'Datahub Client' + group = 'io.acryl' + artifactId = 'datahub-client' + description = 'DataHub Java client for metadata integration' + url = 'https://datahubproject.io' + artifacts = [ shadowJar, javadocJar, sourcesJar ] + + scm { + connection = 'scm:git:git://github.com/linkedin/datahub.git' + developerConnection = 'scm:git:ssh://github.com:linkedin/datahub.git' + url = 'https://github.com/linkedin/datahub.git' + } + + licenses { + license { + name = 'The Apache License, Version 2.0' + url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + + developers { + developer { + id = 'datahub' + name = 'Datahub' + email = 'datahub@acryl.io' + } + } + } + } + } + + repositories { + maven { + def releasesRepoUrl = "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/" + def snapshotsRepoUrl = "https://s01.oss.sonatype.org/content/repositories/snapshots/" + def ossrhUsername = System.getenv('RELEASE_USERNAME') + def ossrhPassword = System.getenv('RELEASE_PASSWORD') + credentials { + username ossrhUsername + password ossrhPassword + } + url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl + } + } +} + + +signing { + def signingKey = findProperty("signingKey") + def signingPassword = System.getenv("SIGNING_PASSWORD") + useInMemoryPgpKeys(signingKey, signingPassword) + sign publishing.publications.shadow +} + +nexusStaging { + serverUrl = "https://s01.oss.sonatype.org/service/local/" //required only for projects registered in Sonatype after 2021-02-24 + username = System.getenv("NEXUS_USERNAME") + password = System.getenv("NEXUS_PASSWORD") +} diff --git a/metadata-integration/java/datahub-client/scripts/check_jar.sh b/metadata-integration/java/datahub-client/scripts/check_jar.sh index 78e1ef4d70..69a2c25805 100755 --- a/metadata-integration/java/datahub-client/scripts/check_jar.sh +++ b/metadata-integration/java/datahub-client/scripts/check_jar.sh @@ -1,5 +1,7 @@ # This script checks the shadow jar to ensure that we only have allowed classes being exposed through the jar -jar -tvf build/libs/datahub-client.jar |\ +jarFiles=$(find build/libs -name "datahub-client*.jar" | grep -v sources | grep -v javadoc) +for jarFile in ${jarFiles}; do +jar -tvf $jarFile |\ grep -v "datahub/shaded" |\ grep -v "META-INF" |\ grep -v "com/linkedin" |\ @@ -9,12 +11,14 @@ jar -tvf build/libs/datahub-client.jar |\ grep -v "pegasus/" |\ grep -v "legacyPegasusSchemas/" |\ grep -v " com/$" |\ - grep -v "git.properties" + grep -v "git.properties" |\ + grep -v "client.properties" if [ $? -ne 0 ]; then - echo "No other packages found. Great" - exit 0 + echo "✅ No unexpected class paths found in ${jarFile}" else - echo "Found other packages than what we were expecting" + echo "💥 Found unexpected class paths in ${jarFile}" exit 1 fi +done +exit 0 diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java index dbf8f58b46..c5ea552c54 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java @@ -41,17 +41,17 @@ import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; * * Constructing a REST Emitter follows a lambda-based fluent builder pattern using the `create` method. * e.g. - * RestEmitter emitter = RestEmitter.create(b -> b + * RestEmitter emitter = RestEmitter.create(b :: b * .server("http://localhost:8080") * .extraHeaders(Collections.singletonMap("Custom-Header", "custom-val") * ); * You can also customize the underlying * http client by calling the `customizeHttpAsyncClient` method on the builder. * e.g. - * RestEmitter emitter = RestEmitter.create(b -> b + * RestEmitter emitter = RestEmitter.create(b :: b * .server("http://localhost:8080") * .extraHeaders(Collections.singletonMap("Custom-Header", "custom-val") - * .customizeHttpAsyncClient(c -> c.setConnectionTimeToLive(30, TimeUnit.SECONDS)) + * .customizeHttpAsyncClient(c :: c.setConnectionTimeToLive(30, TimeUnit.SECONDS)) * ); */ public class RestEmitter implements Emitter { @@ -117,16 +117,16 @@ public class RestEmitter implements Emitter { /** * Constructing a REST Emitter follows a lambda-based fluent builder pattern using the `create` method. * e.g. - * RestEmitter emitter = RestEmitter.create(b -> b + * RestEmitter emitter = RestEmitter.create(b :: b * .server("http://localhost:8080") // coordinates of gms server * .extraHeaders(Collections.singletonMap("Custom-Header", "custom-val") * ); * You can also customize the underlying http client by calling the `customizeHttpAsyncClient` method on the builder. * e.g. - * RestEmitter emitter = RestEmitter.create(b -> b + * RestEmitter emitter = RestEmitter.create(b :: b * .server("http://localhost:8080") * .extraHeaders(Collections.singletonMap("Custom-Header", "custom-val") - * .customizeHttpAsyncClient(c -> c.setConnectionTimeToLive(30, TimeUnit.SECONDS)) + * .customizeHttpAsyncClient(c :: c.setConnectionTimeToLive(30, TimeUnit.SECONDS)) * ); * @param builderSupplier * @return a constructed RestEmitter. Call #testConnection to make sure this emitter has a valid connection to the server diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java index b6a7e9b558..3a83647ae3 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java @@ -1,23 +1,28 @@ package datahub.client.rest; import datahub.event.EventFormatter; +import java.io.InputStream; import java.util.Collections; import java.util.Map; +import java.util.Properties; import java.util.function.Consumer; import lombok.Builder; import lombok.NonNull; import lombok.Value; +import lombok.extern.slf4j.Slf4j; import org.apache.http.client.config.RequestConfig; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; @Value @Builder +@Slf4j public class RestEmitterConfig { public static final int DEFAULT_CONNECT_TIMEOUT_SEC = 10; public static final int DEFAULT_READ_TIMEOUT_SEC = 10; public static final String DEFAULT_AUTH_TOKEN = null; + public static final String CLIENT_VERSION_PROPERTY = "clientVersion"; @Builder.Default private final String server = "http://localhost:8080"; @@ -38,12 +43,25 @@ public class RestEmitterConfig { public static class RestEmitterConfigBuilder { + private String getVersion() { + try ( + InputStream foo = this.getClass().getClassLoader().getResourceAsStream("client.properties")) { + Properties properties = new Properties(); + properties.load(foo); + return properties.getProperty(CLIENT_VERSION_PROPERTY, "unknown"); + } catch (Exception e) { + log.warn("Unable to find a version for datahub-client. Will set to unknown", e); + return "unknown"; + } + } + private HttpAsyncClientBuilder asyncHttpClientBuilder = HttpAsyncClientBuilder .create() .setDefaultRequestConfig(RequestConfig.custom() .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_SEC * 1000) .setSocketTimeout(DEFAULT_READ_TIMEOUT_SEC * 1000) - .build()); + .build()) + .setUserAgent("DataHub-RestClient/" + getVersion()); public RestEmitterConfigBuilder with(Consumer builderFunction) { builderFunction.accept(this); diff --git a/metadata-integration/java/datahub-client/src/main/resources/client.properties b/metadata-integration/java/datahub-client/src/main/resources/client.properties new file mode 100644 index 0000000000..880e869935 --- /dev/null +++ b/metadata-integration/java/datahub-client/src/main/resources/client.properties @@ -0,0 +1 @@ +clientVersion=@fullVersion@ diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/rest/RestEmitterTest.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/rest/RestEmitterTest.java index a9cacdc2a0..aab1634e45 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/rest/RestEmitterTest.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/rest/RestEmitterTest.java @@ -16,6 +16,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -363,4 +364,20 @@ public class RestEmitterTest { Assert.assertTrue(ioe instanceof TimeoutException); } } + + @Test + public void testUserAgentHeader() throws IOException, ExecutionException, InterruptedException { + TestDataHubServer testDataHubServer = new TestDataHubServer(); + Integer port = testDataHubServer.getMockServer().getPort(); + RestEmitter emitter = RestEmitter.create(b -> b.server("http://localhost:" + port)); + testDataHubServer.getMockServer().reset(); + emitter.testConnection(); + Properties properties = new Properties(); + properties.load(emitter.getClass().getClassLoader().getResourceAsStream("client.properties")); + Assert.assertNotNull(properties.getProperty("clientVersion")); + String version = properties.getProperty("clientVersion"); + testDataHubServer.getMockServer().verify( + request("/config") + .withHeader("User-Agent", "DataHub-RestClient/" + version)); + } } \ No newline at end of file diff --git a/metadata-integration/java/spark-lineage/README.md b/metadata-integration/java/spark-lineage/README.md index e679bf1e6c..67ca552fce 100644 --- a/metadata-integration/java/spark-lineage/README.md +++ b/metadata-integration/java/spark-lineage/README.md @@ -1,38 +1,59 @@ -# Spark lineage emitter -The Spark lineage emitter is a java library that provides a Spark listener implementation "DatahubLineageEmitter". The DatahubLineageEmitter listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage. +# Spark Integration +To integrate Spark with DataHub, we provide a lightweight Java agent that listens for Spark application and job events and pushes metadata out to DataHub in real-time. The agent listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage to datasets that are being read from and written to. Read on to learn how to configure this for different Spark scenarios. -## Configuring Spark emitter -Listener configuration can be done using a config file or while creating a spark Session. +## Configuring Spark agent +The Spark agent can be configured using a config file or while creating a spark Session. -### Config file for spark-submit -When running jobs using spark-submit, the listener is to be configured in the config file. +## Before you begin: Versions and Release Notes +Versioning of the jar artifact will follow the semantic versioning of the main [DataHub repo](https://github.com/linkedin/datahub) and release notes will be available [here](https://github.com/linkedin/datahub/releases). +Always check [the Maven central repository](https://search.maven.org/search?q=a:datahub-spark-lineage) for the latest released version. + +### Configuration Instructions: spark-submit +When running jobs using spark-submit, the agent needs to be configured in the config file. ``` spark.master spark://spark-master:7077 -#Configuring datahub spark listener jar -spark.jars.packages io.acryl:datahub-spark-lineage:0.0.3 -spark.extraListeners com.linkedin.datahub.lineage.spark.interceptor.DatahubLineageEmitter +#Configuring datahub spark agent jar +spark.jars.packages io.acryl:datahub-spark-lineage:0.8.23 +spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` -### Configuring with SparkSession Builder for notebooks +### Configuration Instructions: Notebooks When running interactive jobs from a notebook, the listener can be configured while building the Spark Session. ```python spark = SparkSession.builder \ .master("spark://spark-master:7077") \ .appName("test-application") \ - .config("spark.jars.packages","io.acryl:datahub-spark-lineage:0.0.3") \ - .config("spark.extraListeners","com.linkedin.datahub.lineage.interceptor.spark.DatahubLineageEmitter") \ + .config("spark.jars.packages","io.acryl:datahub-spark-lineage:0.8.23") \ + .config("spark.extraListeners","datahub.spark.DatahubSparkListener") \ .config("spark.datahub.rest.server", "http://localhost:8080") \ .enableHiveSupport() \ .getOrCreate() ``` -## Model mapping -A pipeline is created per Spark . -A task is created per unique Spark query execution within an app. +### Configuration Instructions: Standalone Java Applications +The configuration for standalone Java apps is very similar. + +```java +spark = SparkSession.builder() + .appName("test-application") + .config("spark.master", "spark://spark-master:7077") + .config("spark.jars.packages","io.acryl:datahub-spark-lineage:0.8.23") + .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") + .config("spark.datahub.rest.server", "http://localhost:8080") + .enableHiveSupport() + .getOrCreate(); + ``` + +## What to Expect: The Metadata Model + +As of current writing, the Spark agent produces metadata related to the Spark job, tasks and lineage edges to datasets. + +- A pipeline is created per Spark . +- A task is created per unique Spark query execution within an app. ### Custom properties & relating to Spark UI The following custom properties in pipelines and tasks relate to the Spark UI: @@ -42,21 +63,21 @@ The following custom properties in pipelines and tasks relate to the Spark UI: Other custom properties of pipelines and tasks capture the start and end times of execution etc. The query plan is captured in the *queryPlan* property of a task. -## Release notes for v0.0.3 -In this version, basic dataset-level lineage is captured using the model mapping as mentioned earlier. + ### Spark versions supported The primary version tested is Spark/Scala version 2.4.8/2_11. -We anticipate this to work well with other Spark 2.4.x versions and Scala 2_11. - +This library has also been tested to work with Spark versions(2.2.0 - 2.4.8) and Scala versions(2.10 - 2.12). +For the Spark 3.x series, this has been tested to work with Spark 3.1.2 and 3.2.0 with Scala 2.12. Other combinations are not guaranteed to work currently. Support for other Spark versions is planned in the very near future. ### Environments tested with This initial release has been tested with the following environments: - spark-submit of Python/Java applications to local and remote servers -- notebooks +- Jupyter notebooks with pyspark code +- Standalone Java applications -Note that testing for other environments such as Databricks and standalone applications is planned in near future. +Note that testing for other environments such as Databricks is planned in near future. ### Spark commands supported Below is a list of Spark commands that are parsed currently: @@ -75,11 +96,8 @@ Effectively, these support data sources/sinks corresponding to Hive, HDFS and JD ### Important notes on usage - It is advisable to ensure appName is used appropriately to ensure you can trace lineage from a pipeline back to your source code. - - If multiple apps with the same appName run concurrently, dataset-lineage will be captured correctly but the custom-properties e.g. app-id, SQLQueryId would be unreliable. We expect this to be quite rare. - - If spark execution fails, then an empty pipeline would still get created, but it may not have any tasks. - - For HDFS sources, the folder (name) is regarded as the dataset (name) to align with typical storage of parquet/csv formats. ## Known limitations diff --git a/metadata-integration/java/spark-lineage/build.gradle b/metadata-integration/java/spark-lineage/build.gradle index 14a7b84948..2c0a0e045d 100644 --- a/metadata-integration/java/spark-lineage/build.gradle +++ b/metadata-integration/java/spark-lineage/build.gradle @@ -1,21 +1,49 @@ apply plugin: 'java' apply plugin: 'com.github.johnrengelman.shadow' -apply plugin: 'maven' apply plugin: 'signing' +apply plugin: 'io.codearte.nexus-staging' +apply plugin: 'maven-publish' apply plugin: 'jacoco' +apply plugin: 'com.palantir.git-version' +import org.apache.tools.ant.filters.ReplaceTokens + +jar.enabled = false // Since we only want to build shadow jars, disabling the regular jar creation + +//to rename artifacts for publish +project.archivesBaseName = 'datahub-'+project.name + +//mark implementaion dependencies which needs to excluded along with transitive dependencies from shadowjar +//functionality is exactly same as "implementation" +configurations { + provided + implementation.extendsFrom provided +} + +def details = versionDetails() +version = details.lastTag +version = version.startsWith("v")? version.substring(1): version +// trim version if it is of size 4 to size 3 +def versionParts = version.tokenize(".") +def lastPart = details.isCleanTag? versionParts[2]: (versionParts[2].toInteger()+1).toString() + "-SNAPSHOT" +version = versionParts[0] + "." + versionParts[1] + "." + lastPart + +processResources { + filter(ReplaceTokens, tokens:[fullVersion: gitVersion()]) +} + dependencies { //Needed for tie breaking of guava version need for spark and wiremock - compile(externalDependency.hadoopMapreduceClient) { + provided(externalDependency.hadoopMapreduceClient) { force = true } - compile(externalDependency.hadoopCommon) { + provided(externalDependency.hadoopCommon) { force = true } // required for org.apache.hadoop.util.StopWatch - compile(externalDependency.commonsIo) { + provided(externalDependency.commonsIo) { force = true } // required for org.apache.commons.io.Charsets that is used internally @@ -25,8 +53,8 @@ dependencies { implementation project(path: ':metadata-integration:java:datahub-client', configuration: 'shadow') - implementation(externalDependency.sparkSql) - implementation(externalDependency.sparkHive) + provided(externalDependency.sparkSql) + provided(externalDependency.sparkHive) // Tests need a concrete log4j available. Providing it here testImplementation 'org.apache.logging.log4j:log4j-api:2.17.1' @@ -49,18 +77,43 @@ dependencies { testImplementation(externalDependency.testContainersPostgresql) } - +task checkShadowJar(type: Exec) { + commandLine 'sh', '-c', 'scripts/check_jar.sh' +} shadowJar { zip64=true classifier='' + mergeServiceFiles() + + def exclude_modules = project + .configurations + .provided + .resolvedConfiguration + .getLenientConfiguration() + .getAllModuleDependencies() + .collect { + it.name + } dependencies { - exclude(dependency("org.apache.hadoop::")) - exclude(dependency("org.apache.spark::")) - exclude(dependency(externalDependency.commonsIo)) + + exclude(dependency { + exclude_modules.contains(it.name) + }) + } + + relocate 'org.apache.http','datahub.spark2.shaded.http' + relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec' + relocate 'mozilla', 'datahub.spark2.shaded.mozilla' + finalizedBy checkShadowJar } +checkShadowJar { + dependsOn shadowJar +} + + jacocoTestReport { dependsOn test // tests are required to run before generating the report } @@ -74,7 +127,7 @@ assemble { dependsOn shadowJar } -task sourceJar(type: Jar) { +task sourcesJar(type: Jar) { classifier 'sources' from sourceSets.main.allJava } @@ -84,63 +137,71 @@ task javadocJar(type: Jar, dependsOn: javadoc) { from javadoc.destinationDir } -artifacts { - archives shadowJar +publishing { + publications { + shadow(MavenPublication) { + publication -> project.shadow.component(publication) + pom { + name = 'Datahub Spark Lineage' + group = 'io.acryl' + artifactId = 'datahub-spark-lineage' + description = 'Library to push data lineage from spark to datahub' + url = 'https://datahubproject.io' + artifacts = [ shadowJar, javadocJar, sourcesJar ] + + scm { + connection = 'scm:git:git://github.com/linkedin/datahub.git' + developerConnection = 'scm:git:ssh://github.com:linkedin/datahub.git' + url = 'https://github.com/linkedin/datahub.git' + } + + licenses { + license { + name = 'The Apache License, Version 2.0' + url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' + } + } + + developers { + developer { + id = 'datahub' + name = 'Datahub' + email = 'datahub@acryl.io' + } + } + } + } + } + + repositories { + maven { + def releasesRepoUrl = "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/" + def snapshotsRepoUrl = "https://s01.oss.sonatype.org/content/repositories/snapshots/" + def ossrhUsername = System.getenv('RELEASE_USERNAME') + def ossrhPassword = System.getenv('RELEASE_PASSWORD') + credentials { + username ossrhUsername + password ossrhPassword + } + url = version.endsWith('SNAPSHOT') ? snapshotsRepoUrl : releasesRepoUrl + } + } } -// uploadArchives { -// repositories { -// mavenDeployer { -// def ossrhUsername = System.getenv('RELEASE_USERNAME') -// def ossrhPassword = System.getenv('RELEASE_PASSWORD') -// beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) } -// repository(url: "https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/") { -// authentication(userName: ossrhUsername, password: ossrhPassword) -// } +signing { + def signingKey = findProperty("signingKey") + def signingPassword = System.getenv("SIGNING_PASSWORD") + useInMemoryPgpKeys(signingKey, signingPassword) + sign publishing.publications.shadow +} + +nexusStaging { + serverUrl = "https://s01.oss.sonatype.org/service/local/" //required only for projects registered in Sonatype after 2021-02-24 + username = System.getenv("NEXUS_USERNAME") + password = System.getenv("NEXUS_PASSWORD") +} -// snapshotRepository(url: "https://s01.oss.sonatype.org/content/repositories/snapshots/") { -// authentication(userName: ossrhUsername, password: ossrhPassword) -// } - -// pom.project { -// //No need to specify name here. Name is always picked up from project name -// //name 'spark-lineage' -// packaging 'jar' -// // optionally artifactId can be defined here -// description 'Library to push data lineage from spark to datahub' -// url 'https://datahubproject.io' - -// scm { -// connection 'scm:git:git://github.com/linkedin/datahub.git' -// developerConnection 'scm:git:ssh://github.com:linkedin/datahub.git' -// url 'https://github.com/linkedin/datahub.git' -// } - -// licenses { -// license { -// name 'The Apache License, Version 2.0' -// url 'http://www.apache.org/licenses/LICENSE-2.0.txt' -// } -// } - -// developers { -// developer { -// id 'datahub' -// name 'datahub' -// -// } -// } -// } -// } -// } -// } -// signing { -// def signingKey = findProperty("signingKey") -// def signingPassword = findProperty("signingPassword") -// useInMemoryPgpKeys(signingKey, signingPassword) -// sign configurations.archives -// } diff --git a/metadata-integration/java/spark-lineage/scripts/check_jar.sh b/metadata-integration/java/spark-lineage/scripts/check_jar.sh new file mode 100755 index 0000000000..73f4bdd0f3 --- /dev/null +++ b/metadata-integration/java/spark-lineage/scripts/check_jar.sh @@ -0,0 +1,24 @@ +# This script checks the shadow jar to ensure that we only have allowed classes being exposed through the jar +jarFiles=$(find build/libs -name "datahub-spark-lineage*.jar" | grep -v sources | grep -v javadoc) +for jarFile in ${jarFiles}; do +jar -tvf $jarFile |\ + grep -v "datahub/shaded" |\ + grep -v "META-INF" |\ + grep -v "com/linkedin" |\ + grep -v "com/datahub" |\ + grep -v "datahub" |\ + grep -v "entity-registry" |\ + grep -v "pegasus/" |\ + grep -v "legacyPegasusSchemas/" |\ + grep -v " com/$" |\ + grep -v "git.properties" |\ + grep -v "client.properties" + +if [ $? -ne 0 ]; then + echo "✅ No unexpected class paths found in ${jarFile}" +else + echo "💥 Found unexpected class paths in ${jarFile}" + exit 1 +fi +done +exit 0 diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/consumer/impl/McpEmitter.java b/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/consumer/impl/McpEmitter.java deleted file mode 100644 index a0ba55d32a..0000000000 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/consumer/impl/McpEmitter.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.linkedin.datahub.lineage.consumer.impl; - -import com.linkedin.datahub.lineage.spark.model.LineageConsumer; -import com.linkedin.datahub.lineage.spark.model.LineageEvent; -import datahub.client.Emitter; -import datahub.client.rest.RestEmitter; -import datahub.event.MetadataChangeProposalWrapper; -import java.io.IOException; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; -import org.apache.spark.SparkConf; -import org.apache.spark.SparkEnv; - - -@Slf4j -public class McpEmitter implements LineageConsumer { - - private static final String GMS_URL_KEY = "spark.datahub.rest.server"; - private static final String SENTINEL = "moot"; - - private ConcurrentHashMap singleton = new ConcurrentHashMap<>(); - - private void emit(List mcpws) { - Emitter emitter = emitter(); - if (emitter != null) { - mcpws.stream().map(mcpw -> { - try { - return emitter.emit(mcpw); - } catch (IOException ioException) { - log.error("Failed to emit metadata to DataHub", ioException); - return null; - } - }).filter(Objects::nonNull).collect(Collectors.toList()).forEach(future -> { - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - // log error, but don't impact thread - log.error("Failed to emit metadata to DataHub", e); - } - }); - } - } - - // TODO ideally the impl here should not be tied to Spark; the LineageConsumer - // API needs tweaking to include configs - private Emitter emitter() { - singleton.computeIfAbsent(SENTINEL, x -> { - SparkConf conf = SparkEnv.get().conf(); - if (conf.contains(GMS_URL_KEY)) { - String gmsUrl = conf.get(GMS_URL_KEY); - log.debug("REST emitter configured with GMS url " + gmsUrl); - return RestEmitter.create($ -> $.server(gmsUrl)); - } - - log.error("GMS URL not configured."); - return null; - }); - - return singleton.get(SENTINEL); - } - - @Override - public void accept(LineageEvent evt) { - emit(evt.toMcps()); - } -} diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/LineageConsumer.java b/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/LineageConsumer.java deleted file mode 100644 index 1c9341116b..0000000000 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/LineageConsumer.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.linkedin.datahub.lineage.spark.model; - -import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue.Consumer; - -public interface LineageConsumer extends Consumer { -} diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/interceptor/DatahubLineageEmitter.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java similarity index 71% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/interceptor/DatahubLineageEmitter.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 72d231f991..3a9682adf1 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/interceptor/DatahubLineageEmitter.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -1,5 +1,16 @@ -package com.linkedin.datahub.lineage.spark.interceptor; +package datahub.spark; +import com.google.common.base.Splitter; +import datahub.spark.model.LineageUtils; +import datahub.spark.model.AppEndEvent; +import datahub.spark.model.AppStartEvent; +import datahub.spark.model.DatasetLineage; +import datahub.spark.model.LineageConsumer; +import datahub.spark.model.SQLQueryExecEndEvent; +import datahub.spark.model.SQLQueryExecStartEvent; +import datahub.spark.model.dataset.SparkDataset; +import datahub.spark.consumer.impl.McpEmitter; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; @@ -7,13 +18,14 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.StreamSupport; - +import lombok.extern.slf4j.Slf4j; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv; @@ -28,41 +40,28 @@ import org.apache.spark.sql.execution.QueryExecution; import org.apache.spark.sql.execution.SQLExecution; import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart; - -import com.google.common.base.Splitter; -import com.linkedin.datahub.lineage.spark.model.AppEndEvent; -import com.linkedin.datahub.lineage.spark.model.AppStartEvent; -import com.linkedin.datahub.lineage.spark.model.DatasetLineage; -import com.linkedin.datahub.lineage.spark.model.LineageConsumer; -import com.linkedin.datahub.lineage.spark.model.SQLQueryExecEndEvent; -import com.linkedin.datahub.lineage.spark.model.SQLQueryExecStartEvent; -import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset; - -import lombok.extern.slf4j.Slf4j; import scala.collection.JavaConversions; import scala.runtime.AbstractFunction1; import scala.runtime.AbstractPartialFunction; + @Slf4j -public class DatahubLineageEmitter extends SparkListener { +public class DatahubSparkListener extends SparkListener { private static final int THREAD_CNT = 16; public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes"; + public static final String DATAHUB_EMITTER = "mcpEmitter"; private final Map appDetails = new ConcurrentHashMap<>(); private final Map> appSqlDetails = new ConcurrentHashMap<>(); private final Map appPoolDetails = new ConcurrentHashMap<>(); - -// private static LineageConsumer loggingConsumer() { -// log.warn("Lineage consumer not specified. Defaulting to LoggingConsumer."); -// return LineageUtils.LOGGING_CONSUMER; -// } + private final Map appEmitters = new ConcurrentHashMap<>(); private class SqlStartTask implements Runnable { - private SparkListenerSQLExecutionStart sqlStart; - private SparkContext ctx; - private LogicalPlan plan; + private final SparkListenerSQLExecutionStart sqlStart; + private final SparkContext ctx; + private final LogicalPlan plan; public SqlStartTask(SparkListenerSQLExecutionStart sqlStart, LogicalPlan plan, SparkContext ctx) { this.sqlStart = sqlStart; @@ -72,9 +71,10 @@ public class DatahubLineageEmitter extends SparkListener { @Override public void run() { - appSqlDetails.get(ctx.appName()).put(sqlStart.executionId(), - new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), ctx.applicationId(), - sqlStart.time(), sqlStart.executionId(), null)); + appSqlDetails.get(ctx.appName()) + .put(sqlStart.executionId(), + new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), ctx.applicationId(), + sqlStart.time(), sqlStart.executionId(), null)); log.debug("PLAN for execution id: " + ctx.appName() + ":" + sqlStart.executionId() + "\n"); log.debug(plan.toString()); @@ -131,20 +131,21 @@ public class DatahubLineageEmitter extends SparkListener { }); } - SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), - ctx.applicationId(), - sqlStart.time(), sqlStart.executionId(), lineage); + SQLQueryExecStartEvent evt = + new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), ctx.appName(), ctx.applicationId(), + sqlStart.time(), sqlStart.executionId(), lineage); appSqlDetails.get(ctx.appName()).put(sqlStart.executionId(), evt); - consumers().forEach(c -> c.accept(evt)); // TODO parallel stream here? + McpEmitter emitter = appEmitters.get(ctx.appName()); + if (emitter != null) { + emitter.accept(evt); + } + consumers().forEach(c -> c.accept(evt)); - log.debug("LINEAGE \n" + lineage + "\n"); - log.debug("Parsed execution id " + ctx.appName() + ":" + sqlStart.executionId()); - - return; + log.debug("LINEAGE \n{}\n", lineage); + log.debug("Parsed execution id {}:{}", ctx.appName(), sqlStart.executionId()); } - } @Override @@ -156,12 +157,13 @@ public class DatahubLineageEmitter extends SparkListener { @Override public Void apply(SparkContext sc) { String appId = applicationStart.appId().isDefined() ? applicationStart.appId().get() : ""; - AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(sc), applicationStart.appName(), appId, - applicationStart.time(), applicationStart.sparkUser()); + AppStartEvent evt = + new AppStartEvent(LineageUtils.getMaster(sc), applicationStart.appName(), appId, applicationStart.time(), + applicationStart.sparkUser()); + + appEmitters.computeIfAbsent(applicationStart.appName(), s -> new McpEmitter()).accept(evt); + consumers().forEach(c -> c.accept(evt)); - consumers().forEach(x -> x.accept(evt)); - // TODO keyed by appName; only latest will be considered. Potential - // inconsistencies not mapped. appDetails.put(applicationStart.appName(), evt); appSqlDetails.put(applicationStart.appName(), new ConcurrentHashMap<>()); ExecutorService pool = Executors.newFixedThreadPool(THREAD_CNT); @@ -192,13 +194,29 @@ public class DatahubLineageEmitter extends SparkListener { appPoolDetails.remove(sc.appName()).shutdown(); appSqlDetails.remove(sc.appName()); if (start == null) { - log.error( - "Application end event received, but start event missing for appId " + sc.applicationId()); + log.error("Application end event received, but start event missing for appId " + sc.applicationId()); } else { AppEndEvent evt = new AppEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), applicationEnd.time(), start); - consumers().forEach(x -> x.accept(evt)); + McpEmitter emitter = appEmitters.get(sc.appName()); + if (emitter != null) { + emitter.accept(evt); + try { + emitter.close(); + appEmitters.remove(sc.appName()); + } catch (Exception e) { + log.warn("Failed to close underlying emitter due to {}", e.getMessage()); + } + } + consumers().forEach(x -> { + x.accept(evt); + try { + x.close(); + } catch (IOException e) { + log.warn("Failed to close lineage consumer", e); + } + }); } return null; } @@ -243,17 +261,20 @@ public class DatahubLineageEmitter extends SparkListener { public Void apply(SparkContext sc) { SQLQueryExecStartEvent start = appSqlDetails.get(sc.appName()).remove(sqlEnd.executionId()); if (start == null) { - log.error("Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() - + ":" + sqlEnd.executionId()); + log.error( + "Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":" + + sqlEnd.executionId()); } else if (start.getDatasetLineage() != null) { // JobStatus status = jobEnd.jobResult().equals(org.apache.spark.scheduler.JobSucceeded$.MODULE$) // ? JobStatus.COMPLETED // : JobStatus.FAILED; - SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), - sc.applicationId(), - sqlEnd.time(), sqlEnd.executionId(), start); - - consumers().forEach(x -> x.accept(evt)); + SQLQueryExecEndEvent evt = + new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(), + sqlEnd.executionId(), start); + McpEmitter emitter = appEmitters.get(sc.appName()); + if (emitter != null) { + emitter.accept(evt); + } } return null; } @@ -264,7 +285,8 @@ public class DatahubLineageEmitter extends SparkListener { private void processExecution(SparkListenerSQLExecutionStart sqlStart) { QueryExecution queryExec = SQLExecution.getQueryExecution(sqlStart.executionId()); if (queryExec == null) { - log.error("Skipping processing for sql exec Id" + sqlStart.executionId() + " as Query execution context could not be read from current spark state"); + log.error("Skipping processing for sql exec Id" + sqlStart.executionId() + + " as Query execution context could not be read from current spark state"); return; } LogicalPlan plan = queryExec.optimizedPlan(); @@ -274,17 +296,16 @@ public class DatahubLineageEmitter extends SparkListener { pool.execute(new SqlStartTask(sqlStart, plan, ctx)); } - private static List consumers() { + private List consumers() { SparkConf conf = SparkEnv.get().conf(); if (conf.contains(CONSUMER_TYPE_KEY)) { String consumerTypes = conf.get(CONSUMER_TYPE_KEY); - return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false) - .map(x -> LineageUtils.getConsumer(x)).filter(x -> x != null).collect(Collectors.toList()); + .map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList()); } else { - return Collections.singletonList(LineageUtils.getConsumer("mcpEmitter")); + return Collections.emptyList(); + //singletonList(LineageUtils.getConsumer(DATAHUB_EMITTER)); } } - } diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/interceptor/DatasetExtractor.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java similarity index 94% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/interceptor/DatasetExtractor.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java index 5b4578588f..0cf8014219 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/interceptor/DatasetExtractor.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java @@ -1,5 +1,9 @@ -package com.linkedin.datahub.lineage.spark.interceptor; +package datahub.spark; +import datahub.spark.model.dataset.CatalogTableDataset; +import datahub.spark.model.dataset.HdfsPathDataset; +import datahub.spark.model.dataset.JdbcDataset; +import datahub.spark.model.dataset.SparkDataset; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -25,10 +29,6 @@ import org.apache.spark.sql.hive.execution.InsertIntoHiveTable; import org.apache.spark.sql.sources.BaseRelation; import com.google.common.collect.ImmutableSet; -import com.linkedin.datahub.lineage.spark.model.dataset.CatalogTableDataset; -import com.linkedin.datahub.lineage.spark.model.dataset.HdfsPathDataset; -import com.linkedin.datahub.lineage.spark.model.dataset.JdbcDataset; -import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset; import scala.Option; import scala.collection.JavaConversions; diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java new file mode 100644 index 0000000000..a245faa8d2 --- /dev/null +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java @@ -0,0 +1,85 @@ +package datahub.spark.consumer.impl; + +import datahub.spark.model.LineageConsumer; +import datahub.spark.model.LineageEvent; +import datahub.client.Emitter; +import datahub.client.rest.RestEmitter; +import datahub.event.MetadataChangeProposalWrapper; +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkEnv; + + +@Slf4j +public class McpEmitter implements LineageConsumer, Closeable { + + private final Optional emitter; + private static final String CONF_PREFIX = "spark.datahub."; + private static final String TRANSPORT_KEY = "transport"; + private static final String GMS_URL_KEY = "rest.server"; + private static final String GMS_AUTH_TOKEN = "rest.token"; + + private void emit(List mcpws) { + if (emitter.isPresent()) { + mcpws.stream().map(mcpw -> { + try { + return emitter.get().emit(mcpw); + } catch (IOException ioException) { + log.error("Failed to emit metadata to DataHub", ioException); + return null; + } + }).filter(Objects::nonNull).collect(Collectors.toList()).forEach(future -> { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + // log error, but don't impact thread + log.error("Failed to emit metadata to DataHub", e); + } + }); + } + } + + // TODO ideally the impl here should not be tied to Spark; the LineageConsumer + // API needs tweaking to include configs + public McpEmitter() { + SparkConf sparkConf = SparkEnv.get().conf(); + Map conf = + Arrays.stream(sparkConf.getAllWithPrefix("spark.datahub.")).collect(Collectors.toMap(x -> x._1, x -> x._2)); + + String emitterType = conf.getOrDefault(TRANSPORT_KEY, "rest"); + if (emitterType.toLowerCase(Locale.ROOT).equals("rest")) { + String gmsUrl = conf.getOrDefault(GMS_URL_KEY, "http://localhost:8080"); + String token = conf.getOrDefault(GMS_AUTH_TOKEN, null); + log.info("REST Emitter Configuration: GMS url {}{}", gmsUrl, (conf.containsKey(GMS_URL_KEY) ? "" : "(default)")); + if (token != null) { + log.info("REST Emitter Configuration: Token {}", (token != null) ? "XXXXX" : "(empty)"); + } + emitter = Optional.of(RestEmitter.create($ -> $.server(gmsUrl).token(token))); + } else { + emitter = Optional.empty(); + log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", emitterType); + } + } + + @Override + public void accept(LineageEvent evt) { + emit(evt.asMetadataEvents()); + } + + @Override + public void close() throws IOException { + if (emitter.isPresent()) { + emitter.get().close(); + } + } +} diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/AppEndEvent.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppEndEvent.java similarity index 85% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/AppEndEvent.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppEndEvent.java index 6e055fabf5..64aef77ddc 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/AppEndEvent.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppEndEvent.java @@ -1,8 +1,7 @@ -package com.linkedin.datahub.lineage.spark.model; +package datahub.spark.model; import com.linkedin.common.urn.DataFlowUrn; import com.linkedin.data.template.StringMap; -import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils; import com.linkedin.datajob.DataFlowInfo; import datahub.event.MetadataChangeProposalWrapper; import java.util.Collections; @@ -23,7 +22,7 @@ public class AppEndEvent extends LineageEvent { } @Override - public List toMcps() { + public List asMetadataEvents() { DataFlowUrn flowUrn = LineageUtils.flowUrn(getMaster(), getAppName()); StringMap customProps = start.customProps(); diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/AppStartEvent.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppStartEvent.java similarity index 87% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/AppStartEvent.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppStartEvent.java index 27acd54fe6..8e6967e381 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/AppStartEvent.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/AppStartEvent.java @@ -1,8 +1,7 @@ -package com.linkedin.datahub.lineage.spark.model; +package datahub.spark.model; import com.linkedin.common.urn.DataFlowUrn; import com.linkedin.data.template.StringMap; -import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils; import com.linkedin.datajob.DataFlowInfo; import datahub.event.MetadataChangeProposalWrapper; import java.util.Collections; @@ -23,7 +22,7 @@ public class AppStartEvent extends LineageEvent { } @Override - public List toMcps() { + public List asMetadataEvents() { DataFlowUrn flowUrn = LineageUtils.flowUrn(getMaster(), getAppName()); DataFlowInfo flowInfo = new DataFlowInfo().setName(getAppName()).setCustomProperties(customProps()); diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/DatasetLineage.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/DatasetLineage.java similarity index 83% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/DatasetLineage.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/DatasetLineage.java index 82446cee80..9583ab69a2 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/DatasetLineage.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/DatasetLineage.java @@ -1,10 +1,10 @@ -package com.linkedin.datahub.lineage.spark.model; +package datahub.spark.model; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset; +import datahub.spark.model.dataset.SparkDataset; import lombok.Getter; import lombok.RequiredArgsConstructor; diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageConsumer.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageConsumer.java new file mode 100644 index 0000000000..890ed6329c --- /dev/null +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageConsumer.java @@ -0,0 +1,7 @@ +package datahub.spark.model; + +import java.io.Closeable; +import java.util.function.Consumer; + +public interface LineageConsumer extends Consumer, Closeable { +} diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/LineageEvent.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageEvent.java similarity index 76% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/LineageEvent.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageEvent.java index bfac81e76e..238bf61e71 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/LineageEvent.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageEvent.java @@ -1,4 +1,4 @@ -package com.linkedin.datahub.lineage.spark.model; +package datahub.spark.model; import datahub.event.MetadataChangeProposalWrapper; import java.util.Date; @@ -13,7 +13,7 @@ public abstract class LineageEvent { private final String appId; private final long time; - public abstract List toMcps(); + public abstract List asMetadataEvents(); protected String timeStr() { return new Date(getTime()).toInstant().toString(); diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/interceptor/LineageUtils.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageUtils.java similarity index 88% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/interceptor/LineageUtils.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageUtils.java index a61a997b47..3f5d3da4d3 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/interceptor/LineageUtils.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/LineageUtils.java @@ -1,8 +1,6 @@ -package com.linkedin.datahub.lineage.spark.interceptor; +package datahub.spark.model; import com.linkedin.common.urn.DataFlowUrn; -import com.linkedin.datahub.lineage.consumer.impl.McpEmitter; -import com.linkedin.datahub.lineage.spark.model.LineageConsumer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; @@ -19,30 +17,14 @@ public class LineageUtils { private static Map consumers = new ConcurrentHashMap<>(); - public static final LineageConsumer LOGGING_CONSUMER = (x -> log.info(x.toString())); - // hook for replacing paths during testing. Not the cleanest way, TODO improve. /* This is for generating urn from a hash of the plan */ // private static Function PATH_REPLACER = (x -> x); - static { - // system defined consumers - registerConsumer("mcpEmitter", new McpEmitter()); - } - private LineageUtils() { } - // overwrites existing consumer entry of same type - public static void registerConsumer(String consumerType, LineageConsumer consumer) { - consumers.put(consumerType, consumer); - } - - public static LineageConsumer getConsumer(String consumerType) { - return consumers.get(consumerType); - } - public static DataFlowUrn flowUrn(String master, String appName) { return new DataFlowUrn("spark", appName, master.replaceAll(":", "_").replaceAll("/", "_").replaceAll("[_]+", "_")); } @@ -67,6 +49,16 @@ public class LineageUtils { return ctx.conf().get("spark.master"); } + // overwrites existing consumer entry of same type + public static void registerConsumer(String consumerType, LineageConsumer consumer) { + consumers.put(consumerType, consumer); + } + + public static LineageConsumer getConsumer(String consumerType) { + return consumers.get(consumerType); + } + + /* This is for generating urn from a hash of the plan */ /* diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/SQLQueryExecEndEvent.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecEndEvent.java similarity index 91% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/SQLQueryExecEndEvent.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecEndEvent.java index 072fe90d9e..6505cd586b 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/SQLQueryExecEndEvent.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecEndEvent.java @@ -1,4 +1,4 @@ -package com.linkedin.datahub.lineage.spark.model; +package datahub.spark.model; import com.linkedin.common.urn.DataJobUrn; import com.linkedin.data.template.StringMap; @@ -25,7 +25,7 @@ public class SQLQueryExecEndEvent extends LineageEvent { } @Override - public List toMcps() { + public List asMetadataEvents() { DataJobUrn jobUrn = start.jobUrn(); StringMap customProps = start.customProps(); customProps.put("completedAt", timeStr()); diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/SQLQueryExecStartEvent.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java similarity index 93% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/SQLQueryExecStartEvent.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java index 125243d806..263b4d8a83 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/SQLQueryExecStartEvent.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/SQLQueryExecStartEvent.java @@ -1,11 +1,10 @@ -package com.linkedin.datahub.lineage.spark.model; +package datahub.spark.model; import com.linkedin.common.DatasetUrnArray; import com.linkedin.common.urn.DataFlowUrn; import com.linkedin.common.urn.DataJobUrn; import com.linkedin.data.template.StringMap; -import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils; -import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset; +import datahub.spark.model.dataset.SparkDataset; import com.linkedin.datajob.DataJobInfo; import com.linkedin.datajob.DataJobInputOutput; import com.linkedin.datajob.JobStatus; @@ -33,7 +32,7 @@ public class SQLQueryExecStartEvent extends LineageEvent { } @Override - public List toMcps() { + public List asMetadataEvents() { DataJobUrn jobUrn = jobUrn(); MetadataChangeProposalWrapper mcpJobIO = MetadataChangeProposalWrapper.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobIO())); diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/CatalogTableDataset.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/CatalogTableDataset.java similarity index 91% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/CatalogTableDataset.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/CatalogTableDataset.java index 1903d54135..0722fecd80 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/CatalogTableDataset.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/CatalogTableDataset.java @@ -1,4 +1,4 @@ -package com.linkedin.datahub.lineage.spark.model.dataset; +package datahub.spark.model.dataset; import org.apache.spark.sql.catalyst.catalog.CatalogTable; diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/HdfsPathDataset.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java similarity index 92% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/HdfsPathDataset.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java index 1dec3423eb..775bbd7738 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/HdfsPathDataset.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java @@ -1,4 +1,4 @@ -package com.linkedin.datahub.lineage.spark.model.dataset; +package datahub.spark.model.dataset; import org.apache.hadoop.fs.Path; diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/JdbcDataset.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/JdbcDataset.java similarity index 94% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/JdbcDataset.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/JdbcDataset.java index 9582b0a4f2..20d8df4952 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/JdbcDataset.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/JdbcDataset.java @@ -1,4 +1,4 @@ -package com.linkedin.datahub.lineage.spark.model.dataset; +package datahub.spark.model.dataset; import com.linkedin.common.FabricType; import com.linkedin.common.urn.DataPlatformUrn; diff --git a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/SparkDataset.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/SparkDataset.java similarity index 63% rename from metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/SparkDataset.java rename to metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/SparkDataset.java index 5da4dc2deb..cdc30d1d54 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/com/linkedin/datahub/lineage/spark/model/dataset/SparkDataset.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/SparkDataset.java @@ -1,4 +1,4 @@ -package com.linkedin.datahub.lineage.spark.model.dataset; +package datahub.spark.model.dataset; import com.linkedin.common.urn.DatasetUrn; diff --git a/metadata-integration/java/spark-lineage/src/test/java/com/linkedin/datahub/lineage/TestSparkJobsLineage.java b/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java similarity index 94% rename from metadata-integration/java/spark-lineage/src/test/java/com/linkedin/datahub/lineage/TestSparkJobsLineage.java rename to metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java index d66ebb38ac..bec7abd1f7 100644 --- a/metadata-integration/java/spark-lineage/src/test/java/com/linkedin/datahub/lineage/TestSparkJobsLineage.java +++ b/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java @@ -1,14 +1,14 @@ -package com.linkedin.datahub.lineage; +package datahub.spark; -import com.linkedin.datahub.lineage.spark.interceptor.LineageUtils; -import com.linkedin.datahub.lineage.spark.model.DatasetLineage; -import com.linkedin.datahub.lineage.spark.model.LineageConsumer; -import com.linkedin.datahub.lineage.spark.model.LineageEvent; -import com.linkedin.datahub.lineage.spark.model.SQLQueryExecStartEvent; -import com.linkedin.datahub.lineage.spark.model.dataset.CatalogTableDataset; -import com.linkedin.datahub.lineage.spark.model.dataset.HdfsPathDataset; -import com.linkedin.datahub.lineage.spark.model.dataset.JdbcDataset; -import com.linkedin.datahub.lineage.spark.model.dataset.SparkDataset; +import datahub.spark.model.LineageUtils; +import datahub.spark.model.DatasetLineage; +import datahub.spark.model.LineageConsumer; +import datahub.spark.model.LineageEvent; +import datahub.spark.model.SQLQueryExecStartEvent; +import datahub.spark.model.dataset.CatalogTableDataset; +import datahub.spark.model.dataset.HdfsPathDataset; +import datahub.spark.model.dataset.JdbcDataset; +import datahub.spark.model.dataset.SparkDataset; import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -140,8 +140,8 @@ public class TestSparkJobsLineage { spark = SparkSession.builder() .appName(APP_NAME) .config("spark.master", MASTER) - .config("spark.extraListeners", "com.linkedin.datahub.lineage.spark.interceptor.DatahubLineageEmitter") - .config("spark.datahub.lineage.consumerTypes", "accumulator, mcpEmitter") + .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") + .config("spark.datahub.lineage.consumerTypes", "accumulator") .config("spark.datahub.rest.server", "http://localhost:" + mockServer.getPort()) .config("spark.sql.warehouse.dir", new File(WAREHOUSE_LOC).getAbsolutePath()) .enableHiveSupport() @@ -407,6 +407,7 @@ public class TestSparkJobsLineage { } private static class DatasetLineageAccumulator implements LineageConsumer { + boolean closed = false; private final List lineages = new ArrayList<>(); @@ -420,9 +421,17 @@ public class TestSparkJobsLineage { @Override public void accept(LineageEvent e) { + if (closed) { + throw new RuntimeException("Called after close"); + } if (e instanceof SQLQueryExecStartEvent) { lineages.add(((SQLQueryExecStartEvent) e).getDatasetLineage()); } } + + @Override + public void close() throws IOException { + closed = true; + } } } \ No newline at end of file