mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-01 19:25:56 +00:00
Get kafka job id from applicatoin.conf and then get ref_id and configs from DB
This commit is contained in:
parent
dbbdb6e2fb
commit
3d3b2a8075
@ -28,16 +28,21 @@ import kafka.consumer.Consumer;
|
||||
import kafka.consumer.ConsumerConfig;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import kafka.javaapi.consumer.ConsumerConnector;
|
||||
import metadata.etl.models.EtlJobName;
|
||||
import models.daos.ClusterDao;
|
||||
import models.daos.EtlJobDao;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
|
||||
import msgs.KafkaResponseMsg;
|
||||
import play.Logger;
|
||||
import play.Play;
|
||||
import utils.JdbcUtil;
|
||||
import utils.KafkaConfig;
|
||||
import utils.KafkaConfig.Topic;
|
||||
import wherehows.common.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
||||
import wherehows.common.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import wherehows.common.schemas.AbstractRecord;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
import wherehows.common.writers.DatabaseWriter;
|
||||
|
||||
|
||||
@ -48,6 +53,7 @@ import wherehows.common.writers.DatabaseWriter;
|
||||
*/
|
||||
public class KafkaConsumerMaster extends UntypedActor {
|
||||
|
||||
private static List<Integer> _kafkaJobList;
|
||||
private static ConsumerConnector _consumer;
|
||||
private static Properties _kafkaConfig;
|
||||
private static Map<String, Topic> _kafkaTopics;
|
||||
@ -64,13 +70,32 @@ public class KafkaConsumerMaster extends UntypedActor {
|
||||
|
||||
@Override
|
||||
public void preStart() throws Exception {
|
||||
Logger.info("Start the KafkaConsumerMaster actor...");
|
||||
_kafkaJobList = Play.application().configuration().getIntList("kafka.consumer.etl.jobid", null);;
|
||||
if (_kafkaJobList == null || _kafkaJobList.size() == 0) {
|
||||
Logger.error("Kafka job id error, kafkaJobList: " + _kafkaJobList);
|
||||
getContext().stop(getSelf());
|
||||
}
|
||||
Logger.info("Start the KafkaConsumerMaster... Kafka etl job id list: " + _kafkaJobList);
|
||||
|
||||
// handle 1 kafka connection
|
||||
Map<String, Object> kafkaEtlJob = EtlJobDao.getEtlJobById(_kafkaJobList.get(0));
|
||||
final int kafkaJobRefId = Integer.parseInt(kafkaEtlJob.get("ref_id").toString());
|
||||
final String kafkaJobName = kafkaEtlJob.get("wh_etl_job_name").toString();
|
||||
|
||||
if (!kafkaJobName.equals(EtlJobName.KAFKA_CONSUMER_ETL.name())) {
|
||||
Logger.error("Kafka job info error: job name '" + kafkaJobName + "' not equal "
|
||||
+ EtlJobName.KAFKA_CONSUMER_ETL.name());
|
||||
getContext().stop(getSelf());
|
||||
}
|
||||
|
||||
// get Kafka configurations from database
|
||||
KafkaConfig.updateKafkaProperties();
|
||||
KafkaConfig.updateKafkaProperties(kafkaJobRefId);
|
||||
_kafkaConfig = KafkaConfig.getProperties();
|
||||
_kafkaTopics = KafkaConfig.getTopics();
|
||||
|
||||
// get list of cluster information from database and update ClusterUtil
|
||||
ClusterUtil.updateClusterInfo(ClusterDao.getClusterInfo());
|
||||
|
||||
for (String topic : _kafkaTopics.keySet()) {
|
||||
// get the processor class and method
|
||||
final Class processorClass = Class.forName(_kafkaTopics.get(topic).processor);
|
||||
@ -125,7 +150,7 @@ public class KafkaConsumerMaster extends UntypedActor {
|
||||
final AbstractRecord record = kafkaMsg.getRecord();
|
||||
|
||||
if (record != null && _kafkaTopics.containsKey(topic)) {
|
||||
Logger.info("Writing to DB kafka event record: " + topic);
|
||||
Logger.debug("Writing to DB kafka event record: " + topic);
|
||||
final DatabaseWriter dbWriter = _topicDbWriter.get(topic);
|
||||
|
||||
try {
|
||||
@ -145,6 +170,7 @@ public class KafkaConsumerMaster extends UntypedActor {
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
Logger.info("Terminating KafkaConsumerMaster...");
|
||||
if (_consumer != null) {
|
||||
_consumer.shutdown();
|
||||
_kafkaConfig.clear();
|
||||
|
||||
51
backend-service/app/models/daos/ClusterDao.java
Normal file
51
backend-service/app/models/daos/ClusterDao.java
Normal file
@ -0,0 +1,51 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package models.daos;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import utils.JdbcUtil;
|
||||
import wherehows.common.schemas.ClusterInfo;
|
||||
|
||||
|
||||
public class ClusterDao {
|
||||
|
||||
private static final String GET_CLUSTER_INFO = "SELECT * FROM cfg_cluster";
|
||||
|
||||
public static List<ClusterInfo> getClusterInfo() throws Exception {
|
||||
|
||||
List<Map<String, Object>> rows = JdbcUtil.wherehowsJdbcTemplate.queryForList(GET_CLUSTER_INFO);
|
||||
|
||||
List<ClusterInfo> clusters = new ArrayList<>();
|
||||
for (Map<String, Object> row : rows) {
|
||||
String clusterCode = row.get("cluster_code").toString();
|
||||
// skip the default placeholder row
|
||||
if (clusterCode.equals("[all]")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int clusterId = Integer.parseInt(row.get("cluster_id").toString());
|
||||
String clusterShortName = row.get("cluster_short_name").toString();
|
||||
String datacenterCode = row.get("data_center_code").toString();
|
||||
String clusterType = row.get("cluster_type").toString();
|
||||
String deploymentTierCode = row.get("deployment_tier_code").toString();
|
||||
|
||||
clusters.add(
|
||||
new ClusterInfo(clusterId, clusterCode, clusterShortName, datacenterCode, clusterType, deploymentTierCode));
|
||||
}
|
||||
|
||||
return clusters;
|
||||
}
|
||||
}
|
||||
@ -18,6 +18,7 @@ import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import metadata.etl.models.EtlJobName;
|
||||
import models.daos.EtlJobPropertyDao;
|
||||
import play.Logger;
|
||||
|
||||
|
||||
/**
|
||||
@ -29,8 +30,7 @@ public class KafkaConfig {
|
||||
// Map of <topic_name, topic_content>
|
||||
private static Map<String, Topic> _topics = new HashMap<>();
|
||||
|
||||
public static final EtlJobName KAFKA_GOBBLIN_JOBNAME = EtlJobName.KAFKA_CONSUMER_GOBBLIN_ETL;
|
||||
public static final Integer KAFKA_GOBBLIN_JOB_REFID = 50;
|
||||
public static final EtlJobName KAFKA_JOBNAME = EtlJobName.KAFKA_CONSUMER_ETL;
|
||||
|
||||
/**
|
||||
* Class for storing Kafka Topic info
|
||||
@ -53,15 +53,19 @@ public class KafkaConfig {
|
||||
* Update Kafka properties and topics from etl_job_properies table
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void updateKafkaProperties() throws Exception {
|
||||
Properties props = EtlJobPropertyDao.getJobProperties(KAFKA_GOBBLIN_JOBNAME, KAFKA_GOBBLIN_JOB_REFID);
|
||||
public static void updateKafkaProperties(int kafkaJobRefId) throws Exception {
|
||||
Properties props = EtlJobPropertyDao.getJobProperties(KAFKA_JOBNAME, kafkaJobRefId);
|
||||
if (props == null || props.size() < 5) {
|
||||
Logger.error("Fail to update Kafka job properties for " + KAFKA_JOBNAME.name()
|
||||
+ ", job ref id: " + kafkaJobRefId);
|
||||
return;
|
||||
} else {
|
||||
Logger.info("Get Kafka job properties for " + KAFKA_JOBNAME.name() + ", job ref id: " + kafkaJobRefId);
|
||||
}
|
||||
|
||||
String[] topics = ((String) props.get("kafka.topics")).split("\\s*,\\s*");
|
||||
props.remove("kafka.topics");
|
||||
String[] processors = ((String) props.get("kafka.processors")).split("\\s*,\\s*");
|
||||
props.remove("kafka.processors");
|
||||
String[] dbTables = ((String) props.get("kafka.db.tables")).split("\\s*,\\s*");
|
||||
props.remove("kafka.db.tables");
|
||||
String[] topics = ((String) props.remove("kafka.topics")).split("\\s*,\\s*");
|
||||
String[] processors = ((String) props.remove("kafka.processors")).split("\\s*,\\s*");
|
||||
String[] dbTables = ((String) props.remove("kafka.db.tables")).split("\\s*,\\s*");
|
||||
|
||||
_props.clear();
|
||||
_props.putAll(props);
|
||||
|
||||
@ -66,4 +66,6 @@ logger.application=DEBUG
|
||||
# if have this varialbe, only the id in this list will be scheduled
|
||||
# scheduler.jobid.whitelist=[1,2,3,4,5,6,7,8,9]
|
||||
scheduler.check.interval=10
|
||||
# start the following list of kafka consumer etl jobs
|
||||
# kafka.consumer.etl.jobid=[44]
|
||||
application.global=shared.Global
|
||||
@ -13,12 +13,15 @@
|
||||
*/
|
||||
package metadata.etl.kafka;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import wherehows.common.schemas.ClusterInfo;
|
||||
import wherehows.common.schemas.GobblinTrackingCompactionRecord;
|
||||
import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
|
||||
|
||||
/**
|
||||
@ -31,7 +34,6 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
|
||||
private final String UrnRegex = "^(\\/\\w+\\/\\w+\\/[\\w-]+)\\/([\\w-]+)\\/(\\d+.*)$";
|
||||
private final Pattern UrnPattern = Pattern.compile(UrnRegex);
|
||||
|
||||
|
||||
/**
|
||||
* Process a Gobblin tracking event compaction record
|
||||
* @param record
|
||||
@ -53,7 +55,8 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
|
||||
final Map<String, String> metadata = (Map<String, String>) record.get("metadata");
|
||||
|
||||
final String jobContext = "Gobblin:" + name;
|
||||
final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster");
|
||||
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
|
||||
// final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster");
|
||||
final String projectName = metadata.get("azkabanProjectName");
|
||||
final String flowId = metadata.get("azkabanFlowId");
|
||||
final String jobId = metadata.get("azkabanJobId");
|
||||
|
||||
@ -13,12 +13,15 @@
|
||||
*/
|
||||
package metadata.etl.kafka;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import wherehows.common.schemas.ClusterInfo;
|
||||
import wherehows.common.schemas.GobblinTrackingDistcpNgRecord;
|
||||
import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
|
||||
|
||||
/**
|
||||
@ -51,7 +54,7 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor {
|
||||
final Map<String, String> metadata = (Map<String, String>) record.get("metadata");
|
||||
|
||||
final String jobContext = "DistcpNG:" + name;
|
||||
final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster");
|
||||
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
|
||||
final String projectName = metadata.get("azkabanProjectName");
|
||||
final String flowId = metadata.get("azkabanFlowId");
|
||||
final String jobId = metadata.get("azkabanJobId");
|
||||
|
||||
@ -19,6 +19,7 @@ import java.util.regex.Pattern;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import wherehows.common.schemas.GobblinTrackingLumosRecord;
|
||||
import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
|
||||
|
||||
public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
|
||||
@ -59,7 +60,7 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
|
||||
logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp);
|
||||
|
||||
final String jobContext = "Lumos:" + name;
|
||||
final String cluster = parseClusterIdentifier(metadata.get("clusterIdentifier")).get("cluster");
|
||||
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));
|
||||
final String projectName = metadata.get("azkabanProjectName");
|
||||
final String flowId = metadata.get("azkabanFlowId");
|
||||
final String jobId = metadata.get("azkabanJobId");
|
||||
|
||||
@ -13,10 +13,6 @@
|
||||
*/
|
||||
package metadata.etl.kafka;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -31,11 +27,6 @@ public abstract class KafkaConsumerProcessor {
|
||||
|
||||
protected static final Logger logger = LoggerFactory.getLogger(KafkaConsumerProcessor.class);
|
||||
|
||||
// for 'ltx1-holdemrm01.grid.linkedin.com:8032', extract 'ltx1', 'holdem'
|
||||
private final String ClusterIdentifierRegex = "^(\\w{4})-(\\w+)\\w{2}\\d{2}\\.grid.*$";
|
||||
private final Pattern ClusterIdentifierPattern = Pattern.compile(ClusterIdentifierRegex);
|
||||
|
||||
|
||||
/**
|
||||
* Abstract method 'process' to be implemented by specific processor
|
||||
* input Kafka record, process information and write to DB.
|
||||
@ -59,24 +50,4 @@ public abstract class KafkaConsumerProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse cluster identifier to get datacenter and cluster
|
||||
* if length > 10 and in the form of 'ltx1-holdemrm01.grid.linkedin.com:8032', extract 'ltx1', 'holdem'
|
||||
* otherwise, put the original String as cluster
|
||||
* @param text String
|
||||
* @return Map<String, String>
|
||||
*/
|
||||
protected Map<String, String> parseClusterIdentifier(String text) {
|
||||
final Map<String, String> map = new HashMap<>(4);
|
||||
if (text.length() > 10) {
|
||||
final Matcher m = ClusterIdentifierPattern.matcher(text);
|
||||
if (m.find()) {
|
||||
map.put("datacenter", m.group(1));
|
||||
map.put("cluster", m.group(2));
|
||||
}
|
||||
} else {
|
||||
map.put("cluster", text);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@ package metadata.etl.kafka;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import wherehows.common.schemas.MetastoreAuditRecord;
|
||||
import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.ClusterUtil;
|
||||
|
||||
|
||||
public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
|
||||
@ -35,28 +36,28 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
|
||||
logger.info("Processing Metastore Audit event record.");
|
||||
|
||||
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
|
||||
final String server = utf8ToString(auditHeader.get("server"));
|
||||
final String server = ClusterUtil.matchClusterCode(utf8ToString(auditHeader.get("server")));
|
||||
final String instance = utf8ToString(auditHeader.get("instance"));
|
||||
final String appName = utf8ToString(auditHeader.get("appName"));
|
||||
|
||||
String eventName;
|
||||
GenericData.Record content;
|
||||
final Object oldOne;
|
||||
final Object newOne;
|
||||
final Object oldInfo;
|
||||
final Object newInfo;
|
||||
|
||||
// check if it is MetastoreTableAuditEvent
|
||||
if (record.get("metastoreTableAuditContent") != null) {
|
||||
eventName = "MetastoreTableAuditEvent";
|
||||
content = (GenericData.Record) record.get("metastoreTableAuditContent");
|
||||
oldOne = content.get("oldTable");
|
||||
newOne = content.get("newTable");
|
||||
oldInfo = content.get("oldTable");
|
||||
newInfo = content.get("newTable");
|
||||
}
|
||||
// check if it is MetastorePartitionAuditEvent
|
||||
else if (record.get("metastorePartitionAuditContent") != null) {
|
||||
eventName = "MetastorePartitionAuditEvent";
|
||||
content = (GenericData.Record) record.get("metastorePartitionAuditContent");
|
||||
oldOne = content.get("oldPartition");
|
||||
newOne = content.get("newPartition");
|
||||
oldInfo = content.get("oldPartition");
|
||||
newInfo = content.get("newPartition");
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Unknown Metastore Audit event: " + record);
|
||||
@ -70,7 +71,7 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
|
||||
final String isDataDeleted = utf8ToString(content.get("isDataDeleted"));
|
||||
|
||||
// use newOne, if null, use oldOne
|
||||
final GenericData.Record rec = newOne != null ? (GenericData.Record) newOne : (GenericData.Record) oldOne;
|
||||
final GenericData.Record rec = newInfo != null ? (GenericData.Record) newInfo : (GenericData.Record) oldInfo;
|
||||
final String dbName = utf8ToString(rec.get("dbName"));
|
||||
final String tableName = utf8ToString(rec.get("tableName"));
|
||||
final String partition = utf8ToString(rec.get("values"));
|
||||
@ -84,8 +85,8 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
|
||||
// set null partition to '?' for primary key
|
||||
eventRecord.setTableInfo(dbName, tableName, (partition != null ? partition : "?"),
|
||||
location, owner, createTime, lastAccessTime);
|
||||
eventRecord.setOldOne(utf8ToString(oldOne));
|
||||
eventRecord.setNewOne(utf8ToString(newOne));
|
||||
eventRecord.setOldInfo(utf8ToString(oldInfo));
|
||||
eventRecord.setNewInfo(utf8ToString(newInfo));
|
||||
}
|
||||
return eventRecord;
|
||||
}
|
||||
|
||||
@ -29,7 +29,7 @@ public enum EtlJobName {
|
||||
ELASTICSEARCH_EXECUTION_INDEX_ETL(EtlType.OPERATION, RefIdType.APP),
|
||||
TREEBUILDER_EXECUTION_DATASET_ETL(EtlType.OPERATION, RefIdType.APP),
|
||||
ORACLE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.APP),
|
||||
KAFKA_CONSUMER_GOBBLIN_ETL(EtlType.DATASET, RefIdType.APP),
|
||||
KAFKA_CONSUMER_ETL(EtlType.OPERATION, RefIdType.DB),
|
||||
;
|
||||
|
||||
EtlType etlType;
|
||||
|
||||
@ -0,0 +1,57 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package wherehows.common.schemas;
|
||||
|
||||
public class ClusterInfo {
|
||||
int clusterId;
|
||||
String clusterCode;
|
||||
String clusterShortName;
|
||||
String datacenterCode;
|
||||
String clusterType;
|
||||
String deploymentTierCode;
|
||||
|
||||
public ClusterInfo(int clusterId, String clusterCode, String clusterShortName, String datacenterCode,
|
||||
String clusterType, String deploymentTierCode) {
|
||||
this.clusterId = clusterId;
|
||||
this.clusterCode = clusterCode;
|
||||
this.clusterShortName = clusterShortName;
|
||||
this.datacenterCode = datacenterCode;
|
||||
this.clusterType = clusterType;
|
||||
this.deploymentTierCode = deploymentTierCode;
|
||||
}
|
||||
|
||||
public int getClusterId() {
|
||||
return clusterId;
|
||||
}
|
||||
|
||||
public String getClusterCode() {
|
||||
return clusterCode;
|
||||
}
|
||||
|
||||
public String getClusterShortName() {
|
||||
return clusterShortName;
|
||||
}
|
||||
|
||||
public String getDatacenterCode() {
|
||||
return datacenterCode;
|
||||
}
|
||||
|
||||
public String getClusterType() {
|
||||
return clusterType;
|
||||
}
|
||||
|
||||
public String getDeploymentTierCode() {
|
||||
return deploymentTierCode;
|
||||
}
|
||||
}
|
||||
@ -227,19 +227,19 @@ public class MetastoreAuditRecord extends AbstractRecord {
|
||||
this.lastAccessTime = lastAccessTime;
|
||||
}
|
||||
|
||||
public String getOldOne() {
|
||||
public String getOldInfo() {
|
||||
return oldInfo;
|
||||
}
|
||||
|
||||
public void setOldOne(String oldOne) {
|
||||
this.oldInfo = oldOne;
|
||||
public void setOldInfo(String oldInfo) {
|
||||
this.oldInfo = oldInfo;
|
||||
}
|
||||
|
||||
public String getNewOne() {
|
||||
public String getNewInfo() {
|
||||
return newInfo;
|
||||
}
|
||||
|
||||
public void setNewOne(String newOne) {
|
||||
this.newInfo = newOne;
|
||||
public void setNewInfo(String newInfo) {
|
||||
this.newInfo = newInfo;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,63 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package wherehows.common.utils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import wherehows.common.schemas.ClusterInfo;
|
||||
|
||||
|
||||
public class ClusterUtil {
|
||||
|
||||
private static final List<ClusterInfo> CLUSTER_LIST = new ArrayList<>();
|
||||
|
||||
public static void updateClusterInfo(List<ClusterInfo> clusterList) {
|
||||
if (clusterList != null && clusterList.size() > 0) {
|
||||
CLUSTER_LIST.clear();
|
||||
CLUSTER_LIST.addAll(clusterList);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* match the clusterName to the list of cluster info, return the matching clusterCode
|
||||
* @param clusterName String
|
||||
* @return clusterCode
|
||||
*/
|
||||
public static String matchClusterCode(String clusterName) {
|
||||
if (clusterName == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// first use cluster code to find match
|
||||
for (ClusterInfo cluster : CLUSTER_LIST) {
|
||||
if (clusterName.contains(cluster.getClusterCode())) {
|
||||
return cluster.getClusterCode();
|
||||
}
|
||||
}
|
||||
|
||||
// second round use cluster short name to find match
|
||||
for (ClusterInfo cluster : CLUSTER_LIST) {
|
||||
if (clusterName.contains(cluster.getClusterShortName())) {
|
||||
return cluster.getClusterCode();
|
||||
}
|
||||
}
|
||||
|
||||
// no match found, return original string
|
||||
return clusterName;
|
||||
}
|
||||
|
||||
public static List<ClusterInfo> getClusterList() {
|
||||
return CLUSTER_LIST;
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user