feat(datahub-client): add java file emitter (#5578)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Mugdha Hardikar 2022-08-07 19:51:44 +05:30 committed by GitHub
parent 828a711684
commit c7f477813c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 2447 additions and 5 deletions

View File

@ -14,7 +14,7 @@ Follow the specific instructions for your build system to declare a dependency o
### Gradle
Add the following to your build.gradle.
```gradle
implementation 'io.acryl:datahub-client:0.0.1'
implementation 'io.acryl:datahub-client:__version__'
```
### Maven
Add the following to your `pom.xml`.
@ -23,8 +23,8 @@ Add the following to your `pom.xml`.
<dependency>
<groupId>io.acryl</groupId>
<artifactId>datahub-client</artifactId>
<!-- replace with the latest version number -->
<version>0.0.1</version>
<!-- replace __version__ with the latest version number -->
<version>__version__</version>
</dependency>
```
@ -95,7 +95,7 @@ emitter.emit(mcpw, new Callback() {
});
```
### Emitter Code
### REST Emitter Code
If you're interested in looking at the REST emitter code, it is available [here](./datahub-client/src/main/java/datahub/client/rest/RestEmitter.java).
@ -161,6 +161,61 @@ else {
System.out.println("Kafka service is down.");
}
```
### Kafka Emitter Code
If you're interested in looking at the Kafka emitter code, it is available [here](./datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java).
## File Emitter
The File emitter writes metadata change proposal events (MCPs) into a JSON file that can be later handed off to the Python [File source](docs/generated/ingestion/sources/file.md) for ingestion. This works analogous to the [File sink](../../metadata-ingestion/sink_docs/file.md) in Python. This mechanism can be used when the system producing metadata events doesn't have direct connection to DataHub's REST server or Kafka brokers. The generated JSON file can be transferred later and then ingested into DataHub using the [File source](docs/generated/ingestion/sources/file.md).
### Usage
```java
import datahub.client.file.FileEmitter;
import datahub.client.file.FileEmitterConfig;
import datahub.event.MetadataChangeProposalWrapper;
// ... followed by
// Define output file co-ordinates
String outputFile = "/my/path/output.json";
//Create File Emitter
FileEmitter emitter = new FileEmitter(FileEmitterConfig.builder().fileName(outputFile).build());
// A couple of sample metadata events
MetadataChangeProposalWrapper mcpwOne = MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.user-table,PROD)")
.upsert()
.aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset"))
.build();
MetadataChangeProposalWrapper mcpwTwo = MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.fact-orders-table,PROD)")
.upsert()
.aspect(new DatasetProperties().setDescription("This is the canonical Fact table for orders"))
.build();
MetadataChangeProposalWrapper[] mcpws = { mcpwOne, mcpwTwo };
for (MetadataChangeProposalWrapper mcpw : mcpws) {
emitter.emit(mcpw);
}
emitter.close(); // calling close() is important to ensure file gets closed cleanly
```
### File Emitter Code
If you're interested in looking at the File emitter code, it is available [here](./datahub-client/src/main/java/datahub/client/file/FileEmitter.java).
### Support for S3, GCS etc.
The File emitter only supports writing to the local filesystem currently. If you're interested in adding support for S3, GCS etc., contributions are welcome!
## Other Languages

View File

@ -0,0 +1,198 @@
package datahub.client.file;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.Callback;
import datahub.client.Emitter;
import datahub.client.MetadataWriteResponse;
import datahub.event.EventFormatter;
import datahub.event.MetadataChangeProposalWrapper;
import datahub.event.UpsertAspectRequest;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FileEmitter implements Emitter {
private final EventFormatter eventFormatter;
private final FileEmitterConfig config;
private final ObjectMapper objectMapper = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
private final JacksonDataTemplateCodec dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory());
private final BufferedWriter writer;
private final Future<MetadataWriteResponse> cachedSuccessFuture;
private final AtomicBoolean closed;
private boolean wroteSomething;
private static final String INDENT_4 = " ";
/**
* The default constructor
*
* @param config
*/
public FileEmitter(FileEmitterConfig config) {
this.config = config;
this.eventFormatter = this.config.getEventFormatter();
DefaultPrettyPrinter pp = new DefaultPrettyPrinter()
.withObjectIndenter(new DefaultIndenter(FileEmitter.INDENT_4, DefaultIndenter.SYS_LF))
.withArrayIndenter(new DefaultIndenter(FileEmitter.INDENT_4, DefaultIndenter.SYS_LF));
this.dataTemplateCodec.setPrettyPrinter(pp);
try {
FileWriter fw = new FileWriter(config.getFileName(), false);
this.writer = new BufferedWriter(fw);
this.writer.append("[");
this.writer.newLine();
this.closed = new AtomicBoolean(false);
} catch (IOException e) {
throw new RuntimeException("Error while creating file", e);
}
this.wroteSomething = false;
log.debug("Emitter created successfully for " + this.config.getFileName());
this.cachedSuccessFuture = new Future<MetadataWriteResponse>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
return MetadataWriteResponse.builder().success(true).responseContent("MCP witten to File").build();
}
@Override
public MetadataWriteResponse get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return this.get();
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
};
}
@Override
public void close() throws IOException {
this.writer.newLine();
this.writer.append("]");
this.writer.close();
this.closed.set(true);
log.debug("Emitter closed for {}", this.config.getFileName());
}
@Override
public Future<MetadataWriteResponse> emit(@SuppressWarnings("rawtypes") MetadataChangeProposalWrapper mcpw,
Callback callback) throws IOException {
return emit(this.eventFormatter.convert(mcpw), callback);
}
@Override
public Future<MetadataWriteResponse> emit(MetadataChangeProposal mcp, Callback callback) throws IOException {
if (this.closed.get()) {
String errorMsg = "File Emitter is already closed.";
log.error(errorMsg);
Future<MetadataWriteResponse> response = createFailureFuture(errorMsg);
if (callback != null) {
callback.onFailure(new Exception(errorMsg));
}
return response;
}
try {
String serializedMCP = this.dataTemplateCodec.mapToString(mcp.data());
if (wroteSomething) {
this.writer.append(",");
this.writer.newLine();
}
this.writer.append(serializedMCP);
wroteSomething = true;
log.debug("MCP written successfully: {}", serializedMCP);
Future<MetadataWriteResponse> response = this.cachedSuccessFuture;
if (callback != null) {
try {
callback.onCompletion(response.get());
} catch (InterruptedException | ExecutionException e) {
log.warn("Callback could not be executed.", e);
}
}
return response;
} catch (Throwable t) {
Future<MetadataWriteResponse> response = createFailureFuture(t.getMessage());
if (callback != null) {
try {
callback.onFailure(t);
} catch (Exception e) {
log.warn("Callback could not be executed.", e);
}
}
return response;
}
}
@Override
public boolean testConnection() throws IOException, ExecutionException, InterruptedException {
throw new UnsupportedOperationException("testConnection not relevant for File Emitter");
}
@Override
public Future<MetadataWriteResponse> emit(List<UpsertAspectRequest> request, Callback callback) throws IOException {
throw new UnsupportedOperationException("UpsertAspectRequest not relevant for File Emitter");
}
private Future<MetadataWriteResponse> createFailureFuture(String message) {
return new Future<MetadataWriteResponse>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public MetadataWriteResponse get() throws InterruptedException, ExecutionException {
return MetadataWriteResponse.builder().success(false).responseContent(message).build();
}
@Override
public MetadataWriteResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return this.get();
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
};
}
}

View File

@ -0,0 +1,16 @@
package datahub.client.file;
import datahub.event.EventFormatter;
import lombok.Builder;
import lombok.Value;
@Value
@Builder
public class FileEmitterConfig {
@Builder.Default
@lombok.NonNull
private final String fileName = null;
@Builder.Default
private final EventFormatter eventFormatter = new EventFormatter(EventFormatter.Format.PEGASUS_JSON);
}

View File

@ -37,7 +37,7 @@ public class KafkaEmitter implements Emitter {
private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;
/**
* The default constructor, prefer using the `create` factory method.
* The default constructor
*
* @param config
* @throws IOException

View File

@ -0,0 +1,136 @@
package datahub.client.file;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import com.fasterxml.jackson.core.exc.StreamReadException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DatabindException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.mxe.MetadataChangeProposal;
import datahub.client.Callback;
import datahub.client.MetadataWriteResponse;
import datahub.event.MetadataChangeProposalWrapper;
public class FileEmitterTest {
private final ObjectMapper objectMapper = new ObjectMapper();
private final JacksonDataTemplateCodec dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory());
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void testFileEmitter() throws IOException {
InputStream goldenFileStream = ClassLoader.getSystemResourceAsStream("golden_files/mcps_golden.json");
String tempRoot = tempFolder.getRoot().toString();
String outputFile = tempRoot + "/test.json";
FileEmitter emitter = new FileEmitter(FileEmitterConfig.builder().fileName(outputFile).build());
for (MetadataChangeProposal mcp : this.getMCPs(goldenFileStream)) {
emitter.emit(mcp);
}
emitter.close();
goldenFileStream = ClassLoader.getSystemResourceAsStream("golden_files/mcps_golden.json");
this.assertEqualJsonFile(goldenFileStream, outputFile);
}
private void assertEqualJsonFile(InputStream file1, String file2) throws StreamReadException, DatabindException,
IOException {
TypeReference<List<Map<String, Object>>> typeRef = new TypeReference<List<Map<String, Object>>>() {
};
List<Map<String, Object>> map1 = this.objectMapper.readValue(file1, typeRef);
File f2 = new File(file2);
List<Map<String, Object>> map2 = this.objectMapper.readValue(f2, typeRef);
Assert.assertEquals(map1, map2);
}
private List<MetadataChangeProposal> getMCPs(InputStream fileStream) throws StreamReadException, DatabindException,
IOException {
ArrayList<MetadataChangeProposal> mcps = new ArrayList<MetadataChangeProposal>();
TypeReference<Map<String, Object>[]> typeRef = new TypeReference<Map<String, Object>[]>() {
};
Map<String, Object>[] maps = this.objectMapper.readValue(fileStream, typeRef);
for (Map<String, Object> map : maps) {
String json = objectMapper.writeValueAsString(map);
DataMap data = dataTemplateCodec.stringToMap(json);
mcps.add(new MetadataChangeProposal(data));
}
return mcps;
}
@Test
public void testSuccessCallback() throws Exception {
String tempRoot = tempFolder.getRoot().toString();
String outputFile = tempRoot + "/testCallBack.json";
FileEmitter emitter = new FileEmitter(FileEmitterConfig.builder().fileName(outputFile).build());
MetadataChangeProposalWrapper<?> mcpw = getMetadataChangeProposalWrapper("Test Dataset", "urn:li:dataset:foo");
AtomicReference<MetadataWriteResponse> callbackResponse = new AtomicReference<>();
Future<MetadataWriteResponse> future = emitter.emit(mcpw, new Callback() {
@Override
public void onCompletion(MetadataWriteResponse response) {
callbackResponse.set(response);
Assert.assertTrue(response.isSuccess());
}
@Override
public void onFailure(Throwable exception) {
Assert.fail("Should not be called");
}
});
Assert.assertEquals(callbackResponse.get(), future.get());
}
@Test
public void testFailCallback() throws Exception {
String tempRoot = tempFolder.getRoot().toString();
String outputFile = tempRoot + "/testCallBack.json";
FileEmitter emitter = new FileEmitter(FileEmitterConfig.builder().fileName(outputFile).build());
emitter.close();
MetadataChangeProposalWrapper<?> mcpw = getMetadataChangeProposalWrapper("Test Dataset", "urn:li:dataset:foo");
Future<MetadataWriteResponse> future = emitter.emit(mcpw, new Callback() {
@Override
public void onCompletion(MetadataWriteResponse response) {
Assert.fail("Should not be called");
}
@Override
public void onFailure(Throwable exception) {
}
});
Assert.assertFalse(future.get().isSuccess());
}
private MetadataChangeProposalWrapper<?> getMetadataChangeProposalWrapper(String description, String entityUrn) {
return MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.entityUrn(entityUrn)
.upsert()
.aspect(new DatasetProperties().setDescription(description))
.build();
}
}