Kafka ETL to fetch queuing pipeline, also add topic blacklist (#509)

This commit is contained in:
Yi (Alan) Wang 2017-05-15 18:26:25 -07:00 committed by Mars Lan
parent 225d1fc6ec
commit 675dadd374
3 changed files with 36 additions and 19 deletions

View File

@ -251,6 +251,7 @@ public class Constant {
public static final String ESPRESSO_OUTPUT_KEY = "espresso.metadata";
public static final String VOLDEMORT_OUTPUT_KEY = "voldemort.metadata";
public static final String KAFKA_OUTPUT_KEY = "kafka.metadata";
public static final String KAFKA_BLACKLIST_KEY = "kafka.topic_blacklist";
// metadata-store restli server
public static final String WH_RESTLI_SERVER_URL = "wherehows.restli.server.url";

View File

@ -34,6 +34,8 @@ class KafkaExtract:
temp_dir = FileUtil.etl_temp_dir(args, "KAFKA")
self.output_file = open(os.path.join(temp_dir, args[Constant.KAFKA_OUTPUT_KEY]), 'w')
self.blacklist = [x.strip() for x in args[Constant.KAFKA_BLACKLIST_KEY].split(',')]
self.d2_proxys = []
proxy_urls = [x.strip() for x in args[Constant.D2_PROXY_URL].split(',')]
for url in proxy_urls:
@ -52,33 +54,40 @@ class KafkaExtract:
'''
get KAFKA metadata from nuage
'''
headers = {'Accept': 'application/json'}
payload = {'q': 'type', 'type': 'KAFKA', 'subType': 'KAFKA_TRACKING', 'fields': 'subType,fabric,name'}
resp = requests.get(self.d2_proxy_url + '/nuageDatabases', params=payload, headers=headers, verify=False)
if resp.status_code != 200:
self.logger.error(resp.text)
all_tables = resp.json()
# merge the same name to one
merged_all_tables = {} # {name : {'fabrics':[], subType:''}}
for one_table in all_tables['elements']:
name = one_table['name']
fabric = one_table['fabric']
subType = one_table['subType'] if 'subType' in one_table else None
if name in merged_all_tables:
merged_all_tables[name]['fabrics'].append(fabric)
else:
merged_all_tables[name] = {'fabrics': [fabric], 'subType': subType}
self.logger.info("Found {} topics for KAFKA TRACKING".format(len(merged_all_tables)))
# query for topics list
headers = {'Accept': 'application/json'}
pipelines = ['KAFKA_TRACKING', 'KAFKA_QUEUING']
for pipeline in pipelines:
payload = {'q': 'type', 'type': 'KAFKA', 'subType': pipeline, 'fields': 'subType,fabric,name'}
resp = requests.get(self.d2_proxy_url + '/nuageDatabases', params=payload, headers=headers, verify=False)
if resp.status_code != 200:
self.logger.error(resp.text)
all_tables = resp.json()
# merge the same name to one
topics_count = 0
for one_table in all_tables['elements']:
name = one_table['name']
fabric = one_table['fabric']
subType = one_table['subType'] if 'subType' in one_table else None
if name in merged_all_tables:
merged_all_tables[name]['fabrics'].append(fabric)
else:
merged_all_tables[name] = {'fabrics': [fabric], 'subType': subType}
topics_count += 1
self.logger.info("Found {} topics for {}".format(topics_count, pipeline))
# query for topic schema
skip_pattern_startswith = re.compile("^(_|test|tmp).*$")
skip_pattern_endswith = re.compile("^.*(test|testing|tmp)\d*$")
table_count = 0
for name, value in merged_all_tables.items():
if skip_pattern_startswith.match(name) or skip_pattern_endswith.match(name):
if name in self.blacklist or skip_pattern_startswith.match(name) or skip_pattern_endswith.match(name):
continue
if 'PROD' in value['fabrics']:
fabric = 'PROD'

View File

@ -164,6 +164,13 @@ base.url.key=
dali.git.urn=
git.committer.blacklist=
# Nuage
d2.proxy.url=
espresso.metadata=
voldemort.metadata=
kafka.metadata=
kafka.topic_blacklist=
# wherehows metadata store restli server
wherehows.restli.server.url=