From ffa80b8b9f8048b5285bccfda52bf87ee015c555 Mon Sep 17 00:00:00 2001 From: "Yi (Alan) Wang" Date: Wed, 12 Sep 2018 12:08:52 -0700 Subject: [PATCH] Move kafka processors to module wherehows-ingestion (#1377) --- settings.gradle | 1 + wherehows-ingestion/build.gradle | 18 +++++++++ .../KafkaLogCompactionConverter.java | 2 +- .../converters/KafkaMessageConverter.java | 2 +- .../ingestion}/processors/DummyProcessor.java | 9 +++-- .../processors/KafkaMessageProcessor.java | 14 ++++++- .../processors/MetadataChangeProcessor.java | 25 +++++++------ .../MetadataInventoryProcessor.java | 19 +++++----- .../processors/MetadataLineageProcessor.java | 20 +++++----- .../ingestion}/utils/ProcessorUtil.java | 10 ++--- .../KafkaLogCompactionConverterTest.java | 2 +- .../MetadataLineageProcessorTest.java | 10 ++--- .../ingestion}/util/ProcessorUtilTest.java | 27 ++++++-------- wherehows-kafka/build.gradle | 25 +++++-------- .../wherehows/actors/KafkaClientMaster.java | 9 +++-- .../java/wherehows/actors/KafkaWorker.java | 2 +- .../main/java/wherehows/utils/ConfigUtil.java | 34 +++++++++++++++++ .../java/wherehows/util/ConfigUtilTest.java | 37 +++++++++++++++++++ 18 files changed, 182 insertions(+), 84 deletions(-) create mode 100644 wherehows-ingestion/build.gradle rename {wherehows-kafka/src/main/java/wherehows => wherehows-ingestion/src/main/java/wherehows/ingestion}/converters/KafkaLogCompactionConverter.java (97%) rename {wherehows-kafka/src/main/java/wherehows => wherehows-ingestion/src/main/java/wherehows/ingestion}/converters/KafkaMessageConverter.java (95%) rename {wherehows-kafka/src/main/java/wherehows => wherehows-ingestion/src/main/java/wherehows/ingestion}/processors/DummyProcessor.java (75%) rename {wherehows-kafka/src/main/java/wherehows => wherehows-ingestion/src/main/java/wherehows/ingestion}/processors/KafkaMessageProcessor.java (74%) rename {wherehows-kafka/src/main/java/wherehows => wherehows-ingestion/src/main/java/wherehows/ingestion}/processors/MetadataChangeProcessor.java (88%) rename {wherehows-kafka/src/main/java/wherehows => wherehows-ingestion/src/main/java/wherehows/ingestion}/processors/MetadataInventoryProcessor.java (88%) rename {wherehows-kafka/src/main/java/wherehows => wherehows-ingestion/src/main/java/wherehows/ingestion}/processors/MetadataLineageProcessor.java (89%) rename {wherehows-kafka/src/main/java/wherehows => wherehows-ingestion/src/main/java/wherehows/ingestion}/utils/ProcessorUtil.java (91%) rename {wherehows-kafka/src/test/java/wherehows => wherehows-ingestion/src/test/java/wherehows/ingestion}/converters/KafkaLogCompactionConverterTest.java (98%) rename {wherehows-kafka/src/test/java/wherehows => wherehows-ingestion/src/test/java/wherehows/ingestion}/processors/MetadataLineageProcessorTest.java (91%) rename {wherehows-kafka/src/test/java/wherehows => wherehows-ingestion/src/test/java/wherehows/ingestion}/util/ProcessorUtilTest.java (84%) create mode 100644 wherehows-kafka/src/main/java/wherehows/utils/ConfigUtil.java create mode 100644 wherehows-kafka/src/test/java/wherehows/util/ConfigUtilTest.java diff --git a/settings.gradle b/settings.gradle index 198a7f619f..05c53e2071 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,6 +6,7 @@ def modules = [ "wherehows-etl", "wherehows-frontend", "wherehows-hadoop", + "wherehows-ingestion", "wherehows-kafka", "wherehows-web", ] diff --git a/wherehows-ingestion/build.gradle b/wherehows-ingestion/build.gradle new file mode 100644 index 0000000000..85927898aa --- /dev/null +++ b/wherehows-ingestion/build.gradle @@ -0,0 +1,18 @@ +apply plugin: 'java' + +dependencies { + compile project(':wherehows-common') + compile project(':wherehows-dao') + compile externalDependency.kafka_clients + compile externalDependency.slf4j_api + + compileOnly externalDependency.lombok + + testCompile externalDependency.testng + testCompile externalDependency.mockito +} + +test { + // enable TestNG support (default is JUnit) + useTestNG() +} diff --git a/wherehows-kafka/src/main/java/wherehows/converters/KafkaLogCompactionConverter.java b/wherehows-ingestion/src/main/java/wherehows/ingestion/converters/KafkaLogCompactionConverter.java similarity index 97% rename from wherehows-kafka/src/main/java/wherehows/converters/KafkaLogCompactionConverter.java rename to wherehows-ingestion/src/main/java/wherehows/ingestion/converters/KafkaLogCompactionConverter.java index b4acadffaf..bde7124865 100644 --- a/wherehows-kafka/src/main/java/wherehows/converters/KafkaLogCompactionConverter.java +++ b/wherehows-ingestion/src/main/java/wherehows/ingestion/converters/KafkaLogCompactionConverter.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.converters; +package wherehows.ingestion.converters; import com.linkedin.events.metadata.DeploymentDetail; import com.linkedin.events.metadata.MetadataChangeEvent; diff --git a/wherehows-kafka/src/main/java/wherehows/converters/KafkaMessageConverter.java b/wherehows-ingestion/src/main/java/wherehows/ingestion/converters/KafkaMessageConverter.java similarity index 95% rename from wherehows-kafka/src/main/java/wherehows/converters/KafkaMessageConverter.java rename to wherehows-ingestion/src/main/java/wherehows/ingestion/converters/KafkaMessageConverter.java index cb01612d76..51500e5074 100644 --- a/wherehows-kafka/src/main/java/wherehows/converters/KafkaMessageConverter.java +++ b/wherehows-ingestion/src/main/java/wherehows/ingestion/converters/KafkaMessageConverter.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.converters; +package wherehows.ingestion.converters; import org.apache.avro.generic.IndexedRecord; diff --git a/wherehows-kafka/src/main/java/wherehows/processors/DummyProcessor.java b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/DummyProcessor.java similarity index 75% rename from wherehows-kafka/src/main/java/wherehows/processors/DummyProcessor.java rename to wherehows-ingestion/src/main/java/wherehows/ingestion/processors/DummyProcessor.java index 79527ff62f..e622b97a1c 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/DummyProcessor.java +++ b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/DummyProcessor.java @@ -11,8 +11,10 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.processors; +package wherehows.ingestion.processors; +import java.util.Properties; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.IndexedRecord; import org.apache.kafka.clients.producer.KafkaProducer; @@ -25,8 +27,9 @@ import wherehows.dao.DaoFactory; @Slf4j public class DummyProcessor extends KafkaMessageProcessor { - public DummyProcessor(DaoFactory daoFactory, String producerTopic, KafkaProducer producer) { - super(producerTopic, producer); + public DummyProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory, @Nonnull String producerTopic, + @Nonnull KafkaProducer producer) { + super(config, daoFactory, producerTopic, producer); } /** diff --git a/wherehows-kafka/src/main/java/wherehows/processors/KafkaMessageProcessor.java b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/KafkaMessageProcessor.java similarity index 74% rename from wherehows-kafka/src/main/java/wherehows/processors/KafkaMessageProcessor.java rename to wherehows-ingestion/src/main/java/wherehows/ingestion/processors/KafkaMessageProcessor.java index 5dbe01ce32..3ab20d9a6f 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/KafkaMessageProcessor.java +++ b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/KafkaMessageProcessor.java @@ -11,11 +11,14 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.processors; +package wherehows.ingestion.processors; +import java.util.Properties; +import javax.annotation.Nonnull; import org.apache.avro.generic.IndexedRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import wherehows.dao.DaoFactory; /** @@ -23,11 +26,18 @@ import org.apache.kafka.clients.producer.ProducerRecord; */ public abstract class KafkaMessageProcessor { + final Properties _config; + + final DaoFactory _daoFactory; + private final String _producerTopic; private final KafkaProducer _producer; - KafkaMessageProcessor(String producerTopic, KafkaProducer producer) { + KafkaMessageProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory, @Nonnull String producerTopic, + @Nonnull KafkaProducer producer) { + this._config = config; + this._daoFactory = daoFactory; this._producerTopic = producerTopic; this._producer = producer; } diff --git a/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/MetadataChangeProcessor.java similarity index 88% rename from wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java rename to wherehows-ingestion/src/main/java/wherehows/ingestion/processors/MetadataChangeProcessor.java index bbbc211e51..60ebb48fd4 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/MetadataChangeProcessor.java +++ b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/MetadataChangeProcessor.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.processors; +package wherehows.ingestion.processors; import com.linkedin.events.metadata.ChangeAuditStamp; import com.linkedin.events.metadata.ChangeType; @@ -20,21 +20,22 @@ import com.linkedin.events.metadata.DatasetSchema; import com.linkedin.events.metadata.FailedMetadataChangeEvent; import com.linkedin.events.metadata.MetadataChangeEvent; import com.linkedin.events.metadata.Schemaless; -import com.typesafe.config.Config; +import java.util.Properties; import java.util.Set; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.producer.KafkaProducer; import wherehows.common.exceptions.UnauthorizedException; -import wherehows.converters.KafkaLogCompactionConverter; import wherehows.dao.DaoFactory; import wherehows.dao.table.DatasetComplianceDao; import wherehows.dao.table.DatasetOwnerDao; import wherehows.dao.table.DictDatasetDao; import wherehows.dao.table.FieldDetailDao; import wherehows.models.table.DictDataset; -import wherehows.utils.ProcessorUtil; +import wherehows.ingestion.converters.KafkaLogCompactionConverter; +import wherehows.ingestion.utils.ProcessorUtil; import static wherehows.common.utils.StringUtil.*; @@ -51,16 +52,16 @@ public class MetadataChangeProcessor extends KafkaMessageProcessor { private final static int MAX_DATASET_NAME_LENGTH = 400; - public MetadataChangeProcessor(Config config, DaoFactory daoFactory, String producerTopic, - KafkaProducer producer) { - super(producerTopic, producer); + public MetadataChangeProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory, + @Nonnull String producerTopic, @Nonnull KafkaProducer producer) { + super(config, daoFactory, producerTopic, producer); - _dictDatasetDao = daoFactory.getDictDatasetDao(); - _fieldDetailDao = daoFactory.getDictFieldDetailDao(); - _ownerDao = daoFactory.getDatasteOwnerDao(); - _complianceDao = daoFactory.getDatasetComplianceDao(); + _dictDatasetDao = _daoFactory.getDictDatasetDao(); + _fieldDetailDao = _daoFactory.getDictFieldDetailDao(); + _ownerDao = _daoFactory.getDatasteOwnerDao(); + _complianceDao = _daoFactory.getDatasetComplianceDao(); - _whitelistActors = ProcessorUtil.getWhitelistedActors(config, "whitelist.mce"); + _whitelistActors = ProcessorUtil.getWhitelistedActors(_config, "whitelist.mce"); log.info("MCE whitelist: " + _whitelistActors); } diff --git a/wherehows-kafka/src/main/java/wherehows/processors/MetadataInventoryProcessor.java b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/MetadataInventoryProcessor.java similarity index 88% rename from wherehows-kafka/src/main/java/wherehows/processors/MetadataInventoryProcessor.java rename to wherehows-ingestion/src/main/java/wherehows/ingestion/processors/MetadataInventoryProcessor.java index da32e56ae5..59dec67752 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/MetadataInventoryProcessor.java +++ b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/MetadataInventoryProcessor.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.processors; +package wherehows.ingestion.processors; import com.linkedin.events.metadata.ChangeAuditStamp; import com.linkedin.events.metadata.DataOrigin; @@ -19,11 +19,12 @@ import com.linkedin.events.metadata.DatasetIdentifier; import com.linkedin.events.metadata.FailedMetadataInventoryEvent; import com.linkedin.events.metadata.MetadataChangeEvent; import com.linkedin.events.metadata.MetadataInventoryEvent; -import com.typesafe.config.Config; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -31,10 +32,10 @@ import org.apache.kafka.clients.producer.KafkaProducer; import wherehows.common.exceptions.UnauthorizedException; import wherehows.dao.DaoFactory; import wherehows.dao.view.DatasetViewDao; -import wherehows.utils.ProcessorUtil; +import wherehows.ingestion.utils.ProcessorUtil; +import static wherehows.ingestion.utils.ProcessorUtil.*; import static wherehows.util.UrnUtil.*; -import static wherehows.utils.ProcessorUtil.*; @Slf4j @@ -46,13 +47,13 @@ public class MetadataInventoryProcessor extends KafkaMessageProcessor { private final DatasetViewDao _datasetViewDao; - public MetadataInventoryProcessor(Config config, DaoFactory daoFactory, String producerTopic, - KafkaProducer producer) { - super(producerTopic, producer); + public MetadataInventoryProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory, + @Nonnull String producerTopic, @Nonnull KafkaProducer producer) { + super(config, daoFactory, producerTopic, producer); - _datasetViewDao = daoFactory.getDatasetViewDao(); + _datasetViewDao = _daoFactory.getDatasetViewDao(); - _whitelistActors = ProcessorUtil.getWhitelistedActors(config, "whitelist.mie"); + _whitelistActors = ProcessorUtil.getWhitelistedActors(_config, "whitelist.mie"); log.info("MIE whitelist: " + _whitelistActors); } diff --git a/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/MetadataLineageProcessor.java similarity index 89% rename from wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java rename to wherehows-ingestion/src/main/java/wherehows/ingestion/processors/MetadataLineageProcessor.java index f6d5f06ecf..ea0c7dac34 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java +++ b/wherehows-ingestion/src/main/java/wherehows/ingestion/processors/MetadataLineageProcessor.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.processors; +package wherehows.ingestion.processors; import com.google.common.annotations.VisibleForTesting; import com.linkedin.events.metadata.ChangeAuditStamp; @@ -21,10 +21,11 @@ import com.linkedin.events.metadata.FailedMetadataLineageEvent; import com.linkedin.events.metadata.JobStatus; import com.linkedin.events.metadata.MetadataLineageEvent; import com.linkedin.events.metadata.agent; -import com.typesafe.config.Config; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.Set; +import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.IndexedRecord; import org.apache.commons.lang.exception.ExceptionUtils; @@ -33,9 +34,9 @@ import wherehows.common.exceptions.SelfLineageException; import wherehows.common.exceptions.UnauthorizedException; import wherehows.dao.DaoFactory; import wherehows.dao.table.LineageDao; -import wherehows.utils.ProcessorUtil; +import wherehows.ingestion.utils.ProcessorUtil; -import static wherehows.utils.ProcessorUtil.*; +import static wherehows.ingestion.utils.ProcessorUtil.*; @Slf4j @@ -45,12 +46,13 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor { private final Set _whitelistActors; - public MetadataLineageProcessor(Config config, DaoFactory daoFactory, String producerTopic, - KafkaProducer producer) { - super(producerTopic, producer); - this._lineageDao = daoFactory.getLineageDao(); + public MetadataLineageProcessor(@Nonnull Properties config, @Nonnull DaoFactory daoFactory, + @Nonnull String producerTopic, @Nonnull KafkaProducer producer) { + super(config, daoFactory, producerTopic, producer); - _whitelistActors = ProcessorUtil.getWhitelistedActors(config, "whitelist.mle"); + this._lineageDao = _daoFactory.getLineageDao(); + + _whitelistActors = ProcessorUtil.getWhitelistedActors(_config, "whitelist.mle"); log.info("MLE whitelist: " + _whitelistActors); } diff --git a/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java b/wherehows-ingestion/src/main/java/wherehows/ingestion/utils/ProcessorUtil.java similarity index 91% rename from wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java rename to wherehows-ingestion/src/main/java/wherehows/ingestion/utils/ProcessorUtil.java index 8d012d32f5..e8ef80c61c 100644 --- a/wherehows-kafka/src/main/java/wherehows/utils/ProcessorUtil.java +++ b/wherehows-ingestion/src/main/java/wherehows/ingestion/utils/ProcessorUtil.java @@ -11,18 +11,18 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.utils; +package wherehows.ingestion.utils; import com.linkedin.events.metadata.ChangeAuditStamp; import com.linkedin.events.metadata.ChangeType; import com.linkedin.events.metadata.DatasetIdentifier; import com.linkedin.events.metadata.DeploymentDetail; import com.linkedin.events.metadata.MetadataChangeEvent; -import com.typesafe.config.Config; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -64,13 +64,13 @@ public class ProcessorUtil { /** * Extract whitelisted actors from the given config and configPath - * @param config The {@link Config} + * @param config {@link Properties} * @param configPath Key for the white list config * @return A set of actor names or null if corresponding config doesn't exists */ @Nullable - public static Set getWhitelistedActors(@Nonnull Config config, @Nonnull String configPath) { - String actors = config.hasPath(configPath) ? config.getString(configPath) : null; + public static Set getWhitelistedActors(@Nonnull Properties config, @Nonnull String configPath) { + String actors = config.getProperty(configPath); if (StringUtils.isEmpty(actors)) { return null; } diff --git a/wherehows-kafka/src/test/java/wherehows/converters/KafkaLogCompactionConverterTest.java b/wherehows-ingestion/src/test/java/wherehows/ingestion/converters/KafkaLogCompactionConverterTest.java similarity index 98% rename from wherehows-kafka/src/test/java/wherehows/converters/KafkaLogCompactionConverterTest.java rename to wherehows-ingestion/src/test/java/wherehows/ingestion/converters/KafkaLogCompactionConverterTest.java index e3d308ae57..6d3597bb32 100644 --- a/wherehows-kafka/src/test/java/wherehows/converters/KafkaLogCompactionConverterTest.java +++ b/wherehows-ingestion/src/test/java/wherehows/ingestion/converters/KafkaLogCompactionConverterTest.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.converters; +package wherehows.ingestion.converters; import com.linkedin.events.metadata.DatasetIdentifier; import com.linkedin.events.metadata.DeploymentDetail; diff --git a/wherehows-kafka/src/test/java/wherehows/processors/MetadataLineageProcessorTest.java b/wherehows-ingestion/src/test/java/wherehows/ingestion/processors/MetadataLineageProcessorTest.java similarity index 91% rename from wherehows-kafka/src/test/java/wherehows/processors/MetadataLineageProcessorTest.java rename to wherehows-ingestion/src/test/java/wherehows/ingestion/processors/MetadataLineageProcessorTest.java index 51d1e1770b..ee5eb43caf 100644 --- a/wherehows-kafka/src/test/java/wherehows/processors/MetadataLineageProcessorTest.java +++ b/wherehows-ingestion/src/test/java/wherehows/ingestion/processors/MetadataLineageProcessorTest.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.processors; +package wherehows.ingestion.processors; import com.linkedin.events.metadata.ChangeAuditStamp; import com.linkedin.events.metadata.DatasetIdentifier; @@ -20,10 +20,10 @@ import com.linkedin.events.metadata.JobExecution; import com.linkedin.events.metadata.JobStatus; import com.linkedin.events.metadata.MetadataLineageEvent; import com.linkedin.events.metadata.agent; -import com.typesafe.config.Config; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -33,7 +33,7 @@ import wherehows.dao.table.LineageDao; import static org.mockito.Mockito.*; import static org.testng.Assert.*; -import static wherehows.util.ProcessorUtilTest.*; +import static wherehows.ingestion.util.ProcessorUtilTest.*; public class MetadataLineageProcessorTest { @@ -42,12 +42,10 @@ public class MetadataLineageProcessorTest { @BeforeTest public void setup() { - Config mockConfig = mock(Config.class); - when(mockConfig.hasPath("whitelist.mle")).thenReturn(false); DaoFactory mockDaoFactory = mock(DaoFactory.class); when(mockDaoFactory.getLineageDao()).thenReturn(mock(LineageDao.class)); - _processor = new MetadataLineageProcessor(mockConfig, mockDaoFactory, "topic", mock(KafkaProducer.class)); + _processor = new MetadataLineageProcessor(new Properties(), mockDaoFactory, "topic", mock(KafkaProducer.class)); } @Test diff --git a/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java b/wherehows-ingestion/src/test/java/wherehows/ingestion/util/ProcessorUtilTest.java similarity index 84% rename from wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java rename to wherehows-ingestion/src/test/java/wherehows/ingestion/util/ProcessorUtilTest.java index 260e4ad33f..00adea3bfa 100644 --- a/wherehows-kafka/src/test/java/wherehows/util/ProcessorUtilTest.java +++ b/wherehows-ingestion/src/test/java/wherehows/ingestion/util/ProcessorUtilTest.java @@ -11,7 +11,7 @@ * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package wherehows.util; +package wherehows.ingestion.util; import com.google.common.collect.ImmutableSet; import com.linkedin.events.metadata.ChangeType; @@ -19,17 +19,16 @@ import com.linkedin.events.metadata.DataOrigin; import com.linkedin.events.metadata.DatasetIdentifier; import com.linkedin.events.metadata.DeploymentDetail; import com.linkedin.events.metadata.MetadataChangeEvent; -import com.typesafe.config.Config; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; import org.testng.annotations.Test; -import wherehows.utils.ProcessorUtil; +import wherehows.ingestion.utils.ProcessorUtil; -import static org.mockito.Mockito.*; import static org.testng.Assert.*; @@ -61,9 +60,8 @@ public class ProcessorUtilTest { @Test public void testGetWhitelistedActors() { - Config config = mock(Config.class); - when(config.hasPath("whitelist")).thenReturn(true); - when(config.getString("whitelist")).thenReturn("foo;bar"); + Properties config = new Properties(); + config.put("whitelist", "foo;bar"); Set actors = ProcessorUtil.getWhitelistedActors(config, "whitelist"); @@ -72,23 +70,22 @@ public class ProcessorUtilTest { @Test public void testGetWhitelistedActorsNoPath() { - Config config = mock(Config.class); - when(config.hasPath("whitelist")).thenReturn(false); + Properties config = new Properties(); + config.put("whitelist", "foo;bar"); - Set actors = ProcessorUtil.getWhitelistedActors(config, "whitelist"); + Set actors = ProcessorUtil.getWhitelistedActors(config, "no"); - assertEquals(actors, null); + assertNull(actors); } @Test public void testGetWhitelistedActorsEmptyValue() { - Config config = mock(Config.class); - when(config.hasPath("whitelist")).thenReturn(true); - when(config.getString("whitelist")).thenReturn(""); + Properties config = new Properties(); + config.put("whitelist", ""); Set actors = ProcessorUtil.getWhitelistedActors(config, "whitelist"); - assertEquals(actors, null); + assertNull(actors); } @Test diff --git a/wherehows-kafka/build.gradle b/wherehows-kafka/build.gradle index dffa164d94..ca0fc2549a 100644 --- a/wherehows-kafka/build.gradle +++ b/wherehows-kafka/build.gradle @@ -3,25 +3,20 @@ apply plugin: 'application' mainClassName = "wherehows.main.ApplicationStart" dependencies { - compile project(':wherehows-common') - compile project(':wherehows-dao') - compile externalDependency.jackson_databind - compile externalDependency.akka - compile externalDependency.mysql - compile externalDependency.guava - compile externalDependency.slf4j_api - compile externalDependency.logback + compile project(':wherehows-ingestion') + compile externalDependency.akka + compile externalDependency.slf4j_api + compile externalDependency.logback - compile externalDependency.kafka_clients - compile externalDependency.confluent_avro_serde + compile externalDependency.kafka_clients + compile externalDependency.confluent_avro_serde - compileOnly externalDependency.lombok + compileOnly externalDependency.lombok - testCompile externalDependency.testng - testCompile externalDependency.mockito + testCompile externalDependency.testng } test { - // enable TestNG support (default is JUnit) - useTestNG() + // enable TestNG support (default is JUnit) + useTestNG() } diff --git a/wherehows-kafka/src/main/java/wherehows/actors/KafkaClientMaster.java b/wherehows-kafka/src/main/java/wherehows/actors/KafkaClientMaster.java index ac2d6c6f46..720f6b256e 100644 --- a/wherehows-kafka/src/main/java/wherehows/actors/KafkaClientMaster.java +++ b/wherehows-kafka/src/main/java/wherehows/actors/KafkaClientMaster.java @@ -16,7 +16,6 @@ package wherehows.actors; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; -import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.lang.reflect.Constructor; import java.util.ArrayList; @@ -32,8 +31,9 @@ import org.apache.kafka.clients.producer.KafkaProducer; import wherehows.common.Constant; import wherehows.common.utils.JobsUtil; import wherehows.dao.DaoFactory; +import wherehows.ingestion.processors.KafkaMessageProcessor; import wherehows.msgs.KafkaCommMsg; -import wherehows.processors.KafkaMessageProcessor; +import wherehows.utils.ConfigUtil; import static wherehows.main.ApplicationStart.*; import static wherehows.utils.KafkaClientUtil.*; @@ -50,7 +50,7 @@ public class KafkaClientMaster extends UntypedActor { // List of kafka workers private static List _kafkaWorkers = new ArrayList<>(); - private static final Config CONFIG = ConfigFactory.load(); + private static final Properties CONFIG = ConfigUtil.configToProperties(ConfigFactory.load()); public KafkaClientMaster(String kafkaJobDir) { this.KAFKA_JOB_DIR = kafkaJobDir; @@ -133,7 +133,8 @@ public class KafkaClientMaster extends UntypedActor { // get processor instance Class processorClass = Class.forName(processor); - Constructor ctor = processorClass.getConstructor(Config.class, DaoFactory.class, String.class, KafkaProducer.class); + Constructor ctor = + processorClass.getConstructor(Properties.class, DaoFactory.class, String.class, KafkaProducer.class); KafkaMessageProcessor processorInstance = (KafkaMessageProcessor) ctor.newInstance(CONFIG, DAO_FACTORY, producerTopic, producer); diff --git a/wherehows-kafka/src/main/java/wherehows/actors/KafkaWorker.java b/wherehows-kafka/src/main/java/wherehows/actors/KafkaWorker.java index 9424bf4b8b..f346338aa6 100644 --- a/wherehows-kafka/src/main/java/wherehows/actors/KafkaWorker.java +++ b/wherehows-kafka/src/main/java/wherehows/actors/KafkaWorker.java @@ -24,7 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; -import wherehows.processors.KafkaMessageProcessor; +import wherehows.ingestion.processors.KafkaMessageProcessor; /** diff --git a/wherehows-kafka/src/main/java/wherehows/utils/ConfigUtil.java b/wherehows-kafka/src/main/java/wherehows/utils/ConfigUtil.java new file mode 100644 index 0000000000..1df37f7ce3 --- /dev/null +++ b/wherehows-kafka/src/main/java/wherehows/utils/ConfigUtil.java @@ -0,0 +1,34 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package wherehows.utils; + +import com.typesafe.config.Config; +import java.util.Properties; +import javax.annotation.Nonnull; + + +public class ConfigUtil { + + private ConfigUtil() { + } + + /** + * Convert typesafe {@link Config} to {@link Properties} + */ + public static Properties configToProperties(@Nonnull Config config) { + Properties properties = new Properties(); + config.entrySet().forEach(e -> properties.setProperty(e.getKey(), config.getString(e.getKey()))); + return properties; + } +} diff --git a/wherehows-kafka/src/test/java/wherehows/util/ConfigUtilTest.java b/wherehows-kafka/src/test/java/wherehows/util/ConfigUtilTest.java new file mode 100644 index 0000000000..afcb320afd --- /dev/null +++ b/wherehows-kafka/src/test/java/wherehows/util/ConfigUtilTest.java @@ -0,0 +1,37 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package wherehows.util; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.util.Properties; +import org.testng.annotations.Test; +import wherehows.utils.ConfigUtil; + +import static org.testng.Assert.*; + + +public class ConfigUtilTest { + + @Test + public void testConfigToProperties() { + Properties props = new Properties(); + props.put("foo", "foo1"); + props.put("bar", "bar2"); + + Config config = ConfigFactory.parseProperties(props); + + assertEquals(ConfigUtil.configToProperties(config), props); + } +}