fix(spark-lineage): remove need for sparksession.stop call (#4940)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
MugdhaHardikar-GSLab 2022-05-29 21:37:26 +05:30 committed by GitHub
parent 96f923e007
commit 1479c35bfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 40 deletions

View File

@ -19,9 +19,7 @@ df = spark.sql("select v1.c1 as a, v1.c2 as b, v2.c1 as c, v2.c2 as d from v1 jo
#InsertIntoHadoopFsRelationCommand
df.write.mode('overwrite').csv(DATA_DIR + "/" + TEST_CASE_NAME+"/out.csv")
spark.sparkContext.stop()
spark.stop()

View File

@ -12,7 +12,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -32,7 +31,6 @@ import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import com.google.common.base.Splitter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.typesafe.config.Config;
import datahub.spark.consumer.impl.McpEmitter;
@ -52,7 +50,6 @@ import scala.runtime.AbstractPartialFunction;
@Slf4j
public class DatahubSparkListener extends SparkListener {
private static final int THREAD_CNT = 16;
public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes";
public static final String DATAHUB_EMITTER = "mcpEmitter";
public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster";
@ -61,7 +58,6 @@ public class DatahubSparkListener extends SparkListener {
private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap<>();
private final Map<String, ExecutorService> appPoolDetails = new ConcurrentHashMap<>();
private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>();
private final Map<String, Config> appConfig = new ConcurrentHashMap<>();
@ -69,7 +65,7 @@ public class DatahubSparkListener extends SparkListener {
log.info("DatahubSparkListener initialised.");
}
private class SqlStartTask implements Runnable {
private class SqlStartTask {
private final SparkListenerSQLExecutionStart sqlStart;
private final SparkContext ctx;
@ -81,7 +77,6 @@ public class DatahubSparkListener extends SparkListener {
this.ctx = ctx;
}
@Override
public void run() {
appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
@ -166,7 +161,7 @@ public class DatahubSparkListener extends SparkListener {
@Override
public Void apply(SparkContext sc) {
getOrCreateApplicationSetup(sc);
checkOrCreateApplicationSetup(sc);
return null;
}
});
@ -190,7 +185,6 @@ public class DatahubSparkListener extends SparkListener {
public Void apply(SparkContext sc) {
log.info("Application ended : {} {}", sc.appName(), sc.applicationId());
AppStartEvent start = appDetails.remove(sc.applicationId());
appPoolDetails.remove(sc.applicationId()).shutdown();
appSqlDetails.remove(sc.applicationId());
if (start == null) {
log.error("Application end event received, but start event missing for appId " + sc.applicationId());
@ -274,9 +268,8 @@ public class DatahubSparkListener extends SparkListener {
}
});
}
private synchronized ExecutorService getOrCreateApplicationSetup(SparkContext ctx) {
private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) {
ExecutorService pool = null;
String appId = ctx.applicationId();
Config datahubConfig = appConfig.get(appId);
@ -292,16 +285,7 @@ public class DatahubSparkListener extends SparkListener {
consumers().forEach(c -> c.accept(evt));
appDetails.put(appId, evt);
appSqlDetails.put(appId, new ConcurrentHashMap<>());
pool = Executors.newFixedThreadPool(THREAD_CNT,
new ThreadFactoryBuilder().setNameFormat("datahub-emit-pool").build());
appPoolDetails.put(appId, pool);
log.debug("Execution thread pool initialised for {}", appId);
} else {
pool = appPoolDetails.get(appId);
}
return pool;
}
}
private String getPipelineName(SparkContext cx) {
@ -329,8 +313,7 @@ public class DatahubSparkListener extends SparkListener {
LogicalPlan plan = queryExec.optimizedPlan();
SparkSession sess = queryExec.sparkSession();
SparkContext ctx = sess.sparkContext();
ExecutorService pool = getOrCreateApplicationSetup(ctx);
pool.execute(new SqlStartTask(sqlStart, plan, ctx));
(new SqlStartTask(sqlStart, plan, ctx)).run();
}
private List<LineageConsumer> consumers() {

View File

@ -1,30 +1,51 @@
package datahub.spark.consumer.impl;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import datahub.client.Emitter;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import com.typesafe.config.Config;
import datahub.client.Emitter;
import datahub.client.rest.RestEmitter;
import datahub.client.rest.RestEmitterConfig;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class McpEmitter implements LineageConsumer {
private final Optional<Emitter> emitter;
private String emitterType;
private Optional<RestEmitterConfig> restEmitterConfig;
private static final String TRANSPORT_KEY = "transport";
private static final String GMS_URL_KEY = "rest.server";
private static final String GMS_AUTH_TOKEN = "rest.token";
private Optional<Emitter> getEmitter() {
Optional<Emitter> emitter = Optional.empty();
switch (emitterType) {
case "rest":
if (restEmitterConfig.isPresent()) {
emitter = Optional.of(new RestEmitter(restEmitterConfig.get()));
}
break;
default:
log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", emitterType);
break;
}
return emitter;
}
private void emit(List<MetadataChangeProposalWrapper> mcpws) {
Optional<Emitter> emitter = getEmitter();
if (emitter.isPresent()) {
mcpws.stream().map(mcpw -> {
try {
@ -42,11 +63,16 @@ public class McpEmitter implements LineageConsumer {
log.error("Failed to emit metadata to DataHub", e);
}
});
try {
emitter.get().close();
} catch (IOException e) {
log.error("Issue while closing emitter" + e);
}
}
}
public McpEmitter(Config datahubConf) {
String emitterType = datahubConf.hasPath(TRANSPORT_KEY) ? datahubConf.getString(TRANSPORT_KEY) : "rest";
emitterType = datahubConf.hasPath(TRANSPORT_KEY) ? datahubConf.getString(TRANSPORT_KEY) : "rest";
switch (emitterType) {
case "rest":
String gmsUrl = datahubConf.hasPath(GMS_URL_KEY) ? datahubConf.getString(GMS_URL_KEY)
@ -57,10 +83,10 @@ public class McpEmitter implements LineageConsumer {
if (token != null) {
log.info("REST Emitter Configuration: Token {}", (token != null) ? "XXXXX" : "(empty)");
}
emitter = Optional.of(RestEmitter.create($ -> $.server(gmsUrl).token(token)));
restEmitterConfig = Optional.of(RestEmitterConfig.builder().server(gmsUrl).token(token).build());
break;
default:
emitter = Optional.empty();
log.error("DataHub Transport {} not recognized. DataHub Lineage emission will not work", emitterType);
break;
}
@ -73,8 +99,9 @@ public class McpEmitter implements LineageConsumer {
@Override
public void close() throws IOException {
if (emitter.isPresent()) {
emitter.get().close();
}
// Nothing to close at this point
}
}
}