diff --git a/metadata-etl/src/main/resources/jython/KafkaTransform.py b/metadata-etl/src/main/resources/jython/KafkaTransform.py index 715e5a2cf1..a2acd4b23b 100644 --- a/metadata-etl/src/main/resources/jython/KafkaTransform.py +++ b/metadata-etl/src/main/resources/jython/KafkaTransform.py @@ -62,12 +62,12 @@ class KafkaTransform: # databaseSpec : topicSchemas : schema schema_string = '' properties = {} - if 'databaseSpec' in content: - if 'com.linkedin.nuage.KafkaTopic' in content['databaseSpec']: - if 'topicSchemas' in content['databaseSpec']['com.linkedin.nuage.KafkaTopic']: - versions = len(content['databaseSpec']['com.linkedin.nuage.KafkaTopic']['topicSchemas']) - if versions > 0: - schema_string = content['databaseSpec']['com.linkedin.nuage.KafkaTopic']['topicSchemas'][versions - 1]['schema'] + if 'databaseSpec' in content and 'com.linkedin.nuage.KafkaTopic' in content['databaseSpec'] and 'topicSchemas' in \ + content['databaseSpec']['com.linkedin.nuage.KafkaTopic']: + topic_schemas = content['databaseSpec']['com.linkedin.nuage.KafkaTopic']['topicSchemas'] + versions = len(topic_schemas) + if versions > 0: + schema_string = topic_schemas[versions - 1]['schema'].decode('string_escape') for p_key in content.keys(): if p_key not in EXCLUDED_ATTRS_IN_PROP: @@ -85,7 +85,7 @@ class KafkaTransform: self.logger.debug("{} doesn't contain schema fields".format(urn)) self.conn_cursor.executemany(self.dataset_cmd, [self.db_id, dataset_type, urn, name, - json.dumps(schema_string) if len(schema_string) > 0 else None, + schema_string if len(schema_string) > 0 else None, schema_type, json.dumps(properties), json.dumps(fields), source, location_prefix, parent_name, self.wh_etl_exec_id])