diff --git a/metadata-integration/java/spark-lineage/README.md b/metadata-integration/java/spark-lineage/README.md index 9c201af115..84b9f8c479 100644 --- a/metadata-integration/java/spark-lineage/README.md +++ b/metadata-integration/java/spark-lineage/README.md @@ -122,20 +122,20 @@ The Spark agent can be configured using Databricks Cluster [Spark configuration] ## Configuration Options -| Field | Required | Default | Description | -|-------------------------------------------------|----------|---------|-------------------------------------------------------------------------| -| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 | -| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | -| spark.datahub.rest.server | ✅ | | Datahub server url eg: | -| spark.datahub.rest.token | | | Authentication token. | -| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | -| spark.datahub.metadata.pipeline.platformInstance| | | Pipeline level platform instance | -| spark.datahub.metadata.dataset.platformInstance| | | dataset level platform instance | -| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | -| spark.datahub.metadata.table.Platform | | hive | Platform for tables | -| spark.datahub.metadata.include_scheme | | true | Include scheme (e.g. hdfs://, s3://) in dataset URN | -| spark.datahub.coalesce_jobs | | false | Only one datajob(taask) will be emitted containing all input and output datasets for the spark application | -| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | +| Field | Required | Default | Description | +|--------------------------------------------------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------| +| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 | +| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | +| spark.datahub.rest.server | ✅ | | Datahub server url eg: | +| spark.datahub.rest.token | | | Authentication token. | +| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | +| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | +| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance | +| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | +| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | +| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions | +| spark.datahub.coalesce_jobs | | false | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | +| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | ## What to Expect: The Metadata Model diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java index 472963550a..38951238e9 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java @@ -58,7 +58,7 @@ public class DatasetExtractor { CreateDataSourceTableAsSelectCommand.class, CreateHiveTableAsSelectCommand.class, InsertIntoHiveTable.class); private static final String DATASET_ENV_KEY = "metadata.dataset.env"; private static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance"; - private static final String TABLE_PLATFORM_KEY = "metadata.table.platform"; + private static final String TABLE_HIVE_PLATFORM_ALIAS = "metadata.table.hive_platform_alias"; private static final String INCLUDE_SCHEME_KEY = "metadata.include_scheme"; // TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand @@ -122,7 +122,7 @@ public class DatasetExtractor { InsertIntoHadoopFsRelationCommand cmd = (InsertIntoHadoopFsRelationCommand) p; if (cmd.catalogTable().isDefined()) { return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.catalogTable().get(), - getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig), + getCommonPlatformInstance(datahubConfig), getTableHivePlatformAlias(datahubConfig), getCommonFabricType(datahubConfig)))); } return Optional.of(Collections.singletonList(new HdfsPathDataset(cmd.outputPath(), @@ -157,26 +157,26 @@ public class DatasetExtractor { CreateDataSourceTableAsSelectCommand cmd = (CreateDataSourceTableAsSelectCommand) p; // TODO what of cmd.mode() return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.table(), - getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig), + getCommonPlatformInstance(datahubConfig), getTableHivePlatformAlias(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(CreateHiveTableAsSelectCommand.class, (p, ctx, datahubConfig) -> { CreateHiveTableAsSelectCommand cmd = (CreateHiveTableAsSelectCommand) p; return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.tableDesc(), - getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig), + getCommonPlatformInstance(datahubConfig), getTableHivePlatformAlias(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(InsertIntoHiveTable.class, (p, ctx, datahubConfig) -> { InsertIntoHiveTable cmd = (InsertIntoHiveTable) p; return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.table(), - getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig), + getCommonPlatformInstance(datahubConfig), getTableHivePlatformAlias(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(HiveTableRelation.class, (p, ctx, datahubConfig) -> { HiveTableRelation cmd = (HiveTableRelation) p; return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.tableMeta(), - getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig), + getCommonPlatformInstance(datahubConfig), getTableHivePlatformAlias(datahubConfig), getCommonFabricType(datahubConfig)))); }); @@ -268,8 +268,8 @@ public class DatasetExtractor { : null; } - private static String getTablePlatform(Config datahubConfig) { - return datahubConfig.hasPath(TABLE_PLATFORM_KEY) ? datahubConfig.getString(TABLE_PLATFORM_KEY) + private static String getTableHivePlatformAlias(Config datahubConfig) { + return datahubConfig.hasPath(TABLE_HIVE_PLATFORM_ALIAS) ? datahubConfig.getString(TABLE_HIVE_PLATFORM_ALIAS) : "hive"; }