feat(spark): Add SQLJobFacet to column-level lineage transformations (#15232)

This commit is contained in:
Anush Kumar 2025-11-07 02:34:54 -08:00 committed by GitHub
parent 4e07bfa885
commit d9f151e3e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 282 additions and 13 deletions

View File

@ -883,6 +883,7 @@ public class OpenLineageEventToDatahubTest {
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.isSpark(true);
builder.captureColumnLevelLineage(true);
String olEvent =
IOUtils.toString(
@ -912,6 +913,61 @@ public class OpenLineageEventToDatahubTest {
}
}
@Test
public void testCaptureSQLJobFacet() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.lowerCaseDatasetUrns(true);
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.isSpark(true);
builder.captureColumnLevelLineage(true);
String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/sample_spark_with_sql_facet.json"),
StandardCharsets.UTF_8);
OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());
assertNotNull(datahubJob);
assertEquals(1, datahubJob.getInSet().size());
for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:file,/spark-test/people.parquet,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:file,/spark-test/result_test,DEV)",
dataset.getUrn().toString());
// Verify that SQL query is included in transformOperation along with transformations
String transformOperation =
Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())
.get(0)
.getTransformOperation();
assertNotNull(transformOperation);
// The format should be: "-- DIRECT:IDENTITY,INDIRECT:FILTER\nSELECT age, name FROM people
// WHERE age > 18"
assertTrue(
transformOperation.contains("SELECT age, name FROM people WHERE age > 18"),
"Transform operation should contain SQL query");
assertTrue(
transformOperation.contains("DIRECT:IDENTITY")
&& transformOperation.contains("INDIRECT:FILTER"),
"Transform operation should contain transformation types");
// Verify the format: transformations should be prefixed with "-- " and followed by newline
// before SQL
assertTrue(
transformOperation.contains("-- ") && transformOperation.contains("\n"),
"Transform operation should have transformations prefixed with '-- ' and followed by newline before SQL");
}
}
@Test
public void testFlinkJobEvent() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =

View File

@ -0,0 +1,193 @@
{
"eventTime": "2025-06-26T09:10:58.948Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE",
"run": {
"runId": "0197ab81-29e9-7df5-a5f0-0ecdcfcbab5d",
"facets": {
"parent": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-0/ParentRunFacet.json#/$defs/ParentRunFacet",
"run": {
"runId": "0197ab81-2733-7bbc-b360-b605458dfc8c"
},
"job": {
"namespace": "default",
"name": "simple_app_parquet_demo"
},
"root": {
"run": {
"runId": "0197ab81-2733-7bbc-b360-b605458dfc8c"
},
"job": {
"namespace": "default",
"name": "SimpleAppParquetDemo"
}
}
},
"processing_engine": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet",
"version": "3.5.5",
"name": "spark"
},
"environment-properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"environment-properties": {}
},
"spark_properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"properties": {
"spark.master": "local[*]",
"spark.app.name": "SimpleAppParquetDemo"
}
}
}
},
"job": {
"namespace": "default",
"name": "simple_app_parquet_demo.adaptive_spark_plan.spark-test_result_test",
"facets": {
"jobType": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
"processingType": "BATCH",
"integration": "SPARK",
"jobType": "SQL_JOB"
},
"sql": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/SQLJobFacet.json#/$defs/SQLJobFacet",
"query": "SELECT age, name FROM people WHERE age > 18"
}
}
},
"inputs": [
{
"namespace": "file",
"name": "/spark-test/people.parquet",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet",
"name": "file",
"uri": "file"
},
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
"fields": [
{
"name": "age",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
},
"inputFacets": {
"inputStatistics": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/InputStatisticsInputDatasetFacet.json#/$defs/InputStatisticsInputDatasetFacet",
"size": 738,
"fileCount": 1
}
}
}
],
"outputs": [
{
"namespace": "file",
"name": "/spark-test/result_test",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet",
"name": "file",
"uri": "file"
},
"columnLineage": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet",
"fields": {
"age": {
"inputFields": [
{
"namespace": "file",
"name": "/spark-test/people.parquet",
"field": "age",
"transformations": [
{
"type": "DIRECT",
"subtype": "IDENTITY",
"description": "",
"masking": false
},
{
"type": "INDIRECT",
"subtype": "FILTER",
"description": "",
"masking": false
}
]
}
]
},
"name": {
"inputFields": [
{
"namespace": "file",
"name": "/spark-test/people.parquet",
"field": "name",
"transformations": [
{
"type": "DIRECT",
"subtype": "IDENTITY",
"description": "",
"masking": false
}
]
}
]
}
}
},
"lifecycleStateChange": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet",
"lifecycleStateChange": "OVERWRITE"
},
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
"fields": [
{
"name": "age",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
},
"outputFacets": {
"outputStatistics": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.33.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-2/OutputStatisticsOutputDatasetFacet.json#/$defs/OutputStatisticsOutputDatasetFacet",
"rowCount": 1,
"size": 729,
"fileCount": 1
}
}
}
]
}

View File

@ -411,7 +411,7 @@ public class OpenLineageToDataHub {
}
private static UpstreamLineage getFineGrainedLineage(
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig) {
OpenLineage.Dataset dataset, DatahubOpenlineageConfig mappingConfig, OpenLineage.Job job) {
FineGrainedLineageArray fgla = new FineGrainedLineageArray();
UpstreamArray upstreams = new UpstreamArray();
@ -480,6 +480,33 @@ public class OpenLineageToDataHub {
}
});
String combinedTransformations = "";
// Capture transformation information from OpenLineage
if (!transformationTexts.isEmpty()) {
List<String> sortedList =
transformationTexts.stream()
.sorted(String.CASE_INSENSITIVE_ORDER)
.collect(Collectors.toList());
combinedTransformations = String.join(",", sortedList);
}
// Extract SQL query from SQLJobFacet if available
if (job != null
&& job.getFacets() != null
&& job.getFacets().getSql() != null
&& job.getFacets().getSql().getQuery() != null) {
String sqlQuery = job.getFacets().getSql().getQuery();
if (!sqlQuery.trim().isEmpty()) {
if (!combinedTransformations.isEmpty()) {
// Add the OpenLineage Column Transformations as comments and then add the SQL
combinedTransformations = "-- " + combinedTransformations + "\n" + sqlQuery;
} else {
combinedTransformations = sqlQuery;
}
}
}
upstreamFields.sort(Comparator.comparing(Urn::toString));
fgl.setUpstreams(upstreamFields);
fgl.setConfidenceScore(0.5f);
@ -488,16 +515,7 @@ public class OpenLineageToDataHub {
downstreamsFields.sort(Comparator.comparing(Urn::toString));
fgl.setDownstreams(downstreamsFields);
fgl.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET);
// Capture transformation information from OpenLineage
if (!transformationTexts.isEmpty()) {
List<String> sortedList =
transformationTexts.stream()
.sorted(String::compareToIgnoreCase)
.collect(Collectors.toList());
fgl.setTransformOperation(String.join(",", sortedList));
}
fgl.setTransformOperation(combinedTransformations);
fgla.add(fgl);
}
@ -1119,7 +1137,8 @@ public class OpenLineageToDataHub {
builder.schemaMetadata(getSchemaMetadata(input, datahubConf));
}
if (datahubConf.isCaptureColumnLevelLineage()) {
UpstreamLineage upstreamLineage = getFineGrainedLineage(input, datahubConf);
UpstreamLineage upstreamLineage =
getFineGrainedLineage(input, datahubConf, event.getJob());
if (upstreamLineage != null) {
builder.lineage(upstreamLineage);
}
@ -1148,7 +1167,8 @@ public class OpenLineageToDataHub {
builder.schemaMetadata(getSchemaMetadata(output, datahubConf));
}
if (datahubConf.isCaptureColumnLevelLineage()) {
UpstreamLineage upstreamLineage = getFineGrainedLineage(output, datahubConf);
UpstreamLineage upstreamLineage =
getFineGrainedLineage(output, datahubConf, event.getJob());
if (upstreamLineage != null) {
builder.lineage(upstreamLineage);
}