diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 6f72f76699..90410332c3 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -44,10 +44,13 @@ import datahub.spark.model.SQLQueryExecEndEvent; import datahub.spark.model.SQLQueryExecStartEvent; import datahub.spark.model.dataset.SparkDataset; import lombok.extern.slf4j.Slf4j; +import org.apache.spark.util.JsonProtocol; +import org.json4s.jackson.JsonMethods$; import scala.collection.JavaConversions; import scala.runtime.AbstractFunction1; import scala.runtime.AbstractPartialFunction; + @Slf4j public class DatahubSparkListener extends SparkListener { @@ -58,7 +61,7 @@ public class DatahubSparkListener extends SparkListener { public static final String PIPELINE_PLATFORM_INSTANCE_KEY = PIPELINE_KEY + ".platformInstance"; public static final String COALESCE_KEY = "coalesce_jobs"; - + private final Map appDetails = new ConcurrentHashMap<>(); private final Map> appSqlDetails = new ConcurrentHashMap<>(); private final Map appEmitters = new ConcurrentHashMap<>(); @@ -78,12 +81,33 @@ public class DatahubSparkListener extends SparkListener { this.sqlStart = sqlStart; this.plan = plan; this.ctx = ctx; + + String jsonPlan = (plan != null) ? plan.toJSON() : null; + String sqlStartJson = + (sqlStart != null) ? JsonMethods$.MODULE$.compact(JsonProtocol.sparkEventToJson(sqlStart)) : null; + log.debug("SqlStartTask with parameters: sqlStart: {}, plan: {}, ctx: {}", sqlStartJson, jsonPlan, ctx); } public void run() { - appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), - new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(), - sqlStart.time(), sqlStart.executionId(), null)); + if (ctx == null) { + log.error("Context is null skipping run"); + return; + } + + if (ctx.conf() == null) { + log.error("Context does not have config. Skipping run"); + return; + } + + if (sqlStart == null) { + log.error("sqlStart is null skipping run"); + return; + } + + appSqlDetails.get(ctx.applicationId()) + .put(sqlStart.executionId(), + new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(), + sqlStart.time(), sqlStart.executionId(), null)); log.debug("PLAN for execution id: " + getPipelineName(ctx) + ":" + sqlStart.executionId() + "\n"); log.debug(plan.toString()); @@ -94,8 +118,8 @@ public class DatahubSparkListener extends SparkListener { return; } // Here assumption is that there will be only single target for single sql query - DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(), - outputDS.get().iterator().next()); + DatasetLineage lineage = + new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get().iterator().next()); Collection> allInners = new ArrayList<>(); plan.collect(new AbstractPartialFunction() { @@ -140,8 +164,9 @@ public class DatahubSparkListener extends SparkListener { }); } - SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), - ctx.applicationId(), sqlStart.time(), sqlStart.executionId(), lineage); + SQLQueryExecStartEvent evt = + new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(), + sqlStart.time(), sqlStart.executionId(), lineage); appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), evt); @@ -257,11 +282,13 @@ public class DatahubSparkListener extends SparkListener { public Void apply(SparkContext sc) { SQLQueryExecStartEvent start = appSqlDetails.get(sc.applicationId()).remove(sqlEnd.executionId()); if (start == null) { - log.error("Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() - + ":" + sqlEnd.executionId()); + log.error( + "Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":" + + sqlEnd.executionId()); } else if (start.getDatasetLineage() != null) { - SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), - sc.applicationId(), sqlEnd.time(), sqlEnd.executionId(), start); + SQLQueryExecEndEvent evt = + new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(), + sqlEnd.executionId(), start); McpEmitter emitter = appEmitters.get(sc.applicationId()); if (emitter != null) { emitter.accept(evt); @@ -271,7 +298,7 @@ public class DatahubSparkListener extends SparkListener { } }); } - + private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) { ExecutorService pool = null; String appId = ctx.applicationId(); @@ -281,18 +308,17 @@ public class DatahubSparkListener extends SparkListener { appConfig.put(appId, datahubConf); Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY) : com.typesafe.config.ConfigFactory.empty(); - AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(), - ctx.sparkUser(), pipelineConfig); + AppStartEvent evt = + new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(), ctx.sparkUser(), + pipelineConfig); appEmitters.computeIfAbsent(appId, - s -> datahubConf.hasPath(COALESCE_KEY) && datahubConf.getBoolean(COALESCE_KEY) - ? new CoalesceJobsEmitter(datahubConf) - : new McpEmitter(datahubConf)) - .accept(evt); + s -> datahubConf.hasPath(COALESCE_KEY) && datahubConf.getBoolean(COALESCE_KEY) ? new CoalesceJobsEmitter( + datahubConf) : new McpEmitter(datahubConf)).accept(evt); consumers().forEach(c -> c.accept(evt)); appDetails.put(appId, evt); appSqlDetails.put(appId, new ConcurrentHashMap<>()); - } + } } private String getPipelineName(SparkContext cx) { @@ -329,10 +355,11 @@ public class DatahubSparkListener extends SparkListener { if (conf.contains(CONSUMER_TYPE_KEY)) { String consumerTypes = conf.get(CONSUMER_TYPE_KEY); return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false) - .map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList()); + .map(x -> LineageUtils.getConsumer(x)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); } else { return Collections.emptyList(); } - } }