feat(ingest/spark): Promote beta plugin (#10881)

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This commit is contained in:
Tamas Nemeth 2024-07-25 14:46:32 +02:00 committed by GitHub
parent 56bb4c8111
commit f4fb89e799
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
137 changed files with 903 additions and 207 deletions

View File

@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.16.0'
ext.openLineageVersion = '1.19.0'
ext.logbackClassicJava8 = '1.2.12'
ext.docker_registry = 'acryldata'
@ -111,6 +111,7 @@ project.ext.externalDependency = [
'avroCompiler': 'org.apache.avro:avro-compiler:1.11.3',
'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.17',
'awsMskIamAuth': 'software.amazon.msk:aws-msk-iam-auth:2.0.3',
'awsS3': 'software.amazon.awssdk:s3:2.26.21',
'awsSecretsManagerJdbc': 'com.amazonaws.secretsmanager:aws-secretsmanager-jdbc:1.0.13',
'awsPostgresIamAuth': 'software.amazon.jdbc:aws-advanced-jdbc-wrapper:1.0.2',
'awsRds':'software.amazon.awssdk:rds:2.18.24',

View File

@ -562,7 +562,7 @@
}
},
{
"Path": "docs/metadata-integration/java/spark-lineage-beta",
"Path": "docs/metadata-integration/java/acryl-spark-lineage",
"imgPath": "img/logos/platforms/spark.svg",
"Title": "Spark",
"Description": "Spark is a data processing tool that enables fast and efficient processing of large-scale data sets using distributed computing.",

View File

@ -419,17 +419,13 @@ module.exports = {
},
{
type: "doc",
id: "metadata-integration/java/spark-lineage/README",
label: "Spark (Legacy)",
},
{
type: "doc",
id: "metadata-integration/java/spark-lineage-beta/README",
id: "metadata-integration/java/acryl-spark-lineage/README",
label: "Spark",
},
//"docker/airflow/local_airflow",
"metadata-ingestion/integration_docs/great-expectations",
"metadata-integration/java/datahub-protobuf/README",
//"metadata-integration/java/spark-lineage-legacy/README",
//"metadata-ingestion/source-docs-template",
{
type: "autogenerated",
@ -886,7 +882,7 @@ module.exports = {
//"docs/how/graph-onboarding",
//"docs/demo/graph-onboarding",
//"metadata-integration/java/spark-lineage/README",
// "metadata-integration/java/spark-lineage-beta/README.md
// "metadata-integration/java/acryl-spark-lineage/README.md
// "metadata-integration/java/openlineage-converter/README"
//"metadata-ingestion-modules/airflow-plugin/README"
//"metadata-ingestion-modules/dagster-plugin/README"

View File

@ -6,7 +6,7 @@ DataHub, now supports [OpenLineage](https://openlineage.io/) integration. With t
- **REST Endpoint Support**: DataHub now includes a REST endpoint that can understand OpenLineage events. This allows users to send lineage information directly to DataHub, enabling easy integration with various data processing frameworks.
- **[Spark Event Listener Plugin](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)**: DataHub provides a Spark Event Listener plugin that seamlessly integrates OpenLineage's Spark plugin. This plugin enhances DataHub's OpenLineage support by offering additional features such as PathSpec support, column-level lineage, patch support and more.
- **[Spark Event Listener Plugin](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage)**: DataHub provides a Spark Event Listener plugin that seamlessly integrates OpenLineage's Spark plugin. This plugin enhances DataHub's OpenLineage support by offering additional features such as PathSpec support, column-level lineage, patch support and more.
## OpenLineage Support with DataHub
@ -73,7 +73,7 @@ The transport should look like this:
#### Known Limitations
With Spark and Airflow we recommend using the Spark Lineage or DataHub's Airflow plugin for tighter integration with DataHub.
- **[PathSpec](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta/#configuring-hdfs-based-dataset-urns) Support**: While the REST endpoint supports OpenLineage messages, full [PathSpec](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta/#configuring-hdfs-based-dataset-urns)) support is not yet available.
- **[PathSpec](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) Support**: While the REST endpoint supports OpenLineage messages, full [PathSpec](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns)) support is not yet available.
- **Column-level Lineage**: DataHub's current OpenLineage support does not provide full column-level lineage tracking.
- etc...
@ -83,10 +83,10 @@ DataHub's Spark Event Listener plugin enhances OpenLineage support by providing
#### How to Use
Follow the guides of the Spark Lineage plugin page for more information on how to set up the Spark Lineage plugin. The guide can be found [here](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)
Follow the guides of the Spark Lineage plugin page for more information on how to set up the Spark Lineage plugin. The guide can be found [here](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage)
## References
- [OpenLineage](https://openlineage.io/)
- [DataHub OpenAPI Guide](../api/openapi/openapi-usage-guide.md)
- [DataHub Spark Lineage Plugin](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)
- [DataHub Spark Lineage Plugin](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage)

View File

@ -19,7 +19,7 @@ Integration can be divided into two concepts based on the method:
### Push-based Integration
Push-based integrations allow you to emit metadata directly from your data systems when metadata changes.
Examples of push-based integrations include [Airflow](../docs/lineage/airflow.md), [Spark](../metadata-integration/java/spark-lineage/README.md), [Great Expectations](./integration_docs/great-expectations.md) and [Protobuf Schemas](../metadata-integration/java/datahub-protobuf/README.md). This allows you to get low-latency metadata integration from the "active" agents in your data ecosystem.
Examples of push-based integrations include [Airflow](../docs/lineage/airflow.md), [Spark](../metadata-integration/java/acryl-spark-lineage/README.md), [Great Expectations](./integration_docs/great-expectations.md) and [Protobuf Schemas](../metadata-integration/java/datahub-protobuf/README.md). This allows you to get low-latency metadata integration from the "active" agents in your data ecosystem.
### Pull-based Integration

View File

@ -11,7 +11,7 @@ The alternative way to integrate is via the Hive connector. The [Hive starter re
## Databricks Spark
To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage-beta/README.md#configuration-instructions-databricks).
To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/acryl-spark-lineage/README.md#configuration-instructions-databricks).
## Watch the DataHub Talk at the Data and AI Summit 2022

View File

@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co
```text
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.15
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```
@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080
## spark-submit command line
```sh
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.13 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.15 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
```
### Configuration Instructions: Amazon EMR
@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)
```text
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.13
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.15
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.13")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.15")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
@ -157,34 +157,44 @@ information like tokens.
## Configuration Options
| Field | Required | Default | Description |
|---------------------------------------------------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.13 |
| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server | ✅ | | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed |
| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.dataset.hivePlatformAlias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` |
| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions |
| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table |
| spark.datahub.coalesce_jobs | | true | Only one datajob(task) will be emitted containing all input and output datasets for the spark application |
| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |
| spark.datahub.metadata.dataset.materialize | | false | Materialize Datasets in DataHub |
| spark.datahub.platform.s3.path_spec_list | | | List of pathspec per platform |
| spark.datahub.metadata.dataset.experimental_include_schema_metadata | false | | Emit dataset schema metadata based on the spark |
| spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name |
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. |
| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. |
| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. |
| Field | Required | Default | Description |
|--------------------------------------------------------|----------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.15 |
| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener |
| spark.datahub.emitter | | rest | Specify the ways to emit metadata. By default it sends to DataHub using REST emitter. Valid options are rest, kafka or file |
| spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed |
| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries |
| spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set |
| spark.datahub.kafka.bootstrap | | | The Kafka bootstrap server url to use if the Kafka emitter is set |
| spark.datahub.kafka.schema_registry_url | | | The Schema registry url to use if the Kafka emitter is set |
| spark.datahub.kafka.schema_registry_config. | | | Additional config to pass in to the Schema Registry Client |
| spark.datahub.kafka.producer_config. | | | Additional config to pass in to the Kafka producer. For example: `--conf "spark.datahub.kafka.producer_config.client.id=my_client_id"` |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.dataset.hivePlatformAlias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` |
| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions |
| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table |
| spark.datahub.coalesce_jobs | | true | Only one datajob(task) will be emitted containing all input and output datasets for the spark application |
| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |
| spark.datahub.metadata.dataset.materialize | | false | Materialize Datasets in DataHub |
| spark.datahub.platform.s3.path_spec_list | | | List of pathspec per platform |
| spark.datahub.metadata.dataset.include_schema_metadata | false | | Emit dataset schema metadata based on the spark execution. It is recommended to get schema information from platform specific DataHub sources as this is less reliable |
| spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name |
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. |
| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. |
| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. |
| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. |
| spark.datahub.s3.bucket | | | The name of the bucket where metadata will be written if s3 emitter is set |
| spark.datahub.s3.prefix | | | The prefix for the file where metadata will be written on s3 if s3 emitter is set |
| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set |
## What to Expect: The Metadata Model
@ -205,8 +215,6 @@ For Spark on Databricks,
The following custom properties in pipelines and tasks relate to the Spark UI:
- appName and appId in a pipeline can be used to determine the Spark application
- description and SQLQueryId in a task can be used to determine the Query Execution within the application on the SQL
tab of Spark UI
- Other custom properties of pipelines and tasks capture the start and end times of execution etc.
For Spark on Databricks, pipeline start time is the cluster start time.
@ -222,6 +230,7 @@ This initial release has been tested with the following environments:
- spark-submit of Python/Java applications to local and remote servers
- Standalone Java applications
- Databricks Standalone Cluster
- EMR
Testing with Databricks Standard and High-concurrency Cluster is not done yet.
@ -340,20 +349,32 @@ log4j.logger.datahub.client.rest=DEBUG
Use Java 8 to build the project. The project uses Gradle as the build tool. To build the project, run the following command:
```shell
./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:spark-lineage-beta:shadowJar
./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:acryl-spark-lineage:shadowJar
```
## Known limitations
+
## Changelog
### Version 0.2.15
- Add Kafka emitter to emit lineage to kafka
- Add File emitter to emit lineage to file
- Add S3 emitter to save mcps to s3
- Upgrading OpenLineage to 1.19.0
- Renaming project to acryl-datahub-spark-lineage
- Supporting OpenLineage 1.17+ glue identifier changes
- Fix handling OpenLineage input/output where wasn't any facet attached
### Version 0.2.14
- Fix warning about MeterFilter warning from Micrometer
### Version 0.2.13
- Silencing some chatty warnings in RddPathUtils
- Add kafka emitter to emit lineage to kafka
### Version 0.2.12
- Silencing some chatty warnings in RddPathUtils
### Version 0.2.11
- Add option to lowercase dataset URNs
- Add option to set platform instance and/or env per platform with `spark.datahub.platform.<platform_name>.env` and `spark.datahub.platform.<platform_name>.platform_instance` config parameter
- Fixing platform instance setting for datasets when `spark.datahub.metadata.dataset.platformInstance` is set

View File

@ -13,10 +13,16 @@ import com.linkedin.dataprocess.RunResultType;
import com.linkedin.domain.Domains;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.Emitter;
import datahub.client.file.FileEmitter;
import datahub.client.kafka.KafkaEmitter;
import datahub.client.rest.RestEmitter;
import datahub.client.s3.S3Emitter;
import datahub.event.EventFormatter;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.conf.FileDatahubEmitterConfig;
import datahub.spark.conf.KafkaDatahubEmitterConfig;
import datahub.spark.conf.RestDatahubEmitterConfig;
import datahub.spark.conf.S3DatahubEmitterConfig;
import datahub.spark.conf.SparkLineageConf;
import io.datahubproject.openlineage.converter.OpenLineageToDataHub;
import io.datahubproject.openlineage.dataset.DatahubDataset;
@ -71,6 +77,27 @@ public class DatahubEventEmitter extends EventEmitter {
RestDatahubEmitterConfig datahubRestEmitterConfig =
(RestDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig();
emitter = Optional.of(new RestEmitter(datahubRestEmitterConfig.getRestEmitterConfig()));
} else if (datahubConf.getDatahubEmitterConfig() instanceof KafkaDatahubEmitterConfig) {
KafkaDatahubEmitterConfig datahubKafkaEmitterConfig =
(KafkaDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig();
try {
emitter =
Optional.of(new KafkaEmitter(datahubKafkaEmitterConfig.getKafkaEmitterConfig()));
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (datahubConf.getDatahubEmitterConfig() instanceof FileDatahubEmitterConfig) {
FileDatahubEmitterConfig datahubFileEmitterConfig =
(FileDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig();
emitter = Optional.of(new FileEmitter(datahubFileEmitterConfig.getFileEmitterConfig()));
} else if (datahubConf.getDatahubEmitterConfig() instanceof S3DatahubEmitterConfig) {
S3DatahubEmitterConfig datahubFileEmitterConfig =
(S3DatahubEmitterConfig) datahubConf.getDatahubEmitterConfig();
try {
emitter = Optional.of(new S3Emitter(datahubFileEmitterConfig.getS3EmitterConfig()));
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
log.error(
"DataHub Transport {} not recognized. DataHub Lineage emission will not work",

View File

@ -5,9 +5,15 @@ import static io.openlineage.spark.agent.util.ScalaConversionUtils.*;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import datahub.client.file.FileEmitterConfig;
import datahub.client.kafka.KafkaEmitterConfig;
import datahub.client.rest.RestEmitterConfig;
import datahub.client.s3.S3EmitterConfig;
import datahub.spark.conf.DatahubEmitterConfig;
import datahub.spark.conf.FileDatahubEmitterConfig;
import datahub.spark.conf.KafkaDatahubEmitterConfig;
import datahub.spark.conf.RestDatahubEmitterConfig;
import datahub.spark.conf.S3DatahubEmitterConfig;
import datahub.spark.conf.SparkAppContext;
import datahub.spark.conf.SparkConfigParser;
import datahub.spark.conf.SparkLineageConf;
@ -98,57 +104,138 @@ public class DatahubSparkListener extends SparkListener {
public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
String emitterType =
sparkConf.hasPath(SparkConfigParser.TRANSPORT_KEY)
? sparkConf.getString(SparkConfigParser.TRANSPORT_KEY)
sparkConf.hasPath(SparkConfigParser.EMITTER_TYPE)
? sparkConf.getString(SparkConfigParser.EMITTER_TYPE)
: "rest";
if (emitterType.equals("rest")) {
String gmsUrl =
sparkConf.hasPath(SparkConfigParser.GMS_URL_KEY)
? sparkConf.getString(SparkConfigParser.GMS_URL_KEY)
: "http://localhost:8080";
String token =
sparkConf.hasPath(SparkConfigParser.GMS_AUTH_TOKEN)
? sparkConf.getString(SparkConfigParser.GMS_AUTH_TOKEN)
: null;
boolean disableSslVerification =
sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY)
&& sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY);
switch (emitterType) {
case "rest":
String gmsUrl =
sparkConf.hasPath(SparkConfigParser.GMS_URL_KEY)
? sparkConf.getString(SparkConfigParser.GMS_URL_KEY)
: "http://localhost:8080";
String token =
sparkConf.hasPath(SparkConfigParser.GMS_AUTH_TOKEN)
? sparkConf.getString(SparkConfigParser.GMS_AUTH_TOKEN)
: null;
boolean disableSslVerification =
sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY)
&& sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY);
int retry_interval_in_sec =
sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
: 5;
int retry_interval_in_sec =
sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC)
: 5;
int max_retries =
sparkConf.hasPath(SparkConfigParser.MAX_RETRIES)
? sparkConf.getInt(SparkConfigParser.MAX_RETRIES)
: 0;
int max_retries =
sparkConf.hasPath(SparkConfigParser.MAX_RETRIES)
? sparkConf.getInt(SparkConfigParser.MAX_RETRIES)
: 0;
log.info(
"REST Emitter Configuration: GMS url {}{}",
gmsUrl,
(sparkConf.hasPath(SparkConfigParser.GMS_URL_KEY) ? "" : "(default)"));
if (token != null) {
log.info("REST Emitter Configuration: Token {}", "XXXXX");
}
log.info(
"REST Emitter Configuration: GMS url {}{}",
gmsUrl,
(sparkConf.hasPath(SparkConfigParser.GMS_URL_KEY) ? "" : "(default)"));
if (token != null) {
log.info("REST Emitter Configuration: Token {}", "XXXXX");
}
if (disableSslVerification) {
log.warn("REST Emitter Configuration: ssl verification will be disabled.");
}
if (disableSslVerification) {
log.warn("REST Emitter Configuration: ssl verification will be disabled.");
}
RestEmitterConfig restEmitterConf =
RestEmitterConfig.builder()
.server(gmsUrl)
.token(token)
.disableSslVerification(disableSslVerification)
.maxRetries(max_retries)
.retryIntervalSec(retry_interval_in_sec)
.build();
return Optional.of(new RestDatahubEmitterConfig(restEmitterConf));
} else {
log.error(
"DataHub Transport {} not recognized. DataHub Lineage emission will not work",
emitterType);
RestEmitterConfig restEmitterConf =
RestEmitterConfig.builder()
.server(gmsUrl)
.token(token)
.disableSslVerification(disableSslVerification)
.maxRetries(max_retries)
.retryIntervalSec(retry_interval_in_sec)
.build();
return Optional.of(new RestDatahubEmitterConfig(restEmitterConf));
case "kafka":
KafkaEmitterConfig.KafkaEmitterConfigBuilder kafkaEmitterConfig =
KafkaEmitterConfig.builder();
if (sparkConf.hasPath(SparkConfigParser.KAFKA_EMITTER_BOOTSTRAP)) {
kafkaEmitterConfig.bootstrap(
sparkConf.getString(SparkConfigParser.KAFKA_EMITTER_BOOTSTRAP));
}
if (sparkConf.hasPath(SparkConfigParser.KAFKA_EMITTER_SCHEMA_REGISTRY_URL)) {
kafkaEmitterConfig.schemaRegistryUrl(
sparkConf.getString(SparkConfigParser.KAFKA_EMITTER_SCHEMA_REGISTRY_URL));
}
if (sparkConf.hasPath(KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG)) {
Map<String, String> schemaRegistryConfig = new HashMap<>();
sparkConf
.getConfig(KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG)
.entrySet()
.forEach(
entry -> {
schemaRegistryConfig.put(
entry.getKey(), entry.getValue().unwrapped().toString());
});
kafkaEmitterConfig.schemaRegistryConfig(schemaRegistryConfig);
}
if (sparkConf.hasPath(KAFKA_EMITTER_PRODUCER_CONFIG)) {
Map<String, String> kafkaConfig = new HashMap<>();
sparkConf
.getConfig(KAFKA_EMITTER_PRODUCER_CONFIG)
.entrySet()
.forEach(
entry -> {
kafkaConfig.put(entry.getKey(), entry.getValue().unwrapped().toString());
});
kafkaEmitterConfig.producerConfig(kafkaConfig);
}
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
case "file":
log.info("File Emitter Configuration: File emitter will be used");
FileEmitterConfig.FileEmitterConfigBuilder fileEmitterConfig = FileEmitterConfig.builder();
fileEmitterConfig.fileName(sparkConf.getString(SparkConfigParser.FILE_EMITTER_FILE_NAME));
return Optional.of(new FileDatahubEmitterConfig(fileEmitterConfig.build()));
case "s3":
log.info("S3 Emitter Configuration: S3 emitter will be used");
S3EmitterConfig.S3EmitterConfigBuilder s3EmitterConfig = S3EmitterConfig.builder();
if (sparkConf.hasPath(SparkConfigParser.S3_EMITTER_BUCKET)) {
s3EmitterConfig.bucketName(sparkConf.getString(SparkConfigParser.S3_EMITTER_BUCKET));
}
if (sparkConf.hasPath(SparkConfigParser.S3_EMITTER_PREFIX)) {
s3EmitterConfig.pathPrefix(sparkConf.getString(SparkConfigParser.S3_EMITTER_PREFIX));
}
if (sparkConf.hasPath(SparkConfigParser.S3_EMITTER_REGION)) {
s3EmitterConfig.region(sparkConf.getString(SparkConfigParser.S3_EMITTER_REGION));
}
if (sparkConf.hasPath(S3_EMITTER_PROFILE)) {
s3EmitterConfig.profileName(sparkConf.getString(S3_EMITTER_PROFILE));
}
if (sparkConf.hasPath(S3_EMITTER_ENDPOINT)) {
s3EmitterConfig.endpoint(sparkConf.getString(S3_EMITTER_ENDPOINT));
}
if (sparkConf.hasPath(S3_EMITTER_ACCESS_KEY)) {
s3EmitterConfig.accessKey(sparkConf.getString(S3_EMITTER_ACCESS_KEY));
}
if (sparkConf.hasPath(S3_EMITTER_SECRET_KEY)) {
s3EmitterConfig.secretKey(sparkConf.getString(S3_EMITTER_SECRET_KEY));
}
if (sparkConf.hasPath(S3_EMITTER_FILE_NAME)) {
s3EmitterConfig.fileName(sparkConf.getString(S3_EMITTER_FILE_NAME));
}
return Optional.of(new S3DatahubEmitterConfig(s3EmitterConfig.build()));
default:
log.error(
"DataHub Transport {} not recognized. DataHub Lineage emission will not work",
emitterType);
break;
}
return Optional.empty();
@ -171,9 +258,9 @@ public class DatahubSparkListener extends SparkListener {
}
log.info("Datahub configuration: {}", datahubConf.root().render());
Optional<DatahubEmitterConfig> restEmitter = initializeEmitter(datahubConf);
Optional<DatahubEmitterConfig> emitterConfig = initializeEmitter(datahubConf);
SparkLineageConf sparkLineageConf =
SparkLineageConf.toSparkLineageConf(datahubConf, appContext, restEmitter.orElse(null));
SparkLineageConf.toSparkLineageConf(datahubConf, appContext, emitterConfig.orElse(null));
long elapsedTime = System.currentTimeMillis() - startTime;
log.debug("loadDatahubConfig completed successfully in {} ms", elapsedTime);
@ -182,6 +269,7 @@ public class DatahubSparkListener extends SparkListener {
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
long startTime = System.currentTimeMillis();
initializeContextFactoryIfNotInitialized();
log.debug("Application end called");
listener.onApplicationEnd(applicationEnd);

View File

@ -0,0 +1,18 @@
package datahub.spark.conf;
import datahub.client.file.FileEmitterConfig;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Setter
@ToString
@Getter
public class FileDatahubEmitterConfig implements DatahubEmitterConfig {
final String type = "file";
FileEmitterConfig fileEmitterConfig;
public FileDatahubEmitterConfig(FileEmitterConfig fileEmitterConfig) {
this.fileEmitterConfig = fileEmitterConfig;
}
}

View File

@ -0,0 +1,18 @@
package datahub.spark.conf;
import datahub.client.kafka.KafkaEmitterConfig;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Setter
@ToString
@Getter
public class KafkaDatahubEmitterConfig implements DatahubEmitterConfig {
final String type = "kafka";
KafkaEmitterConfig kafkaEmitterConfig;
public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
}
}

View File

@ -0,0 +1,18 @@
package datahub.spark.conf;
import datahub.client.s3.S3EmitterConfig;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Setter
@ToString
@Getter
public class S3DatahubEmitterConfig implements DatahubEmitterConfig {
final String type = "s3";
S3EmitterConfig s3EmitterConfig;
public S3DatahubEmitterConfig(S3EmitterConfig s3EmitterConfig) {
this.s3EmitterConfig = s3EmitterConfig;
}
}

View File

@ -25,12 +25,26 @@ import org.slf4j.LoggerFactory;
public class SparkConfigParser {
public static final String PARENT_JOB_KEY = "parent.datajob_urn";
public static final String TRANSPORT_KEY = "transport";
public static final String EMITTER_TYPE = "emitter";
public static final String GMS_URL_KEY = "rest.server";
public static final String GMS_AUTH_TOKEN = "rest.token";
public static final String FILE_EMITTER_FILE_NAME = "file.filename";
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config";
public static final String KAFKA_EMITTER_PRODUCER_CONFIG = "kafka.producer_config";
public static final String S3_EMITTER_BUCKET = "s3.bucket";
public static final String S3_EMITTER_REGION = "s3.region";
public static final String S3_EMITTER_ENDPOINT = "s3.endpoint";
public static final String S3_EMITTER_PREFIX = "s3.prefix";
public static final String S3_EMITTER_ACCESS_KEY = "s3.access_key";
public static final String S3_EMITTER_SECRET_KEY = "s3.secret_key";
public static final String S3_EMITTER_PROFILE = "s3.profile";
public static final String S3_EMITTER_FILE_NAME = "s3.filename";
public static final String COALESCE_KEY = "coalesce_jobs";
public static final String PATCH_ENABLED = "patch.enabled";
@ -46,8 +60,10 @@ public class SparkConfigParser {
public static final String DATASET_MATERIALIZE_KEY = "metadata.dataset.materialize";
public static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
public static final String DATASET_INCLUDE_SCHEMA_METADATA =
public static final String DATASET_INCLUDE_SCHEMA_METADATA_DEPRECATED_ALIAS =
"metadata.dataset.experimental_include_schema_metadata";
public static final String DATASET_INCLUDE_SCHEMA_METADATA =
"metadata.dataset.include_schema_metadata";
public static final String SPARK_PLATFORM_INSTANCE_KEY = "platformInstance";
public static final String REMOVE_PARTITION_PATTERN = "metadata.remove_partition_pattern";
public static final String SPARK_APP_NAME = "spark.app.name";
@ -293,8 +309,13 @@ public class SparkConfigParser {
}
public static boolean isIncludeSchemaMetadata(Config datahubConfig) {
return datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA)
&& datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA);
if (datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA)) {
return datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA);
} else {
// TODO: Deprecate eventually
return datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA_DEPRECATED_ALIAS)
&& datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA_DEPRECATED_ALIAS);
}
}
public static String getPipelineName(Config datahubConfig, SparkAppContext appContext) {

View File

@ -513,6 +513,33 @@ public class OpenLineageEventToDatahubTest extends TestCase {
}
}
public void testProcess_OL17_GlueOlEvent() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/sample_glue_ol_0_17_changes.json"),
StandardCharsets.UTF_8);
OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());
assertNotNull(datahubJob);
for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:glue,my_glue_database.my_glue_table,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:glue,my_glue_database.my_output_glue_table,DEV)",
dataset.getUrn().toString());
}
}
public void testProcessGlueOlEventSymlinkDisabled() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
@ -754,4 +781,37 @@ public class OpenLineageEventToDatahubTest extends TestCase {
dataset.getSchemaMetadata().getPlatform().toString(), "urn:li:dataPlatform:redshift");
}
}
public void testProcessGCSInputsOutputs() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.lowerCaseDatasetUrns(true);
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/gs_input_output.json"),
StandardCharsets.UTF_8);
OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());
assertNotNull(datahubJob);
assertEquals(1, datahubJob.getInSet().size());
for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:gcs,my-gs-input-bucket/path/to/my-input-file.csv,DEV)",
dataset.getUrn().toString());
}
assertEquals(1, datahubJob.getOutSet().size());
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:gcs,my-gs-output-bucket/path/to/my-output-file.csv,DEV)",
dataset.getUrn().toString());
}
}
}

View File

@ -0,0 +1,62 @@
{
"eventTime": "2024-07-23T14:42:58.176Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "COMPLETE",
"run": {
"runId": "0190e00b-80a0-7902-829f-10faa3197778",
"facets": {
"parent": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet",
"run": {
"runId": "0190e00b-80a0-7902-829f-10faa3197778"
},
"job": {
"namespace": "default",
"name": "my-job"
}
},
"processing_engine": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet",
"version": "3.3.4",
"name": "spark",
"openlineageAdapterVersion": "3.3.2_2.12"
},
"spark_properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"properties": {
"spark.master": "spark://my-spark-master:7077",
"spark.app.name": "mySparkApp"
}
}
}
},
"job": {
"namespace": "default",
"name": "my_job",
"facets": {
"jobType": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.16.0/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
"processingType": "BATCH",
"integration": "SPARK",
"jobType": "RDD_JOB"
}
}
},
"inputs": [
{
"namespace": "gs://my-gs-input-bucket",
"name": "/path/to/my-input-file.csv"
}
],
"outputs": [
{
"namespace": "gs://my-gs-output-bucket",
"name": "/path/to/my-output-file.csv"
}
]
}

View File

@ -0,0 +1,168 @@
{
"eventTime": "2024-05-31T17:01:26.465Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "START",
"run": {
"runId": "3ad2a5ec-1c8b-4bda-84f4-1492758af65c",
"facets": {
"parent": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet",
"run": {
"runId": "f03077f2-c077-4472-987e-b89b1c741c86"
},
"job": {
"namespace": "default",
"name": "simple_app_parquet_with_persist_without_coalesce_s3_demo"
}
},
"processing_engine": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet",
"version": "3.3.0-amzn-1",
"name": "spark"
},
"environment-properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"environment-properties": {}
},
"spark_properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"properties": {
"spark.master": "jes",
"spark.app.name": "SimpleAppParquetWithPersistWithoutCoalesceS3-Demo"
}
},
"spark_version": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"spark-version": "3.3.0-amzn-1"
}
}
},
"job": {
"namespace": "default",
"name": "simple_app_parquet_with_persist_without_coalesce_s3_demo.execute_insert_into_hadoop_fs_relation_command.sample_data_output",
"facets": {
"jobType": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-2/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
"processingType": "BATCH",
"integration": "SPARK",
"jobType": "JOB"
}
}
},
"inputs": [
{
"namespace": "s3://my-bucket-test",
"name": "/sample_data/input_data.parquet",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet",
"name": "s3://my-bucket-test",
"uri": "s3://my-bucket-test"
},
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
"fields": [
{
"name": "field_1",
"type": "integer"
},
{
"name": "field_2",
"type": "string"
}
]
},
"symlinks": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet",
"identifiers": [
{
"namespace": "arn:aws:glue:us-west-2:123456789012",
"name": "table/my_glue_database/my_glue_table",
"type": "TABLE"
}
]
}
},
"inputFacets": {}
}
],
"outputs": [
{
"namespace": "s3://my-bucket-test",
"name": "mydata _output",
"facets": {
"dataSource": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/DatasourceDatasetFacet.json#/$defs/DatasourceDatasetFacet",
"name": "s3://my-bucket-test",
"uri": "s3://my-bucket-test"
},
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json#/$defs/SchemaDatasetFacet",
"fields": [
{
"name": "field1",
"type": "long"
},
{
"name": "field2",
"type": "string"
}
]
},
"columnLineage": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-2/ColumnLineageDatasetFacet.json#/$defs/ColumnLineageDatasetFacet",
"fields": {
"field1": {
"inputFields": [
{
"namespace": "s3://my-bucket-test",
"name": "/output/field1.parquet",
"field": "field1"
}
]
},
"field2": {
"inputFields": [
{
"namespace": "s3://my-bucket-test",
"name": "/output/field2.parquet",
"field": "field2"
}
]
}
}
},
"symlinks": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/SymlinksDatasetFacet.json#/$defs/SymlinksDatasetFacet",
"identifiers": [
{
"namespace": "arn:aws:glue:us-east-1:123456789012/",
"name": "table/my_glue_database/my_output_glue_table",
"type": "TABLE"
}
]
},
"lifecycleStateChange": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.1/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/LifecycleStateChangeDatasetFacet.json#/$defs/LifecycleStateChangeDatasetFacet",
"lifecycleStateChange": "OVERWRITE"
}
},
"outputFacets": {}
}
]
}

View File

@ -21,13 +21,15 @@ dependencies {
exclude group: "org.apache.avro"
}
implementation externalDependency.avro
implementation externalDependency.httpClient
constraints {
implementation('commons-collections:commons-collections:3.2.2') {
because 'Vulnerability Issue'
}
}
compileOnly externalDependency.httpClient
implementation externalDependency.awsS3
implementation externalDependency.jacksonDataBind
runtimeOnly externalDependency.jna
@ -41,7 +43,6 @@ dependencies {
testImplementation externalDependency.mockServer
testImplementation externalDependency.mockServerClient
testImplementation externalDependency.testContainers
testImplementation externalDependency.httpClient
testRuntimeOnly externalDependency.logbackClassic
}
@ -105,6 +106,8 @@ shadowJar {
relocate 'com.thoughtworks.paranamer', 'datahub.shaded.com.thoughtworks.paranamer'
relocate 'org.xerial.snappy', 'datahub.shaded.org.xerial.snappy'
relocate 'org.apache.kafka', 'datahub.shaded.org.apache.kafka'
relocate 'org.apache.http', 'datahub.shaded.org.apache.http'
relocate 'software.amazon.awssdk', 'datahub.shaded.software.amazon.awssdk'
relocate 'io.confluent', 'datahub.shaded.io.confluent'
relocate 'org.apache.zookeeper', 'datahub.shaded.org.apache.zookeeper'
relocate 'org.apache.yetus', 'datahub.shaded.org.apache.yetus'
@ -121,7 +124,9 @@ shadowJar {
relocate 'org.eclipse.parsson', 'datahub.shaded.parsson'
relocate 'jakarta.json', 'datahub.shaded.json'
relocate 'io.netty', 'datahub.shaded.io.netty'
relocate 'org.apache.hc', 'datahub.shaded.org.apache.hc'
relocate 'org.reactivestreams', 'datahub.shaded.org.reactivestreams'
relocate 'software.amazon.eventstream', 'datahub.shaded.software.amazon.eventstream'
finalizedBy checkShadowJar
}

View File

@ -37,6 +37,10 @@ jar -tvf $jarFile |\
grep -v "MetadataChangeProposal.avsc" |\
grep -v "aix" |\
grep -v "com/sun/"
grep -v "mozilla"
grep -v "VersionInfo.java"
grep -v "mime.types"
if [ $? -ne 0 ]; then
echo "✅ No unexpected class paths found in ${jarFile}"

View File

@ -88,7 +88,7 @@ public class FileEmitter implements Emitter {
public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
return MetadataWriteResponse.builder()
.success(true)
.responseContent("MCP witten to File")
.responseContent("MCP witten to File " + config.getFileName())
.build();
}

View File

@ -51,6 +51,7 @@ public class KafkaEmitter implements Emitter {
kafkaConfigProperties.put("schema.registry.url", this.config.getSchemaRegistryUrl());
kafkaConfigProperties.putAll(config.getSchemaRegistryConfig());
kafkaConfigProperties.putAll(config.getProducerConfig());
producer = new KafkaProducer<>(kafkaConfigProperties);
_avroSerializer = new AvroSerializer();
}

View File

@ -0,0 +1,190 @@
package datahub.client.s3;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.Callback;
import datahub.client.Emitter;
import datahub.client.MetadataWriteResponse;
import datahub.client.file.FileEmitter;
import datahub.client.file.FileEmitterConfig;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.event.UpsertAspectRequest;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.profiles.ProfileFile;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
@Slf4j
public class S3Emitter implements Emitter {
private final Path temporaryFile;
private final FileEmitter fileEmitter;
private final S3Client client;
private final S3EmitterConfig config;
/**
* The default constructor
*
* @param config
*/
public S3Emitter(S3EmitterConfig config) throws IOException {
temporaryFile = Files.createTempFile("datahub_ingest_", "_mcps.json");
log.info("Emitter created temporary file: {}", this.temporaryFile.toFile());
FileEmitterConfig fileEmitterConfig =
FileEmitterConfig.builder()
.fileName(temporaryFile.toString())
.eventFormatter(config.getEventFormatter())
.build();
fileEmitter = new FileEmitter(fileEmitterConfig);
S3ClientBuilder s3ClientBuilder = S3Client.builder();
if (config.getRegion() != null) {
s3ClientBuilder.region(Region.of(config.getRegion()));
}
if (config.getEndpoint() != null) {
try {
s3ClientBuilder.endpointOverride(new URI(config.getEndpoint()));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
if (config.getAccessKey() != null && config.getSecretKey() != null) {
s3ClientBuilder.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(config.getAccessKey(), config.getSecretKey())));
} else {
DefaultCredentialsProvider.Builder credentialsProviderBuilder =
DefaultCredentialsProvider.builder();
if (config.getProfileName() != null) {
credentialsProviderBuilder.profileName(config.getProfileName());
}
if (config.getProfileFile() != null) {
credentialsProviderBuilder.profileFile(
ProfileFile.builder().content(Paths.get(config.getProfileFile())).build());
}
s3ClientBuilder.credentialsProvider(credentialsProviderBuilder.build());
}
this.client = s3ClientBuilder.build();
this.config = config;
}
private void deleteTemporaryFile() {
try {
Files.delete(temporaryFile);
log.debug("Emitter deleted temporary file: {}", this.temporaryFile.toFile());
} catch (IOException e) {
log.warn("Failed to delete temporary file {}", temporaryFile);
}
}
@Override
public void close() throws IOException {
log.debug("Closing file {}", this.temporaryFile.toFile());
fileEmitter.close();
String key = this.temporaryFile.getFileName().toString();
// If the target filename is set, use that as the key
if (config.getFileName() != null) {
key = config.getFileName();
}
if (config.getPathPrefix().endsWith("/")) {
key = config.getPathPrefix() + key;
} else {
key = config.getPathPrefix() + "/" + key;
}
if (key.startsWith("/")) {
key = key.substring(1);
}
PutObjectRequest objectRequest =
PutObjectRequest.builder().bucket(config.getBucketName()).key(key).build();
log.info(
"Uploading file {} to S3 with bucket {} and key: {}",
this.temporaryFile,
config.getBucketName(),
key);
PutObjectResponse response = client.putObject(objectRequest, this.temporaryFile);
deleteTemporaryFile();
if (!response.sdkHttpResponse().isSuccessful()) {
log.error("Failed to upload file to S3. Response: {}", response);
throw new IOException("Failed to upload file to S3. Response: " + response);
}
}
@Override
public Future<MetadataWriteResponse> emit(
@SuppressWarnings("rawtypes") MetadataChangeProposalWrapper mcpw, Callback callback)
throws IOException {
return fileEmitter.emit(mcpw, callback);
}
@Override
public Future<MetadataWriteResponse> emit(MetadataChangeProposal mcp, Callback callback)
throws IOException {
return fileEmitter.emit(mcp, callback);
}
@Override
public boolean testConnection() throws IOException, ExecutionException, InterruptedException {
throw new UnsupportedOperationException("testConnection not relevant for File Emitter");
}
@Override
public Future<MetadataWriteResponse> emit(List<UpsertAspectRequest> request, Callback callback)
throws IOException {
throw new UnsupportedOperationException("UpsertAspectRequest not relevant for File Emitter");
}
private Future<MetadataWriteResponse> createFailureFuture(String message) {
return new Future<MetadataWriteResponse>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
return MetadataWriteResponse.builder().success(false).responseContent(message).build();
}
@Override
public MetadataWriteResponse get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return this.get();
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
};
}
}

View File

@ -0,0 +1,24 @@
package datahub.client.s3;
import datahub.event.EventFormatter;
import lombok.Builder;
import lombok.Value;
@Value
@Builder
public class S3EmitterConfig {
@Builder.Default @lombok.NonNull String bucketName = null;
@Builder.Default String pathPrefix = null;
@Builder.Default String fileName = null;
@Builder.Default
EventFormatter eventFormatter = new EventFormatter(EventFormatter.Format.PEGASUS_JSON);
@Builder.Default String region = null;
@Builder.Default String endpoint = null;
@Builder.Default String accessKey = null;
@Builder.Default String secretKey = null;
@Builder.Default String sessionToken = null;
@Builder.Default String profileFile = null;
@Builder.Default String profileName = null;
}

View File

@ -113,12 +113,19 @@ public class OpenLineageToDataHub {
for (OpenLineage.SymlinksDatasetFacetIdentifiers symlink :
dataset.getFacets().getSymlinks().getIdentifiers()) {
if (symlink.getType().equals("TABLE")) {
if (symlink.getNamespace().startsWith("aws:glue:")) {
// Before OpenLineage 0.17.1 the namespace started with "aws:glue:" and after that it was
// changed to :arn:aws:glue:"
if (symlink.getNamespace().startsWith("aws:glue:")
|| symlink.getNamespace().startsWith("arn:aws:glue:")) {
namespace = "glue";
} else {
namespace = mappingConfig.getHivePlatformAlias();
}
datasetName = symlink.getName();
if (symlink.getName().startsWith("table/")) {
datasetName = symlink.getName().replaceFirst("table/", "").replace("/", ".");
} else {
datasetName = symlink.getName();
}
}
}
Optional<DatasetUrn> symlinkedUrn =
@ -761,10 +768,7 @@ public class OpenLineageToDataHub {
}
for (OpenLineage.InputDataset input :
event.getInputs().stream()
.filter(input -> input.getFacets() != null)
.distinct()
.collect(Collectors.toList())) {
event.getInputs().stream().distinct().collect(Collectors.toList())) {
Optional<DatasetUrn> datasetUrn = convertOpenlineageDatasetToDatasetUrn(input, datahubConf);
if (datasetUrn.isPresent()) {
DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder();
@ -791,10 +795,7 @@ public class OpenLineageToDataHub {
}
for (OpenLineage.OutputDataset output :
event.getOutputs().stream()
.filter(input -> input.getFacets() != null)
.distinct()
.collect(Collectors.toList())) {
event.getOutputs().stream().distinct().collect(Collectors.toList())) {
Optional<DatasetUrn> datasetUrn = convertOpenlineageDatasetToDatasetUrn(output, datahubConf);
if (datasetUrn.isPresent()) {
DatahubDataset.DatahubDatasetBuilder builder = DatahubDataset.builder();

View File

@ -1,79 +0,0 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/
package io.openlineage.spark.agent.lifecycle.plan;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark.api.QueryPlanVisitor;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand;
import scala.Option;
/**
* {@link LogicalPlan} visitor that matches an {@link InsertIntoHadoopFsRelationCommand} and
* extracts the output {@link OpenLineage.Dataset} being written.
*/
public class InsertIntoHadoopFsRelationVisitor
extends QueryPlanVisitor<InsertIntoHadoopFsRelationCommand, OpenLineage.OutputDataset> {
public InsertIntoHadoopFsRelationVisitor(OpenLineageContext context) {
super(context);
}
@Override
public List<OpenLineage.OutputDataset> apply(LogicalPlan x) {
InsertIntoHadoopFsRelationCommand command = (InsertIntoHadoopFsRelationCommand) x;
Option<CatalogTable> catalogTable = command.catalogTable();
OpenLineage.OutputDataset outputDataset;
if (catalogTable.isEmpty()) {
DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file");
if (SaveMode.Overwrite == command.mode()) {
outputDataset =
outputDataset()
.getDataset(
di,
command.query().schema(),
OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.OVERWRITE);
} else {
outputDataset = outputDataset().getDataset(di, command.query().schema());
}
return Collections.singletonList(outputDataset);
} else {
if (SaveMode.Overwrite == command.mode()) {
return Collections.singletonList(
outputDataset()
.getDataset(
PathUtils.fromCatalogTable(catalogTable.get()),
catalogTable.get().schema(),
OpenLineage.LifecycleStateChangeDatasetFacet.LifecycleStateChange.CREATE));
} else {
return Collections.singletonList(
outputDataset()
.getDataset(
PathUtils.fromCatalogTable(catalogTable.get()), catalogTable.get().schema()));
}
}
}
@Override
public Optional<String> jobNameSuffix(InsertIntoHadoopFsRelationCommand command) {
if (command.catalogTable().isEmpty()) {
DatasetIdentifier di = PathUtils.fromURI(command.outputPath().toUri(), "file");
return Optional.of(trimPath(di.getName()));
}
return Optional.of(
trimPath(PathUtils.fromCatalogTable(command.catalogTable().get()).getName()));
}
}

View File

@ -2,7 +2,7 @@
:::note
This is our legacy Spark Integration which is replaced by [Acryl Spark Lineage](https://datahubproject.io/docs/metadata-integration/java/spark-lineage-beta)
This is our legacy Spark Integration which is replaced by [Acryl Spark Lineage](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage)
:::

View File

@ -37,7 +37,10 @@ jar -tvf $jarFile |\
grep -v "MetadataChangeProposal.avsc" |\
grep -v "aix" |\
grep -v "library.properties" |\
grep -v "rootdoc.txt"
grep -v "rootdoc.txt" |\
grep -v "VersionInfo.java" |\
grep -v "mime.types"
if [ $? -ne 0 ]; then
echo "✅ No unexpected class paths found in ${jarFile}"

Some files were not shown because too many files have changed in this diff Show More