From 3b071ea3ce37ae370db8cf4139891155c176dec2 Mon Sep 17 00:00:00 2001 From: kyungryun choi Date: Wed, 13 Aug 2025 18:07:10 +0900 Subject: [PATCH] feat(spark-lineage): add column-level lineage capture control (#14300) --- .../java/acryl-spark-lineage/README.md | 1 + .../main/java/datahub/spark/DatahubSparkListener.java | 3 +++ .../java/datahub/spark/conf/SparkConfigParser.java | 11 +++++++++++ 3 files changed, 15 insertions(+) diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index ac535fa798..1ea237dfad 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -219,6 +219,7 @@ information like tokens. | spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set | | spark.datahub.log.mcps | | true | Set this to true to log MCPS to the log. By default, it is enabled. | | spark.datahub.legacyLineageCleanup.enabled | | false | Set this to true to remove legacy lineages from older Spark Plugin runs. This will remove those lineages from the Datasets which it adds to DataJob. By default, it is disabled. | +| spark.datahub.captureColumnLevelLineage | | true | Set this to false to disable column-level lineage capture for improved performance on large datasets. | | spark.datahub.capture_spark_plan | | false | Set this to true to capture the Spark plan. By default, it is disabled. | | spark.datahub.metadata.dataset.enableEnhancedMergeIntoExtraction | | false | Set this to true to enable enhanced table name extraction for Delta Lake MERGE INTO commands. This improves lineage tracking by including the target table name in the job name. By default, it is disabled. | diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 35c39b12e2..8daa9cbff9 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -266,6 +266,9 @@ public class DatahubSparkListener extends SparkListener { && datahubConf.getBoolean("capture_spark_plan")) { sparkEnv.conf().set("spark.openlineage.facets.spark.logicalPlan.disabled", "false"); } + if (!isCaptureColumnLevelLineage(datahubConf)) { + sparkEnv.conf().set("spark.openlineage.facets.columnLineage.disabled", "true"); + } } if (properties != null) { diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java index 30f997830c..e0e641b192 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -84,6 +84,8 @@ public class SparkConfigParser { public static final String ENABLE_ENHANCED_MERGE_INTO_EXTRACTION = "metadata.dataset.enableEnhancedMergeIntoExtraction"; + public static final String CAPTURE_COLUMN_LEVEL_LINEAGE = "captureColumnLevelLineage"; + public static final String TAGS_KEY = "tags"; public static final String DOMAINS_KEY = "domains"; @@ -180,6 +182,7 @@ public class SparkConfigParser { builder.removeLegacyLineage(SparkConfigParser.isLegacyLineageCleanupEnabled(sparkConfig)); builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig)); builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig)); + builder.captureColumnLevelLineage(SparkConfigParser.isCaptureColumnLevelLineage(sparkConfig)); builder.enhancedMergeIntoExtraction( SparkConfigParser.isEnhancedMergeIntoExtractionEnabled(sparkConfig)); try { @@ -404,4 +407,12 @@ public class SparkConfigParser { return datahubConfig.hasPath(ENABLE_ENHANCED_MERGE_INTO_EXTRACTION) && datahubConfig.getBoolean(ENABLE_ENHANCED_MERGE_INTO_EXTRACTION); } + + public static boolean isCaptureColumnLevelLineage(Config datahubConfig) { + if (datahubConfig.hasPath(CAPTURE_COLUMN_LEVEL_LINEAGE)) { + return datahubConfig.getBoolean(CAPTURE_COLUMN_LEVEL_LINEAGE); + } + + return true; + } }