feat(spark-lineage): support for persist API (#4980)

This commit is contained in:
MugdhaHardikar-GSLab 2022-05-24 00:23:57 +05:30 committed by GitHub
parent a9ff203363
commit 96ef7e55ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 304 additions and 155 deletions

4
.gitignore vendored
View File

@ -46,7 +46,7 @@ MANIFEST
**/spark-lineage/metastore_db/
**/spark-lineage/**/derby.log
**/spark-lineage/**/hive/
**/spark-lineage/**/out.csv/
**/spark-lineage/**/out*.csv/
.vscode
#spark smoke test
@ -67,4 +67,4 @@ tmp*
temp*
# frontend assets
datahub-frontend/public/**
datahub-frontend/public/**

View File

@ -93,6 +93,7 @@ This initial release has been tested with the following environments:
Note that testing for other environments such as Databricks is planned in near future.
### Spark commands supported
Below is a list of Spark commands that are parsed currently:
- InsertIntoHadoopFsRelationCommand
- SaveIntoDataSourceCommand (jdbc)
@ -101,6 +102,12 @@ Below is a list of Spark commands that are parsed currently:
Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC.
DataFrame.persist command is supported for below LeafExecNodes:
- FileSourceScanExec
- HiveTableScanExec
- RowDataSourceScanExec
- InMemoryTableScanExec
### Spark commands not yet supported
- View related commands
- Cache commands and implications on lineage

View File

@ -49,24 +49,22 @@ import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractPartialFunction;
@Slf4j
public class DatahubSparkListener extends SparkListener {
private static final int THREAD_CNT = 16;
public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes";
public static final String DATAHUB_EMITTER = "mcpEmitter";
public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster";
public static final String PIPELINE_KEY = "metadata.pipeline";
public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster";
public static final String PIPELINE_KEY = "metadata.pipeline";
public static final String PIPELINE_PLATFORM_INSTANCE_KEY = PIPELINE_KEY + ".platformInstance";
private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap<>();
private final Map<String, ExecutorService> appPoolDetails = new ConcurrentHashMap<>();
private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>();
private final Map<String, Config> appConfig = new ConcurrentHashMap<>();
public DatahubSparkListener() {
log.info("DatahubSparkListener initialised.");
}
@ -85,21 +83,21 @@ public class DatahubSparkListener extends SparkListener {
@Override
public void run() {
appSqlDetails.get(ctx.applicationId())
.put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
log.debug("PLAN for execution id: " + getPipelineName(ctx) + ":" + sqlStart.executionId() + "\n");
log.debug(plan.toString());
Optional<? extends SparkDataset> outputDS = DatasetExtractor.asDataset(plan, ctx, true);
if (!outputDS.isPresent()) {
Optional<? extends Collection<SparkDataset>> outputDS = DatasetExtractor.asDataset(plan, ctx, true);
if (!outputDS.isPresent() || outputDS.get().isEmpty()) {
log.debug("Skipping execution as no output dataset present for execution id: " + ctx.applicationId() + ":"
+ sqlStart.executionId());
return;
}
DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get());
// Here assumption is that there will be only single target for single sql query
DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(),
outputDS.get().iterator().next());
Collection<QueryPlan<?>> allInners = new ArrayList<>();
plan.collect(new AbstractPartialFunction<LogicalPlan, Void>() {
@ -107,8 +105,8 @@ public class DatahubSparkListener extends SparkListener {
@Override
public Void apply(LogicalPlan plan) {
log.debug("CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n");
Optional<? extends SparkDataset> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(x -> lineage.addSource(x));
Optional<? extends Collection<SparkDataset>> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(x -> x.forEach(y -> lineage.addSource(y)));
allInners.addAll(JavaConversions.asJavaCollection(plan.innerChildren()));
return null;
}
@ -130,10 +128,10 @@ public class DatahubSparkListener extends SparkListener {
@Override
public Void apply(LogicalPlan plan) {
log.debug("INNER CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n");
Optional<? extends SparkDataset> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
Optional<? extends Collection<SparkDataset>> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(
x -> log.debug("source added for " + ctx.appName() + "/" + sqlStart.executionId() + ": " + x));
inputDS.ifPresent(x -> lineage.addSource(x));
inputDS.ifPresent(x -> x.forEach(y -> lineage.addSource(y)));
return null;
}
@ -144,9 +142,8 @@ public class DatahubSparkListener extends SparkListener {
});
}
SQLQueryExecStartEvent evt =
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);
SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx),
ctx.applicationId(), sqlStart.time(), sqlStart.executionId(), lineage);
appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), evt);
@ -160,7 +157,7 @@ public class DatahubSparkListener extends SparkListener {
log.debug("Parsed execution id {}:{}", ctx.appName(), sqlStart.executionId());
}
}
@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
try {
@ -212,13 +209,13 @@ public class DatahubSparkListener extends SparkListener {
}
}
consumers().forEach(x -> {
x.accept(evt);
try {
x.close();
} catch (IOException e) {
log.warn("Failed to close lineage consumer", e);
}
});
x.accept(evt);
try {
x.close();
} catch (IOException e) {
log.warn("Failed to close lineage consumer", e);
}
});
}
return null;
}
@ -263,13 +260,11 @@ public class DatahubSparkListener extends SparkListener {
public Void apply(SparkContext sc) {
SQLQueryExecStartEvent start = appSqlDetails.get(sc.applicationId()).remove(sqlEnd.executionId());
if (start == null) {
log.error(
"Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":"
+ sqlEnd.executionId());
log.error("Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId()
+ ":" + sqlEnd.executionId());
} else if (start.getDatasetLineage() != null) {
SQLQueryExecEndEvent evt =
new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(),
sqlEnd.executionId(), start);
SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(),
sc.applicationId(), sqlEnd.time(), sqlEnd.executionId(), start);
McpEmitter emitter = appEmitters.get(sc.applicationId());
if (emitter != null) {
emitter.accept(evt);
@ -279,7 +274,7 @@ public class DatahubSparkListener extends SparkListener {
}
});
}
private synchronized ExecutorService getOrCreateApplicationSetup(SparkContext ctx) {
ExecutorService pool = null;
@ -288,10 +283,11 @@ public class DatahubSparkListener extends SparkListener {
if (datahubConfig == null) {
Config datahubConf = LineageUtils.parseSparkConfig();
appConfig.put(appId, datahubConf);
Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY) : com.typesafe.config.ConfigFactory.empty();
Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY)
: com.typesafe.config.ConfigFactory.empty();
AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(),
ctx.sparkUser(), pipelineConfig);
appEmitters.computeIfAbsent(appId, s -> new McpEmitter(datahubConf)).accept(evt);
consumers().forEach(c -> c.accept(evt));
appDetails.put(appId, evt);
@ -315,14 +311,14 @@ public class DatahubSparkListener extends SparkListener {
name = datahubConfig.getString(DATABRICKS_CLUSTER_KEY) + "_" + cx.applicationId();
}
name = cx.appName();
//TODO: appending of platform instance needs to be done at central location like adding constructor to dataflowurl
// TODO: appending of platform instance needs to be done at central location
// like adding constructor to dataflowurl
if (datahubConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)) {
name = datahubConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY) + "." + name;
}
return name;
}
private void processExecution(SparkListenerSQLExecutionStart sqlStart) {
QueryExecution queryExec = SQLExecution.getQueryExecution(sqlStart.executionId());
if (queryExec == null) {
@ -336,15 +332,16 @@ public class DatahubSparkListener extends SparkListener {
ExecutorService pool = getOrCreateApplicationSetup(ctx);
pool.execute(new SqlStartTask(sqlStart, plan, ctx));
}
private List<LineageConsumer> consumers() {
SparkConf conf = SparkEnv.get().conf();
if (conf.contains(CONSUMER_TYPE_KEY)) {
String consumerTypes = conf.get(CONSUMER_TYPE_KEY);
return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false)
.map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
private List<LineageConsumer> consumers() {
SparkConf conf = SparkEnv.get().conf();
if (conf.contains(CONSUMER_TYPE_KEY)) {
String consumerTypes = conf.get(CONSUMER_TYPE_KEY);
return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false)
.map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
}

View File

@ -1,13 +1,9 @@
package datahub.spark;
import datahub.spark.model.LineageUtils;
import datahub.spark.model.dataset.CatalogTableDataset;
import datahub.spark.model.dataset.HdfsPathDataset;
import datahub.spark.model.dataset.JdbcDataset;
import datahub.spark.model.dataset.SparkDataset;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -20,6 +16,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.FileSourceScanExec;
import org.apache.spark.sql.execution.RowDataSourceScanExec;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.columnar.InMemoryRelation;
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec;
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand;
import org.apache.spark.sql.execution.datasources.HadoopFsRelation;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
@ -28,21 +29,29 @@ import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation;
import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand;
import org.apache.spark.sql.hive.execution.HiveTableScanExec;
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable;
import org.apache.spark.sql.sources.BaseRelation;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.FabricType;
import com.typesafe.config.Config;
import datahub.spark.model.LineageUtils;
import datahub.spark.model.dataset.CatalogTableDataset;
import datahub.spark.model.dataset.HdfsPathDataset;
import datahub.spark.model.dataset.JdbcDataset;
import datahub.spark.model.dataset.SparkDataset;
import lombok.extern.slf4j.Slf4j;
import scala.Option;
import scala.collection.JavaConversions;
import com.typesafe.config.Config;
import scala.runtime.AbstractFunction1;
@Slf4j
public class DatasetExtractor {
private static final Map<Class<? extends LogicalPlan>, PlanToDataset> PLAN_TO_DATASET = new HashMap<>();
private static final Map<Class<? extends SparkPlan>, SparkPlanToDataset> SPARKPLAN_TO_DATASET = new HashMap<>();
private static final Map<Class<? extends BaseRelation>, RelationToDataset> REL_TO_DATASET = new HashMap<>();
private static final Set<Class<? extends LogicalPlan>> OUTPUT_CMD = ImmutableSet.of(
InsertIntoHadoopFsRelationCommand.class, SaveIntoDataSourceCommand.class,
@ -51,24 +60,70 @@ public class DatasetExtractor {
private static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
// TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand
private DatasetExtractor() {
}
private DatasetExtractor() {
}
private static interface PlanToDataset {
Optional<? extends SparkDataset> fromPlanNode(LogicalPlan plan, SparkContext ctx, Config datahubConfig);
Optional<? extends Collection<SparkDataset>> fromPlanNode(LogicalPlan plan, SparkContext ctx, Config datahubConfig);
}
private static interface RelationToDataset {
Optional<? extends SparkDataset> fromRelation(BaseRelation rel, SparkContext ctx, Config datahubConfig);
Optional<? extends Collection<SparkDataset>> fromRelation(BaseRelation rel, SparkContext ctx, Config datahubConfig);
}
private static interface SparkPlanToDataset {
Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, SparkContext ctx,
Config datahubConfig);
}
static {
SPARKPLAN_TO_DATASET.put(FileSourceScanExec.class, (p, ctx, datahubConfig) -> {
BaseRelation baseRel = ((FileSourceScanExec) p).relation();
if (!REL_TO_DATASET.containsKey(baseRel.getClass())) {
return Optional.empty();
}
return REL_TO_DATASET.get(baseRel.getClass()).fromRelation(baseRel, ctx, datahubConfig);
});
SPARKPLAN_TO_DATASET.put(HiveTableScanExec.class, (p, ctx, datahubConfig) -> {
HiveTableRelation baseRel = ((HiveTableScanExec) p).relation();
if (!PLAN_TO_DATASET.containsKey(baseRel.getClass())) {
return Optional.empty();
}
return PLAN_TO_DATASET.get(baseRel.getClass()).fromPlanNode(baseRel, ctx, datahubConfig);
});
SPARKPLAN_TO_DATASET.put(RowDataSourceScanExec.class, (p, ctx, datahubConfig) -> {
BaseRelation baseRel = ((RowDataSourceScanExec) p).relation();
if (!REL_TO_DATASET.containsKey(baseRel.getClass())) {
return Optional.empty();
}
return REL_TO_DATASET.get(baseRel.getClass()).fromRelation(baseRel, ctx, datahubConfig);
});
SPARKPLAN_TO_DATASET.put(InMemoryTableScanExec.class, (p, ctx, datahubConfig) -> {
InMemoryRelation baseRel = ((InMemoryTableScanExec) p).relation();
if (!PLAN_TO_DATASET.containsKey(baseRel.getClass())) {
return Optional.empty();
}
return PLAN_TO_DATASET.get(baseRel.getClass()).fromPlanNode(baseRel, ctx, datahubConfig);
});
PLAN_TO_DATASET.put(InsertIntoHadoopFsRelationCommand.class, (p, ctx, datahubConfig) -> {
InsertIntoHadoopFsRelationCommand cmd = (InsertIntoHadoopFsRelationCommand) p;
if (cmd.catalogTable().isDefined()) {
return Optional.of(new CatalogTableDataset(cmd.catalogTable().get(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.catalogTable().get(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
}
return Optional.of(new HdfsPathDataset(cmd.outputPath(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
return Optional.of(Collections.singletonList(new HdfsPathDataset(cmd.outputPath(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
});
PLAN_TO_DATASET.put(LogicalRelation.class, (p, ctx, datahubConfig) -> {
@ -90,26 +145,31 @@ public class DatasetExtractor {
}
String tbl = options.get("dbtable");
return Optional.of(new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
return Optional.of(Collections.singletonList(
new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
});
PLAN_TO_DATASET.put(CreateDataSourceTableAsSelectCommand.class, (p, ctx, datahubConfig) -> {
CreateDataSourceTableAsSelectCommand cmd = (CreateDataSourceTableAsSelectCommand) p;
// TODO what of cmd.mode()
return Optional.of(new CatalogTableDataset(cmd.table(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.table(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
});
PLAN_TO_DATASET.put(CreateHiveTableAsSelectCommand.class, (p, ctx, datahubConfig) -> {
CreateHiveTableAsSelectCommand cmd = (CreateHiveTableAsSelectCommand) p;
return Optional.of(new CatalogTableDataset(cmd.tableDesc(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.tableDesc(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
});
PLAN_TO_DATASET.put(InsertIntoHiveTable.class, (p, ctx, datahubConfig) -> {
InsertIntoHiveTable cmd = (InsertIntoHiveTable) p;
return Optional.of(new CatalogTableDataset(cmd.table(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.table(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
});
PLAN_TO_DATASET.put(HiveTableRelation.class, (p, ctx, datahubConfig) -> {
HiveTableRelation cmd = (HiveTableRelation) p;
return Optional.of(new CatalogTableDataset(cmd.tableMeta(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.tableMeta(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
});
REL_TO_DATASET.put(HadoopFsRelation.class, (r, ctx, datahubConfig) -> {
@ -117,7 +177,8 @@ public class DatasetExtractor {
.map(p -> getDirectoryPath(p, ctx.hadoopConfiguration())).distinct().collect(Collectors.toList());
// TODO mapping to URN TBD
return Optional.of(new HdfsPathDataset(res.get(0), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
return Optional.of(Collections.singletonList(new HdfsPathDataset(res.get(0),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
});
REL_TO_DATASET.put(JDBCRelation.class, (r, ctx, datahubConfig) -> {
JDBCRelation rel = (JDBCRelation) r;
@ -126,16 +187,41 @@ public class DatasetExtractor {
return Optional.empty();
}
return Optional.of(new JdbcDataset(rel.jdbcOptions().url(), tbl.get(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)));
return Optional.of(Collections.singletonList(new JdbcDataset(rel.jdbcOptions().url(), tbl.get(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
});
PLAN_TO_DATASET.put(InMemoryRelation.class, (plan, ctx, datahubConfig) -> {
SparkPlan cachedPlan = ((InMemoryRelation) plan).cachedPlan();
ArrayList<SparkDataset> datasets = new ArrayList<>();
cachedPlan.collectLeaves().toList().foreach(new AbstractFunction1<SparkPlan, Void>() {
@Override
public Void apply(SparkPlan leafPlan) {
if (SPARKPLAN_TO_DATASET.containsKey(leafPlan.getClass())) {
Optional<? extends Collection<SparkDataset>> dataset = SPARKPLAN_TO_DATASET.get(leafPlan.getClass())
.fromSparkPlanNode(leafPlan, ctx, datahubConfig);
dataset.ifPresent(x -> datasets.addAll(x));
} else {
log.error(leafPlan.getClass() + " is not yet supported. Please contact datahub team for further support.");
}
return null;
}
});
return datasets.isEmpty() ? Optional.empty() : Optional.of(datasets);
});
}
static Optional<? extends SparkDataset> asDataset(LogicalPlan logicalPlan, SparkContext ctx, boolean outputNode) {
static Optional<? extends Collection<SparkDataset>> asDataset(LogicalPlan logicalPlan, SparkContext ctx,
boolean outputNode) {
if (!outputNode && OUTPUT_CMD.contains(logicalPlan.getClass())) {
return Optional.empty();
}
if (!PLAN_TO_DATASET.containsKey(logicalPlan.getClass())) {
log.error(logicalPlan.getClass() + " is not supported yet. Please contact datahub team for further support. ");
return Optional.empty();
}
Config datahubconfig = LineageUtils.parseSparkConfig();

View File

@ -1,14 +1,10 @@
package datahub.spark;
import datahub.spark.model.LineageUtils;
import datahub.spark.model.DatasetLineage;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import datahub.spark.model.SQLQueryExecStartEvent;
import datahub.spark.model.dataset.CatalogTableDataset;
import datahub.spark.model.dataset.HdfsPathDataset;
import datahub.spark.model.dataset.JdbcDataset;
import datahub.spark.model.dataset.SparkDataset;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockserver.integration.ClientAndServer.startClientAndServer;
import static org.mockserver.model.HttpRequest.request;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@ -21,20 +17,24 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.FixMethodOrder;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.junit.runners.MethodSorters;
import org.mockserver.integration.ClientAndServer;
import org.mockserver.matchers.Times;
import org.mockserver.model.HttpResponse;
@ -45,16 +45,25 @@ import org.testcontainers.containers.PostgreSQLContainer;
import com.linkedin.common.FabricType;
import static org.junit.Assert.*;
import static org.mockserver.integration.ClientAndServer.*;
import static org.mockserver.model.HttpRequest.*;
import datahub.spark.model.DatasetLineage;
import datahub.spark.model.LineageConsumer;
import datahub.spark.model.LineageEvent;
import datahub.spark.model.LineageUtils;
import datahub.spark.model.SQLQueryExecStartEvent;
import datahub.spark.model.dataset.CatalogTableDataset;
import datahub.spark.model.dataset.HdfsPathDataset;
import datahub.spark.model.dataset.JdbcDataset;
import datahub.spark.model.dataset.SparkDataset;
//!!!! IMP !!!!!!!!
//Add the test number before naming the test. This will ensure that tests run in specified order.
//This is necessary to have fixed query execution numbers. Otherwise tests will fail.
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestSparkJobsLineage {
private static final boolean MOCK_GMS = Boolean.valueOf("true");
// if false, MCPs get written to real GMS server (see GMS_PORT)
// if false, MCPs get written to real GMS server (see GMS_PORT)
private static final boolean VERIFY_EXPECTED = MOCK_GMS && Boolean.valueOf("true");
// if false, "expected" JSONs are overwritten.
// if false, "expected" JSONs are overwritten.
private static final String APP_NAME = "sparkTestApp";
@ -71,14 +80,14 @@ public class TestSparkJobsLineage {
private static final int GMS_PORT = MOCK_GMS ? MOCK_PORT : 8080;
private static final String EXPECTED_JSON_ROOT = "src/test/resources/expected/";
private static final FabricType DATASET_ENV = FabricType.DEV;
private static final String PIPELINE_PLATFORM_INSTANCE = "test_machine";
private static final String DATASET_PLATFORM_INSTANCE = "test_dev_dataset";
@ClassRule
public static PostgreSQLContainer<?> db =
new PostgreSQLContainer<>("postgres:9.6.12").withDatabaseName("sparktestdb");
public static PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:9.6.12")
.withDatabaseName("sparktestdb");
private static SparkSession spark;
private static Properties jdbcConnnProperties;
private static DatasetLineageAccumulator acc;
@ -104,9 +113,10 @@ public class TestSparkJobsLineage {
public static void resetBaseExpectations() {
mockServer.when(request().withMethod("GET").withPath("/config").withHeader("Content-type", "application/json"),
Times.unlimited()).respond(org.mockserver.model.HttpResponse.response().withBody("{\"noCode\": true }"));
mockServer.when(
request().withMethod("POST").withPath("/aspects").withQueryStringParameter("action", "ingestProposal"),
Times.unlimited()).respond(HttpResponse.response().withStatusCode(200));
mockServer
.when(request().withMethod("POST").withPath("/aspects").withQueryStringParameter("action", "ingestProposal"),
Times.unlimited())
.respond(HttpResponse.response().withStatusCode(200));
}
public static void init() {
@ -120,10 +130,9 @@ public class TestSparkJobsLineage {
List<String> expected = Files.readAllLines(Paths.get(EXPECTED_JSON_ROOT, expectationFileName));
for (String content : expected) {
String swappedContent = addLocalPath(content);
mockServer.verify(request().withMethod("POST")
.withPath("/aspects")
.withQueryStringParameter("action", "ingestProposal")
.withBody(new JsonBody(swappedContent)), VerificationTimes.atLeast(1));
mockServer.verify(request().withMethod("POST").withPath("/aspects")
.withQueryStringParameter("action", "ingestProposal").withBody(new JsonBody(swappedContent)),
VerificationTimes.atLeast(1));
}
} catch (IOException ioe) {
throw new RuntimeException("failed to read expectations", ioe);
@ -146,18 +155,14 @@ public class TestSparkJobsLineage {
LineageUtils.registerConsumer("accumulator", acc);
init();
spark = SparkSession.builder()
.appName(APP_NAME)
.config("spark.master", MASTER)
spark = SparkSession.builder().appName(APP_NAME).config("spark.master", MASTER)
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.lineage.consumerTypes", "accumulator")
.config("spark.datahub.rest.server", "http://localhost:" + mockServer.getPort())
.config("spark.datahub.metadata.pipeline.platformInstance", PIPELINE_PLATFORM_INSTANCE)
.config("spark.datahub.metadata.dataset.platformInstance", DATASET_PLATFORM_INSTANCE)
.config("spark.datahub.metadata.dataset.env", DATASET_ENV.name())
.config("spark.sql.warehouse.dir", new File(WAREHOUSE_LOC).getAbsolutePath())
.enableHiveSupport()
.getOrCreate();
.config("spark.sql.warehouse.dir", new File(WAREHOUSE_LOC).getAbsolutePath()).enableHiveSupport().getOrCreate();
spark.sql("drop database if exists " + TEST_DB + " cascade");
spark.sql("create database " + TEST_DB);
@ -172,8 +177,8 @@ public class TestSparkJobsLineage {
}
private static void clear() {
mockServer.clear(
request().withMethod("POST").withPath("/aspects").withQueryStringParameter("action", "ingestProposal"));
mockServer
.clear(request().withMethod("POST").withPath("/aspects").withQueryStringParameter("action", "ingestProposal"));
}
@AfterClass
@ -240,15 +245,15 @@ public class TestSparkJobsLineage {
}
@Test
public void testHdfsInOut() throws Exception {
public void test1HdfsInOut() throws Exception {
Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv");
Dataset<Row> df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv");
df1.createOrReplaceTempView("v1");
df2.createOrReplaceTempView("v2");
Dataset<Row> df =
spark.sql("select v1.c1 as a, v1.c2 as b, v2.c1 as c, v2.c2 as d from v1 join v2 on v1.id = v2.id");
Dataset<Row> df = spark
.sql("select v1.c1 as a, v1.c2 as b, v2.c1 as c, v2.c2 as d from v1 join v2 on v1.id = v2.id");
// InsertIntoHadoopFsRelationCommand
df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out.csv");
@ -260,17 +265,12 @@ public class TestSparkJobsLineage {
}
@Test
public void testHdfsInJdbcOut() throws Exception {
Dataset<Row> df1 = spark.read()
.option("header", "true")
.csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a")
public void test5HdfsInJdbcOut() throws Exception {
Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a")
.withColumnRenamed("c2", "b");
Dataset<Row> df2 = spark.read()
.option("header", "true")
.csv(DATA_DIR + "/in2.csv")
.withColumnRenamed("c1", "c")
Dataset<Row> df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv").withColumnRenamed("c1", "c")
.withColumnRenamed("c2", "d");
Dataset<Row> df = df1.join(df2, "id").drop("id");
@ -280,22 +280,20 @@ public class TestSparkJobsLineage {
df.write().mode(SaveMode.Overwrite).jdbc(db.getJdbcUrl(), "foo1", jdbcConnnProperties);
Thread.sleep(5000);
check(dsl(pgDs("foo1"), hdfsDs("in1.csv"), hdfsDs("in2.csv")), acc.getLineages().get(0));
{
if (VERIFY_EXPECTED) {
verify(1 * N);
}
}
@Test
public void testHdfsJdbcInJdbcOut() throws Exception {
public void test3HdfsJdbcInJdbcOut() throws Exception {
Connection c = db.createConnection("");
c.createStatement().execute("create table foo2 (a varchar(5), b int);");
c.createStatement().execute("insert into foo2 values('a', 4);");
c.close();
Dataset<Row> df1 = spark.read()
.option("header", "true")
.csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a")
Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a")
.withColumnRenamed("c2", "b2");
Dataset<Row> df2 = spark.read().jdbc(db.getJdbcUrl(), "foo2", jdbcConnnProperties);
@ -313,17 +311,12 @@ public class TestSparkJobsLineage {
}
@Test
public void testHdfsInHiveOut() throws Exception {
Dataset<Row> df1 = spark.read()
.option("header", "true")
.csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a")
public void test2HdfsInHiveOut() throws Exception {
Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a")
.withColumnRenamed("c2", "b");
Dataset<Row> df2 = spark.read()
.option("header", "true")
.csv(DATA_DIR + "/in2.csv")
.withColumnRenamed("c1", "c")
Dataset<Row> df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv").withColumnRenamed("c1", "c")
.withColumnRenamed("c2", "d");
Dataset<Row> df = df1.join(df2, "id").drop("id");
@ -343,17 +336,12 @@ public class TestSparkJobsLineage {
}
@Test
public void testHiveInHiveOut() throws Exception {
Dataset<Row> df1 = spark.read()
.option("header", "true")
.csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a")
public void test4HiveInHiveOut() throws Exception {
Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a")
.withColumnRenamed("c2", "b");
Dataset<Row> df2 = spark.read()
.option("header", "true")
.csv(DATA_DIR + "/in2.csv")
.withColumnRenamed("c1", "c")
Dataset<Row> df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv").withColumnRenamed("c1", "c")
.withColumnRenamed("c2", "d");
df1.createOrReplaceTempView("v1");
@ -386,24 +374,19 @@ public class TestSparkJobsLineage {
}
@Test
public void testHdfsJdbcInJdbcOutTwoLevel() throws Exception {
public void test6HdfsJdbcInJdbcOutTwoLevel() throws Exception {
Connection c = db.createConnection("");
c.createStatement().execute("create table foo6 (a varchar(5), b int);");
c.createStatement().execute("insert into foo6 values('a', 4);");
c.close();
Dataset<Row> df1 = spark.read()
.option("header", "true")
.csv(DATA_DIR + "/in1.csv")
.withColumnRenamed("c1", "a")
Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a")
.withColumnRenamed("c2", "b2");
Dataset<Row> df2 = spark.read().jdbc(db.getJdbcUrl(), "foo6", jdbcConnnProperties);
Dataset<Row> df3 = spark.read()
.option("header", "true")
.csv(DATA_DIR + "/in2.csv")
.withColumnRenamed("c1", "a")
Dataset<Row> df3 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv").withColumnRenamed("c1", "a")
.withColumnRenamed("c2", "b3");
Dataset<Row> df = df1.join(df2, "a").drop("id").join(df3, "a");
@ -418,7 +401,73 @@ public class TestSparkJobsLineage {
}
}
@Test
public void test7HdfsInPersistHdfsOut() throws Exception {
Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in3.csv");
Dataset<Row> df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in4.csv").withColumnRenamed("c2", "d")
.withColumnRenamed("c1", "c").withColumnRenamed("id", "id2");
Dataset<Row> df = df1.join(df2, df1.col("id").equalTo(df2.col("id2")), "inner")
.filter(df1.col("id").equalTo("id_filter")).persist(StorageLevel.MEMORY_ONLY());
df.show();
// InsertIntoHadoopFsRelationCommand
df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out_persist.csv");
Thread.sleep(5000);
check(dsl(hdfsDs("out_persist.csv"), hdfsDs("in3.csv"), hdfsDs("in4.csv")), acc.getLineages().get(0));
if (VERIFY_EXPECTED) {
verify(1 * N);
}
}
@Test
public void test8PersistHdfsJdbcInJdbcOut() throws Exception {
Connection c = db.createConnection("");
c.createStatement().execute("create table foo8 (a varchar(5), b int);");
c.createStatement().execute("insert into foo8 values('a', 4);");
c.close();
Dataset<Row> df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a")
.withColumnRenamed("c2", "b2");
Dataset<Row> df2 = spark.read().jdbc(db.getJdbcUrl(), "foo8", jdbcConnnProperties).persist(StorageLevel.MEMORY_ONLY());
Dataset<Row> df = df1.join(df2, "a");
// SaveIntoDataSourceCommand
// JDBCRelation input
df.write().mode(SaveMode.Overwrite).jdbc(db.getJdbcUrl(), "foo9", jdbcConnnProperties);
Thread.sleep(5000);
check(dsl(pgDs("foo9"), hdfsDs("in1.csv"), pgDs("foo8")), acc.getLineages().get(0));
if (VERIFY_EXPECTED) {
verify(1 * N);
}
}
// This test cannot be executed individually. It depends upon previous tests to create tables in the database.
@Test
public void test9PersistJdbcInHdfsOut() throws Exception {
Connection c = db.createConnection("");
Dataset<Row> df1 = spark.read().jdbc(db.getJdbcUrl(), "foo9", jdbcConnnProperties);
df1 = df1.withColumnRenamed("b", "b1");
Dataset<Row> df2 = spark.read().jdbc(db.getJdbcUrl(), "foo8", jdbcConnnProperties).persist(StorageLevel.DISK_ONLY_2());
Dataset<Row> df = df1.join(df2, "a");
df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out_persist.csv");
Thread.sleep(5000);
check(dsl(hdfsDs("out_persist.csv"), pgDs("foo2"), pgDs("foo3")), acc.getLineages().get(0));
if (VERIFY_EXPECTED) {
verify(1 * N);
}
}
private static class DatasetLineageAccumulator implements LineageConsumer {
boolean closed = false;
private final List<DatasetLineage> lineages = new ArrayList<>();

View File

@ -0,0 +1,3 @@
id,c1,c2
1,a,4
2,a,5
1 id c1 c2
2 1 a 4
3 2 a 5

View File

@ -0,0 +1,4 @@
id,c1,c2
1,a,4
2,b,5
3,b,6
1 id c1 c2
2 1 a 4
3 2 b 5
4 3 b 6

View File

@ -0,0 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_31)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in3.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in4.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/out_persist.csv,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}

View File

@ -0,0 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_33)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo8,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo9,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}

View File

@ -0,0 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_34)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo8,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo9,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/out_persist.csv,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}