Move kafka processors to module wherehows-ingestion (#1377)

This commit is contained in:
Yi (Alan) Wang 2018-09-12 12:08:52 -07:00 committed by GitHub
parent ca7110ea11
commit ffa80b8b9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 182 additions and 84 deletions

View File

@ -6,6 +6,7 @@ def modules = [
"wherehows-etl",
"wherehows-frontend",
"wherehows-hadoop",
"wherehows-ingestion",
"wherehows-kafka",
"wherehows-web",
]

View File

@ -0,0 +1,18 @@
apply plugin: 'java'
dependencies {
compile project(':wherehows-common')
compile project(':wherehows-dao')
compile externalDependency.kafka_clients
compile externalDependency.slf4j_api
compileOnly externalDependency.lombok
testCompile externalDependency.testng
testCompile externalDependency.mockito
}
test {
// enable TestNG support (default is JUnit)
useTestNG()
}

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.converters;
package wherehows.ingestion.converters;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.MetadataChangeEvent;

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.converters;
package wherehows.ingestion.converters;
import org.apache.avro.generic.IndexedRecord;

View File

@ -11,8 +11,10 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.processors;
package wherehows.ingestion.processors;
import java.util.Properties;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -25,8 +27,9 @@ import wherehows.dao.DaoFactory;
@Slf4j
public class DummyProcessor extends KafkaMessageProcessor {
public DummyProcessor(DaoFactory daoFactory, String producerTopic, KafkaProducer<String, IndexedRecord> producer) {
super(producerTopic, producer);
public DummyProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory, @Nonnull String producerTopic,
@Nonnull KafkaProducer<String, IndexedRecord> producer) {
super(config, daoFactory, producerTopic, producer);
}
/**

View File

@ -11,11 +11,14 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.processors;
package wherehows.ingestion.processors;
import java.util.Properties;
import javax.annotation.Nonnull;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import wherehows.dao.DaoFactory;
/**
@ -23,11 +26,18 @@ import org.apache.kafka.clients.producer.ProducerRecord;
*/
public abstract class KafkaMessageProcessor {
final Properties _config;
final DaoFactory _daoFactory;
private final String _producerTopic;
private final KafkaProducer<String, IndexedRecord> _producer;
KafkaMessageProcessor(String producerTopic, KafkaProducer<String, IndexedRecord> producer) {
KafkaMessageProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory, @Nonnull String producerTopic,
@Nonnull KafkaProducer<String, IndexedRecord> producer) {
this._config = config;
this._daoFactory = daoFactory;
this._producerTopic = producerTopic;
this._producer = producer;
}

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.processors;
package wherehows.ingestion.processors;
import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.ChangeType;
@ -20,21 +20,22 @@ import com.linkedin.events.metadata.DatasetSchema;
import com.linkedin.events.metadata.FailedMetadataChangeEvent;
import com.linkedin.events.metadata.MetadataChangeEvent;
import com.linkedin.events.metadata.Schemaless;
import com.typesafe.config.Config;
import java.util.Properties;
import java.util.Set;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import wherehows.common.exceptions.UnauthorizedException;
import wherehows.converters.KafkaLogCompactionConverter;
import wherehows.dao.DaoFactory;
import wherehows.dao.table.DatasetComplianceDao;
import wherehows.dao.table.DatasetOwnerDao;
import wherehows.dao.table.DictDatasetDao;
import wherehows.dao.table.FieldDetailDao;
import wherehows.models.table.DictDataset;
import wherehows.utils.ProcessorUtil;
import wherehows.ingestion.converters.KafkaLogCompactionConverter;
import wherehows.ingestion.utils.ProcessorUtil;
import static wherehows.common.utils.StringUtil.*;
@ -51,16 +52,16 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor {
private final static int MAX_DATASET_NAME_LENGTH = 400;
public MetadataChangeProcessor(Config config, DaoFactory daoFactory, String producerTopic,
KafkaProducer<String, IndexedRecord> producer) {
super(producerTopic, producer);
public MetadataChangeProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory,
@Nonnull String producerTopic, @Nonnull KafkaProducer<String, IndexedRecord> producer) {
super(config, daoFactory, producerTopic, producer);
_dictDatasetDao = daoFactory.getDictDatasetDao();
_fieldDetailDao = daoFactory.getDictFieldDetailDao();
_ownerDao = daoFactory.getDatasteOwnerDao();
_complianceDao = daoFactory.getDatasetComplianceDao();
_dictDatasetDao = _daoFactory.getDictDatasetDao();
_fieldDetailDao = _daoFactory.getDictFieldDetailDao();
_ownerDao = _daoFactory.getDatasteOwnerDao();
_complianceDao = _daoFactory.getDatasetComplianceDao();
_whitelistActors = ProcessorUtil.getWhitelistedActors(config, "whitelist.mce");
_whitelistActors = ProcessorUtil.getWhitelistedActors(_config, "whitelist.mce");
log.info("MCE whitelist: " + _whitelistActors);
}

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.processors;
package wherehows.ingestion.processors;
import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.DataOrigin;
@ -19,11 +19,12 @@ import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.FailedMetadataInventoryEvent;
import com.linkedin.events.metadata.MetadataChangeEvent;
import com.linkedin.events.metadata.MetadataInventoryEvent;
import com.typesafe.config.Config;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.exception.ExceptionUtils;
@ -31,10 +32,10 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import wherehows.common.exceptions.UnauthorizedException;
import wherehows.dao.DaoFactory;
import wherehows.dao.view.DatasetViewDao;
import wherehows.utils.ProcessorUtil;
import wherehows.ingestion.utils.ProcessorUtil;
import static wherehows.ingestion.utils.ProcessorUtil.*;
import static wherehows.util.UrnUtil.*;
import static wherehows.utils.ProcessorUtil.*;
@Slf4j
@ -46,13 +47,13 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor {
private final DatasetViewDao _datasetViewDao;
public MetadataInventoryProcessor(Config config, DaoFactory daoFactory, String producerTopic,
KafkaProducer<String, IndexedRecord> producer) {
super(producerTopic, producer);
public MetadataInventoryProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory,
@Nonnull String producerTopic, @Nonnull KafkaProducer<String, IndexedRecord> producer) {
super(config, daoFactory, producerTopic, producer);
_datasetViewDao = daoFactory.getDatasetViewDao();
_datasetViewDao = _daoFactory.getDatasetViewDao();
_whitelistActors = ProcessorUtil.getWhitelistedActors(config, "whitelist.mie");
_whitelistActors = ProcessorUtil.getWhitelistedActors(_config, "whitelist.mie");
log.info("MIE whitelist: " + _whitelistActors);
}

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.processors;
package wherehows.ingestion.processors;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.events.metadata.ChangeAuditStamp;
@ -21,10 +21,11 @@ import com.linkedin.events.metadata.FailedMetadataLineageEvent;
import com.linkedin.events.metadata.JobStatus;
import com.linkedin.events.metadata.MetadataLineageEvent;
import com.linkedin.events.metadata.agent;
import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
@ -33,9 +34,9 @@ import wherehows.common.exceptions.SelfLineageException;
import wherehows.common.exceptions.UnauthorizedException;
import wherehows.dao.DaoFactory;
import wherehows.dao.table.LineageDao;
import wherehows.utils.ProcessorUtil;
import wherehows.ingestion.utils.ProcessorUtil;
import static wherehows.utils.ProcessorUtil.*;
import static wherehows.ingestion.utils.ProcessorUtil.*;
@Slf4j
@ -45,12 +46,13 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor {
private final Set<String> _whitelistActors;
public MetadataLineageProcessor(Config config, DaoFactory daoFactory, String producerTopic,
KafkaProducer<String, IndexedRecord> producer) {
super(producerTopic, producer);
this._lineageDao = daoFactory.getLineageDao();
public MetadataLineageProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory,
@Nonnull String producerTopic, @Nonnull KafkaProducer<String, IndexedRecord> producer) {
super(config, daoFactory, producerTopic, producer);
_whitelistActors = ProcessorUtil.getWhitelistedActors(config, "whitelist.mle");
this._lineageDao = _daoFactory.getLineageDao();
_whitelistActors = ProcessorUtil.getWhitelistedActors(_config, "whitelist.mle");
log.info("MLE whitelist: " + _whitelistActors);
}

View File

@ -11,18 +11,18 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.utils;
package wherehows.ingestion.utils;
import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.MetadataChangeEvent;
import com.typesafe.config.Config;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -64,13 +64,13 @@ public class ProcessorUtil {
/**
* Extract whitelisted actors from the given config and configPath
* @param config The {@link Config}
* @param config {@link Properties}
* @param configPath Key for the white list config
* @return A set of actor names or null if corresponding config doesn't exists
*/
@Nullable
public static Set<String> getWhitelistedActors(@Nonnull Config config, @Nonnull String configPath) {
String actors = config.hasPath(configPath) ? config.getString(configPath) : null;
public static Set<String> getWhitelistedActors(@Nonnull Properties config, @Nonnull String configPath) {
String actors = config.getProperty(configPath);
if (StringUtils.isEmpty(actors)) {
return null;
}

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.converters;
package wherehows.ingestion.converters;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.DeploymentDetail;

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.processors;
package wherehows.ingestion.processors;
import com.linkedin.events.metadata.ChangeAuditStamp;
import com.linkedin.events.metadata.DatasetIdentifier;
@ -20,10 +20,10 @@ import com.linkedin.events.metadata.JobExecution;
import com.linkedin.events.metadata.JobStatus;
import com.linkedin.events.metadata.MetadataLineageEvent;
import com.linkedin.events.metadata.agent;
import com.typesafe.config.Config;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@ -33,7 +33,7 @@ import wherehows.dao.table.LineageDao;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;
import static wherehows.util.ProcessorUtilTest.*;
import static wherehows.ingestion.util.ProcessorUtilTest.*;
public class MetadataLineageProcessorTest {
@ -42,12 +42,10 @@ public class MetadataLineageProcessorTest {
@BeforeTest
public void setup() {
Config mockConfig = mock(Config.class);
when(mockConfig.hasPath("whitelist.mle")).thenReturn(false);
DaoFactory mockDaoFactory = mock(DaoFactory.class);
when(mockDaoFactory.getLineageDao()).thenReturn(mock(LineageDao.class));
_processor = new MetadataLineageProcessor(mockConfig, mockDaoFactory, "topic", mock(KafkaProducer.class));
_processor = new MetadataLineageProcessor(new Properties(), mockDaoFactory, "topic", mock(KafkaProducer.class));
}
@Test

View File

@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.util;
package wherehows.ingestion.util;
import com.google.common.collect.ImmutableSet;
import com.linkedin.events.metadata.ChangeType;
@ -19,17 +19,16 @@ import com.linkedin.events.metadata.DataOrigin;
import com.linkedin.events.metadata.DatasetIdentifier;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.MetadataChangeEvent;
import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.testng.annotations.Test;
import wherehows.utils.ProcessorUtil;
import wherehows.ingestion.utils.ProcessorUtil;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;
@ -61,9 +60,8 @@ public class ProcessorUtilTest {
@Test
public void testGetWhitelistedActors() {
Config config = mock(Config.class);
when(config.hasPath("whitelist")).thenReturn(true);
when(config.getString("whitelist")).thenReturn("foo;bar");
Properties config = new Properties();
config.put("whitelist", "foo;bar");
Set<String> actors = ProcessorUtil.getWhitelistedActors(config, "whitelist");
@ -72,23 +70,22 @@ public class ProcessorUtilTest {
@Test
public void testGetWhitelistedActorsNoPath() {
Config config = mock(Config.class);
when(config.hasPath("whitelist")).thenReturn(false);
Properties config = new Properties();
config.put("whitelist", "foo;bar");
Set<String> actors = ProcessorUtil.getWhitelistedActors(config, "whitelist");
Set<String> actors = ProcessorUtil.getWhitelistedActors(config, "no");
assertEquals(actors, null);
assertNull(actors);
}
@Test
public void testGetWhitelistedActorsEmptyValue() {
Config config = mock(Config.class);
when(config.hasPath("whitelist")).thenReturn(true);
when(config.getString("whitelist")).thenReturn("");
Properties config = new Properties();
config.put("whitelist", "");
Set<String> actors = ProcessorUtil.getWhitelistedActors(config, "whitelist");
assertEquals(actors, null);
assertNull(actors);
}
@Test

View File

@ -3,12 +3,8 @@ apply plugin: 'application'
mainClassName = "wherehows.main.ApplicationStart"
dependencies {
compile project(':wherehows-common')
compile project(':wherehows-dao')
compile externalDependency.jackson_databind
compile project(':wherehows-ingestion')
compile externalDependency.akka
compile externalDependency.mysql
compile externalDependency.guava
compile externalDependency.slf4j_api
compile externalDependency.logback
@ -18,7 +14,6 @@ dependencies {
compileOnly externalDependency.lombok
testCompile externalDependency.testng
testCompile externalDependency.mockito
}
test {

View File

@ -16,7 +16,6 @@ package wherehows.actors;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
@ -32,8 +31,9 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import wherehows.common.Constant;
import wherehows.common.utils.JobsUtil;
import wherehows.dao.DaoFactory;
import wherehows.ingestion.processors.KafkaMessageProcessor;
import wherehows.msgs.KafkaCommMsg;
import wherehows.processors.KafkaMessageProcessor;
import wherehows.utils.ConfigUtil;
import static wherehows.main.ApplicationStart.*;
import static wherehows.utils.KafkaClientUtil.*;
@ -50,7 +50,7 @@ public class KafkaClientMaster extends UntypedActor {
// List of kafka workers
private static List<ActorRef> _kafkaWorkers = new ArrayList<>();
private static final Config CONFIG = ConfigFactory.load();
private static final Properties CONFIG = ConfigUtil.configToProperties(ConfigFactory.load());
public KafkaClientMaster(String kafkaJobDir) {
this.KAFKA_JOB_DIR = kafkaJobDir;
@ -133,7 +133,8 @@ public class KafkaClientMaster extends UntypedActor {
// get processor instance
Class processorClass = Class.forName(processor);
Constructor<?> ctor = processorClass.getConstructor(Config.class, DaoFactory.class, String.class, KafkaProducer.class);
Constructor<?> ctor =
processorClass.getConstructor(Properties.class, DaoFactory.class, String.class, KafkaProducer.class);
KafkaMessageProcessor processorInstance =
(KafkaMessageProcessor) ctor.newInstance(CONFIG, DAO_FACTORY, producerTopic, producer);

View File

@ -24,7 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import wherehows.processors.KafkaMessageProcessor;
import wherehows.ingestion.processors.KafkaMessageProcessor;
/**

View File

@ -0,0 +1,34 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.utils;
import com.typesafe.config.Config;
import java.util.Properties;
import javax.annotation.Nonnull;
public class ConfigUtil {
private ConfigUtil() {
}
/**
* Convert typesafe {@link Config} to {@link Properties}
*/
public static Properties configToProperties(@Nonnull Config config) {
Properties properties = new Properties();
config.entrySet().forEach(e -> properties.setProperty(e.getKey(), config.getString(e.getKey())));
return properties;
}
}

View File

@ -0,0 +1,37 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.util;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Properties;
import org.testng.annotations.Test;
import wherehows.utils.ConfigUtil;
import static org.testng.Assert.*;
public class ConfigUtilTest {
@Test
public void testConfigToProperties() {
Properties props = new Properties();
props.put("foo", "foo1");
props.put("bar", "bar2");
Config config = ConfigFactory.parseProperties(props);
assertEquals(ConfigUtil.configToProperties(config), props);
}
}