mirror of
https://github.com/datahub-project/datahub.git
synced 2026-01-07 07:17:01 +00:00
parent
938658a1cc
commit
5db7f71c8b
@ -19,6 +19,7 @@ import datetime
|
||||
from wherehows.common.schemas import SampleDataRecord
|
||||
from wherehows.common import Constant
|
||||
from org.slf4j import LoggerFactory
|
||||
from wherehows.common.writers import FileWriter
|
||||
|
||||
|
||||
class OracleExtract:
|
||||
@ -190,7 +191,7 @@ class OracleExtract:
|
||||
self.logger.info("")
|
||||
|
||||
|
||||
def format_table_metadata(self, rows):
|
||||
def format_table_metadata(self, rows,schema):
|
||||
'''
|
||||
add table info from rows into schema
|
||||
:param rows: input. each row is a database with all it's tables
|
||||
@ -201,10 +202,38 @@ class OracleExtract:
|
||||
table_record = {}
|
||||
table_idx = 0
|
||||
field_idx = 0
|
||||
db_dict = {}
|
||||
|
||||
db_output_dict = {}
|
||||
db_idx = len(schema) - 1
|
||||
db_output_idx = -1
|
||||
|
||||
for row in rows:
|
||||
table_name_key = "%s.%s" % (row[0], row[1])
|
||||
table_urn = "oracle:///%s/%s" % (row[0], row[1])
|
||||
if row[0] not in db_dict:
|
||||
schema.append({'database': row[0], 'type': 'Oracle', 'tables': []})
|
||||
db_idx += 1
|
||||
db_dict[row[0]] = db_idx
|
||||
full_name = ''
|
||||
if row[0]:
|
||||
full_name = row[0]
|
||||
if row[1]:
|
||||
full_name += '.' + row[1]
|
||||
elif row[1]:
|
||||
full_name = row[1]
|
||||
original_name = row[0] + '.' + row[1]
|
||||
if full_name not in db_output_dict:
|
||||
schema[db_idx]['tables'].append(
|
||||
{'name': row[1], 'type': 'Oracle', 'columns': [],'original_name': original_name})
|
||||
db_output_idx += 1
|
||||
db_output_dict[full_name] = db_output_idx
|
||||
schema[db_idx]['tables'][db_output_idx]['columns'].append(
|
||||
{'name': row[4], 'nullable': row[6], 'dataType': row[5] ,
|
||||
'maxByteLength': row[7] , 'precision': row[8] ,
|
||||
'scale': row[9] ,'default': row[12]})
|
||||
column_idx = len(schema[db_idx]['tables'][db_output_idx]['columns']) - 1
|
||||
datetime.datetime.now(), db_output_idx + 1, len(rows), row[0]))
|
||||
|
||||
if 'urn' not in table_record or table_urn != table_record['urn']:
|
||||
# This is a new table. Let's push the previous table record into output_list
|
||||
@ -265,6 +294,7 @@ class OracleExtract:
|
||||
fullname = ''
|
||||
columns = []
|
||||
rows_data = []
|
||||
fullname = database_name + '."' + table_name + '"'
|
||||
|
||||
sql = 'SELECT * FROM %s WHERE ROWNUM<=10' % fullname
|
||||
curs_td = self.conn_db.cursor()
|
||||
@ -332,12 +362,13 @@ class OracleExtract:
|
||||
:param collect_sample:
|
||||
:return:
|
||||
"""
|
||||
schema = []
|
||||
if database_name is None and table_name is None: # default route: process everything
|
||||
begin = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
# table info
|
||||
rows = self.get_table_info(None, None)
|
||||
self.get_extra_table_info()
|
||||
self.format_table_metadata(rows)
|
||||
self.format_table_metadata(rows,schema)
|
||||
end = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
self.logger.info("Collecting table info [%s -> %s]" % (str(begin), str(end)))
|
||||
|
||||
@ -348,12 +379,13 @@ class OracleExtract:
|
||||
csv_columns = ['dataset_urn', 'sort_id', 'name', 'data_type', 'nullable',
|
||||
'size', 'precision', 'scale', 'default_value', 'doc']
|
||||
self.write_csv(field_output_file, csv_columns, self.field_output_list)
|
||||
|
||||
scaned_dict = {}
|
||||
if sample:
|
||||
csvfile = open(sample_output_file, 'wb')
|
||||
open(sample_output_file, 'wb')
|
||||
os.chmod(sample_output_file, 0666)
|
||||
writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n',
|
||||
quoting=csv.QUOTE_NONE, quotechar='\1', escapechar='\0')
|
||||
sample_file_writer = FileWriter(sample_output_file)
|
||||
##writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n',
|
||||
## quoting=csv.QUOTE_NONE, quotechar='\1', escapechar='\0')
|
||||
self.logger.info("Writing to CSV file {}".format(sample_output_file))
|
||||
|
||||
# collect sample data
|
||||
@ -373,8 +405,10 @@ class OracleExtract:
|
||||
(ref_urn, sample_data) = self.get_sample_data(database_name, table_name)
|
||||
sample_record = SampleDataRecord('oracle', '/' + database_name + '/' + table_name, '', sample_data)
|
||||
scaned_dict[table_name] = {'ref_urn': ref_urn, 'data': sample_data}
|
||||
writer.writerow(sample_record)
|
||||
csvfile.close()
|
||||
sample_file_writer.append(sample_record)
|
||||
sample_file_writer.close()
|
||||
#writer.writerow(sample_record)
|
||||
#csvfile.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@ -314,7 +314,7 @@ class OracleLoad:
|
||||
begin = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
self.load_tables()
|
||||
self.load_fields()
|
||||
# self.load_sample()
|
||||
self.load_sample()
|
||||
end = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
self.logger.info("Load Oracle metadata [%s -> %s]" % (str(begin), str(end)))
|
||||
finally:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user