refactor(spark-lineage): enhance logging and documentation (#4113)

This commit is contained in:
MugdhaHardikar-GSLab 2022-02-19 02:24:26 +05:30 committed by GitHub
parent 71c2b664de
commit a894424dc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 5 deletions

View File

@ -48,6 +48,14 @@ spark = SparkSession.builder()
.getOrCreate(); .getOrCreate();
``` ```
### Enable https and authentication token
Add below config in spark config
```
spark.datahub.rest.server https://<server URL>
spark.datahub.rest.token <token>
```
## What to Expect: The Metadata Model ## What to Expect: The Metadata Model
As of current writing, the Spark agent produces metadata related to the Spark job, tasks and lineage edges to datasets. As of current writing, the Spark agent produces metadata related to the Spark job, tasks and lineage edges to datasets.
@ -100,6 +108,37 @@ Effectively, these support data sources/sinks corresponding to Hive, HDFS and JD
- If spark execution fails, then an empty pipeline would still get created, but it may not have any tasks. - If spark execution fails, then an empty pipeline would still get created, but it may not have any tasks.
- For HDFS sources, the folder (name) is regarded as the dataset (name) to align with typical storage of parquet/csv formats. - For HDFS sources, the folder (name) is regarded as the dataset (name) to align with typical storage of parquet/csv formats.
### Debugging
- Following info logs are generated
On Spark context startup
```
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: DatahubSparkListener initialised.
YY/MM/DD HH:mm:ss INFO SparkContext: Registered listener datahub.spark.DatahubSparkListener
```
On application start
```
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application started: SparkListenerApplicationStart(AppName,Some(local-1644489736794),1644489735772,user,None,None)
YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: GMS url <rest.server>
YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: Token XXXXX
```
On pushing data to server
```
YY/MM/DD HH:mm:ss INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"<URN>"}, underlyingResponse=HTTP/1.1 200 OK [Date: day, DD month year HH:mm:ss GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 97, Server: Jetty(9.4.20.v20190813)] [Content-Length: 97,Chunked: false])
```
On application end
```
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application ended : AppName AppID
```
- To enable debugging logs, add below configuration in log4j.properties file
```
log4j.logger.datahub.spark=DEBUG
log4j.logger.datahub.client.rest=DEBUG
```
## Known limitations ## Known limitations
- Only postgres supported for JDBC sources in this initial release. Support for other driver URL formats will be added in future. - Only postgres supported for JDBC sources in this initial release. Support for other driver URL formats will be added in future.
- Behavior with cached datasets is not fully specified/defined in context of lineage. - Behavior with cached datasets is not fully specified/defined in context of lineage.

View File

@ -62,6 +62,10 @@ public class DatahubSparkListener extends SparkListener {
private final Map<String, ExecutorService> appPoolDetails = new ConcurrentHashMap<>(); private final Map<String, ExecutorService> appPoolDetails = new ConcurrentHashMap<>();
private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>(); private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>();
public DatahubSparkListener() {
log.info("DatahubSparkListener initialised.");
}
private class SqlStartTask implements Runnable { private class SqlStartTask implements Runnable {
private final SparkListenerSQLExecutionStart sqlStart; private final SparkListenerSQLExecutionStart sqlStart;
@ -164,7 +168,7 @@ public class DatahubSparkListener extends SparkListener {
@Override @Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) { public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
try { try {
log.debug("App started: " + applicationStart); log.info("Application started: " + applicationStart);
LineageUtils.findSparkCtx().foreach(new AbstractFunction1<SparkContext, Void>() { LineageUtils.findSparkCtx().foreach(new AbstractFunction1<SparkContext, Void>() {
@Override @Override
@ -202,7 +206,7 @@ public class DatahubSparkListener extends SparkListener {
@Override @Override
public Void apply(SparkContext sc) { public Void apply(SparkContext sc) {
log.debug("Application end event received for appId :" + sc.appName()); log.info("Application ended : {} {}", sc.appName(), sc.applicationId());
AppStartEvent start = appDetails.remove(sc.appName()); AppStartEvent start = appDetails.remove(sc.appName());
appPoolDetails.remove(sc.appName()).shutdown(); appPoolDetails.remove(sc.appName()).shutdown();
appSqlDetails.remove(sc.appName()); appSqlDetails.remove(sc.appName());
@ -313,7 +317,6 @@ public class DatahubSparkListener extends SparkListener {
.map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList()); .map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList());
} else { } else {
return Collections.emptyList(); return Collections.emptyList();
//singletonList(LineageUtils.getConsumer(DATAHUB_EMITTER));
} }
} }

View File

@ -28,6 +28,7 @@ public class McpEmitter implements LineageConsumer {
if (emitter.isPresent()) { if (emitter.isPresent()) {
mcpws.stream().map(mcpw -> { mcpws.stream().map(mcpw -> {
try { try {
log.debug("emitting mcpw: " + mcpw);
return emitter.get().emit(mcpw); return emitter.get().emit(mcpw);
} catch (IOException ioException) { } catch (IOException ioException) {
log.error("Failed to emit metadata to DataHub", ioException); log.error("Failed to emit metadata to DataHub", ioException);
@ -35,7 +36,7 @@ public class McpEmitter implements LineageConsumer {
} }
}).filter(Objects::nonNull).collect(Collectors.toList()).forEach(future -> { }).filter(Objects::nonNull).collect(Collectors.toList()).forEach(future -> {
try { try {
future.get(); log.info(future.get().toString());
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
// log error, but don't impact thread // log error, but don't impact thread
log.error("Failed to emit metadata to DataHub", e); log.error("Failed to emit metadata to DataHub", e);

View File

@ -5,4 +5,5 @@ log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
log4j.logger.com.linkedin.datahub.lineage=DEBUG log4j.logger.datahub.spark=DEBUG
log4j.logger.datahub.client.rest=DEBUG