docs: spark-lineage - configuration details for Amazon EMR (#5459)

This commit is contained in:
Tamas Nemeth 2022-07-21 17:57:26 +02:00 committed by GitHub
parent afc7177450
commit 54ff4524dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,26 +1,41 @@
# Spark # Spark
To integrate Spark with DataHub, we provide a lightweight Java agent that listens for Spark application and job events and pushes metadata out to DataHub in real-time. The agent listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage to datasets that are being read from and written to. Read on to learn how to configure this for different Spark scenarios. To integrate Spark with DataHub, we provide a lightweight Java agent that listens for Spark application and job events and pushes metadata out to DataHub in real-time. The agent listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage to datasets that are being read from and written to. Read on to learn how to configure this for different Spark scenarios.
## Configuring Spark agent ## Configuring Spark agent
The Spark agent can be configured using a config file or while creating a spark Session. The Spark agent can be configured using a config file or while creating a spark Session.
## Before you begin: Versions and Release Notes ## Before you begin: Versions and Release Notes
Versioning of the jar artifact will follow the semantic versioning of the main [DataHub repo](https://github.com/datahub-project/datahub) and release notes will be available [here](https://github.com/datahub-project/datahub/releases). Versioning of the jar artifact will follow the semantic versioning of the main [DataHub repo](https://github.com/datahub-project/datahub) and release notes will be available [here](https://github.com/datahub-project/datahub/releases).
Always check [the Maven central repository](https://search.maven.org/search?q=a:datahub-spark-lineage) for the latest released version. Always check [the Maven central repository](https://search.maven.org/search?q=a:datahub-spark-lineage) for the latest released version.
### Configuration Instructions: spark-submit ### Configuration Instructions: spark-submit
When running jobs using spark-submit, the agent needs to be configured in the config file. When running jobs using spark-submit, the agent needs to be configured in the config file.
``` ```text
spark.master spark://spark-master:7077
#Configuring datahub spark agent jar #Configuring datahub spark agent jar
spark.jars.packages io.acryl:datahub-spark-lineage:0.8.23 spark.jars.packages io.acryl:datahub-spark-lineage:0.8.23
spark.extraListeners datahub.spark.DatahubSparkListener spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080 spark.datahub.rest.server http://localhost:8080
``` ```
#### Configuration for Amazon EMR
Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)
```
spark.jars.packages io.acryl:datahub-spark-lineage:0.8.23
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
spark.datahub.rest.token yourtoken
```
### Configuration Instructions: Notebooks ### Configuration Instructions: Notebooks
When running interactive jobs from a notebook, the listener can be configured while building the Spark Session. When running interactive jobs from a notebook, the listener can be configured while building the Spark Session.
```python ```python
@ -35,6 +50,7 @@ spark = SparkSession.builder \
``` ```
### Configuration Instructions: Standalone Java Applications ### Configuration Instructions: Standalone Java Applications
The configuration for standalone Java apps is very similar. The configuration for standalone Java apps is very similar.
```java ```java
@ -54,7 +70,7 @@ spark = SparkSession.builder()
|-------------------------------------------------|----------|---------|-------------------------------------------------------------------------| |-------------------------------------------------|----------|---------|-------------------------------------------------------------------------|
| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 | | spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 |
| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | | spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server | ✅ | | Datahub server url eg:http://localhost:8080 | | spark.datahub.rest.server | ✅ | | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. | | spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.metadata.pipeline.platformInstance| | | Pipeline level platform instance | | spark.datahub.metadata.pipeline.platformInstance| | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance| | | dataset level platform instance | | spark.datahub.metadata.dataset.platformInstance| | | dataset level platform instance |
@ -62,7 +78,6 @@ spark = SparkSession.builder()
| spark.datahub.coalesce_jobs | | false | Only one datajob(taask) will be emitted containing all input and output datasets for the spark application | | spark.datahub.coalesce_jobs | | false | Only one datajob(taask) will be emitted containing all input and output datasets for the spark application |
| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | | spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |
## What to Expect: The Metadata Model ## What to Expect: The Metadata Model
As of current writing, the Spark agent produces metadata related to the Spark job, tasks and lineage edges to datasets. As of current writing, the Spark agent produces metadata related to the Spark job, tasks and lineage edges to datasets.
@ -71,23 +86,26 @@ As of current writing, the Spark agent produces metadata related to the Spark jo
- A task is created per unique Spark query execution within an app. - A task is created per unique Spark query execution within an app.
### Custom properties & relating to Spark UI ### Custom properties & relating to Spark UI
The following custom properties in pipelines and tasks relate to the Spark UI: The following custom properties in pipelines and tasks relate to the Spark UI:
- appName and appId in a pipeline can be used to determine the Spark application - appName and appId in a pipeline can be used to determine the Spark application
- description and SQLQueryId in a task can be used to determine the Query Execution within the application on the SQL tab of Spark UI - description and SQLQueryId in a task can be used to determine the Query Execution within the application on the SQL tab of Spark UI
Other custom properties of pipelines and tasks capture the start and end times of execution etc. Other custom properties of pipelines and tasks capture the start and end times of execution etc.
The query plan is captured in the *queryPlan* property of a task. The query plan is captured in the *queryPlan* property of a task.
### Spark versions supported ### Spark versions supported
The primary version tested is Spark/Scala version 2.4.8/2_11. The primary version tested is Spark/Scala version 2.4.8/2_11.
This library has also been tested to work with Spark versions(2.2.0 - 2.4.8) and Scala versions(2.10 - 2.12). This library has also been tested to work with Spark versions(2.2.0 - 2.4.8) and Scala versions(2.10 - 2.12).
For the Spark 3.x series, this has been tested to work with Spark 3.1.2 and 3.2.0 with Scala 2.12. Other combinations are not guaranteed to work currently. For the Spark 3.x series, this has been tested to work with Spark 3.1.2 and 3.2.0 with Scala 2.12. Other combinations are not guaranteed to work currently.
Support for other Spark versions is planned in the very near future. Support for other Spark versions is planned in the very near future.
### Environments tested with ### Environments tested with
This initial release has been tested with the following environments: This initial release has been tested with the following environments:
- spark-submit of Python/Java applications to local and remote servers - spark-submit of Python/Java applications to local and remote servers
- Jupyter notebooks with pyspark code - Jupyter notebooks with pyspark code
- Standalone Java applications - Standalone Java applications
@ -97,6 +115,7 @@ Note that testing for other environments such as Databricks is planned in near f
### Spark commands supported ### Spark commands supported
Below is a list of Spark commands that are parsed currently: Below is a list of Spark commands that are parsed currently:
- InsertIntoHadoopFsRelationCommand - InsertIntoHadoopFsRelationCommand
- SaveIntoDataSourceCommand (jdbc) - SaveIntoDataSourceCommand (jdbc)
- CreateHiveTableAsSelectCommand - CreateHiveTableAsSelectCommand
@ -105,12 +124,14 @@ Below is a list of Spark commands that are parsed currently:
Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC. Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC.
DataFrame.persist command is supported for below LeafExecNodes: DataFrame.persist command is supported for below LeafExecNodes:
- FileSourceScanExec - FileSourceScanExec
- HiveTableScanExec - HiveTableScanExec
- RowDataSourceScanExec - RowDataSourceScanExec
- InMemoryTableScanExec - InMemoryTableScanExec
### Spark commands not yet supported ### Spark commands not yet supported
- View related commands - View related commands
- Cache commands and implications on lineage - Cache commands and implications on lineage
- RDD jobs - RDD jobs
@ -127,21 +148,28 @@ DataFrame.persist command is supported for below LeafExecNodes:
- Following info logs are generated - Following info logs are generated
On Spark context startup On Spark context startup
```
```text
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: DatahubSparkListener initialised. YY/MM/DD HH:mm:ss INFO DatahubSparkListener: DatahubSparkListener initialised.
YY/MM/DD HH:mm:ss INFO SparkContext: Registered listener datahub.spark.DatahubSparkListener YY/MM/DD HH:mm:ss INFO SparkContext: Registered listener datahub.spark.DatahubSparkListener
``` ```
On application start On application start
``` ```
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application started: SparkListenerApplicationStart(AppName,Some(local-1644489736794),1644489735772,user,None,None) YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application started: SparkListenerApplicationStart(AppName,Some(local-1644489736794),1644489735772,user,None,None)
YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: GMS url <rest.server> YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: GMS url <rest.server>
YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: Token XXXXX YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: Token XXXXX
``` ```
On pushing data to server On pushing data to server
``` ```
YY/MM/DD HH:mm:ss INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"<URN>"}, underlyingResponse=HTTP/1.1 200 OK [Date: day, DD month year HH:mm:ss GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 97, Server: Jetty(9.4.46.v20220331)] [Content-Length: 97,Chunked: false]) YY/MM/DD HH:mm:ss INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"<URN>"}, underlyingResponse=HTTP/1.1 200 OK [Date: day, DD month year HH:mm:ss GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 97, Server: Jetty(9.4.46.v20220331)] [Content-Length: 97,Chunked: false])
``` ```
On application end On application end
``` ```
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application ended : AppName AppID YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application ended : AppName AppID
``` ```
@ -154,6 +182,7 @@ log4j.logger.datahub.client.rest=DEBUG
``` ```
## Known limitations ## Known limitations
- Only postgres supported for JDBC sources in this initial release. Support for other driver URL formats will be added in future. - Only postgres supported for JDBC sources in this initial release. Support for other driver URL formats will be added in future.
- Behavior with cached datasets is not fully specified/defined in context of lineage. - Behavior with cached datasets is not fully specified/defined in context of lineage.
- There is a possibility that very short-lived jobs that run within a few milliseconds may not be captured by the listener. This should not cause an issue for realistic Spark applications. - There is a possibility that very short-lived jobs that run within a few milliseconds may not be captured by the listener. This should not cause an issue for realistic Spark applications.