Add schema and generated java to data model, refactor Gobblin audit processor (#732)

This commit is contained in:
Yi (Alan) Wang 2017-09-11 15:26:06 -07:00 committed by GitHub
parent 6e87040768
commit d2a3fe58db
13 changed files with 162 additions and 35 deletions

View File

@ -12,7 +12,8 @@ subprojects {
exclude "**/jython/requests/**"
exclude "**/pyparsing.py"
excludes(["**/*.json", "**/*.avsc", "**/*.avro", "**/*.conf", "**/*.yaml", "**/*.xml"])
excludes(["**/*.txt", "**/*.csv", "**/*.md"])
excludes(["**/*.txt", "**/*.csv", "**/*.md", "**/*.job", "**/*.properties", "**/*.template"])
excludes(["**/com/linkedin/events/**", "**/gobblin/metrics/**"])
}
plugins.withType(PlayPlugin) {

View File

@ -17,3 +17,4 @@ modules.each { module ->
include "${module}"
}
gradle.ext.appBuildEnvironment = "opensource"

View File

@ -129,4 +129,18 @@ public class StringUtil {
String string = String.valueOf(obj);
return string == null || string.equals("null") ? replacement : string;
}
/**
* Convert a map from object to object to a map from string to string
* Calling String.valueOf on both key and value
* @param map
* @return
*/
public static <T, K> Map<String, String> toStringMap(Map<T, K> map) {
final Map<String, String> newMap = new HashMap<>();
for (Map.Entry<T, K> entry : map.entrySet()) {
newMap.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
}
return newMap;
}
}

View File

@ -0,0 +1,39 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.utils;
import java.util.HashMap;
import java.util.Map;
import org.testng.Assert;
import org.testng.annotations.Test;
import static wherehows.common.utils.StringUtil.*;
public class StringUtilTest {
@Test
public void testToStringMap() {
CharSequence key = "foo";
Object value = "bar";
Map<CharSequence, Object> map = new HashMap<>();
map.put(key, value);
Map newMap = toStringMap(map);
Assert.assertTrue(newMap.containsKey("foo"));
Assert.assertEquals(newMap.get("foo"), "bar");
}
}

View File

@ -0,0 +1,23 @@
# Wherehows Data Model
The module contains the data model used by WhereHows, including the table DDLs for MySQL DB, Elastic search indices,
and avro schemas for Kafka events. It also includes the auto generated Kafka event java classes, to be used by other modules.
## Code Generation for Avro Schema
The java code here under src/main/java are auto generated by Avro-tool from avro schema (.avsc) of Kafka events.
They should not be edited directly.
Note that we currently require avro version 1.4 in Kafka related tasks.
To generate java code, first download avro-tool-1.4.1, then use command line from wherehows-data-model/:
```
java -jar avro-tools-1.4.1.jar compile schema xxx.avsc src/main/java
```
## Build
```
$ ../gradlew build
BUILD SUCCESSFUL in 0s
4 actionable tasks: 2 executed, 2 up-to-date
```

View File

@ -1,8 +1,17 @@
sourceSets {
main {
resources {
srcDir 'DDL'
srcDir 'ELASTICSEARCH'
}
plugins {
id "com.commercehub.gradle.plugin.avro" version "0.8.0" apply false
}
if (gradle.appBuildEnvironment == "opensource") {
apply plugin: "com.commercehub.gradle.plugin.avro"
avro {
fieldVisibility = "PUBLIC"
}
}
apply plugin: 'java'
dependencies {
compile externalDependency.avro
}

View File

@ -0,0 +1,35 @@
{
"type": "record",
"name": "GobblinTrackingEvent_audit",
"namespace": "gobblin.metrics",
"fields": [
{
"name": "timestamp",
"type": "long",
"doc": "Time at which event was created.",
"default": 0
},
{
"name": "namespace",
"type": [
"string",
"null"
],
"doc": "Namespace used for filtering of events."
},
{
"name": "name",
"type": "string",
"doc": "Event name."
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"doc": "Event metadata.",
"default": {}
}
]
}

View File

@ -3,8 +3,9 @@ apply plugin: 'application'
mainClassName = "wherehows.main.ApplicationStart"
dependencies {
compile project(':wherehows-dao')
compile project(':wherehows-common')
compile project(':wherehows-dao')
compile project(':wherehows-data-model')
compile externalDependency.jackson_databind
compile externalDependency.jackson_core
compile externalDependency.jackson_annotations

View File

@ -31,10 +31,10 @@ public class DummyProcessor extends KafkaMessageProcessor {
/**
* Simply print the message content
* @param record IndexedRecord
* @param indexedRecord IndexedRecord
*/
public void process(IndexedRecord record) {
log.info(record.toString());
public void process(IndexedRecord indexedRecord) {
log.info(indexedRecord.toString());
//System.out.println(record.toString());
}
}

View File

@ -13,15 +13,16 @@
*/
package wherehows.processors;
import lombok.RequiredArgsConstructor;
import gobblin.metrics.GobblinTrackingEvent_audit;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import wherehows.dao.DaoFactory;
import wherehows.service.GobblinTrackingAuditService;
@Slf4j
@RequiredArgsConstructor
public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
public class GobblinTrackingAuditProcessor extends KafkaMessageProcessor {
private static final String DALI_LIMITED_RETENTION_AUDITOR = "DaliLimitedRetentionAuditor";
private static final String DALI_AUTOPURGED_AUDITOR = "DaliAutoPurgeAuditor";
@ -30,26 +31,35 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
private final GobblinTrackingAuditService gobblinTrackingAuditService;
public GobblinTrackingAuditProcessor(DaoFactory daoFactory, KafkaProducer<String, IndexedRecord> producer) {
super(daoFactory, producer);
gobblinTrackingAuditService =
new GobblinTrackingAuditService(DAO_FACTORY.getDatasetClassificationDao(), DAO_FACTORY.getDictDatasetDao());
}
/**
* Process a Gobblin tracking event audit record
* @param record
* @param topic
* @return null
* @param indexedRecord
* @throws Exception
*/
public void process(GenericData.Record record, String topic) throws Exception {
public void process(IndexedRecord indexedRecord) throws Exception {
if (record == null || record.get("name") == null) {
if (indexedRecord == null || indexedRecord.getClass() != GobblinTrackingEvent_audit.class) {
log.debug("Event record type error");
return;
}
final String name = record.get("name").toString();
GobblinTrackingEvent_audit record = (GobblinTrackingEvent_audit) indexedRecord;
String name = String.valueOf(record.name);
// only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor"
if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) || name.equals(DALI_AUTOPURGED_AUDITOR) || name.equals(
DS_IGNORE_IDPC_AUDITOR)) {
// TODO: Re-enable this once it's fixed.
} else if (name.equals(METADATA_FILE_CLASSIFIER)) {
gobblinTrackingAuditService.updateHdfsDatasetSchema(record);
} else {
log.info("Gobblin audit message skipped.");
}
}
}

View File

@ -34,9 +34,9 @@ public abstract class KafkaMessageProcessor {
/**
* Abstract method 'process' to be implemented by specific processor
* @param record IndexedRecord
* @param indexedRecord IndexedRecord
* @throws Exception
*/
public abstract void process(IndexedRecord record) throws Exception;
public abstract void process(IndexedRecord indexedRecord) throws Exception;
}

View File

@ -25,12 +25,6 @@ import wherehows.service.MetadataInventoryService;
public class ProcessorFactory {
private final DaoFactory daoFactory;
public GobblinTrackingAuditProcessor getGobblinTrackingAuditProcessor() {
GobblinTrackingAuditService service =
new GobblinTrackingAuditService(daoFactory.getDatasetClassificationDao(), daoFactory.getDictDatasetDao());
return new GobblinTrackingAuditProcessor(service);
}
public JobExecutionLineageProcessor getJobExecutionLineageProcessor() {
return new JobExecutionLineageProcessor(new JobExecutionLineageService());
}

View File

@ -16,6 +16,7 @@ package wherehows.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import gobblin.metrics.GobblinTrackingEvent_audit;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -24,13 +25,12 @@ import java.util.regex.Pattern;
import javax.persistence.NoResultException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import wherehows.common.utils.StringUtil;
import wherehows.dao.table.DatasetClassificationDao;
import wherehows.dao.table.DictDatasetDao;
import wherehows.models.table.DatasetClassification;
import wherehows.models.table.DictDataset;
import wherehows.utils.StringUtil;
@Slf4j
@ -66,9 +66,9 @@ public class GobblinTrackingAuditService {
.add(Pattern.compile("/output/"))
.build();
public void updateHdfsDatasetSchema(GenericData.Record record) throws Exception {
Long timestamp = (Long) record.get("timestamp");
Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
public void updateHdfsDatasetSchema(GobblinTrackingEvent_audit record) throws Exception {
Long timestamp = record.timestamp;
Map<String, String> metadata = StringUtil.toStringMap(record.metadata);
String datasetName = metadata.get("dataset");
if (StringUtils.isEmpty(datasetName) || isDatasetNameBlacklisted(datasetName)) {
@ -155,7 +155,7 @@ public class GobblinTrackingAuditService {
return "";
}
//TODO the retuen time should be timeStamp
//TODO the return time should be timeStamp
private int getsourceModifiedTime(String hdfsModifiedTime) {
long result = Long.parseLong(hdfsModifiedTime) / 1000;
if (hdfsModifiedTime == null || result > Integer.MAX_VALUE) {