diff --git a/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut1.py b/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut1.py index 3ccffe192d..e4c50dbb4a 100644 --- a/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut1.py +++ b/metadata-integration/java/spark-lineage/spark-smoke-test/python-spark-lineage-test/HdfsIn2HdfsOut1.py @@ -19,9 +19,7 @@ df = spark.sql("select v1.c1 as a, v1.c2 as b, v2.c1 as c, v2.c2 as d from v1 jo #InsertIntoHadoopFsRelationCommand df.write.mode('overwrite').csv(DATA_DIR + "/" + TEST_CASE_NAME+"/out.csv") -spark.sparkContext.stop() - -spark.stop() + diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 88b52ddca5..2132978ee9 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -12,7 +12,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -32,7 +31,6 @@ import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart; import com.google.common.base.Splitter; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.typesafe.config.Config; import datahub.spark.consumer.impl.McpEmitter; @@ -52,7 +50,6 @@ import scala.runtime.AbstractPartialFunction; @Slf4j public class DatahubSparkListener extends SparkListener { - private static final int THREAD_CNT = 16; public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes"; public static final String DATAHUB_EMITTER = "mcpEmitter"; public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster"; @@ -61,7 +58,6 @@ public class DatahubSparkListener extends SparkListener { private final Map appDetails = new ConcurrentHashMap<>(); private final Map> appSqlDetails = new ConcurrentHashMap<>(); - private final Map appPoolDetails = new ConcurrentHashMap<>(); private final Map appEmitters = new ConcurrentHashMap<>(); private final Map appConfig = new ConcurrentHashMap<>(); @@ -69,7 +65,7 @@ public class DatahubSparkListener extends SparkListener { log.info("DatahubSparkListener initialised."); } - private class SqlStartTask implements Runnable { + private class SqlStartTask { private final SparkListenerSQLExecutionStart sqlStart; private final SparkContext ctx; @@ -81,7 +77,6 @@ public class DatahubSparkListener extends SparkListener { this.ctx = ctx; } - @Override public void run() { appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(), @@ -166,7 +161,7 @@ public class DatahubSparkListener extends SparkListener { @Override public Void apply(SparkContext sc) { - getOrCreateApplicationSetup(sc); + checkOrCreateApplicationSetup(sc); return null; } }); @@ -190,7 +185,6 @@ public class DatahubSparkListener extends SparkListener { public Void apply(SparkContext sc) { log.info("Application ended : {} {}", sc.appName(), sc.applicationId()); AppStartEvent start = appDetails.remove(sc.applicationId()); - appPoolDetails.remove(sc.applicationId()).shutdown(); appSqlDetails.remove(sc.applicationId()); if (start == null) { log.error("Application end event received, but start event missing for appId " + sc.applicationId()); @@ -274,9 +268,8 @@ public class DatahubSparkListener extends SparkListener { } }); } - - private synchronized ExecutorService getOrCreateApplicationSetup(SparkContext ctx) { - + + private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) { ExecutorService pool = null; String appId = ctx.applicationId(); Config datahubConfig = appConfig.get(appId); @@ -292,16 +285,7 @@ public class DatahubSparkListener extends SparkListener { consumers().forEach(c -> c.accept(evt)); appDetails.put(appId, evt); appSqlDetails.put(appId, new ConcurrentHashMap<>()); - pool = Executors.newFixedThreadPool(THREAD_CNT, - new ThreadFactoryBuilder().setNameFormat("datahub-emit-pool").build()); - appPoolDetails.put(appId, pool); - log.debug("Execution thread pool initialised for {}", appId); - } else { - pool = appPoolDetails.get(appId); - } - - return pool; - + } } private String getPipelineName(SparkContext cx) { @@ -329,8 +313,7 @@ public class DatahubSparkListener extends SparkListener { LogicalPlan plan = queryExec.optimizedPlan(); SparkSession sess = queryExec.sparkSession(); SparkContext ctx = sess.sparkContext(); - ExecutorService pool = getOrCreateApplicationSetup(ctx); - pool.execute(new SqlStartTask(sqlStart, plan, ctx)); + (new SqlStartTask(sqlStart, plan, ctx)).run(); } private List consumers() { diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java index c89e4fb5cd..946ca5a0e4 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/consumer/impl/McpEmitter.java @@ -1,30 +1,51 @@ package datahub.spark.consumer.impl; -import datahub.spark.model.LineageConsumer; -import datahub.spark.model.LineageEvent; -import datahub.client.Emitter; -import datahub.client.rest.RestEmitter; -import datahub.event.MetadataChangeProposalWrapper; import java.io.IOException; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; import com.typesafe.config.Config; +import datahub.client.Emitter; +import datahub.client.rest.RestEmitter; +import datahub.client.rest.RestEmitterConfig; +import datahub.event.MetadataChangeProposalWrapper; +import datahub.spark.model.LineageConsumer; +import datahub.spark.model.LineageEvent; +import lombok.extern.slf4j.Slf4j; + @Slf4j public class McpEmitter implements LineageConsumer { - private final Optional emitter; + private String emitterType; + private Optional restEmitterConfig; private static final String TRANSPORT_KEY = "transport"; private static final String GMS_URL_KEY = "rest.server"; private static final String GMS_AUTH_TOKEN = "rest.token"; + + private Optional getEmitter() { + Optional emitter = Optional.empty(); + switch (emitterType) { + case "rest": + if (restEmitterConfig.isPresent()) { + emitter = Optional.of(new RestEmitter(restEmitterConfig.get())); + } + break; + + default: + log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", emitterType); + break; + + } + return emitter; + } private void emit(List mcpws) { + Optional emitter = getEmitter(); if (emitter.isPresent()) { mcpws.stream().map(mcpw -> { try { @@ -42,11 +63,16 @@ public class McpEmitter implements LineageConsumer { log.error("Failed to emit metadata to DataHub", e); } }); + try { + emitter.get().close(); + } catch (IOException e) { + log.error("Issue while closing emitter" + e); + } } } public McpEmitter(Config datahubConf) { - String emitterType = datahubConf.hasPath(TRANSPORT_KEY) ? datahubConf.getString(TRANSPORT_KEY) : "rest"; + emitterType = datahubConf.hasPath(TRANSPORT_KEY) ? datahubConf.getString(TRANSPORT_KEY) : "rest"; switch (emitterType) { case "rest": String gmsUrl = datahubConf.hasPath(GMS_URL_KEY) ? datahubConf.getString(GMS_URL_KEY) @@ -57,10 +83,10 @@ public class McpEmitter implements LineageConsumer { if (token != null) { log.info("REST Emitter Configuration: Token {}", (token != null) ? "XXXXX" : "(empty)"); } - emitter = Optional.of(RestEmitter.create($ -> $.server(gmsUrl).token(token))); + restEmitterConfig = Optional.of(RestEmitterConfig.builder().server(gmsUrl).token(token).build()); + break; default: - emitter = Optional.empty(); log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", emitterType); break; } @@ -73,8 +99,9 @@ public class McpEmitter implements LineageConsumer { @Override public void close() throws IOException { - if (emitter.isPresent()) { - emitter.get().close(); - } + // Nothing to close at this point + } -} + + +} \ No newline at end of file