modify Kafka ETL job to unescape schema string (#392)

This commit is contained in:
Yi (Alan) Wang 2017-03-31 17:22:42 -07:00 committed by Mars Lan
parent 3697b06c23
commit afaa30487e

View File

@ -62,12 +62,12 @@ class KafkaTransform:
# databaseSpec : topicSchemas : schema
schema_string = ''
properties = {}
if 'databaseSpec' in content:
if 'com.linkedin.nuage.KafkaTopic' in content['databaseSpec']:
if 'topicSchemas' in content['databaseSpec']['com.linkedin.nuage.KafkaTopic']:
versions = len(content['databaseSpec']['com.linkedin.nuage.KafkaTopic']['topicSchemas'])
if versions > 0:
schema_string = content['databaseSpec']['com.linkedin.nuage.KafkaTopic']['topicSchemas'][versions - 1]['schema']
if 'databaseSpec' in content and 'com.linkedin.nuage.KafkaTopic' in content['databaseSpec'] and 'topicSchemas' in \
content['databaseSpec']['com.linkedin.nuage.KafkaTopic']:
topic_schemas = content['databaseSpec']['com.linkedin.nuage.KafkaTopic']['topicSchemas']
versions = len(topic_schemas)
if versions > 0:
schema_string = topic_schemas[versions - 1]['schema'].decode('string_escape')
for p_key in content.keys():
if p_key not in EXCLUDED_ATTRS_IN_PROP:
@ -85,7 +85,7 @@ class KafkaTransform:
self.logger.debug("{} doesn't contain schema fields".format(urn))
self.conn_cursor.executemany(self.dataset_cmd, [self.db_id, dataset_type, urn, name,
json.dumps(schema_string) if len(schema_string) > 0 else None,
schema_string if len(schema_string) > 0 else None,
schema_type, json.dumps(properties), json.dumps(fields), source,
location_prefix, parent_name, self.wh_etl_exec_id])