mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-24 08:28:12 +00:00
parent
2260118232
commit
fc8a8030e4
@ -5,6 +5,7 @@ import datahub.event.MetadataChangeProposalWrapper;
|
||||
import datahub.event.UpsertAspectRequest;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
@ -29,6 +30,7 @@ public interface Emitter extends Closeable {
|
||||
* @param callback if not null, is called from the IO thread. Should be a quick operation.
|
||||
* @return a {@link Future} for callers to inspect the result of the operation or block until one is available
|
||||
* @throws IOException
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
Future<MetadataWriteResponse> emit(@Nonnull MetadataChangeProposalWrapper mcpw, Callback callback) throws IOException;
|
||||
|
||||
|
||||
@ -1,17 +1,5 @@
|
||||
package datahub.client.rest;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.linkedin.data.DataMap;
|
||||
import com.linkedin.data.template.JacksonDataTemplateCodec;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import datahub.client.Callback;
|
||||
import datahub.client.Emitter;
|
||||
import datahub.client.MetadataResponseFuture;
|
||||
import datahub.client.MetadataWriteResponse;
|
||||
import datahub.event.EventFormatter;
|
||||
import datahub.event.MetadataChangeProposalWrapper;
|
||||
import datahub.event.UpsertAspectRequest;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@ -21,8 +9,9 @@ import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
@ -33,6 +22,21 @@ import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.linkedin.data.DataMap;
|
||||
import com.linkedin.data.template.JacksonDataTemplateCodec;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
|
||||
import datahub.client.Callback;
|
||||
import datahub.client.Emitter;
|
||||
import datahub.client.MetadataResponseFuture;
|
||||
import datahub.client.MetadataWriteResponse;
|
||||
import datahub.event.EventFormatter;
|
||||
import datahub.event.MetadataChangeProposalWrapper;
|
||||
import datahub.event.UpsertAspectRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
||||
@ThreadSafe
|
||||
@Slf4j
|
||||
@ -154,7 +158,7 @@ public class RestEmitter implements Emitter {
|
||||
@Override
|
||||
public Future<MetadataWriteResponse> emit(MetadataChangeProposalWrapper mcpw,
|
||||
Callback callback) throws IOException {
|
||||
return emit(this.eventFormatter.convert(mcpw), callback);
|
||||
return emit(this.eventFormatter.convert(mcpw), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -309,4 +313,4 @@ public class RestEmitter implements Emitter {
|
||||
Future<HttpResponse> requestFuture = httpClient.execute(httpPost, httpCallback);
|
||||
return new MetadataResponseFuture(requestFuture, responseAtomicReference, responseLatch);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ import com.linkedin.data.ByteString;
|
||||
import com.linkedin.data.template.JacksonDataTemplateCodec;
|
||||
import com.linkedin.mxe.GenericAspect;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@ -33,7 +34,8 @@ public class EventFormatter {
|
||||
|
||||
@SneakyThrows(URISyntaxException.class)
|
||||
public MetadataChangeProposal convert(MetadataChangeProposalWrapper mcpw) throws IOException {
|
||||
String serializedAspect = dataTemplateCodec.dataTemplateToString(mcpw.getAspect());
|
||||
|
||||
String serializedAspect = StringEscapeUtils.escapeJava(dataTemplateCodec.dataTemplateToString(mcpw.getAspect()));
|
||||
MetadataChangeProposal mcp = new MetadataChangeProposal().setEntityType(mcpw.getEntityType())
|
||||
.setAspectName(mcpw.getAspectName())
|
||||
.setEntityUrn(Urn.createFromString(mcpw.getEntityUrn()))
|
||||
|
||||
@ -0,0 +1,149 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package datahub.event;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.io.Writer;
|
||||
import java.util.Locale;
|
||||
|
||||
public class StringEscapeUtils {
|
||||
|
||||
private StringEscapeUtils() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker method for the {@link #escapeJavaScript(String)} method.
|
||||
*
|
||||
* @param out write to receieve the escaped string
|
||||
* @param str String to escape values in, may be null
|
||||
* @param escapeSingleQuote escapes single quotes if <code>true</code>
|
||||
* @param escapeForwardSlash TODO
|
||||
* @throws IOException if an IOException occurs
|
||||
*/
|
||||
private static void escapeJavaStyleString(Writer out, String str, boolean escapeSingleQuote,
|
||||
boolean escapeForwardSlash) throws IOException {
|
||||
if (out == null) {
|
||||
throw new IllegalArgumentException("The Writer must not be null");
|
||||
} else if (str != null) {
|
||||
int sz = str.length();
|
||||
|
||||
for (int i = 0; i < sz; ++i) {
|
||||
char ch = str.charAt(i);
|
||||
if (ch > 4095) {
|
||||
out.write("\\u" + hex(ch));
|
||||
} else if (ch > 255) {
|
||||
out.write("\\u0" + hex(ch));
|
||||
} else if (ch > 127) {
|
||||
out.write("\\u00" + hex(ch));
|
||||
} else if (ch < ' ') {
|
||||
switch (ch) {
|
||||
case '\b':
|
||||
out.write(92);
|
||||
out.write(98);
|
||||
break;
|
||||
case '\t':
|
||||
out.write(92);
|
||||
out.write(116);
|
||||
break;
|
||||
case '\n':
|
||||
out.write(92);
|
||||
out.write(110);
|
||||
break;
|
||||
case '\u000b':
|
||||
|
||||
case '\f':
|
||||
out.write(92);
|
||||
out.write(102);
|
||||
break;
|
||||
case '\r':
|
||||
out.write(92);
|
||||
out.write(114);
|
||||
break;
|
||||
default:
|
||||
if (ch > 15) {
|
||||
out.write("\\u00" + hex(ch));
|
||||
} else {
|
||||
out.write("\\u000" + hex(ch));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
} else {
|
||||
out.write(ch);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an upper case hexadecimal <code>String</code> for the given
|
||||
* character.
|
||||
*
|
||||
* @param ch The character to convert.
|
||||
* @return An upper case hexadecimal <code>String</code>
|
||||
*/
|
||||
private static String hex(char ch) {
|
||||
return Integer.toHexString(ch).toUpperCase(Locale.ENGLISH);
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker method for the {@link #escapeJavaScript(String)} method.
|
||||
*
|
||||
* @param str String to escape values in, may be null
|
||||
* @param escapeSingleQuotes escapes single quotes if <code>true</code>
|
||||
* @param escapeForwardSlash TODO
|
||||
* @return the escaped string
|
||||
*/
|
||||
private static String escapeJavaStyleString(String str, boolean escapeSingleQuotes, boolean escapeForwardSlash) throws IOException {
|
||||
if (str == null) {
|
||||
return null;
|
||||
} else {
|
||||
StringWriter writer = new StringWriter(str.length() * 2);
|
||||
escapeJavaStyleString(writer, str, escapeSingleQuotes, escapeForwardSlash);
|
||||
return writer.toString();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Escapes the characters in a <code>String</code> using Java String rules.
|
||||
* <p>
|
||||
* Deals correctly with quotes and control-chars (tab, backslash, cr, ff, etc.)
|
||||
* <p>
|
||||
* So a tab becomes the characters <code>'\\'</code> and <code>'t'</code>.
|
||||
* <p>
|
||||
* The only difference between Java strings and JavaScript strings
|
||||
* is that in JavaScript, a single quote must be escaped.
|
||||
* <p>
|
||||
* Example:
|
||||
* <pre>
|
||||
* input string: He didn't say, "Stop!"
|
||||
* output string: He didn't say, \"Stop!\"
|
||||
* </pre>
|
||||
*
|
||||
* @param str String to escape values in, may be null
|
||||
* @return String with escaped values, <code>null</code> if null string input
|
||||
*/
|
||||
public static String escapeJava(String str) throws IOException {
|
||||
return escapeJavaStyleString(str, false, false);
|
||||
}
|
||||
}
|
||||
@ -90,7 +90,7 @@ public class RestEmitterTest {
|
||||
+ ",\"value\":\"{\\\"description\\\":\\\"Test Dataset\\\"}\"}}}";
|
||||
Assert.assertEquals(expectedContent, contentString);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testExceptions() throws URISyntaxException, IOException, ExecutionException, InterruptedException {
|
||||
|
||||
|
||||
@ -1,13 +1,15 @@
|
||||
package datahub.event;
|
||||
|
||||
import com.linkedin.dataset.DatasetProperties;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.testng.Assert;
|
||||
|
||||
import com.linkedin.dataset.DatasetProperties;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
|
||||
|
||||
public class EventFormatterTest {
|
||||
|
||||
@ -25,4 +27,23 @@ public class EventFormatterTest {
|
||||
String content = mcp.getAspect().getValue().asString(StandardCharsets.UTF_8);
|
||||
Assert.assertEquals(content, "{\"description\":\"A test dataset\"}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUtf8Encoding() throws URISyntaxException, IOException {
|
||||
|
||||
MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.builder()
|
||||
.entityType("dataset")
|
||||
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.user-table,PROD)")
|
||||
.upsert()
|
||||
.aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset œ∑´´†¥¨ˆˆπ“‘åß∂ƒ©˙∆˚¬…æΩ≈ç√∫˜˜≤≥ç"))
|
||||
.build();
|
||||
EventFormatter eventFormatter = new EventFormatter();
|
||||
MetadataChangeProposal mcp = eventFormatter.convert(mcpw);
|
||||
Assert.assertEquals(mcp.getAspect().getContentType(), "application/json");
|
||||
String content = mcp.getAspect().getValue().asString(StandardCharsets.UTF_8);
|
||||
String expectedContent = "{\"description\":\"This is the canonical User profile dataset \\u0153\\u2211\\u00B4\\u00B4"
|
||||
+ "\\u2020\\u00A5\\u00A8\\u02C6\\u02C6\\u03C0\\u201C\\u2018\\u00E5\\u00DF\\u2202\\u0192\\u00A9\\u02D9\\u2206"
|
||||
+ "\\u02DA\\u00AC\\u2026\\u00E6\\u03A9\\u2248\\u00E7\\u221A\\u222B\\u02DC\\u02DC\\u2264\\u2265\\u00E7\"}";
|
||||
Assert.assertEquals(content, expectedContent);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user