diff --git a/.gitignore b/.gitignore index 4e1bb91170..70a7c2a7e8 100644 --- a/.gitignore +++ b/.gitignore @@ -46,7 +46,7 @@ MANIFEST **/spark-lineage/metastore_db/ **/spark-lineage/**/derby.log **/spark-lineage/**/hive/ -**/spark-lineage/**/out.csv/ +**/spark-lineage/**/out*.csv/ .vscode #spark smoke test @@ -67,4 +67,4 @@ tmp* temp* # frontend assets -datahub-frontend/public/** \ No newline at end of file +datahub-frontend/public/** diff --git a/metadata-integration/java/spark-lineage/README.md b/metadata-integration/java/spark-lineage/README.md index 08e53050da..9483214460 100644 --- a/metadata-integration/java/spark-lineage/README.md +++ b/metadata-integration/java/spark-lineage/README.md @@ -93,6 +93,7 @@ This initial release has been tested with the following environments: Note that testing for other environments such as Databricks is planned in near future. ### Spark commands supported + Below is a list of Spark commands that are parsed currently: - InsertIntoHadoopFsRelationCommand - SaveIntoDataSourceCommand (jdbc) @@ -101,6 +102,12 @@ Below is a list of Spark commands that are parsed currently: Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC. +DataFrame.persist command is supported for below LeafExecNodes: +- FileSourceScanExec +- HiveTableScanExec +- RowDataSourceScanExec +- InMemoryTableScanExec + ### Spark commands not yet supported - View related commands - Cache commands and implications on lineage diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 9d693f3926..88b52ddca5 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -49,24 +49,22 @@ import scala.collection.JavaConversions; import scala.runtime.AbstractFunction1; import scala.runtime.AbstractPartialFunction; - - @Slf4j public class DatahubSparkListener extends SparkListener { private static final int THREAD_CNT = 16; public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes"; public static final String DATAHUB_EMITTER = "mcpEmitter"; - public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster"; - public static final String PIPELINE_KEY = "metadata.pipeline"; + public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster"; + public static final String PIPELINE_KEY = "metadata.pipeline"; public static final String PIPELINE_PLATFORM_INSTANCE_KEY = PIPELINE_KEY + ".platformInstance"; - + private final Map appDetails = new ConcurrentHashMap<>(); private final Map> appSqlDetails = new ConcurrentHashMap<>(); private final Map appPoolDetails = new ConcurrentHashMap<>(); private final Map appEmitters = new ConcurrentHashMap<>(); private final Map appConfig = new ConcurrentHashMap<>(); - + public DatahubSparkListener() { log.info("DatahubSparkListener initialised."); } @@ -85,21 +83,21 @@ public class DatahubSparkListener extends SparkListener { @Override public void run() { - appSqlDetails.get(ctx.applicationId()) - .put(sqlStart.executionId(), - new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(), - sqlStart.time(), sqlStart.executionId(), null)); + appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), + new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(), + sqlStart.time(), sqlStart.executionId(), null)); log.debug("PLAN for execution id: " + getPipelineName(ctx) + ":" + sqlStart.executionId() + "\n"); log.debug(plan.toString()); - Optional outputDS = DatasetExtractor.asDataset(plan, ctx, true); - if (!outputDS.isPresent()) { + Optional> outputDS = DatasetExtractor.asDataset(plan, ctx, true); + if (!outputDS.isPresent() || outputDS.get().isEmpty()) { log.debug("Skipping execution as no output dataset present for execution id: " + ctx.applicationId() + ":" + sqlStart.executionId()); return; } - - DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get()); + // Here assumption is that there will be only single target for single sql query + DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(), + outputDS.get().iterator().next()); Collection> allInners = new ArrayList<>(); plan.collect(new AbstractPartialFunction() { @@ -107,8 +105,8 @@ public class DatahubSparkListener extends SparkListener { @Override public Void apply(LogicalPlan plan) { log.debug("CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n"); - Optional inputDS = DatasetExtractor.asDataset(plan, ctx, false); - inputDS.ifPresent(x -> lineage.addSource(x)); + Optional> inputDS = DatasetExtractor.asDataset(plan, ctx, false); + inputDS.ifPresent(x -> x.forEach(y -> lineage.addSource(y))); allInners.addAll(JavaConversions.asJavaCollection(plan.innerChildren())); return null; } @@ -130,10 +128,10 @@ public class DatahubSparkListener extends SparkListener { @Override public Void apply(LogicalPlan plan) { log.debug("INNER CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n"); - Optional inputDS = DatasetExtractor.asDataset(plan, ctx, false); + Optional> inputDS = DatasetExtractor.asDataset(plan, ctx, false); inputDS.ifPresent( x -> log.debug("source added for " + ctx.appName() + "/" + sqlStart.executionId() + ": " + x)); - inputDS.ifPresent(x -> lineage.addSource(x)); + inputDS.ifPresent(x -> x.forEach(y -> lineage.addSource(y))); return null; } @@ -144,9 +142,8 @@ public class DatahubSparkListener extends SparkListener { }); } - SQLQueryExecStartEvent evt = - new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(), - sqlStart.time(), sqlStart.executionId(), lineage); + SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), + ctx.applicationId(), sqlStart.time(), sqlStart.executionId(), lineage); appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), evt); @@ -160,7 +157,7 @@ public class DatahubSparkListener extends SparkListener { log.debug("Parsed execution id {}:{}", ctx.appName(), sqlStart.executionId()); } } - + @Override public void onApplicationStart(SparkListenerApplicationStart applicationStart) { try { @@ -212,13 +209,13 @@ public class DatahubSparkListener extends SparkListener { } } consumers().forEach(x -> { - x.accept(evt); - try { - x.close(); - } catch (IOException e) { - log.warn("Failed to close lineage consumer", e); - } - }); + x.accept(evt); + try { + x.close(); + } catch (IOException e) { + log.warn("Failed to close lineage consumer", e); + } + }); } return null; } @@ -263,13 +260,11 @@ public class DatahubSparkListener extends SparkListener { public Void apply(SparkContext sc) { SQLQueryExecStartEvent start = appSqlDetails.get(sc.applicationId()).remove(sqlEnd.executionId()); if (start == null) { - log.error( - "Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":" - + sqlEnd.executionId()); + log.error("Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + + ":" + sqlEnd.executionId()); } else if (start.getDatasetLineage() != null) { - SQLQueryExecEndEvent evt = - new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(), - sqlEnd.executionId(), start); + SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), + sc.applicationId(), sqlEnd.time(), sqlEnd.executionId(), start); McpEmitter emitter = appEmitters.get(sc.applicationId()); if (emitter != null) { emitter.accept(evt); @@ -279,7 +274,7 @@ public class DatahubSparkListener extends SparkListener { } }); } - + private synchronized ExecutorService getOrCreateApplicationSetup(SparkContext ctx) { ExecutorService pool = null; @@ -288,10 +283,11 @@ public class DatahubSparkListener extends SparkListener { if (datahubConfig == null) { Config datahubConf = LineageUtils.parseSparkConfig(); appConfig.put(appId, datahubConf); - Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY) : com.typesafe.config.ConfigFactory.empty(); + Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY) + : com.typesafe.config.ConfigFactory.empty(); AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(), ctx.sparkUser(), pipelineConfig); - + appEmitters.computeIfAbsent(appId, s -> new McpEmitter(datahubConf)).accept(evt); consumers().forEach(c -> c.accept(evt)); appDetails.put(appId, evt); @@ -315,14 +311,14 @@ public class DatahubSparkListener extends SparkListener { name = datahubConfig.getString(DATABRICKS_CLUSTER_KEY) + "_" + cx.applicationId(); } name = cx.appName(); - //TODO: appending of platform instance needs to be done at central location like adding constructor to dataflowurl + // TODO: appending of platform instance needs to be done at central location + // like adding constructor to dataflowurl if (datahubConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)) { name = datahubConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY) + "." + name; } return name; } - private void processExecution(SparkListenerSQLExecutionStart sqlStart) { QueryExecution queryExec = SQLExecution.getQueryExecution(sqlStart.executionId()); if (queryExec == null) { @@ -336,15 +332,16 @@ public class DatahubSparkListener extends SparkListener { ExecutorService pool = getOrCreateApplicationSetup(ctx); pool.execute(new SqlStartTask(sqlStart, plan, ctx)); } - private List consumers() { - SparkConf conf = SparkEnv.get().conf(); - if (conf.contains(CONSUMER_TYPE_KEY)) { - String consumerTypes = conf.get(CONSUMER_TYPE_KEY); - return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false) - .map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList()); - } else { - return Collections.emptyList(); - } + private List consumers() { + SparkConf conf = SparkEnv.get().conf(); + if (conf.contains(CONSUMER_TYPE_KEY)) { + String consumerTypes = conf.get(CONSUMER_TYPE_KEY); + return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false) + .map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList()); + } else { + return Collections.emptyList(); } + + } } \ No newline at end of file diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java index 95787343b0..4b3bf41b52 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java @@ -1,13 +1,9 @@ package datahub.spark; -import datahub.spark.model.LineageUtils; -import datahub.spark.model.dataset.CatalogTableDataset; -import datahub.spark.model.dataset.HdfsPathDataset; -import datahub.spark.model.dataset.JdbcDataset; -import datahub.spark.model.dataset.SparkDataset; -import lombok.extern.slf4j.Slf4j; - import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -20,6 +16,11 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.SparkContext; import org.apache.spark.sql.catalyst.catalog.HiveTableRelation; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.FileSourceScanExec; +import org.apache.spark.sql.execution.RowDataSourceScanExec; +import org.apache.spark.sql.execution.SparkPlan; +import org.apache.spark.sql.execution.columnar.InMemoryRelation; +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec; import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand; import org.apache.spark.sql.execution.datasources.HadoopFsRelation; import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand; @@ -28,21 +29,29 @@ import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand; import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions; import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation; import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand; +import org.apache.spark.sql.hive.execution.HiveTableScanExec; import org.apache.spark.sql.hive.execution.InsertIntoHiveTable; import org.apache.spark.sql.sources.BaseRelation; import com.google.common.collect.ImmutableSet; import com.linkedin.common.FabricType; +import com.typesafe.config.Config; +import datahub.spark.model.LineageUtils; +import datahub.spark.model.dataset.CatalogTableDataset; +import datahub.spark.model.dataset.HdfsPathDataset; +import datahub.spark.model.dataset.JdbcDataset; +import datahub.spark.model.dataset.SparkDataset; +import lombok.extern.slf4j.Slf4j; import scala.Option; import scala.collection.JavaConversions; - -import com.typesafe.config.Config; +import scala.runtime.AbstractFunction1; @Slf4j public class DatasetExtractor { - + private static final Map, PlanToDataset> PLAN_TO_DATASET = new HashMap<>(); + private static final Map, SparkPlanToDataset> SPARKPLAN_TO_DATASET = new HashMap<>(); private static final Map, RelationToDataset> REL_TO_DATASET = new HashMap<>(); private static final Set> OUTPUT_CMD = ImmutableSet.of( InsertIntoHadoopFsRelationCommand.class, SaveIntoDataSourceCommand.class, @@ -51,24 +60,70 @@ public class DatasetExtractor { private static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance"; // TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand - private DatasetExtractor() { - - } + private DatasetExtractor() { + + } + private static interface PlanToDataset { - Optional fromPlanNode(LogicalPlan plan, SparkContext ctx, Config datahubConfig); + Optional> fromPlanNode(LogicalPlan plan, SparkContext ctx, Config datahubConfig); } private static interface RelationToDataset { - Optional fromRelation(BaseRelation rel, SparkContext ctx, Config datahubConfig); + Optional> fromRelation(BaseRelation rel, SparkContext ctx, Config datahubConfig); + } + + private static interface SparkPlanToDataset { + Optional> fromSparkPlanNode(SparkPlan plan, SparkContext ctx, + Config datahubConfig); } static { + + SPARKPLAN_TO_DATASET.put(FileSourceScanExec.class, (p, ctx, datahubConfig) -> { + + BaseRelation baseRel = ((FileSourceScanExec) p).relation(); + if (!REL_TO_DATASET.containsKey(baseRel.getClass())) { + return Optional.empty(); + } + return REL_TO_DATASET.get(baseRel.getClass()).fromRelation(baseRel, ctx, datahubConfig); + + }); + + SPARKPLAN_TO_DATASET.put(HiveTableScanExec.class, (p, ctx, datahubConfig) -> { + + HiveTableRelation baseRel = ((HiveTableScanExec) p).relation(); + if (!PLAN_TO_DATASET.containsKey(baseRel.getClass())) { + return Optional.empty(); + } + return PLAN_TO_DATASET.get(baseRel.getClass()).fromPlanNode(baseRel, ctx, datahubConfig); + + }); + + SPARKPLAN_TO_DATASET.put(RowDataSourceScanExec.class, (p, ctx, datahubConfig) -> { + BaseRelation baseRel = ((RowDataSourceScanExec) p).relation(); + if (!REL_TO_DATASET.containsKey(baseRel.getClass())) { + return Optional.empty(); + } + return REL_TO_DATASET.get(baseRel.getClass()).fromRelation(baseRel, ctx, datahubConfig); + }); + + SPARKPLAN_TO_DATASET.put(InMemoryTableScanExec.class, (p, ctx, datahubConfig) -> { + InMemoryRelation baseRel = ((InMemoryTableScanExec) p).relation(); + if (!PLAN_TO_DATASET.containsKey(baseRel.getClass())) { + return Optional.empty(); + } + return PLAN_TO_DATASET.get(baseRel.getClass()).fromPlanNode(baseRel, ctx, datahubConfig); + + }); + PLAN_TO_DATASET.put(InsertIntoHadoopFsRelationCommand.class, (p, ctx, datahubConfig) -> { InsertIntoHadoopFsRelationCommand cmd = (InsertIntoHadoopFsRelationCommand) p; if (cmd.catalogTable().isDefined()) { - return Optional.of(new CatalogTableDataset(cmd.catalogTable().get(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); + return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.catalogTable().get(), + getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); } - return Optional.of(new HdfsPathDataset(cmd.outputPath(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); + return Optional.of(Collections.singletonList(new HdfsPathDataset(cmd.outputPath(), + getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(LogicalRelation.class, (p, ctx, datahubConfig) -> { @@ -90,26 +145,31 @@ public class DatasetExtractor { } String tbl = options.get("dbtable"); - return Optional.of(new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); + return Optional.of(Collections.singletonList( + new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(CreateDataSourceTableAsSelectCommand.class, (p, ctx, datahubConfig) -> { CreateDataSourceTableAsSelectCommand cmd = (CreateDataSourceTableAsSelectCommand) p; // TODO what of cmd.mode() - return Optional.of(new CatalogTableDataset(cmd.table(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); + return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.table(), + getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(CreateHiveTableAsSelectCommand.class, (p, ctx, datahubConfig) -> { CreateHiveTableAsSelectCommand cmd = (CreateHiveTableAsSelectCommand) p; - return Optional.of(new CatalogTableDataset(cmd.tableDesc(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); + return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.tableDesc(), + getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(InsertIntoHiveTable.class, (p, ctx, datahubConfig) -> { InsertIntoHiveTable cmd = (InsertIntoHiveTable) p; - return Optional.of(new CatalogTableDataset(cmd.table(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); + return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.table(), + getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(HiveTableRelation.class, (p, ctx, datahubConfig) -> { HiveTableRelation cmd = (HiveTableRelation) p; - return Optional.of(new CatalogTableDataset(cmd.tableMeta(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); + return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.tableMeta(), + getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); }); REL_TO_DATASET.put(HadoopFsRelation.class, (r, ctx, datahubConfig) -> { @@ -117,7 +177,8 @@ public class DatasetExtractor { .map(p -> getDirectoryPath(p, ctx.hadoopConfiguration())).distinct().collect(Collectors.toList()); // TODO mapping to URN TBD - return Optional.of(new HdfsPathDataset(res.get(0), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); + return Optional.of(Collections.singletonList(new HdfsPathDataset(res.get(0), + getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); }); REL_TO_DATASET.put(JDBCRelation.class, (r, ctx, datahubConfig) -> { JDBCRelation rel = (JDBCRelation) r; @@ -126,16 +187,41 @@ public class DatasetExtractor { return Optional.empty(); } - return Optional.of(new JdbcDataset(rel.jdbcOptions().url(), tbl.get(), getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))); + return Optional.of(Collections.singletonList(new JdbcDataset(rel.jdbcOptions().url(), tbl.get(), + getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); + }); + + PLAN_TO_DATASET.put(InMemoryRelation.class, (plan, ctx, datahubConfig) -> { + SparkPlan cachedPlan = ((InMemoryRelation) plan).cachedPlan(); + ArrayList datasets = new ArrayList<>(); + cachedPlan.collectLeaves().toList().foreach(new AbstractFunction1() { + + @Override + public Void apply(SparkPlan leafPlan) { + + if (SPARKPLAN_TO_DATASET.containsKey(leafPlan.getClass())) { + Optional> dataset = SPARKPLAN_TO_DATASET.get(leafPlan.getClass()) + .fromSparkPlanNode(leafPlan, ctx, datahubConfig); + dataset.ifPresent(x -> datasets.addAll(x)); + } else { + log.error(leafPlan.getClass() + " is not yet supported. Please contact datahub team for further support."); + } + return null; + } + }); + return datasets.isEmpty() ? Optional.empty() : Optional.of(datasets); }); } - static Optional asDataset(LogicalPlan logicalPlan, SparkContext ctx, boolean outputNode) { + static Optional> asDataset(LogicalPlan logicalPlan, SparkContext ctx, + boolean outputNode) { + if (!outputNode && OUTPUT_CMD.contains(logicalPlan.getClass())) { return Optional.empty(); } if (!PLAN_TO_DATASET.containsKey(logicalPlan.getClass())) { + log.error(logicalPlan.getClass() + " is not supported yet. Please contact datahub team for further support. "); return Optional.empty(); } Config datahubconfig = LineageUtils.parseSparkConfig(); diff --git a/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java b/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java index 4fff88007c..1b5ff4dac2 100644 --- a/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java +++ b/metadata-integration/java/spark-lineage/src/test/java/datahub/spark/TestSparkJobsLineage.java @@ -1,14 +1,10 @@ package datahub.spark; -import datahub.spark.model.LineageUtils; -import datahub.spark.model.DatasetLineage; -import datahub.spark.model.LineageConsumer; -import datahub.spark.model.LineageEvent; -import datahub.spark.model.SQLQueryExecStartEvent; -import datahub.spark.model.dataset.CatalogTableDataset; -import datahub.spark.model.dataset.HdfsPathDataset; -import datahub.spark.model.dataset.JdbcDataset; -import datahub.spark.model.dataset.SparkDataset; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.model.HttpRequest.request; + import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -21,20 +17,24 @@ import java.util.List; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.apache.spark.storage.StorageLevel; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.FixMethodOrder; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestRule; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.junit.runners.MethodSorters; import org.mockserver.integration.ClientAndServer; import org.mockserver.matchers.Times; import org.mockserver.model.HttpResponse; @@ -45,16 +45,25 @@ import org.testcontainers.containers.PostgreSQLContainer; import com.linkedin.common.FabricType; -import static org.junit.Assert.*; -import static org.mockserver.integration.ClientAndServer.*; -import static org.mockserver.model.HttpRequest.*; - +import datahub.spark.model.DatasetLineage; +import datahub.spark.model.LineageConsumer; +import datahub.spark.model.LineageEvent; +import datahub.spark.model.LineageUtils; +import datahub.spark.model.SQLQueryExecStartEvent; +import datahub.spark.model.dataset.CatalogTableDataset; +import datahub.spark.model.dataset.HdfsPathDataset; +import datahub.spark.model.dataset.JdbcDataset; +import datahub.spark.model.dataset.SparkDataset; +//!!!! IMP !!!!!!!! +//Add the test number before naming the test. This will ensure that tests run in specified order. +//This is necessary to have fixed query execution numbers. Otherwise tests will fail. +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class TestSparkJobsLineage { private static final boolean MOCK_GMS = Boolean.valueOf("true"); - // if false, MCPs get written to real GMS server (see GMS_PORT) + // if false, MCPs get written to real GMS server (see GMS_PORT) private static final boolean VERIFY_EXPECTED = MOCK_GMS && Boolean.valueOf("true"); - // if false, "expected" JSONs are overwritten. + // if false, "expected" JSONs are overwritten. private static final String APP_NAME = "sparkTestApp"; @@ -71,14 +80,14 @@ public class TestSparkJobsLineage { private static final int GMS_PORT = MOCK_GMS ? MOCK_PORT : 8080; private static final String EXPECTED_JSON_ROOT = "src/test/resources/expected/"; - + private static final FabricType DATASET_ENV = FabricType.DEV; private static final String PIPELINE_PLATFORM_INSTANCE = "test_machine"; private static final String DATASET_PLATFORM_INSTANCE = "test_dev_dataset"; - + @ClassRule - public static PostgreSQLContainer db = - new PostgreSQLContainer<>("postgres:9.6.12").withDatabaseName("sparktestdb"); + public static PostgreSQLContainer db = new PostgreSQLContainer<>("postgres:9.6.12") + .withDatabaseName("sparktestdb"); private static SparkSession spark; private static Properties jdbcConnnProperties; private static DatasetLineageAccumulator acc; @@ -104,9 +113,10 @@ public class TestSparkJobsLineage { public static void resetBaseExpectations() { mockServer.when(request().withMethod("GET").withPath("/config").withHeader("Content-type", "application/json"), Times.unlimited()).respond(org.mockserver.model.HttpResponse.response().withBody("{\"noCode\": true }")); - mockServer.when( - request().withMethod("POST").withPath("/aspects").withQueryStringParameter("action", "ingestProposal"), - Times.unlimited()).respond(HttpResponse.response().withStatusCode(200)); + mockServer + .when(request().withMethod("POST").withPath("/aspects").withQueryStringParameter("action", "ingestProposal"), + Times.unlimited()) + .respond(HttpResponse.response().withStatusCode(200)); } public static void init() { @@ -120,10 +130,9 @@ public class TestSparkJobsLineage { List expected = Files.readAllLines(Paths.get(EXPECTED_JSON_ROOT, expectationFileName)); for (String content : expected) { String swappedContent = addLocalPath(content); - mockServer.verify(request().withMethod("POST") - .withPath("/aspects") - .withQueryStringParameter("action", "ingestProposal") - .withBody(new JsonBody(swappedContent)), VerificationTimes.atLeast(1)); + mockServer.verify(request().withMethod("POST").withPath("/aspects") + .withQueryStringParameter("action", "ingestProposal").withBody(new JsonBody(swappedContent)), + VerificationTimes.atLeast(1)); } } catch (IOException ioe) { throw new RuntimeException("failed to read expectations", ioe); @@ -146,18 +155,14 @@ public class TestSparkJobsLineage { LineageUtils.registerConsumer("accumulator", acc); init(); - spark = SparkSession.builder() - .appName(APP_NAME) - .config("spark.master", MASTER) + spark = SparkSession.builder().appName(APP_NAME).config("spark.master", MASTER) .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.lineage.consumerTypes", "accumulator") .config("spark.datahub.rest.server", "http://localhost:" + mockServer.getPort()) .config("spark.datahub.metadata.pipeline.platformInstance", PIPELINE_PLATFORM_INSTANCE) .config("spark.datahub.metadata.dataset.platformInstance", DATASET_PLATFORM_INSTANCE) .config("spark.datahub.metadata.dataset.env", DATASET_ENV.name()) - .config("spark.sql.warehouse.dir", new File(WAREHOUSE_LOC).getAbsolutePath()) - .enableHiveSupport() - .getOrCreate(); + .config("spark.sql.warehouse.dir", new File(WAREHOUSE_LOC).getAbsolutePath()).enableHiveSupport().getOrCreate(); spark.sql("drop database if exists " + TEST_DB + " cascade"); spark.sql("create database " + TEST_DB); @@ -172,8 +177,8 @@ public class TestSparkJobsLineage { } private static void clear() { - mockServer.clear( - request().withMethod("POST").withPath("/aspects").withQueryStringParameter("action", "ingestProposal")); + mockServer + .clear(request().withMethod("POST").withPath("/aspects").withQueryStringParameter("action", "ingestProposal")); } @AfterClass @@ -240,15 +245,15 @@ public class TestSparkJobsLineage { } @Test - public void testHdfsInOut() throws Exception { + public void test1HdfsInOut() throws Exception { Dataset df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv"); Dataset df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv"); df1.createOrReplaceTempView("v1"); df2.createOrReplaceTempView("v2"); - Dataset df = - spark.sql("select v1.c1 as a, v1.c2 as b, v2.c1 as c, v2.c2 as d from v1 join v2 on v1.id = v2.id"); + Dataset df = spark + .sql("select v1.c1 as a, v1.c2 as b, v2.c1 as c, v2.c2 as d from v1 join v2 on v1.id = v2.id"); // InsertIntoHadoopFsRelationCommand df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out.csv"); @@ -260,17 +265,12 @@ public class TestSparkJobsLineage { } @Test - public void testHdfsInJdbcOut() throws Exception { - Dataset df1 = spark.read() - .option("header", "true") - .csv(DATA_DIR + "/in1.csv") - .withColumnRenamed("c1", "a") + public void test5HdfsInJdbcOut() throws Exception { + + Dataset df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a") .withColumnRenamed("c2", "b"); - Dataset df2 = spark.read() - .option("header", "true") - .csv(DATA_DIR + "/in2.csv") - .withColumnRenamed("c1", "c") + Dataset df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv").withColumnRenamed("c1", "c") .withColumnRenamed("c2", "d"); Dataset df = df1.join(df2, "id").drop("id"); @@ -280,22 +280,20 @@ public class TestSparkJobsLineage { df.write().mode(SaveMode.Overwrite).jdbc(db.getJdbcUrl(), "foo1", jdbcConnnProperties); Thread.sleep(5000); check(dsl(pgDs("foo1"), hdfsDs("in1.csv"), hdfsDs("in2.csv")), acc.getLineages().get(0)); - { + if (VERIFY_EXPECTED) { verify(1 * N); } } @Test - public void testHdfsJdbcInJdbcOut() throws Exception { + public void test3HdfsJdbcInJdbcOut() throws Exception { + Connection c = db.createConnection(""); c.createStatement().execute("create table foo2 (a varchar(5), b int);"); c.createStatement().execute("insert into foo2 values('a', 4);"); c.close(); - Dataset df1 = spark.read() - .option("header", "true") - .csv(DATA_DIR + "/in1.csv") - .withColumnRenamed("c1", "a") + Dataset df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a") .withColumnRenamed("c2", "b2"); Dataset df2 = spark.read().jdbc(db.getJdbcUrl(), "foo2", jdbcConnnProperties); @@ -313,17 +311,12 @@ public class TestSparkJobsLineage { } @Test - public void testHdfsInHiveOut() throws Exception { - Dataset df1 = spark.read() - .option("header", "true") - .csv(DATA_DIR + "/in1.csv") - .withColumnRenamed("c1", "a") + public void test2HdfsInHiveOut() throws Exception { + + Dataset df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a") .withColumnRenamed("c2", "b"); - Dataset df2 = spark.read() - .option("header", "true") - .csv(DATA_DIR + "/in2.csv") - .withColumnRenamed("c1", "c") + Dataset df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv").withColumnRenamed("c1", "c") .withColumnRenamed("c2", "d"); Dataset df = df1.join(df2, "id").drop("id"); @@ -343,17 +336,12 @@ public class TestSparkJobsLineage { } @Test - public void testHiveInHiveOut() throws Exception { - Dataset df1 = spark.read() - .option("header", "true") - .csv(DATA_DIR + "/in1.csv") - .withColumnRenamed("c1", "a") + public void test4HiveInHiveOut() throws Exception { + + Dataset df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a") .withColumnRenamed("c2", "b"); - Dataset df2 = spark.read() - .option("header", "true") - .csv(DATA_DIR + "/in2.csv") - .withColumnRenamed("c1", "c") + Dataset df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv").withColumnRenamed("c1", "c") .withColumnRenamed("c2", "d"); df1.createOrReplaceTempView("v1"); @@ -386,24 +374,19 @@ public class TestSparkJobsLineage { } @Test - public void testHdfsJdbcInJdbcOutTwoLevel() throws Exception { + public void test6HdfsJdbcInJdbcOutTwoLevel() throws Exception { + Connection c = db.createConnection(""); c.createStatement().execute("create table foo6 (a varchar(5), b int);"); c.createStatement().execute("insert into foo6 values('a', 4);"); c.close(); - Dataset df1 = spark.read() - .option("header", "true") - .csv(DATA_DIR + "/in1.csv") - .withColumnRenamed("c1", "a") + Dataset df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a") .withColumnRenamed("c2", "b2"); Dataset df2 = spark.read().jdbc(db.getJdbcUrl(), "foo6", jdbcConnnProperties); - Dataset df3 = spark.read() - .option("header", "true") - .csv(DATA_DIR + "/in2.csv") - .withColumnRenamed("c1", "a") + Dataset df3 = spark.read().option("header", "true").csv(DATA_DIR + "/in2.csv").withColumnRenamed("c1", "a") .withColumnRenamed("c2", "b3"); Dataset df = df1.join(df2, "a").drop("id").join(df3, "a"); @@ -418,7 +401,73 @@ public class TestSparkJobsLineage { } } + @Test + public void test7HdfsInPersistHdfsOut() throws Exception { + + Dataset df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in3.csv"); + + Dataset df2 = spark.read().option("header", "true").csv(DATA_DIR + "/in4.csv").withColumnRenamed("c2", "d") + .withColumnRenamed("c1", "c").withColumnRenamed("id", "id2"); + Dataset df = df1.join(df2, df1.col("id").equalTo(df2.col("id2")), "inner") + .filter(df1.col("id").equalTo("id_filter")).persist(StorageLevel.MEMORY_ONLY()); + + df.show(); + // InsertIntoHadoopFsRelationCommand + df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out_persist.csv"); + Thread.sleep(5000); + check(dsl(hdfsDs("out_persist.csv"), hdfsDs("in3.csv"), hdfsDs("in4.csv")), acc.getLineages().get(0)); + if (VERIFY_EXPECTED) { + verify(1 * N); + } + } + + @Test + public void test8PersistHdfsJdbcInJdbcOut() throws Exception { + + Connection c = db.createConnection(""); + c.createStatement().execute("create table foo8 (a varchar(5), b int);"); + c.createStatement().execute("insert into foo8 values('a', 4);"); + c.close(); + + Dataset df1 = spark.read().option("header", "true").csv(DATA_DIR + "/in1.csv").withColumnRenamed("c1", "a") + .withColumnRenamed("c2", "b2"); + + Dataset df2 = spark.read().jdbc(db.getJdbcUrl(), "foo8", jdbcConnnProperties).persist(StorageLevel.MEMORY_ONLY()); + + Dataset df = df1.join(df2, "a"); + + // SaveIntoDataSourceCommand + // JDBCRelation input + df.write().mode(SaveMode.Overwrite).jdbc(db.getJdbcUrl(), "foo9", jdbcConnnProperties); + Thread.sleep(5000); + check(dsl(pgDs("foo9"), hdfsDs("in1.csv"), pgDs("foo8")), acc.getLineages().get(0)); + if (VERIFY_EXPECTED) { + verify(1 * N); + } + } + + // This test cannot be executed individually. It depends upon previous tests to create tables in the database. + @Test + public void test9PersistJdbcInHdfsOut() throws Exception { + + Connection c = db.createConnection(""); + + Dataset df1 = spark.read().jdbc(db.getJdbcUrl(), "foo9", jdbcConnnProperties); + df1 = df1.withColumnRenamed("b", "b1"); + Dataset df2 = spark.read().jdbc(db.getJdbcUrl(), "foo8", jdbcConnnProperties).persist(StorageLevel.DISK_ONLY_2()); + + Dataset df = df1.join(df2, "a"); + + df.write().mode(SaveMode.Overwrite).csv(DATA_DIR + "/out_persist.csv"); + Thread.sleep(5000); + check(dsl(hdfsDs("out_persist.csv"), pgDs("foo2"), pgDs("foo3")), acc.getLineages().get(0)); + if (VERIFY_EXPECTED) { + verify(1 * N); + } + } + private static class DatasetLineageAccumulator implements LineageConsumer { + boolean closed = false; private final List lineages = new ArrayList<>(); diff --git a/metadata-integration/java/spark-lineage/src/test/resources/data/in3.csv/part1.csv b/metadata-integration/java/spark-lineage/src/test/resources/data/in3.csv/part1.csv new file mode 100644 index 0000000000..b65449abf6 --- /dev/null +++ b/metadata-integration/java/spark-lineage/src/test/resources/data/in3.csv/part1.csv @@ -0,0 +1,3 @@ +id,c1,c2 +1,a,4 +2,a,5 diff --git a/metadata-integration/java/spark-lineage/src/test/resources/data/in4.csv/part1.csv b/metadata-integration/java/spark-lineage/src/test/resources/data/in4.csv/part1.csv new file mode 100644 index 0000000000..cd1853d694 --- /dev/null +++ b/metadata-integration/java/spark-lineage/src/test/resources/data/in4.csv/part1.csv @@ -0,0 +1,4 @@ +id,c1,c2 +1,a,4 +2,b,5 +3,b,6 diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsInOut.json b/metadata-integration/java/spark-lineage/src/test/resources/expected/test1HdfsInOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsInOut.json rename to metadata-integration/java/spark-lineage/src/test/resources/expected/test1HdfsInOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsInHiveOut.json b/metadata-integration/java/spark-lineage/src/test/resources/expected/test2HdfsInHiveOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsInHiveOut.json rename to metadata-integration/java/spark-lineage/src/test/resources/expected/test2HdfsInHiveOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsJdbcInJdbcOut.json b/metadata-integration/java/spark-lineage/src/test/resources/expected/test3HdfsJdbcInJdbcOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsJdbcInJdbcOut.json rename to metadata-integration/java/spark-lineage/src/test/resources/expected/test3HdfsJdbcInJdbcOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/testHiveInHiveOut.json b/metadata-integration/java/spark-lineage/src/test/resources/expected/test4HiveInHiveOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/testHiveInHiveOut.json rename to metadata-integration/java/spark-lineage/src/test/resources/expected/test4HiveInHiveOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsInJdbcOut.json b/metadata-integration/java/spark-lineage/src/test/resources/expected/test5HdfsInJdbcOut.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsInJdbcOut.json rename to metadata-integration/java/spark-lineage/src/test/resources/expected/test5HdfsInJdbcOut.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsJdbcInJdbcOutTwoLevel.json b/metadata-integration/java/spark-lineage/src/test/resources/expected/test6HdfsJdbcInJdbcOutTwoLevel.json similarity index 100% rename from metadata-integration/java/spark-lineage/src/test/resources/expected/testHdfsJdbcInJdbcOutTwoLevel.json rename to metadata-integration/java/spark-lineage/src/test/resources/expected/test6HdfsJdbcInJdbcOutTwoLevel.json diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test7HdfsInPersistHdfsOut.json b/metadata-integration/java/spark-lineage/src/test/resources/expected/test7HdfsInPersistHdfsOut.json new file mode 100644 index 0000000000..e5a0251fd3 --- /dev/null +++ b/metadata-integration/java/spark-lineage/src/test/resources/expected/test7HdfsInPersistHdfsOut.json @@ -0,0 +1 @@ +{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_31)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in3.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in4.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/out_persist.csv,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}} diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test8PersistHdfsJdbcInJdbcOut.json b/metadata-integration/java/spark-lineage/src/test/resources/expected/test8PersistHdfsJdbcInJdbcOut.json new file mode 100644 index 0000000000..a52415334e --- /dev/null +++ b/metadata-integration/java/spark-lineage/src/test/resources/expected/test8PersistHdfsJdbcInJdbcOut.json @@ -0,0 +1 @@ +{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_33)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo8,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo9,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}} diff --git a/metadata-integration/java/spark-lineage/src/test/resources/expected/test9PersistJdbcInHdfsOut.json b/metadata-integration/java/spark-lineage/src/test/resources/expected/test9PersistJdbcInHdfsOut.json new file mode 100644 index 0000000000..ee09f408fa --- /dev/null +++ b/metadata-integration/java/spark-lineage/src/test/resources/expected/test9PersistJdbcInHdfsOut.json @@ -0,0 +1 @@ +{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_34)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo8,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo9,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/out_persist.csv,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}