From 35c0781f59a8a801ffdca74e33f5970826e43208 Mon Sep 17 00:00:00 2001 From: Mars Lan Date: Tue, 25 Apr 2017 15:11:02 -0700 Subject: [PATCH] Use WH_APP_FOLDER property as the base directory for temp files for various ETL jobs (#451) * Use WH_APP_FOLDER property as the base directory for temp files in Oracle ETL, instead of the full path defined in wh_etl_job_property, which is often /var/tmp/{something}. * Move common code to FileUtil.py and move Voldamort's temp files too. * Move Kafaka ETL temp files. * Move Espresso ETL temp files. * Move Multiproduct ETL temp files. * Move CodeSearch ETL temp files. * Move teradata ETL temp files. --- .../resources/jython/CodeSearchExtract.py | 16 +++++++++--- .../main/resources/jython/CodeSearchLoad.py | 10 +++++-- .../main/resources/jython/EspressoExtract.py | 14 +++++++--- .../resources/jython/EspressoTransform.py | 11 ++++++-- .../src/main/resources/jython/FileUtil.py | 26 +++++++++++++++++++ .../src/main/resources/jython/KafkaExtract.py | 15 ++++++++--- .../main/resources/jython/KafkaTransform.py | 10 +++++-- .../resources/jython/MultiproductExtract.py | 19 +++++++++----- .../main/resources/jython/MultiproductLoad.py | 15 ++++++++--- .../main/resources/jython/OracleExtract.py | 25 +++++++++++++----- .../src/main/resources/jython/OracleLoad.py | 17 ++++++++---- .../main/resources/jython/TeradataExtract.py | 19 +++++++++++--- .../src/main/resources/jython/TeradataLoad.py | 18 ++++++++----- .../resources/jython/TeradataTransform.py | 15 ++++++++--- .../main/resources/jython/VoldemortExtract.py | 14 +++++++--- .../resources/jython/VoldemortTransform.py | 11 ++++++-- .../espresso/EspressoMetadataEtlTest.java | 9 +++---- .../dataset/kafka/KafkaMetadataEtlTest.java | 9 +++---- .../dataset/oracle/OracleMetadataEtlTest.java | 6 ++--- .../teradata/TeradataMetadataEtlTest.java | 3 --- .../voldemort/VoldemortMetadataEtlTest.java | 9 +++---- .../etl/git/CodeSearchMetadataEtlTest.java | 6 ++--- .../etl/git/MultiproductMetadataEtlTest.java | 6 ++--- 23 files changed, 211 insertions(+), 92 deletions(-) create mode 100644 metadata-etl/src/main/resources/jython/FileUtil.py diff --git a/metadata-etl/src/main/resources/jython/CodeSearchExtract.py b/metadata-etl/src/main/resources/jython/CodeSearchExtract.py index c1bde8d32b..b080b63e2c 100644 --- a/metadata-etl/src/main/resources/jython/CodeSearchExtract.py +++ b/metadata-etl/src/main/resources/jython/CodeSearchExtract.py @@ -12,14 +12,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys,os,re +import os +import re import requests import subprocess +import sys + from wherehows.common import Constant from wherehows.common.schemas import SCMOwnerRecord from wherehows.common.writers import FileWriter from org.slf4j import LoggerFactory +import FileUtil + class CodeSearchExtract: """ @@ -33,10 +38,13 @@ class CodeSearchExtract: # limit_multiproduct = None # limit_plugin = None - def __init__(self): + def __init__(self, args): self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) self.base_url = args[Constant.BASE_URL_KEY] - self.code_search_committer_writer = FileWriter(args[Constant.DATABASE_SCM_REPO_OUTPUT_KEY]) + + temp_dir = FileUtil.etl_temp_dir(args, "CODESEARCH") + self.code_search_committer_writer = FileWriter( + os.path.join(temp_dir, args[Constant.DATABASE_SCM_REPO_OUTPUT_KEY])) def run(self): offset_min = 1 @@ -191,5 +199,5 @@ class CodeSearchExtract: if __name__ == "__main__": args = sys.argv[1] - e = CodeSearchExtract() + e = CodeSearchExtract(args) e.run() diff --git a/metadata-etl/src/main/resources/jython/CodeSearchLoad.py b/metadata-etl/src/main/resources/jython/CodeSearchLoad.py index 66b9e0f1c3..87da933502 100644 --- a/metadata-etl/src/main/resources/jython/CodeSearchLoad.py +++ b/metadata-etl/src/main/resources/jython/CodeSearchLoad.py @@ -12,10 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +import datetime +import os +import sys from com.ziclix.python.sql import zxJDBC from wherehows.common import Constant from org.slf4j import LoggerFactory -import sys, os, datetime + +import FileUtil class CodeSearchLoad: @@ -26,7 +30,9 @@ class CodeSearchLoad: password = args[Constant.WH_DB_PASSWORD_KEY] JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY] JDBC_URL = args[Constant.WH_DB_URL_KEY] - self.database_scm_repo_file = args[Constant.DATABASE_SCM_REPO_OUTPUT_KEY] + + temp_dir = FileUtil.etl_temp_dir(args, "CODESEARCH") + self.database_scm_repo_file = os.path.join(temp_dir, args[Constant.DATABASE_SCM_REPO_OUTPUT_KEY]) self.app_id = args[Constant.APP_ID_KEY] self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] diff --git a/metadata-etl/src/main/resources/jython/EspressoExtract.py b/metadata-etl/src/main/resources/jython/EspressoExtract.py index bc08a57d76..1e8c0b1ffc 100644 --- a/metadata-etl/src/main/resources/jython/EspressoExtract.py +++ b/metadata-etl/src/main/resources/jython/EspressoExtract.py @@ -12,20 +12,26 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys, json +import json +import os +import sys + from datetime import datetime from jython import requests from wherehows.common import Constant from org.slf4j import LoggerFactory +import FileUtil + class EspressoExtract: - def __init__(self): + def __init__(self, args): self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) requests.packages.urllib3.disable_warnings() - self.output_file = open(args[Constant.ESPRESSO_OUTPUT_KEY], 'w') + temp_dir = FileUtil.etl_temp_dir(args, "ESPRESSO") + self.output_file = open(os.path.join(temp_dir, args[Constant.ESPRESSO_OUTPUT_KEY]), 'w') self.d2_proxys = [] proxy_urls = [x.strip() for x in args[Constant.D2_PROXY_URL].split(',')] @@ -111,5 +117,5 @@ class EspressoExtract: if __name__ == "__main__": args = sys.argv[1] - e = EspressoExtract() + e = EspressoExtract(args) e.run() diff --git a/metadata-etl/src/main/resources/jython/EspressoTransform.py b/metadata-etl/src/main/resources/jython/EspressoTransform.py index 591f557881..987dbebe2e 100644 --- a/metadata-etl/src/main/resources/jython/EspressoTransform.py +++ b/metadata-etl/src/main/resources/jython/EspressoTransform.py @@ -12,11 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys, datetime, json +import datetime +import json +import os +import sys + from com.ziclix.python.sql import zxJDBC from wherehows.common import Constant from org.slf4j import LoggerFactory +import FileUtil + class EspressoTransform: @@ -28,7 +34,8 @@ class EspressoTransform: JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY] JDBC_URL = args[Constant.WH_DB_URL_KEY] - self.input_file = open(args[Constant.ESPRESSO_OUTPUT_KEY], 'r') + temp_dir = FileUtil.etl_temp_dir(args, "ESPRESSO") + self.input_file = open(os.path.join(temp_dir, args[Constant.ESPRESSO_OUTPUT_KEY]), 'r') self.db_id = args[Constant.DB_ID_KEY] self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] diff --git a/metadata-etl/src/main/resources/jython/FileUtil.py b/metadata-etl/src/main/resources/jython/FileUtil.py new file mode 100644 index 0000000000..0ecdf5cace --- /dev/null +++ b/metadata-etl/src/main/resources/jython/FileUtil.py @@ -0,0 +1,26 @@ +# +# Copyright 2015 LinkedIn Corp. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# + +import os + +from wherehows.common import Constant + +def etl_temp_dir(args, etl_type): + dir = os.path.join(args[Constant.WH_APP_FOLDER_KEY], etl_type, args[Constant.WH_EXEC_ID_KEY]) + if not os.path.exists(dir): + os.makedirs(dir) + + return dir + + diff --git a/metadata-etl/src/main/resources/jython/KafkaExtract.py b/metadata-etl/src/main/resources/jython/KafkaExtract.py index c65b363142..1eedf05fa0 100644 --- a/metadata-etl/src/main/resources/jython/KafkaExtract.py +++ b/metadata-etl/src/main/resources/jython/KafkaExtract.py @@ -12,20 +12,27 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys, json, re +import json +import re +import os +import sys + from datetime import datetime from jython import requests from wherehows.common import Constant from org.slf4j import LoggerFactory +import FileUtil + class KafkaExtract: - def __init__(self): + def __init__(self, args): self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) requests.packages.urllib3.disable_warnings() - self.output_file = open(args[Constant.KAFKA_OUTPUT_KEY], 'w') + temp_dir = FileUtil.etl_temp_dir(args, "KAFKA") + self.output_file = open(os.path.join(temp_dir, args[Constant.KAFKA_OUTPUT_KEY]), 'w') self.d2_proxys = [] proxy_urls = [x.strip() for x in args[Constant.D2_PROXY_URL].split(',')] @@ -112,5 +119,5 @@ class KafkaExtract: if __name__ == "__main__": args = sys.argv[1] - e = KafkaExtract() + e = KafkaExtract(args) e.run() diff --git a/metadata-etl/src/main/resources/jython/KafkaTransform.py b/metadata-etl/src/main/resources/jython/KafkaTransform.py index a2acd4b23b..2e70f3a853 100644 --- a/metadata-etl/src/main/resources/jython/KafkaTransform.py +++ b/metadata-etl/src/main/resources/jython/KafkaTransform.py @@ -12,11 +12,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys, datetime, json +import datetime +import json +import os +import sys from com.ziclix.python.sql import zxJDBC from wherehows.common import Constant from org.slf4j import LoggerFactory +import FileUtil + class KafkaTransform: @@ -28,7 +33,8 @@ class KafkaTransform: JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY] JDBC_URL = args[Constant.WH_DB_URL_KEY] - self.input_file = open(args[Constant.KAFKA_OUTPUT_KEY], 'r') + temp_dir = FileUtil.etl_temp_dir(args, "KAFKA") + self.input_file = open(os.path.join(temp_dir, args[Constant.KAFKA_OUTPUT_KEY]), 'r') self.db_id = args[Constant.DB_ID_KEY] self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] diff --git a/metadata-etl/src/main/resources/jython/MultiproductExtract.py b/metadata-etl/src/main/resources/jython/MultiproductExtract.py index dc0eb272ca..38ad264aae 100644 --- a/metadata-etl/src/main/resources/jython/MultiproductExtract.py +++ b/metadata-etl/src/main/resources/jython/MultiproductExtract.py @@ -12,9 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys, os, re +import os +import re +import sys import datetime import xml.etree.ElementTree as ET + from jython import requests from wherehows.common import Constant from wherehows.common.schemas import MultiproductProjectRecord @@ -23,17 +26,21 @@ from wherehows.common.schemas import MultiproductRepoOwnerRecord from wherehows.common.writers import FileWriter from org.slf4j import LoggerFactory +import FileUtil + class MultiproductLoad: - def __init__(self): + def __init__(self, args): self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) requests.packages.urllib3.disable_warnings() self.app_id = int(args[Constant.APP_ID_KEY]) self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY]) - self.project_writer = FileWriter(args[Constant.GIT_PROJECT_OUTPUT_KEY]) - self.repo_writer = FileWriter(args[Constant.PRODUCT_REPO_OUTPUT_KEY]) - self.repo_owner_writer = FileWriter(args[Constant.PRODUCT_REPO_OWNER_OUTPUT_KEY]) + + temp_dir = FileUtil.etl_temp_dir(args, "MULTIPRODUCT") + self.project_writer = FileWriter(os.path.join(temp_dir, args[Constant.GIT_PROJECT_OUTPUT_KEY])) + self.repo_writer = FileWriter(os.path.join(temp_dir, args[Constant.PRODUCT_REPO_OUTPUT_KEY])) + self.repo_owner_writer = FileWriter(os.path.join(temp_dir, args[Constant.PRODUCT_REPO_OWNER_OUTPUT_KEY])) self.multiproduct = {} self.git_repo = {} @@ -303,5 +310,5 @@ class MultiproductLoad: if __name__ == "__main__": args = sys.argv[1] - e = MultiproductLoad() + e = MultiproductLoad(args) e.run() diff --git a/metadata-etl/src/main/resources/jython/MultiproductLoad.py b/metadata-etl/src/main/resources/jython/MultiproductLoad.py index 77b1a66211..497a131de9 100644 --- a/metadata-etl/src/main/resources/jython/MultiproductLoad.py +++ b/metadata-etl/src/main/resources/jython/MultiproductLoad.py @@ -12,10 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +import datetime +import os +import sys + from com.ziclix.python.sql import zxJDBC from wherehows.common import Constant from org.slf4j import LoggerFactory -import sys, os, datetime + +import FileUtil class MultiproductLoad: @@ -26,9 +31,11 @@ class MultiproductLoad: password = args[Constant.WH_DB_PASSWORD_KEY] JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY] JDBC_URL = args[Constant.WH_DB_URL_KEY] - self.mp_gitli_project_file = args[Constant.GIT_PROJECT_OUTPUT_KEY] - self.product_repo_file = args[Constant.PRODUCT_REPO_OUTPUT_KEY] - self.product_repo_owner_file = args[Constant.PRODUCT_REPO_OWNER_OUTPUT_KEY] + + temp_dir = FileUtil.etl_temp_dir(args, "MULTIPRODUCT") + self.mp_gitli_project_file = os.path.join(temp_dir, args[Constant.GIT_PROJECT_OUTPUT_KEY]) + self.product_repo_file = os.path.join(temp_dir, args[Constant.PRODUCT_REPO_OUTPUT_KEY]) + self.product_repo_owner_file = os.path.join(temp_dir, args[Constant.PRODUCT_REPO_OWNER_OUTPUT_KEY]) self.app_id = args[Constant.APP_ID_KEY] self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] diff --git a/metadata-etl/src/main/resources/jython/OracleExtract.py b/metadata-etl/src/main/resources/jython/OracleExtract.py index 363cadae4c..6e990d4c56 100644 --- a/metadata-etl/src/main/resources/jython/OracleExtract.py +++ b/metadata-etl/src/main/resources/jython/OracleExtract.py @@ -12,15 +12,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -from com.ziclix.python.sql import zxJDBC -import sys, os, re -import json, csv +import csv import datetime +import json +import os +import sys + +from com.ziclix.python.sql import zxJDBC from wherehows.common.schemas import SampleDataRecord from wherehows.common import Constant from org.slf4j import LoggerFactory from wherehows.common.writers import FileWriter +import FileUtil + class OracleExtract: table_dict = {} @@ -427,6 +432,11 @@ if __name__ == "__main__": collect_sample = bool(args[Constant.ORA_LOAD_SAMPLE]) e.databases = args[Constant.ORA_EXCLUDE_DATABASES_KEY].split(',') + temp_dir = FileUtil.etl_temp_dir(args, "ORACLE"); + table_output_file = os.path.join(temp_dir, args[Constant.ORA_SCHEMA_OUTPUT_KEY]) + field_output_file = os.path.join(temp_dir, args[Constant.ORA_FIELD_OUTPUT_KEY]) + sample_output_file = os.path.join(temp_dir, args[Constant.ORA_SAMPLE_OUTPUT_KEY]) + try: e.conn_db.cursor().execute("ALTER SESSION SET TIME_ZONE = 'US/Pacific'") e.conn_db.cursor().execute("ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'") @@ -434,10 +444,11 @@ if __name__ == "__main__": ('WhereHows (Jython)', os.getpid())) e.conn_db.commit() - e.run(None, None, - args[Constant.ORA_SCHEMA_OUTPUT_KEY], - args[Constant.ORA_FIELD_OUTPUT_KEY], - args[Constant.ORA_SAMPLE_OUTPUT_KEY], + e.run(None, + None, + table_output_file, + field_output_file, + sample_output_file, sample=collect_sample) finally: e.conn_db.cursor().close() diff --git a/metadata-etl/src/main/resources/jython/OracleLoad.py b/metadata-etl/src/main/resources/jython/OracleLoad.py index 954bc7f275..6c31f835d1 100644 --- a/metadata-etl/src/main/resources/jython/OracleLoad.py +++ b/metadata-etl/src/main/resources/jython/OracleLoad.py @@ -12,10 +12,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +import datetime +import os +import sys + from com.ziclix.python.sql import zxJDBC from wherehows.common import Constant from org.slf4j import LoggerFactory -import sys, datetime + +import FileUtil class OracleLoad: @@ -26,9 +31,6 @@ class OracleLoad: password = args[Constant.WH_DB_PASSWORD_KEY] JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY] JDBC_URL = args[Constant.WH_DB_URL_KEY] - self.input_table_file = args[Constant.ORA_SCHEMA_OUTPUT_KEY] - self.input_field_file = args[Constant.ORA_FIELD_OUTPUT_KEY] - self.input_sample_file = args[Constant.ORA_SAMPLE_OUTPUT_KEY] self.db_id = args[Constant.DB_ID_KEY] self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] @@ -39,6 +41,11 @@ class OracleLoad: lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] self.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) + temp_dir = FileUtil.etl_temp_dir(args, "ORACLE"); + self.input_table_file = os.path.join(temp_dir, args[Constant.ORA_SCHEMA_OUTPUT_KEY]) + self.input_field_file = os.path.join(temp_dir, args[Constant.ORA_FIELD_OUTPUT_KEY]) + self.input_sample_file = os.path.join(temp_dir, args[Constant.ORA_SAMPLE_OUTPUT_KEY]) + self.logger.info("Load Oracle Metadata into {}, db_id {}, wh_exec_id {}" .format(JDBC_URL, self.db_id, self.wh_etl_exec_id)) @@ -161,7 +168,7 @@ class OracleLoad: select x.field_id, s.* from stg_dict_field_detail s join dict_field_detail x - on s.dataset_id = x.dataset_id + on s.dataset_id = x.dataset_id and s.field_name = x.field_name and s.parent_path <=> x.parent_path where s.db_id = {db_id} diff --git a/metadata-etl/src/main/resources/jython/TeradataExtract.py b/metadata-etl/src/main/resources/jython/TeradataExtract.py index aa2051bfe2..379fa4dedf 100644 --- a/metadata-etl/src/main/resources/jython/TeradataExtract.py +++ b/metadata-etl/src/main/resources/jython/TeradataExtract.py @@ -12,15 +12,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -from com.ziclix.python.sql import zxJDBC -import sys, os, re, json import datetime +import json +import os +import re +import sys + +from com.ziclix.python.sql import zxJDBC from distutils.util import strtobool from wherehows.common.schemas import SampleDataRecord from wherehows.common.writers import FileWriter from wherehows.common import Constant from org.slf4j import LoggerFactory +import FileUtil + class TeradataExtract: def __init__(self): @@ -555,17 +561,22 @@ if __name__ == "__main__": if datetime.datetime.now().strftime('%a') not in args[Constant.TD_COLLECT_SAMPLE_DATA_DAYS]: do_sample = False + temp_dir = FileUtil.etl_temp_dir(args, "TERADATA") + try: e.conn_td.cursor().execute( "SET QUERY_BAND = 'script=%s; pid=%d; ' FOR SESSION;" % ('TeradataExtract.py', os.getpid())) e.conn_td.commit() - e.log_file = args[Constant.TD_LOG_KEY] + e.log_file = os.path.join(temp_dir, args[Constant.TD_LOG_KEY]) e.databases = args[Constant.TD_TARGET_DATABASES_KEY].split(',') e.default_database = args[Constant.TD_DEFAULT_DATABASE_KEY] index_type = {'P': 'Primary Index', 'K': 'Primary Key', 'S': 'Secondary Index', 'Q': 'Partitioned Primary Index', 'J': 'Join Index', 'U': 'Unique Index'} - e.run(None, None, args[Constant.TD_SCHEMA_OUTPUT_KEY], args[Constant.TD_SAMPLE_OUTPUT_KEY], sample=do_sample) + schema_output_file = os.path.join(temp_dir, args[Constant.TD_SCHEMA_OUTPUT_KEY]) + sample_output_file = os.path.join(temp_dir, args[Constant.TD_SAMPLE_OUTPUT_KEY]) + + e.run(None, None, schema_output_file, sample_output_file, sample=do_sample) finally: e.conn_td.close() diff --git a/metadata-etl/src/main/resources/jython/TeradataLoad.py b/metadata-etl/src/main/resources/jython/TeradataLoad.py index 99e0e106e8..94dac261fc 100644 --- a/metadata-etl/src/main/resources/jython/TeradataLoad.py +++ b/metadata-etl/src/main/resources/jython/TeradataLoad.py @@ -12,12 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys, datetime +import datetime +import os +import sys + from com.ziclix.python.sql import zxJDBC from distutils.util import strtobool from wherehows.common import Constant from org.slf4j import LoggerFactory +import FileUtil + class TeradataLoad: def __init__(self): @@ -189,8 +194,8 @@ class TeradataLoad: @dummy ) set - data_precision=nullif(@precision,''), - data_scale=nullif(@scale,''), + data_precision=nullif(@precision,''), + data_scale=nullif(@scale,''), db_id = {db_id}; analyze table stg_dict_field_detail; @@ -404,9 +409,10 @@ if __name__ == "__main__": JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY] JDBC_URL = args[Constant.WH_DB_URL_KEY] - l.input_file = args[Constant.TD_METADATA_KEY] - l.input_field_file = args[Constant.TD_FIELD_METADATA_KEY] - l.input_sampledata_file = args[Constant.TD_SAMPLE_OUTPUT_KEY] + temp_dir = FileUtil.etl_temp_dir(args, "TERADATA") + l.input_file = os.path.join(temp_dir, args[Constant.TD_METADATA_KEY]) + l.input_field_file = os.path.join(temp_dir, args[Constant.TD_FIELD_METADATA_KEY]) + l.input_sampledata_file = os.path.join(temp_dir, args[Constant.TD_SAMPLE_OUTPUT_KEY]) do_sample = False if Constant.TD_LOAD_SAMPLE in args: diff --git a/metadata-etl/src/main/resources/jython/TeradataTransform.py b/metadata-etl/src/main/resources/jython/TeradataTransform.py index 91934c0b63..ed7765a7db 100644 --- a/metadata-etl/src/main/resources/jython/TeradataTransform.py +++ b/metadata-etl/src/main/resources/jython/TeradataTransform.py @@ -12,15 +12,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import json import datetime -import sys, os +import json +import os +import sys import time + from wherehows.common.writers import FileWriter from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord from wherehows.common import Constant from org.slf4j import LoggerFactory +import FileUtil + class TeradataTransform: def __init__(self): @@ -132,5 +136,10 @@ if __name__ == "__main__": t = TeradataTransform() t.log_file = args['teradata.log'] - t.transform(args[Constant.TD_SCHEMA_OUTPUT_KEY], args[Constant.TD_METADATA_KEY], args[Constant.TD_FIELD_METADATA_KEY]) + temp_dir = FileUtil.etl_temp_dir(args, "TERADATA") + input = os.path.join(temp_dir, args[Constant.TD_SCHEMA_OUTPUT_KEY]) + td_metadata = os.path.join(temp_dir, args[Constant.TD_METADATA_KEY]) + td_field_metadata = os.path.join(temp_dir, args[Constant.TD_FIELD_METADATA_KEY]) + + t.transform(input, td_metadata, td_field_metadata) diff --git a/metadata-etl/src/main/resources/jython/VoldemortExtract.py b/metadata-etl/src/main/resources/jython/VoldemortExtract.py index f029216394..63cdc694ed 100644 --- a/metadata-etl/src/main/resources/jython/VoldemortExtract.py +++ b/metadata-etl/src/main/resources/jython/VoldemortExtract.py @@ -12,20 +12,26 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys, json +import json +import os +import sys + from datetime import datetime from jython import requests from wherehows.common import Constant from org.slf4j import LoggerFactory +import FileUtil + class VoldemortExtract: - def __init__(self): + def __init__(self, args): self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__) requests.packages.urllib3.disable_warnings() - self.output_file = open(args[Constant.VOLDEMORT_OUTPUT_KEY], 'w') + temp_dir = FileUtil.etl_temp_dir(args, "VOLDEMORT") + self.output_file = open(os.path.join(temp_dir, args[Constant.VOLDEMORT_OUTPUT_KEY]), 'w') self.d2_proxys = [] proxy_urls = [x.strip() for x in args[Constant.D2_PROXY_URL].split(',')] @@ -117,5 +123,5 @@ class VoldemortExtract: if __name__ == "__main__": args = sys.argv[1] - e = VoldemortExtract() + e = VoldemortExtract(args) e.run() diff --git a/metadata-etl/src/main/resources/jython/VoldemortTransform.py b/metadata-etl/src/main/resources/jython/VoldemortTransform.py index 9a3bc2beca..a31f1e3caf 100644 --- a/metadata-etl/src/main/resources/jython/VoldemortTransform.py +++ b/metadata-etl/src/main/resources/jython/VoldemortTransform.py @@ -12,11 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # -import sys, datetime, json +import datetime +import json +import os +import sys + from com.ziclix.python.sql import zxJDBC from wherehows.common import Constant from org.slf4j import LoggerFactory +import FileUtil + class VoldemortTransform: @@ -28,7 +34,8 @@ class VoldemortTransform: JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY] JDBC_URL = args[Constant.WH_DB_URL_KEY] - self.input_file = open(args[Constant.VOLDEMORT_OUTPUT_KEY], 'r') + temp_dir = FileUtil.etl_temp_dir(args, "VOLDEMORT") + self.input_file = open(os.path.join(temp_dir, args[Constant.VOLDEMORT_OUTPUT_KEY]), 'r') self.db_id = args[Constant.DB_ID_KEY] self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] diff --git a/metadata-etl/src/test/java/metadata/etl/dataset/espresso/EspressoMetadataEtlTest.java b/metadata-etl/src/test/java/metadata/etl/dataset/espresso/EspressoMetadataEtlTest.java index a796cf336f..7683f9c96d 100644 --- a/metadata-etl/src/test/java/metadata/etl/dataset/espresso/EspressoMetadataEtlTest.java +++ b/metadata-etl/src/test/java/metadata/etl/dataset/espresso/EspressoMetadataEtlTest.java @@ -26,22 +26,19 @@ public class EspressoMetadataEtlTest { _etl = new EspressoMetadataEtl(70, 0L); } - @Test - public void extractTest() + private void extractTest() throws Exception { _etl.extract(); // check file } - @Test - public void transformTest() + private void transformTest() throws Exception { _etl.transform(); // check staging tables in database } - @Test - public void loadTest() + private void loadTest() throws Exception { _etl.load(); // check final tables in database diff --git a/metadata-etl/src/test/java/metadata/etl/dataset/kafka/KafkaMetadataEtlTest.java b/metadata-etl/src/test/java/metadata/etl/dataset/kafka/KafkaMetadataEtlTest.java index 9452f98ef3..2d561b17ff 100644 --- a/metadata-etl/src/test/java/metadata/etl/dataset/kafka/KafkaMetadataEtlTest.java +++ b/metadata-etl/src/test/java/metadata/etl/dataset/kafka/KafkaMetadataEtlTest.java @@ -26,22 +26,19 @@ public class KafkaMetadataEtlTest { _etl = new KafkaMetadataEtl(200, 0L); } - @Test - public void extractTest() + private void extractTest() throws Exception { _etl.extract(); // check file } - @Test - public void transformTest() + private void transformTest() throws Exception { _etl.transform(); // check staging tables in database } - @Test - public void loadTest() + private void loadTest() throws Exception { _etl.load(); // check final tables in database diff --git a/metadata-etl/src/test/java/metadata/etl/dataset/oracle/OracleMetadataEtlTest.java b/metadata-etl/src/test/java/metadata/etl/dataset/oracle/OracleMetadataEtlTest.java index 7874d7e89e..d4773330fe 100644 --- a/metadata-etl/src/test/java/metadata/etl/dataset/oracle/OracleMetadataEtlTest.java +++ b/metadata-etl/src/test/java/metadata/etl/dataset/oracle/OracleMetadataEtlTest.java @@ -26,15 +26,13 @@ public class OracleMetadataEtlTest { _etl = new OracleMetadataEtl(80, 0L); } - @Test - public void extractTest() + private void extractTest() throws Exception { _etl.extract(); // check the csv file } - @Test - public void loadTest() + private void loadTest() throws Exception { _etl.load(); // check in database diff --git a/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java b/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java index 6719ee6e8a..44a729bfa5 100644 --- a/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java +++ b/metadata-etl/src/test/java/metadata/etl/dataset/teradata/TeradataMetadataEtlTest.java @@ -28,21 +28,18 @@ public class TeradataMetadataEtlTest { t.run(); } - @Test(groups = {"needConfig"}) public void testExtract() throws Exception { TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); t.extract(); } - @Test(groups = {"needConfig"}) public void testTransform() throws Exception { TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); t.transform(); } - @Test(groups = {"needConfig"}) public void testLoad() throws Exception { TeradataMetadataEtl t = new TeradataMetadataEtl(3, 0L); diff --git a/metadata-etl/src/test/java/metadata/etl/dataset/voldemort/VoldemortMetadataEtlTest.java b/metadata-etl/src/test/java/metadata/etl/dataset/voldemort/VoldemortMetadataEtlTest.java index 75905ccaf4..79909014b3 100644 --- a/metadata-etl/src/test/java/metadata/etl/dataset/voldemort/VoldemortMetadataEtlTest.java +++ b/metadata-etl/src/test/java/metadata/etl/dataset/voldemort/VoldemortMetadataEtlTest.java @@ -26,22 +26,19 @@ public class VoldemortMetadataEtlTest { _etl = new VoldemortMetadataEtl(50, 0L); } - @Test - public void extractTest() + private void extractTest() throws Exception { _etl.extract(); // check file } - @Test - public void transformTest() + private void transformTest() throws Exception { _etl.transform(); // check staging tables in database } - @Test - public void loadTest() + private void loadTest() throws Exception { _etl.load(); // check final tables in database diff --git a/metadata-etl/src/test/java/metadata/etl/git/CodeSearchMetadataEtlTest.java b/metadata-etl/src/test/java/metadata/etl/git/CodeSearchMetadataEtlTest.java index ed96d15589..d92095eea4 100644 --- a/metadata-etl/src/test/java/metadata/etl/git/CodeSearchMetadataEtlTest.java +++ b/metadata-etl/src/test/java/metadata/etl/git/CodeSearchMetadataEtlTest.java @@ -26,15 +26,13 @@ public class CodeSearchMetadataEtlTest { _etl = new CodeSearchMetadataEtl(800, 0L); } - @Test - public void extractTest() + private void extractTest() throws Exception { _etl.extract(); // check the csv file } - @Test - public void loadTest() + private void loadTest() throws Exception { _etl.load(); // check in database diff --git a/metadata-etl/src/test/java/metadata/etl/git/MultiproductMetadataEtlTest.java b/metadata-etl/src/test/java/metadata/etl/git/MultiproductMetadataEtlTest.java index 52abff189f..1ed3aa0397 100644 --- a/metadata-etl/src/test/java/metadata/etl/git/MultiproductMetadataEtlTest.java +++ b/metadata-etl/src/test/java/metadata/etl/git/MultiproductMetadataEtlTest.java @@ -26,15 +26,13 @@ public class MultiproductMetadataEtlTest { _etl = new MultiproductMetadataEtl(500, 0L); } - @Test - public void extractTest() + private void extractTest() throws Exception { _etl.extract(); // check the csv file } - @Test - public void loadTest() + private void loadTest() throws Exception { _etl.load(); // check in database