switch from avro schema to espresso schema format

This commit is contained in:
Na Zhang 2017-03-22 14:49:45 -07:00 committed by Mars Lan
parent eee2fbf230
commit aacb722e4e
5 changed files with 67 additions and 23 deletions

View File

@ -34,6 +34,7 @@ import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import wherehows.common.Constant; import wherehows.common.Constant;
import wherehows.common.utils.JdbcConnection; import wherehows.common.utils.JdbcConnection;
@ -73,7 +74,6 @@ public class EspressoPopulateSchemaMetadataEtl extends EtlJob {
final Client r2Client = new TransportClientAdapter(http.getClient(Collections.<String, String>emptyMap())); final Client r2Client = new TransportClientAdapter(http.getClient(Collections.<String, String>emptyMap()));
RestClient _restClient = new RestClient(r2Client, restLiServerURL); RestClient _restClient = new RestClient(r2Client, restLiServerURL);
logger.debug("restLiServerURL is: " + restLiServerURL); logger.debug("restLiServerURL is: " + restLiServerURL);
System.out.println("restLiServerURL is: " + restLiServerURL);
NamedParameterJdbcTemplate NamedParameterJdbcTemplate
lNamedParameterJdbcTemplate = JdbcConnection.getNamedParameterJdbcTemplate(driverClassName,url,dbUserName,dbPassword); lNamedParameterJdbcTemplate = JdbcConnection.getNamedParameterJdbcTemplate(driverClassName,url,dbUserName,dbPassword);
@ -93,45 +93,78 @@ public class EspressoPopulateSchemaMetadataEtl extends EtlJob {
String urn = (String) row.get("urn"); String urn = (String) row.get("urn");
String lDatasetName = urn.substring(12).replace('/','.'); String lDatasetName = urn.substring(12).replace('/','.');
String lSchemaName = lDatasetName; // TO CHECK? IS THIS EXPECTED? String lSchemaName = lDatasetName;
String lOwnerName = ownerQueryResult.get("namespace") + ":" + ownerQueryResult.get("owner_id"); String lOwnerName = ownerQueryResult.get("namespace") + ":" + ownerQueryResult.get("owner_id");
String lSchemaJsonString = (String)row.get("schema"); String lSchemaJsonString = (String)row.get("schema");
logger.info("The populating schema is: " + lSchemaJsonString); logger.info("The populating schema is: " + lSchemaJsonString);
String lTableSchemaData = " "; String lTableSchemaData = null;
String lDocumentSchemaData = " "; String lDocumentSchemaData = null;
try { try {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(lSchemaJsonString); JsonNode rootNode = mapper.readTree(lSchemaJsonString);
ArrayNode keySchemaNode = (ArrayNode) rootNode.get("keySchema"); ArrayNode keySchemaNode = (ArrayNode) rootNode.get("keySchema");
if (keySchemaNode!= null) { ObjectNode tableSchemaNode = mapper.createObjectNode();
ArrayNode resourceKeyPartsNode = mapper.createArrayNode();
if (keySchemaNode!= null)
{
Iterator<JsonNode> slaidsIterator = keySchemaNode.getElements(); Iterator<JsonNode> slaidsIterator = keySchemaNode.getElements();
while (slaidsIterator.hasNext()) { while (slaidsIterator.hasNext())
{
JsonNode oneElementNode = slaidsIterator.next(); JsonNode oneElementNode = slaidsIterator.next();
lTableSchemaData = mapper.writeValueAsString(oneElementNode); resourceKeyPartsNode.add(oneElementNode);
} }
}
JsonNode valueSchemaNode = rootNode.path("valueSchema"); String name = "";
if (valueSchemaNode != null) { String doc = "";
lDocumentSchemaData = mapper.writeValueAsString(valueSchemaNode); if (rootNode.has("name") && rootNode.has("doc"))
} {
name = rootNode.get("name").toString();
doc = rootNode.get("doc").toString();
tableSchemaNode.put("name",name);
tableSchemaNode.put("doc",doc);
tableSchemaNode.put("schemaType","TableSchema");
tableSchemaNode.put("recordType","/schemata/document/DUMMY/DUMMY");
tableSchemaNode.put("version",1);
tableSchemaNode.put("resourceKeyParts",resourceKeyPartsNode);
lTableSchemaData = tableSchemaNode.toString();
}
else
{
logger.info("Missing name or doc field. Corrupted schema : " + lSchemaName);
}
}
JsonNode valueSchemaNode = rootNode.get("valueSchema");
if (valueSchemaNode != null && valueSchemaNode.has("name"))
{
lDocumentSchemaData = valueSchemaNode.toString();
}
else
{
logger.info("ValueSchema is missing name field in schema : " + lSchemaName);
}
} }
catch (JsonParseException e) { e.printStackTrace(); } catch (JsonParseException e) { e.printStackTrace(); }
catch (JsonMappingException e) { e.printStackTrace(); } catch (JsonMappingException e) { e.printStackTrace(); }
catch (IOException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
if (lTableSchemaData != null && lDocumentSchemaData != null) // not found then create, avoid committing transaction error. Is this necessary? if (lTableSchemaData != null && lDocumentSchemaData != null)
{ {
try { try {
logger.info("table schema: " + lTableSchemaData); logger.info("table schema: " + lTableSchemaData);
logger.info("valueSchema: " + lDocumentSchemaData); logger.info("valueSchema: " + lDocumentSchemaData);
System.out.println("lSchemaName is " + lSchemaName);
lRestClient.create(_restClient, lSchemaName,lTableSchemaData,lDocumentSchemaData,lDatasetName,lOwnerName); //"urn:li:corpuser:gmohanas" lRestClient.create(_restClient, lSchemaName,lTableSchemaData,lDocumentSchemaData,lDatasetName,lOwnerName);
} catch (Exception e) { } catch (Exception e) {
logger.error(e.toString()); logger.error(e.toString());
} }

View File

@ -46,7 +46,7 @@ public class OraclePopulateSchemaMetadataEtl extends EtlJob {
public static final String GET_OWNER_BY_ID = "SELECT * FROM dataset_owner WHERE dataset_urn like :platform limit 1"; public static final String GET_OWNER_BY_ID = "SELECT * FROM dataset_owner WHERE dataset_urn like :platform limit 1";
public final static String GET_DATASETS_NAME_LIKE_PLAT = public final static String GET_DATASETS_NAME_LIKE_PLAT =
"SELECT * FROM dict_dataset WHERE urn like :platform limit 1"; "SELECT * FROM dict_dataset WHERE urn like :platform";
/** The property_name field in wh_property table for WhereHows database connection information */ /** The property_name field in wh_property table for WhereHows database connection information */
public String driverClassName = prop.getProperty(Constant.WH_DB_DRIVER_KEY); public String driverClassName = prop.getProperty(Constant.WH_DB_DRIVER_KEY);

View File

@ -163,3 +163,7 @@ base.url.key=
# dali # dali
dali.git.urn= dali.git.urn=
git.committer.blacklist= git.committer.blacklist=
# wherehows metadata store restli server
wherehows.restli.server.url=

View File

@ -1,5 +1,6 @@
Please get the extra library files, which may not be available in Maven/TypeSafe repository or Artifactory, and put them here. For example: Please get the extra library files from linkedin artifactory:
* https://downloads.teradata.com/download/connectivity/jdbc-driver -metadata-store-api-rest-client-0.1.26.jar
* http://download.oracle.com/otn/utilities_drivers/jdbc/121010/ojdbc7.jar -metadata-store-api-data-template-0.1.26.jar
* http://download.oracle.com/otn/utilities_drivers/jdbc/121010/orai18n.jar -models-data-template-16.0.4.jar
-resource-identity-urn-3.0.5.jar

View File

@ -34,6 +34,10 @@ import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.common.ComplexResourceKey; import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord; import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.IdResponse; import com.linkedin.restli.common.IdResponse;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.RestLiResponseException;
import com.linkedin.r2.RemoteInvocationException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -68,15 +72,17 @@ public class EspressoCreateSchemaMetadata {
.setForeignKeysSpecs(new ForeignKeySpecMap()); .setForeignKeysSpecs(new ForeignKeySpecMap());
try { try {
IdResponse<ComplexResourceKey<SchemaMetadataKey, EmptyRecord>> response = IdResponse<ComplexResourceKey<SchemaMetadataKey, EmptyRecord>> response =
_restClient.sendRequest(_schemaMetadataBuilders.create().input(newSchemaMetadata).actorParam(new Urn(iUrn)).build()).getResponseEntity(); _restClient.sendRequest(_schemaMetadataBuilders.create().input(newSchemaMetadata).actorParam(new Urn(iUrn)).build()).getResponseEntity();
Thread.sleep(10000);
System.out.println(response.getId().getKey()); } catch (RemoteInvocationException ex) {
} catch (Exception e) { RestLiResponseException ex2 = (RestLiResponseException)ex;
System.out.println(_schemaMetadataBuilders.create().input(newSchemaMetadata).build()); System.out.println(ex2.getDecodedResponse().getHeader("X-LI-R2-W-IC-1-serviceCallTraceData"));
System.out.println(e.fillInStackTrace());
} }
} }
private static final SchemaMetadataRequestBuilders _schemaMetadataBuilders = new SchemaMetadataRequestBuilders(); private static final SchemaMetadataRequestBuilders _schemaMetadataBuilders = new SchemaMetadataRequestBuilders();