feat(ingestion): spark - support lineage for delta lake writes (#6834)

This commit is contained in:
danielli-ziprecruiter 2022-12-22 04:13:32 -07:00 committed by GitHub
parent 3b8686d012
commit a6470fc267
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 6 deletions

View File

@ -186,10 +186,11 @@ Below is a list of Spark commands that are parsed currently:
- InsertIntoHadoopFsRelationCommand
- SaveIntoDataSourceCommand (jdbc)
- SaveIntoDataSourceCommand (Delta Lake)
- CreateHiveTableAsSelectCommand
- InsertIntoHiveTable
Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC.
Effectively, these support data sources/sinks corresponding to Hive, HDFS, JDBC, and Delta Lake.
DataFrame.persist command is supported for below LeafExecNodes:

View File

@ -144,13 +144,17 @@ public class DatasetExtractor {
Map<String, String> options = JavaConversions.mapAsJavaMap(cmd.options());
String url = options.getOrDefault("url", ""); // e.g. jdbc:postgresql://localhost:5432/sparktestdb
if (!url.contains("jdbc")) {
if (url.contains("jdbc")) {
String tbl = options.get("dbtable");
return Optional.of(Collections.singletonList(
new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
} else if (options.containsKey("path")) {
return Optional.of(Collections.singletonList(new HdfsPathDataset(new Path(options.get("path")),
getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig),
getCommonFabricType(datahubConfig))));
} else {
return Optional.empty();
}
String tbl = options.get("dbtable");
return Optional.of(Collections.singletonList(
new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
});
PLAN_TO_DATASET.put(CreateDataSourceTableAsSelectCommand.class, (p, ctx, datahubConfig) -> {