feat(spark-lineage): add column-level lineage capture control (#14300)

This commit is contained in:
kyungryun choi 2025-08-13 18:07:10 +09:00 committed by GitHub
parent 4eb4d617b6
commit 3b071ea3ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 15 additions and 0 deletions

View File

@ -219,6 +219,7 @@ information like tokens.
| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set |
| spark.datahub.log.mcps | | true | Set this to true to log MCPS to the log. By default, it is enabled. |
| spark.datahub.legacyLineageCleanup.enabled | | false | Set this to true to remove legacy lineages from older Spark Plugin runs. This will remove those lineages from the Datasets which it adds to DataJob. By default, it is disabled. |
| spark.datahub.captureColumnLevelLineage | | true | Set this to false to disable column-level lineage capture for improved performance on large datasets. |
| spark.datahub.capture_spark_plan | | false | Set this to true to capture the Spark plan. By default, it is disabled. |
| spark.datahub.metadata.dataset.enableEnhancedMergeIntoExtraction | | false | Set this to true to enable enhanced table name extraction for Delta Lake MERGE INTO commands. This improves lineage tracking by including the target table name in the job name. By default, it is disabled. |

View File

@ -266,6 +266,9 @@ public class DatahubSparkListener extends SparkListener {
&& datahubConf.getBoolean("capture_spark_plan")) {
sparkEnv.conf().set("spark.openlineage.facets.spark.logicalPlan.disabled", "false");
}
if (!isCaptureColumnLevelLineage(datahubConf)) {
sparkEnv.conf().set("spark.openlineage.facets.columnLineage.disabled", "true");
}
}
if (properties != null) {

View File

@ -84,6 +84,8 @@ public class SparkConfigParser {
public static final String ENABLE_ENHANCED_MERGE_INTO_EXTRACTION =
"metadata.dataset.enableEnhancedMergeIntoExtraction";
public static final String CAPTURE_COLUMN_LEVEL_LINEAGE = "captureColumnLevelLineage";
public static final String TAGS_KEY = "tags";
public static final String DOMAINS_KEY = "domains";
@ -180,6 +182,7 @@ public class SparkConfigParser {
builder.removeLegacyLineage(SparkConfigParser.isLegacyLineageCleanupEnabled(sparkConfig));
builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig));
builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig));
builder.captureColumnLevelLineage(SparkConfigParser.isCaptureColumnLevelLineage(sparkConfig));
builder.enhancedMergeIntoExtraction(
SparkConfigParser.isEnhancedMergeIntoExtractionEnabled(sparkConfig));
try {
@ -404,4 +407,12 @@ public class SparkConfigParser {
return datahubConfig.hasPath(ENABLE_ENHANCED_MERGE_INTO_EXTRACTION)
&& datahubConfig.getBoolean(ENABLE_ENHANCED_MERGE_INTO_EXTRACTION);
}
public static boolean isCaptureColumnLevelLineage(Config datahubConfig) {
if (datahubConfig.hasPath(CAPTURE_COLUMN_LEVEL_LINEAGE)) {
return datahubConfig.getBoolean(CAPTURE_COLUMN_LEVEL_LINEAGE);
}
return true;
}
}