mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 09:58:14 +00:00
Fix Hive column parser parent path bug
This commit is contained in:
parent
5ff4d35f51
commit
0b5c421311
@ -45,13 +45,14 @@ public class SchedulerActor extends UntypedActor {
|
||||
throws Exception {
|
||||
if (message.equals("checking")) {
|
||||
List<Map<String, Object>> dueJobs = EtlJobDao.getDueJobs();
|
||||
Logger.info("running " + dueJobs.size() + " jobs");
|
||||
Set<Integer> whiteList = Global.getWhiteList();
|
||||
Logger.info("total " + dueJobs.size() + " jobs due, white list : " + whiteList);
|
||||
for (Map<String, Object> dueJob : dueJobs) {
|
||||
Integer whEtlJobId = ((Long) dueJob.get("wh_etl_job_id")).intValue();
|
||||
if (whiteList != null && !whiteList.contains(whEtlJobId)) {
|
||||
continue; // if we config the white list and it's not in white list, skip this job
|
||||
}
|
||||
Logger.info("running job: job id :" + whEtlJobId);
|
||||
EtlJobName etlJobName = EtlJobName.valueOf((String) dueJob.get("wh_etl_job_name"));
|
||||
EtlType etlType = EtlType.valueOf((String) dueJob.get("wh_etl_type"));
|
||||
Integer refId = (Integer) dueJob.get("ref_id");
|
||||
|
||||
@ -63,6 +63,9 @@ class HiveColumnParser:
|
||||
if inner:
|
||||
self.prefix = column_name
|
||||
column.update(self._parse_complex(simple_type, inner, self.sort_id))
|
||||
|
||||
# reset prefix after each outermost field
|
||||
self.prefix = ''
|
||||
return column
|
||||
|
||||
def is_scalar_type(self, type_string):
|
||||
@ -72,6 +75,8 @@ class HiveColumnParser:
|
||||
def _parse_type(self, type_string):
|
||||
pattern = re.compile(r"^([a-z]+[(),0-9]*)(<(.+)>)?( comment '(.*)')?$", re.IGNORECASE)
|
||||
match = re.search(pattern, type_string)
|
||||
if match is None:
|
||||
return None, None, None
|
||||
return match.group(1), match.group(3), match.group(5)
|
||||
|
||||
def _parse_complex(self, simple_type, inner, parent_id):
|
||||
|
||||
@ -124,7 +124,8 @@ class HiveLoad:
|
||||
, default_value=nullif(@default_value,'null')
|
||||
, data_size=nullif(@data_size,'null')
|
||||
, namespace=nullif(@namespace,'null')
|
||||
, description=nullif(@description,'null');
|
||||
, description=nullif(@description,'null')
|
||||
, last_modified=NULL;
|
||||
|
||||
|
||||
|
||||
|
||||
@ -45,7 +45,7 @@ position:string,results:struct<numsearchresults:decimal(15,2),results:array<stru
|
||||
additionalinfo:map<string,string>>>> comment 'com.brother.innobella.printer',\
|
||||
additionalinfo:map<string,string>>>,searchtime:int comment '1~1024',\
|
||||
extratag:uniontype<int,double,struct<aaa:int,bbb:char>,array<varchar>> comment '*',\
|
||||
querytagger:string>", "ColumnName": "testcomplex"}]}
|
||||
querytagger:string>", "ColumnName": "testcomplex"},{"Comment":null,"TypeName":"string","ColumnName":"extracolumn"}]}
|
||||
'''
|
||||
|
||||
expect_result_complex = [['hdfs:///test/urn', 1, 0, '', u'testcomplex', u'struct', None, None, None, None, None],
|
||||
@ -72,7 +72,8 @@ expect_result_complex = [['hdfs:///test/urn', 1, 0, '', u'testcomplex', u'struct
|
||||
['hdfs:///test/urn', 22, 21, u'testcomplex.extratag.type2', u'aaa', u'int', None, None, None, None, None],
|
||||
['hdfs:///test/urn', 23, 21, u'testcomplex.extratag.type2', u'bbb', u'char', None, None, None, None, None],
|
||||
['hdfs:///test/urn', 24, 18, u'testcomplex.extratag', 'type3', u'array', None, None, None, None, None],
|
||||
['hdfs:///test/urn', 25, 1, u'testcomplex', u'querytagger', u'string', None, None, None, None, None]]
|
||||
['hdfs:///test/urn', 25, 1, u'testcomplex', u'querytagger', u'string', None, None, None, None, None],
|
||||
['hdfs:///test/urn', 26, 0, u'', u'extracolumn', u'string', None, None, None, None, None]]
|
||||
class HiveColumnParserTest(unittest.TestCase):
|
||||
|
||||
def test_parse_simple(self):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user