From 3c51365e125ed752a9cede63799b290f40c07b8e Mon Sep 17 00:00:00 2001 From: Zhen Chen Date: Tue, 15 Dec 2015 11:44:05 -0800 Subject: [PATCH 1/2] parse the xml format of gitorious project page instead of the html page --- .../java/metadata/etl/git/GitMetadataEtl.java | 11 ++++++----- .../java/wherehows/common/utils/GitUtil.java | 19 ++++++++++++------- .../wherehows/common/utils/GitUtilTest.java | 2 +- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/metadata-etl/src/main/java/metadata/etl/git/GitMetadataEtl.java b/metadata-etl/src/main/java/metadata/etl/git/GitMetadataEtl.java index a5dc523a42..0ebe0aefb0 100644 --- a/metadata-etl/src/main/java/metadata/etl/git/GitMetadataEtl.java +++ b/metadata-etl/src/main/java/metadata/etl/git/GitMetadataEtl.java @@ -16,6 +16,7 @@ package metadata.etl.git; import java.io.File; import java.io.InputStream; import java.util.List; +import java.util.Map; import java.util.Properties; import metadata.etl.EtlJob; import org.slf4j.Logger; @@ -56,13 +57,13 @@ public class GitMetadataEtl extends EtlJob { } FileWriter fw = new FileWriter(localDir + "/" + COMMIT_OUTPUT_FILE); for (String project : projects) { - List repos = GitUtil.getRepoListFromProject(GitUtil.getHttpsUrl(gitHost, project)); - for (String repo : repos) { - String repoUri = GitUtil.getGitUrl(gitHost, repo); + Map repos = GitUtil.getRepoListFromProject(GitUtil.getHttpsUrl(gitHost, project)); + for (String repo : repos.keySet()) { + String repoUri = repos.get(repo); String repoDir = localDir + "/" + repo; GitUtil.clone(repoUri, repoDir); - List commitMetadatas = GitUtil.getRepoMetadata(repoDir); - for (GitUtil.CommitMetadata m : commitMetadatas) { + List commitMetadataList = GitUtil.getRepoMetadata(repoDir); + for (GitUtil.CommitMetadata m : commitMetadataList) { fw.append(new GitCommitRecord(m, repoUri)); } } diff --git a/wherehows-common/src/main/java/wherehows/common/utils/GitUtil.java b/wherehows-common/src/main/java/wherehows/common/utils/GitUtil.java index 3a79870fd5..01054d75b1 100644 --- a/wherehows-common/src/main/java/wherehows/common/utils/GitUtil.java +++ b/wherehows-common/src/main/java/wherehows/common/utils/GitUtil.java @@ -17,8 +17,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.api.errors.GitAPIException; @@ -80,15 +82,18 @@ public class GitUtil { * @return List of path of repositories e.g. project/repo * @throws IOException */ - public static List getRepoListFromProject(String projectUrl) throws IOException { + public static Map getRepoListFromProject(String projectUrl) throws IOException { - List repoList = new LinkedList<>(); - Document doc = Jsoup.connect(projectUrl).get(); - Elements repos = doc.getElementsByClass("repository"); + Map repoList = new HashMap<>(); + Document doc = Jsoup.connect(projectUrl).data("format", "xml").get(); + Elements repos = doc.getElementsByTag("repositories"); + Elements mainlines = repos.first().getElementsByTag("mainlines"); + Elements repo = mainlines.first().getElementsByTag("repository"); - for (Element e : repos) { - String repo = e.children().first().text(); - repoList.add(repo.trim()); + for (Element e : repo) { + String repoName = e.getElementsByTag("name").first().text(); + String repoUrl = e.getElementsByTag("clone_url").first().text(); + repoList.put(repoName.trim(), repoUrl.trim()); } return repoList; diff --git a/wherehows-common/src/test/java/wherehows/common/utils/GitUtilTest.java b/wherehows-common/src/test/java/wherehows/common/utils/GitUtilTest.java index 5302a7cb21..d5b79e7ab2 100644 --- a/wherehows-common/src/test/java/wherehows/common/utils/GitUtilTest.java +++ b/wherehows-common/src/test/java/wherehows/common/utils/GitUtilTest.java @@ -29,7 +29,7 @@ public class GitUtilTest { @Test public void testGetRepoListFromProject() throws Exception { - //List repos = GitUtil.getRepoListFromProject("git://git.example.com/project"); + //Map repos = GitUtil.getRepoListFromProject("git://git.example.com/project"); //Assert.assertTrue(repos.size() > 0); } From 9ebe0b22ad8e18475b6ec912ea2b999cb739e134 Mon Sep 17 00:00:00 2001 From: Zhen Chen Date: Wed, 16 Dec 2015 14:41:47 -0800 Subject: [PATCH 2/2] move close connection into finally block --- .../main/resources/jython/AzkabanExtract.py | 17 +- .../resources/jython/DatasetTreeBuilder.py | 30 +- .../main/resources/jython/FlowTreeBuilder.py | 40 +- .../src/main/resources/jython/GitLoad.py | 42 +- .../src/main/resources/jython/GitTransform.py | 83 ++-- .../src/main/resources/jython/HdfsLoad.py | 10 +- .../src/main/resources/jython/LdapExtract.py | 364 +++++++++--------- .../src/main/resources/jython/LdapLoad.py | 62 +-- .../main/resources/jython/LdapTransform.py | 253 ++++++------ .../src/main/resources/jython/OozieExtract.py | 20 +- .../src/main/resources/jython/OwnerLoad.py | 141 +++---- .../main/resources/jython/OwnerTransform.py | 161 ++++---- .../main/resources/jython/SchedulerLoad.py | 20 +- .../resources/jython/SchedulerTransform.py | 2 + .../main/resources/jython/TeradataExtract.py | 20 +- .../src/main/resources/jython/TeradataLoad.py | 10 +- 16 files changed, 652 insertions(+), 623 deletions(-) diff --git a/metadata-etl/src/main/resources/jython/AzkabanExtract.py b/metadata-etl/src/main/resources/jython/AzkabanExtract.py index e0a4b81845..3fa901ff6b 100644 --- a/metadata-etl/src/main/resources/jython/AzkabanExtract.py +++ b/metadata-etl/src/main/resources/jython/AzkabanExtract.py @@ -65,16 +65,18 @@ class AzkabanExtract: print e def run(self): - self.collect_flow_jobs(self.metadata_folder + "/flow.csv", self.metadata_folder + "/job.csv", self.metadata_folder + "/dag.csv") - self.collect_flow_owners(self.metadata_folder + "/owner.csv") - self.collect_flow_schedules(self.metadata_folder + "/schedule.csv") - self.collect_flow_execs(self.metadata_folder + "/flow_exec.csv", self.metadata_folder + "/job_exec.csv", self.lookback_period) - self.az_cursor.close() - self.az_con.close() + try: + self.collect_flow_jobs(self.metadata_folder + "/flow.csv", self.metadata_folder + "/job.csv", self.metadata_folder + "/dag.csv") + self.collect_flow_owners(self.metadata_folder + "/owner.csv") + self.collect_flow_schedules(self.metadata_folder + "/schedule.csv") + self.collect_flow_execs(self.metadata_folder + "/flow_exec.csv", self.metadata_folder + "/job_exec.csv", self.lookback_period) + finally: + self.az_cursor.close() + self.az_con.close() def collect_flow_jobs(self, flow_file, job_file, dag_file): print "collect flow&jobs" - query = "SELECT f.*, p.name as project_name FROM project_flows f inner join projects p on f.project_id = p.id and f.version = p.version where p.active = 1" + query = "SELECT distinct f.*, p.name as project_name FROM project_flows f inner join projects p on f.project_id = p.id and f.version = p.version where p.active = 1" self.az_cursor.execute(query) rows = DbUtil.dict_cursor(self.az_cursor) flow_writer = FileWriter(flow_file) @@ -89,7 +91,6 @@ class AzkabanExtract: unzipped_content = gzip.GzipFile(mode='r', fileobj=StringIO.StringIO(row[json_column].tostring())).read() try: row[json_column] = json.loads(unzipped_content) - #print json.dumps(row[json_column], indent=4) except: pass diff --git a/metadata-etl/src/main/resources/jython/DatasetTreeBuilder.py b/metadata-etl/src/main/resources/jython/DatasetTreeBuilder.py index 56ca5802d1..719c41a8ad 100644 --- a/metadata-etl/src/main/resources/jython/DatasetTreeBuilder.py +++ b/metadata-etl/src/main/resources/jython/DatasetTreeBuilder.py @@ -26,21 +26,23 @@ class DatasetTreeBuilder: jdbc_driver = args[Constant.WH_DB_DRIVER_KEY] jdbc_url = args[Constant.WH_DB_URL_KEY] conn_mysql = zxJDBC.connect(jdbc_url, username, password, jdbc_driver) - query = "select distinct id, concat(SUBSTRING_INDEX(urn, ':///', 1), '/', SUBSTRING_INDEX(urn, ':///', -1)) p from dict_dataset order by urn" cur = conn_mysql.cursor() - cur.execute(query) - datasets = cur.fetchall() - self.dataset_dict = dict() - for dataset in datasets: - current = self.dataset_dict - path_arr = dataset[1].split('/') - for name in path_arr: - current = current.setdefault(name, {}) - current["__ID_OF_DATASET__"] = dataset[0] - self.file_name = args[Constant.DATASET_TREE_FILE_NAME_KEY] - self.value = [] - cur.close() - conn_mysql.close() + try: + query = "select distinct id, concat(SUBSTRING_INDEX(urn, ':///', 1), '/', SUBSTRING_INDEX(urn, ':///', -1)) p from dict_dataset order by urn" + cur.execute(query) + datasets = cur.fetchall() + self.dataset_dict = dict() + for dataset in datasets: + current = self.dataset_dict + path_arr = dataset[1].split('/') + for name in path_arr: + current = current.setdefault(name, {}) + current["__ID_OF_DATASET__"] = dataset[0] + self.file_name = args[Constant.DATASET_TREE_FILE_NAME_KEY] + self.value = [] + finally: + cur.close() + conn_mysql.close() def build_trie_helper(self, depth, path, current, current_dict): nodes = [] diff --git a/metadata-etl/src/main/resources/jython/FlowTreeBuilder.py b/metadata-etl/src/main/resources/jython/FlowTreeBuilder.py index 17db0a572a..286129537b 100644 --- a/metadata-etl/src/main/resources/jython/FlowTreeBuilder.py +++ b/metadata-etl/src/main/resources/jython/FlowTreeBuilder.py @@ -26,27 +26,29 @@ class FlowTreeBuilder: jdbc_driver = args[Constant.WH_DB_DRIVER_KEY] jdbc_url = args[Constant.WH_DB_URL_KEY] conn_mysql = zxJDBC.connect(jdbc_url, username, password, jdbc_driver) - query = "select distinct f.flow_id, f.flow_name, f.flow_group, ca.app_code from flow f join cfg_application ca on f.app_id = ca.app_id order by app_code, flow_name" cur = conn_mysql.cursor() - cur.execute(query) - flows = cur.fetchall() - self.flow_dict = dict() - for flow in flows: - current = self.flow_dict - # if needed, use flow[3].replace(' ', '.') - current = current.setdefault(flow[3], {}) - if flow[2] is not None: - current = current.setdefault(flow[2], {}) - # for oozie - else: - current = current.setdefault('NA', {}) + try: + query = "select distinct f.flow_id, f.flow_name, f.flow_group, ca.app_code from flow f join cfg_application ca on f.app_id = ca.app_id order by app_code, flow_name" + cur.execute(query) + flows = cur.fetchall() + self.flow_dict = dict() + for flow in flows: + current = self.flow_dict + # if needed, use flow[3].replace(' ', '.') + current = current.setdefault(flow[3], {}) + if flow[2] is not None: + current = current.setdefault(flow[2], {}) + # for oozie + else: + current = current.setdefault('NA', {}) - current = current.setdefault(flow[1], {}) - current["__ID_OF_FLOW__"] = flow[0] - self.file_name = args[Constant.FLOW_TREE_FILE_NAME_KEY] - self.value = [] - cur.close() - conn_mysql.close() + current = current.setdefault(flow[1], {}) + current["__ID_OF_FLOW__"] = flow[0] + self.file_name = args[Constant.FLOW_TREE_FILE_NAME_KEY] + self.value = [] + finally: + cur.close() + conn_mysql.close() def build_trie_helper(self, depth, current, current_dict): nodes = [] diff --git a/metadata-etl/src/main/resources/jython/GitLoad.py b/metadata-etl/src/main/resources/jython/GitLoad.py index ff6b8328c1..e6c527fa17 100644 --- a/metadata-etl/src/main/resources/jython/GitLoad.py +++ b/metadata-etl/src/main/resources/jython/GitLoad.py @@ -20,22 +20,23 @@ import sys class GitLoad: + def __init__(self, args): + self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], + args[Constant.WH_DB_USERNAME_KEY], + args[Constant.WH_DB_PASSWORD_KEY], + args[Constant.WH_DB_DRIVER_KEY]) + self.wh_cursor = self.wh_con.cursor() + self.app_id = int(args[Constant.APP_ID_KEY]) - def __init__(self, args): - self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], - args[Constant.WH_DB_USERNAME_KEY], - args[Constant.WH_DB_PASSWORD_KEY], - args[Constant.WH_DB_DRIVER_KEY]) - self.wh_cursor = self.wh_con.cursor() - self.app_id = int(args[Constant.APP_ID_KEY]) + def run(self): + try: + self.load_from_stg() + finally: + self.wh_cursor.close() + self.wh_con.close() - def run(self): - self.load_from_stg() - self.wh_cursor.close() - self.wh_con.close() - - def load_from_stg(self): - query = """ + def load_from_stg(self): + query = """ INSERT IGNORE INTO source_code_commit_info ( app_id, repository_urn, commit_id, file_path, file_name, commit_time, committer_name, committer_email, @@ -46,11 +47,12 @@ class GitLoad: from stg_source_code_commit_info s where s.app_id = {app_id} """.format(app_id=self.app_id) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + if __name__ == "__main__": - props = sys.argv[1] - git = GitLoad(props) - git.run() + props = sys.argv[1] + git = GitLoad(props) + git.run() diff --git a/metadata-etl/src/main/resources/jython/GitTransform.py b/metadata-etl/src/main/resources/jython/GitTransform.py index 23da6ec9da..a846a78505 100644 --- a/metadata-etl/src/main/resources/jython/GitTransform.py +++ b/metadata-etl/src/main/resources/jython/GitTransform.py @@ -20,16 +20,16 @@ import sys class OwnerTransform: - _tables = {"source_code_commit": {"columns": "repository_urn, commit_id, file_path, file_name, commit_time, committer_name, committer_email, author_name, author_email, message", - "file": "commit.csv", - "table": "stg_source_code_commit_info"} - } + _tables = {"source_code_commit": {"columns": "repository_urn, commit_id, file_path, file_name, commit_time, committer_name, committer_email, author_name, author_email, message", + "file": "commit.csv", + "table": "stg_source_code_commit_info"} + } - _clear_staging_tempalte = """ + _clear_staging_tempalte = """ DELETE FROM {table} """ - _read_file_template = """ + _read_file_template = """ LOAD DATA LOCAL INFILE '{folder}/{file}' INTO TABLE {table} FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0' @@ -39,43 +39,46 @@ class OwnerTransform: wh_etl_exec_id = {wh_etl_exec_id}; """ - def __init__(self, args): - self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], - args[Constant.WH_DB_USERNAME_KEY], - args[Constant.WH_DB_PASSWORD_KEY], - args[Constant.WH_DB_DRIVER_KEY]) - self.wh_cursor = self.wh_con.cursor() - self.app_id = int(args[Constant.APP_ID_KEY]) - self.wh_etl_exec_id = int(args[Constant.WH_EXEC_ID_KEY]) - self.app_folder = args[Constant.WH_APP_FOLDER_KEY] - self.metadata_folder = self.app_folder + "/" + str(self.app_id) + def __init__(self, args): + self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], + args[Constant.WH_DB_USERNAME_KEY], + args[Constant.WH_DB_PASSWORD_KEY], + args[Constant.WH_DB_DRIVER_KEY]) + self.wh_cursor = self.wh_con.cursor() + self.app_id = int(args[Constant.APP_ID_KEY]) + self.wh_etl_exec_id = int(args[Constant.WH_EXEC_ID_KEY]) + self.app_folder = args[Constant.WH_APP_FOLDER_KEY] + self.metadata_folder = self.app_folder + "/" + str(self.app_id) - def run(self): - self.read_file_to_stg() - self.wh_cursor.close() - self.wh_con.close() + def run(self): + try: + self.read_file_to_stg() + finally: + self.wh_cursor.close() + self.wh_con.close() - def read_file_to_stg(self): - t = self._tables["source_code_commit"] + def read_file_to_stg(self): + t = self._tables["source_code_commit"] - # Clear stagging table - query = self._clear_staging_tempalte.format(table=t.get("table")) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + # Clear stagging table + query = self._clear_staging_tempalte.format(table=t.get("table")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + # Load file into stagging table + query = self._read_file_template.format(folder=self.metadata_folder, + file=t.get("file"), + table=t.get("table"), + columns=t.get("columns"), + app_id=self.app_id, + wh_etl_exec_id=self.wh_etl_exec_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - # Load file into stagging table - query = self._read_file_template.format(folder=self.metadata_folder, - file=t.get("file"), - table=t.get("table"), - columns=t.get("columns"), - app_id=self.app_id, - wh_etl_exec_id=self.wh_etl_exec_id) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() if __name__ == "__main__": - props = sys.argv[1] - ot = OwnerTransform(props) - ot.run() + props = sys.argv[1] + ot = OwnerTransform(props) + ot.run() diff --git a/metadata-etl/src/main/resources/jython/HdfsLoad.py b/metadata-etl/src/main/resources/jython/HdfsLoad.py index 4ca3505566..0b45d0cdb8 100644 --- a/metadata-etl/src/main/resources/jython/HdfsLoad.py +++ b/metadata-etl/src/main/resources/jython/HdfsLoad.py @@ -263,7 +263,9 @@ if __name__ == "__main__": l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) - l.load_metadata() - l.load_field() - l.load_sample() - l.conn_mysql.close() + try: + l.load_metadata() + l.load_field() + l.load_sample() + finally: + l.conn_mysql.close() diff --git a/metadata-etl/src/main/resources/jython/LdapExtract.py b/metadata-etl/src/main/resources/jython/LdapExtract.py index 0cc9531f19..ac61a59904 100644 --- a/metadata-etl/src/main/resources/jython/LdapExtract.py +++ b/metadata-etl/src/main/resources/jython/LdapExtract.py @@ -24,215 +24,211 @@ from java.io import FileWriter class LdapExtract: + def __init__(self, args): + self.args = args + self.app_id = int(args[Constant.APP_ID_KEY]) + self.group_app_id = int(args[Constant.LDAP_GROUP_APP_ID_KEY]) + self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY]) + self.app_folder = args[Constant.WH_APP_FOLDER_KEY] + self.metadata_folder = self.app_folder + "/" + str(self.app_id) + if not os.path.exists(self.metadata_folder): + try: + os.makedirs(self.metadata_folder) + except Exception as e: + print e - def __init__(self, args): - self.args = args - self.app_id = int(args[Constant.APP_ID_KEY]) - self.group_app_id = int(args[Constant.LDAP_GROUP_APP_ID_KEY]) - self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY]) - self.app_folder = args[Constant.WH_APP_FOLDER_KEY] - self.metadata_folder = self.app_folder + "/" + str(self.app_id) - if not os.path.exists(self.metadata_folder): - try: - os.makedirs(self.metadata_folder) - except Exception as e: - print e + self.ldap_user = set() + self.group_map = dict() + self.group_flatten_map = dict() - self.ldap_user = set() - self.group_map = dict() - self.group_flatten_map = dict() + def split_property(self, property_value): + return re.split('\s*\'\s*,\s*\'\s*', property_value.strip('\' \t\n\r\f\v')) - def split_property(self, property_value): - return re.split('\s*\'\s*,\s*\'\s*', property_value.strip('\' \t\n\r\f\v')) + def fetch_ldap_user(self, file): + """ + fetch ldap user from ldap server + :param file: output file name + """ - def fetch_ldap_user(self, file): - """ - fetch ldap user from ldap server - :param file: output file name - """ + # Setup LDAP Context Options + settings = Hashtable() + settings.put(Context.INITIAL_CONTEXT_FACTORY, self.args[Constant.LDAP_CONTEXT_FACTORY_KEY]) + settings.put(Context.PROVIDER_URL, self.args[Constant.LDAP_CONTEXT_PROVIDER_URL_KEY]) + settings.put(Context.SECURITY_PRINCIPAL, self.args[Constant.LDAP_CONTEXT_SECURITY_PRINCIPAL_KEY]) + settings.put(Context.SECURITY_CREDENTIALS, self.args[Constant.LDAP_CONTEXT_SECURITY_CREDENTIALS_KEY]) - # Setup LDAP Context Options - settings = Hashtable() - settings.put(Context.INITIAL_CONTEXT_FACTORY, self.args[Constant.LDAP_CONTEXT_FACTORY_KEY]) - settings.put(Context.PROVIDER_URL, self.args[Constant.LDAP_CONTEXT_PROVIDER_URL_KEY]) - settings.put(Context.SECURITY_PRINCIPAL, self.args[Constant.LDAP_CONTEXT_SECURITY_PRINCIPAL_KEY]) - settings.put(Context.SECURITY_CREDENTIALS, self.args[Constant.LDAP_CONTEXT_SECURITY_CREDENTIALS_KEY]) + # Connect to LDAP Server + ctx = InitialDirContext(settings) - # Connect to LDAP Server - ctx = InitialDirContext(settings) + # load the java Hashtable out of the ldap server + # Query starting point and query target + search_target = '(objectClass=person)' + return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number', 'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile'] + return_attributes_actual = self.split_property(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY]) + return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) - # load the java Hashtable out of the ldap server - # Query starting point and query target - search_target = '(objectClass=person)' - return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number', 'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile'] - return_attributes_actual = self.split_property(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY]) - return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) + ctls = SearchControls() + ctls.setReturningAttributes(return_attributes_actual) + ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) + ldap_records = [] - ctls = SearchControls() - ctls.setReturningAttributes(return_attributes_actual) - ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) - ldap_records = [] + # domain format should look like : 'OU=domain1','OU=domain2','OU=domain3,OU=subdomain3' + org_units = self.split_property(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY]) - # domain format should look like : 'OU=domain1','OU=domain2','OU=domain3,OU=subdomain3' - org_units = self.split_property(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY]) + for search_unit in org_units: + search_result = ctx.search(search_unit, search_target, ctls) - for search_unit in org_units: - search_result = ctx.search(search_unit, search_target, ctls) + # print search_return_attributes + for person in search_result: + ldap_user_tuple = [self.app_id] + if search_unit == self.args[Constant.LDAP_INACTIVE_DOMAIN_KEY]: + ldap_user_tuple.append('N') + else: + ldap_user_tuple.append('Y') + person_attributes = person.getAttributes() + user_id = person_attributes.get(return_attributes_map['user_id']) + user_id = re.sub(r"\r|\n", '', user_id.get(0)).strip().encode('utf8') + self.ldap_user.add(user_id) - # print search_return_attributes - for person in search_result: - ldap_user_tuple = [self.app_id] - if search_unit == self.args[Constant.LDAP_INACTIVE_DOMAIN_KEY]: - ldap_user_tuple.append('N') - else: - ldap_user_tuple.append('Y') - person_attributes = person.getAttributes() - user_id = person_attributes.get(return_attributes_map['user_id']) - user_id = re.sub(r"\r|\n", '', user_id.get(0)).strip().encode('utf8') - self.ldap_user.add(user_id) + for attr_name in return_attributes_actual: + attr = person_attributes.get(attr_name) + if attr: + attr = re.sub(r"\r|\n", '', attr.get(0)).strip().encode('utf8') + # special fix for start_date + if attr_name == return_attributes_map['start_date'] and len(attr) == 4: + attr += '0101' + ldap_user_tuple.append(attr) + else: + ldap_user_tuple.append("") - for attr_name in return_attributes_actual: - attr = person_attributes.get(attr_name) - if attr: - attr = re.sub(r"\r|\n", '', attr.get(0)).strip().encode('utf8') - # special fix for start_date - if attr_name == return_attributes_map['start_date'] and len(attr) == 4: - attr += '0101' - ldap_user_tuple.append(attr) - else: - ldap_user_tuple.append("") + ldap_user_tuple.append(self.wh_exec_id) + ldap_records.append(ldap_user_tuple) - ldap_user_tuple.append(self.wh_exec_id) - ldap_records.append(ldap_user_tuple) + print "%d records found in ldap search" % (len(self.ldap_user)) - print "%d records found in ldap search" % (len(self.ldap_user)) + csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n") + csv_writer.writerows(ldap_records) - csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n") - csv_writer.writerows(ldap_records) + def fetch_ldap_group(self, file): + """ + fetch group mapping from group ldap server + :param file: output file name + """ + settings = Hashtable() + settings.put(Context.INITIAL_CONTEXT_FACTORY, self.args[Constant.LDAP_GROUP_CONTEXT_FACTORY_KEY]) + settings.put(Context.PROVIDER_URL, self.args[Constant.LDAP_GROUP_CONTEXT_PROVIDER_URL_KEY]) + settings.put(Context.SECURITY_PRINCIPAL, self.args[Constant.LDAP_GROUP_CONTEXT_SECURITY_PRINCIPAL_KEY]) + settings.put(Context.SECURITY_CREDENTIALS, self.args[Constant.LDAP_GROUP_CONTEXT_SECURITY_CREDENTIALS_KEY]) - def fetch_ldap_group(self, file): - """ - fetch group mapping from group ldap server - :param file: output file name - """ - settings = Hashtable() - settings.put(Context.INITIAL_CONTEXT_FACTORY, self.args[Constant.LDAP_GROUP_CONTEXT_FACTORY_KEY]) - settings.put(Context.PROVIDER_URL, self.args[Constant.LDAP_GROUP_CONTEXT_PROVIDER_URL_KEY]) - settings.put(Context.SECURITY_PRINCIPAL, self.args[Constant.LDAP_GROUP_CONTEXT_SECURITY_PRINCIPAL_KEY]) - settings.put(Context.SECURITY_CREDENTIALS, self.args[Constant.LDAP_GROUP_CONTEXT_SECURITY_CREDENTIALS_KEY]) + ctx = InitialDirContext(settings) + search_target = "(objectClass=posixGroup)" + return_attributes_standard = ['group_id', 'member_ids'] + return_attributes_actual = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY]) + return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) + ctls = SearchControls() + ctls.setReturningAttributes(return_attributes_actual) + ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) - ctx = InitialDirContext(settings) - search_target = "(objectClass=posixGroup)" - return_attributes_standard = ['group_id', 'member_ids'] - return_attributes_actual = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY]) - return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) - ctls = SearchControls() - ctls.setReturningAttributes(return_attributes_actual) - ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) + ldap_records = [] + org_units = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY]) + for search_unit in org_units: + results = ctx.search(search_unit, search_target, ctls) + for r in results: + person_attributes = r.getAttributes() + group = person_attributes.get(return_attributes_map['group_id']).get(0) + group = re.sub(r"\r|\n", '', group).strip().encode('utf8') + # skip special group that contains all group users + if group == 'users': + continue + members = person_attributes.get(return_attributes_map['member_ids']) + if members: + self.group_map[group] = members + sort_id = 0 + for member in members.getAll(): + member = re.sub(r"\r|\n", '', member).strip().encode('utf8') + ldap_group_tuple = [self.group_app_id] + ldap_group_tuple.append(group) + ldap_group_tuple.append(sort_id) + if member in self.ldap_user: + ldap_group_tuple.append(self.app_id) + else: + ldap_group_tuple.append(self.group_app_id) + ldap_group_tuple.append(member) + ldap_group_tuple.append(self.wh_exec_id) + ldap_records.append(ldap_group_tuple) + sort_id += 1 + else: + pass + print "%d records found in group accounts" % (len(self.group_map)) - ldap_records = [] - org_units = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY]) - for search_unit in org_units: - results = ctx.search(search_unit, search_target, ctls) - for r in results: - person_attributes = r.getAttributes() - group = person_attributes.get(return_attributes_map['group_id']).get(0) - group = re.sub(r"\r|\n", '', group).strip().encode('utf8') - # skip special group that contains all group users - if group == 'users': - continue - members = person_attributes.get(return_attributes_map['member_ids']) - if members: - self.group_map[group] = members - sort_id = 0 - for member in members.getAll(): - member = re.sub(r"\r|\n", '', member).strip().encode('utf8') - ldap_group_tuple = [self.group_app_id] - ldap_group_tuple.append(group) - ldap_group_tuple.append(sort_id) - if member in self.ldap_user: - ldap_group_tuple.append(self.app_id) - else: - ldap_group_tuple.append(self.group_app_id) - ldap_group_tuple.append(member) - ldap_group_tuple.append(self.wh_exec_id) - ldap_records.append(ldap_group_tuple) - sort_id += 1 - else: - pass - print "%d records found in group accounts" % (len(self.group_map)) + csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n") + csv_writer.writerows(ldap_records) - csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n") - csv_writer.writerows(ldap_records) + def fetch_ldap_group_flatten(self, file): + """ + Flatten the group - user map by recursive extending inner-group members + :param file: output file name + """ + ldap_records = [] + for group in self.group_map: + all_users = self.get_all_users_for_group(group, self.ldap_user, self.group_map, set()) + self.group_flatten_map[group] = all_users + sort_id = 0 + for u in all_users: + ldap_group_flatten_tuple = [self.group_app_id] + ldap_group_flatten_tuple.append(group) + ldap_group_flatten_tuple.append(sort_id) + ldap_group_flatten_tuple.append(self.app_id) + ldap_group_flatten_tuple.append(u) + ldap_group_flatten_tuple.append(self.wh_exec_id) + ldap_records.append(ldap_group_flatten_tuple) + sort_id += 1 - def fetch_ldap_group_flatten(self, file): - """ - Flatten the group - user map by recursive extending inner-group members - :param file: output file name - """ - ldap_records = [] - for group in self.group_map: - all_users = self.get_all_users_for_group(group, self.ldap_user, self.group_map, set()) - self.group_flatten_map[group] = all_users - sort_id = 0 - for u in all_users: - ldap_group_flatten_tuple = [self.group_app_id] - ldap_group_flatten_tuple.append(group) - ldap_group_flatten_tuple.append(sort_id) - ldap_group_flatten_tuple.append(self.app_id) - ldap_group_flatten_tuple.append(u) - ldap_group_flatten_tuple.append(self.wh_exec_id) - ldap_records.append(ldap_group_flatten_tuple) - sort_id += 1 + csv_writer = csv.writer(open(file, "w"), delimiter='', quoting=csv.QUOTE_MINIMAL, lineterminator="\n") + csv_writer.writerows(ldap_records) - csv_writer = csv.writer(open(file, "w"), delimiter='', quoting=csv.QUOTE_MINIMAL, lineterminator="\n") - csv_writer.writerows(ldap_records) + def get_all_users_for_group(self, current, user_set, group_map, previous): + """ + Recursive method that calculate all users for current group + :param current: current group name + :param user_set: the user set that contains all user ids + :param group_map: the original group user map before extend + :param previous: previous visited group name + :return: ordered list of users + """ + ret = [] + # base condition + if current in user_set: + ret.append(current) + return ret - def get_all_users_for_group(self, current, user_set, group_map, previous): - """ - Recursive method that calculate all users for current group - :param current: current group name - :param user_set: the user set that contains all user ids - :param group_map: the original group user map before extend - :param previous: previous visited group name - :return: ordered list of users - """ - ret = [] - # base condition - if current in user_set: - ret.append(current) - return ret + # cyclic condition + if current in previous: + return ret - # cyclic condition - if current in previous: - return ret + # avoid duplicate computation + if current in self.group_flatten_map: + return self.group_flatten_map[current] - # avoid duplicate computation - if current in self.group_flatten_map: - return self.group_flatten_map[current] + # current is a group + if current in group_map: + members = group_map[current] + previous.add(current) + for member in members.getAll(): + member = re.sub(r"\r|\n", '', member).strip().encode('utf8') + next_ret = self.get_all_users_for_group(member, user_set, group_map, previous) + for i in next_ret: + if i not in ret: + ret.append(i) + return ret - # current is a group - if current in group_map: - members = group_map[current] - previous.add(current) - for member in members.getAll(): - member = re.sub(r"\r|\n", '', member).strip().encode('utf8') - next_ret = self.get_all_users_for_group(member, user_set, group_map, previous) - for i in next_ret: - if i not in ret: - ret.append(i) - return ret + def run(self): + self.fetch_ldap_user(self.metadata_folder + "/ldap_user_record.csv") + self.fetch_ldap_group(self.metadata_folder + "/ldap_group_record.csv") + self.fetch_ldap_group_flatten(self.metadata_folder + "/ldap_group_flatten_record.csv") - def run(self): - self.fetch_ldap_user(self.metadata_folder + "/ldap_user_record.csv") - self.fetch_ldap_group(self.metadata_folder + "/ldap_group_record.csv") - self.fetch_ldap_group_flatten(self.metadata_folder + "/ldap_group_flatten_record.csv") if __name__ == "__main__": - props = sys.argv[1] - ldap = LdapExtract(props) - ldap.run() - - - - + props = sys.argv[1] + ldap = LdapExtract(props) + ldap.run() diff --git a/metadata-etl/src/main/resources/jython/LdapLoad.py b/metadata-etl/src/main/resources/jython/LdapLoad.py index 3866ebc75f..0e0bdc289b 100644 --- a/metadata-etl/src/main/resources/jython/LdapLoad.py +++ b/metadata-etl/src/main/resources/jython/LdapLoad.py @@ -20,24 +20,25 @@ import sys class LdapLoad: + def __init__(self, args): + self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], + args[Constant.WH_DB_USERNAME_KEY], + args[Constant.WH_DB_PASSWORD_KEY], + args[Constant.WH_DB_DRIVER_KEY]) + self.wh_cursor = self.wh_con.cursor() + self.app_id = int(args[Constant.APP_ID_KEY]) + self.app_folder = args[Constant.WH_APP_FOLDER_KEY] + self.metadata_folder = self.app_folder + "/" + str(self.app_id) - def __init__(self, args): - self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], - args[Constant.WH_DB_USERNAME_KEY], - args[Constant.WH_DB_PASSWORD_KEY], - args[Constant.WH_DB_DRIVER_KEY]) - self.wh_cursor = self.wh_con.cursor() - self.app_id = int(args[Constant.APP_ID_KEY]) - self.app_folder = args[Constant.WH_APP_FOLDER_KEY] - self.metadata_folder = self.app_folder + "/" + str(self.app_id) + def run(self): + try: + self.load_from_stg() + finally: + self.wh_cursor.close() + self.wh_con.close() - def run(self): - self.load_from_stg() - self.wh_cursor.close() - self.wh_con.close() - - def load_from_stg(self): - query = """ + def load_from_stg(self): + query = """ INSERT INTO dir_external_user_info ( app_id, user_id, urn, full_name, display_name, title, employee_number, @@ -69,11 +70,11 @@ class LdapLoad: modified_time = unix_timestamp(NOW()), wh_etl_exec_id = s.wh_etl_exec_id """ - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - query = """ + query = """ INSERT INTO dir_external_group_user_map (app_id, group_id, sort_id, user_app_id, user_id, created_time, wh_etl_exec_id) SELECT app_id, group_id, sort_id, user_app_id, user_id, unix_timestamp(NOW()), wh_etl_exec_id @@ -82,11 +83,11 @@ class LdapLoad: modified_time = unix_timestamp(NOW()), wh_etl_exec_id = s.wh_etl_exec_id """ - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - query = """ + query = """ INSERT INTO dir_external_group_user_map_flatten (app_id, group_id, sort_id, user_app_id, user_id, created_time, wh_etl_exec_id) SELECT app_id, group_id, sort_id, user_app_id, user_id, unix_timestamp(NOW()), wh_etl_exec_id @@ -95,11 +96,12 @@ class LdapLoad: modified_time = unix_timestamp(NOW()), wh_etl_exec_id = s.wh_etl_exec_id """ - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + if __name__ == "__main__": - props = sys.argv[1] - lt = LdapLoad(props) - lt.run() + props = sys.argv[1] + lt = LdapLoad(props) + lt.run() diff --git a/metadata-etl/src/main/resources/jython/LdapTransform.py b/metadata-etl/src/main/resources/jython/LdapTransform.py index e4209f1943..562f664cee 100644 --- a/metadata-etl/src/main/resources/jython/LdapTransform.py +++ b/metadata-etl/src/main/resources/jython/LdapTransform.py @@ -20,31 +20,32 @@ import sys class LdapTransform: - _tables = {"ldap_user": {"columns": "app_id, is_active, user_id, urn, full_name, display_name, title, employee_number, manager_urn, email, department_id, department_name, start_date, mobile_phone, wh_etl_exec_id", - "file": "ldap_user_record.csv", - "table": "stg_dir_external_user_info", - "nullif_columns": - {"department_id": "''", - "employee_number": 0, - "start_date": "'0000-00-00'", - "manager_urn": "''", - "department_name": "''", - "mobile_phone": "''", - "email": "''", - "title": "''"} - }, - "ldap_group": {"columns": "app_id, group_id, sort_id, user_app_id, user_id, wh_etl_exec_id", - "file": "ldap_group_record.csv", - "table": "stg_dir_external_group_user_map", - "nullif_columns": {"user_id": "''"} - }, - "ldap_group_flatten": {"columns": "app_id, group_id, sort_id, user_app_id, user_id, wh_etl_exec_id", - "file": "ldap_group_flatten_record.csv", - "table": "stg_dir_external_group_user_map_flatten" - } - } + _tables = {"ldap_user": { + "columns": "app_id, is_active, user_id, urn, full_name, display_name, title, employee_number, manager_urn, email, department_id, department_name, start_date, mobile_phone, wh_etl_exec_id", + "file": "ldap_user_record.csv", + "table": "stg_dir_external_user_info", + "nullif_columns": + {"department_id": "''", + "employee_number": 0, + "start_date": "'0000-00-00'", + "manager_urn": "''", + "department_name": "''", + "mobile_phone": "''", + "email": "''", + "title": "''"} + }, + "ldap_group": {"columns": "app_id, group_id, sort_id, user_app_id, user_id, wh_etl_exec_id", + "file": "ldap_group_record.csv", + "table": "stg_dir_external_group_user_map", + "nullif_columns": {"user_id": "''"} + }, + "ldap_group_flatten": {"columns": "app_id, group_id, sort_id, user_app_id, user_id, wh_etl_exec_id", + "file": "ldap_group_flatten_record.csv", + "table": "stg_dir_external_group_user_map_flatten" + } + } - _read_file_template = """ + _read_file_template = """ LOAD DATA LOCAL INFILE '{folder}/{file}' INTO TABLE {table} FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0' @@ -52,13 +53,13 @@ class LdapTransform: ({columns}); """ - _update_column_to_null_template = """ + _update_column_to_null_template = """ UPDATE {table} stg SET {column} = NULL WHERE {column} = {column_value} and app_id = {app_id} """ - _update_manager_info = """ + _update_manager_info = """ update {table} stg join (select t1.app_id, t1.user_id, t1.employee_number, t2.user_id as manager_user_id, t2.employee_number as manager_employee_number from {table} t1 join {table} t2 on t1.manager_urn = t2.urn and t1.app_id = t2.app_id @@ -69,138 +70,142 @@ class LdapTransform: WHERE stg.app_id = {app_id} """ - _get_manager_edge = """ + _get_manager_edge = """ select user_id, manager_user_id from {table} stg where app_id = {app_id} """ - _update_hierarchy_info = """ + _update_hierarchy_info = """ update {table} stg set org_hierarchy = CASE {org_hierarchy_long_string} END, org_hierarchy_depth = CASE {org_hierarchy_depth_long_string} END where app_id = {app_id} and user_id in ({user_ids}) """ - _update_hierarchy_info_per_row = """ + _update_hierarchy_info_per_row = """ update {table} stg set org_hierarchy = '{org_hierarchy}', org_hierarchy_depth = {org_hierarchy_depth} where app_id = {app_id} and user_id = '{user_id}' """ - _clear_staging_tempalte = """ + _clear_staging_tempalte = """ DELETE FROM {table} where app_id = {app_id} """ - def __init__(self, args): - self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], - args[Constant.WH_DB_USERNAME_KEY], - args[Constant.WH_DB_PASSWORD_KEY], - args[Constant.WH_DB_DRIVER_KEY]) - self.wh_cursor = self.wh_con.cursor() - self.app_id = int(args[Constant.APP_ID_KEY]) - self.group_app_id = int(args[Constant.LDAP_GROUP_APP_ID_KEY]) - self.app_folder = args[Constant.WH_APP_FOLDER_KEY] - self.metadata_folder = self.app_folder + "/" + str(self.app_id) - self.ceo_user_id = args[Constant.LDAP_CEO_USER_ID_KEY] + def __init__(self, args): + self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], + args[Constant.WH_DB_USERNAME_KEY], + args[Constant.WH_DB_PASSWORD_KEY], + args[Constant.WH_DB_DRIVER_KEY]) + self.wh_cursor = self.wh_con.cursor() + self.app_id = int(args[Constant.APP_ID_KEY]) + self.group_app_id = int(args[Constant.LDAP_GROUP_APP_ID_KEY]) + self.app_folder = args[Constant.WH_APP_FOLDER_KEY] + self.metadata_folder = self.app_folder + "/" + str(self.app_id) + self.ceo_user_id = args[Constant.LDAP_CEO_USER_ID_KEY] - def run(self): - self.read_file_to_stg() - self.update_null_value() - self.update_manager_info() - self.update_hierarchy_info() - self.wh_cursor.close() - self.wh_con.close() + def run(self): + try: + self.read_file_to_stg() + self.update_null_value() + self.update_manager_info() + self.update_hierarchy_info() + finally: + self.wh_cursor.close() + self.wh_con.close() - def read_file_to_stg(self): + def read_file_to_stg(self): - for table in self._tables: - t = self._tables[table] - # Clear stagging table - query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + for table in self._tables: + t = self._tables[table] + # Clear stagging table + query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - # Load file into stagging table - query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns")) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + # Load file into stagging table + query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - def update_null_value(self): - for table in self._tables: - t = self._tables[table] - if 'nullif_columns' in t: - for column in t['nullif_columns']: - query = self._update_column_to_null_template.format(table=t.get("table"), column=column, column_value=t['nullif_columns'][column], app_id=self.app_id) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + def update_null_value(self): + for table in self._tables: + t = self._tables[table] + if 'nullif_columns' in t: + for column in t['nullif_columns']: + query = self._update_column_to_null_template.format(table=t.get("table"), column=column, column_value=t['nullif_columns'][column], app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - def update_manager_info(self): - t = self._tables["ldap_user"] - query = self._update_manager_info.format(table=t.get("table"), app_id=self.app_id) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + def update_manager_info(self): + t = self._tables["ldap_user"] + query = self._update_manager_info.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - def update_hierarchy_info(self): - t = self._tables["ldap_user"] - query = self._get_manager_edge.format(table=t.get("table"), app_id=self.app_id) - print query - self.wh_cursor.execute(query) - pair = dict() - hierarchy = dict() + def update_hierarchy_info(self): + t = self._tables["ldap_user"] + query = self._get_manager_edge.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + pair = dict() + hierarchy = dict() - for row in self.wh_cursor: - pair[row[0]] = row[1] + for row in self.wh_cursor: + pair[row[0]] = row[1] - for user in pair: - self.find_path_for_user(user, pair, hierarchy) + for user in pair: + self.find_path_for_user(user, pair, hierarchy) - case_org_hierarchy_template = " WHEN user_id = '{user_id}' THEN '{org_hierarchy}' " - case_org_hierarchy_depth_template = " WHEN user_id = '{user_id}' THEN {org_hierarchy_depth} " - user_ids = [] - org_hierarchy_long_string = "" - org_hierarchy_depth_long_string = "" - count = 0 - for user in hierarchy: - if hierarchy[user] is not None: - user_ids.append("'" + user + "'") - org_hierarchy_long_string += case_org_hierarchy_template.format(user_id=user, org_hierarchy=hierarchy[user][0]) - org_hierarchy_depth_long_string += case_org_hierarchy_depth_template.format(user_id=user, org_hierarchy_depth=hierarchy[user][1]) - count += 1 - if count % 1000 == 0: - query = self._update_hierarchy_info.format(table=t.get("table"), app_id=self.app_id, user_ids=",".join(user_ids), org_hierarchy_long_string=org_hierarchy_long_string, org_hierarchy_depth_long_string=org_hierarchy_depth_long_string) - # print query - self.wh_cursor.executemany(query) - user_ids = [] - org_hierarchy_long_string = "" - org_hierarchy_depth_long_string = "" - self.wh_con.commit() + case_org_hierarchy_template = " WHEN user_id = '{user_id}' THEN '{org_hierarchy}' " + case_org_hierarchy_depth_template = " WHEN user_id = '{user_id}' THEN {org_hierarchy_depth} " + user_ids = [] + org_hierarchy_long_string = "" + org_hierarchy_depth_long_string = "" + count = 0 + for user in hierarchy: + if hierarchy[user] is not None: + user_ids.append("'" + user + "'") + org_hierarchy_long_string += case_org_hierarchy_template.format(user_id=user, org_hierarchy=hierarchy[user][0]) + org_hierarchy_depth_long_string += case_org_hierarchy_depth_template.format(user_id=user, org_hierarchy_depth=hierarchy[user][1]) + count += 1 + if count % 1000 == 0: + query = self._update_hierarchy_info.format(table=t.get("table"), app_id=self.app_id, user_ids=",".join(user_ids), org_hierarchy_long_string=org_hierarchy_long_string, + org_hierarchy_depth_long_string=org_hierarchy_depth_long_string) + # print query + self.wh_cursor.executemany(query) + user_ids = [] + org_hierarchy_long_string = "" + org_hierarchy_depth_long_string = "" + self.wh_con.commit() - def find_path_for_user(self, start, pair, hierarchy): - if start in hierarchy: - return hierarchy[start] + def find_path_for_user(self, start, pair, hierarchy): + if start in hierarchy: + return hierarchy[start] - if start == self.ceo_user_id: - return "/" + start, 0 + if start == self.ceo_user_id: + return "/" + start, 0 - if start is None: - return None + if start is None: + return None - next = self.find_path_for_user(pair[start], pair, hierarchy) + next = self.find_path_for_user(pair[start], pair, hierarchy) - if next: - current = next[0] + "/" + start, next[1] + 1 - else: - current = None + if next: + current = next[0] + "/" + start, next[1] + 1 + else: + current = None + + hierarchy[start] = current + return current - hierarchy[start] = current - return current if __name__ == "__main__": - props = sys.argv[1] - lt = LdapTransform(props) - lt.run() + props = sys.argv[1] + lt = LdapTransform(props) + lt.run() diff --git a/metadata-etl/src/main/resources/jython/OozieExtract.py b/metadata-etl/src/main/resources/jython/OozieExtract.py index b508c7a0fb..1d0cfccb44 100644 --- a/metadata-etl/src/main/resources/jython/OozieExtract.py +++ b/metadata-etl/src/main/resources/jython/OozieExtract.py @@ -63,15 +63,17 @@ class OozieExtract: print "Oozie version: ", self.oz_version[0] def run(self): - self.collect_flow_jobs(self.metadata_folder + "/flow.csv", - self.metadata_folder + "/job.csv", - self.metadata_folder + "/dag.csv") - self.collect_flow_owners(self.metadata_folder + "/owner.csv") - self.collect_flow_schedules(self.metadata_folder + "/schedule.csv") - self.collect_flow_execs(self.metadata_folder + "/flow_exec.csv", self.lookback_period) - self.collect_job_execs(self.metadata_folder + "/job_exec.csv", self.lookback_period) - self.oz_cursor.close() - self.oz_con.close() + try: + self.collect_flow_jobs(self.metadata_folder + "/flow.csv", + self.metadata_folder + "/job.csv", + self.metadata_folder + "/dag.csv") + self.collect_flow_owners(self.metadata_folder + "/owner.csv") + self.collect_flow_schedules(self.metadata_folder + "/schedule.csv") + self.collect_flow_execs(self.metadata_folder + "/flow_exec.csv", self.lookback_period) + self.collect_job_execs(self.metadata_folder + "/job_exec.csv", self.lookback_period) + finally: + self.oz_cursor.close() + self.oz_con.close() def collect_flow_jobs(self, flow_file, job_file, dag_file): print "collect flow&jobs" diff --git a/metadata-etl/src/main/resources/jython/OwnerLoad.py b/metadata-etl/src/main/resources/jython/OwnerLoad.py index 6d283505b2..8e5d130db9 100644 --- a/metadata-etl/src/main/resources/jython/OwnerLoad.py +++ b/metadata-etl/src/main/resources/jython/OwnerLoad.py @@ -22,25 +22,48 @@ import sys class OwnerLoad: + def __init__(self, args): + self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], + args[Constant.WH_DB_USERNAME_KEY], + args[Constant.WH_DB_PASSWORD_KEY], + args[Constant.WH_DB_DRIVER_KEY]) + self.wh_cursor = self.wh_con.cursor() + self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY]) + self.app_folder = args[Constant.WH_APP_FOLDER_KEY] - def __init__(self, args): - self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], - args[Constant.WH_DB_USERNAME_KEY], - args[Constant.WH_DB_PASSWORD_KEY], - args[Constant.WH_DB_DRIVER_KEY]) - self.wh_cursor = self.wh_con.cursor() - self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY]) - self.app_folder = args[Constant.WH_APP_FOLDER_KEY] + def run(self): + try: + cmd = """ + INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id) + SELECT * FROM (SELECT dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, group_concat(db_id ORDER BY db_id SEPARATOR ",") db_ids, is_group, is_active, source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id} + FROM stg_dataset_owner s + WHERE s.dataset_id is not null and s.owner_id is not null and s.owner_id != '' and s.app_id is not null + GROUP BY s.dataset_id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb + ON DUPLICATE KEY UPDATE + dataset_urn = sb.dataset_urn, + sort_id = COALESCE(@sort_id, sb.sort_id), + owner_type = COALESCE(@owner_type, sb.owner_type), + owner_sub_type = COALESCE(@owner_sub_type, sb.owner_sub_type), + app_id = sb.app_id, + is_active = sb.is_active, + db_ids = sb.db_ids, + source_time = sb.source_time, + wh_etl_exec_id = {wh_etl_exec_id}, + modified_time = unix_timestamp(NOW()) + """.format(wh_etl_exec_id=self.wh_exec_id) + print cmd + self.wh_cursor.execute(cmd) + self.wh_con.commit() - def run(self): - cmd = """ + # matching parent level urns + template = """ INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id) - SELECT * FROM (SELECT dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, group_concat(db_id ORDER BY db_id SEPARATOR ",") db_ids, is_group, is_active, source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id} - FROM stg_dataset_owner s - WHERE s.dataset_id is not null and s.owner_id is not null and s.owner_id != '' and s.app_id is not null - GROUP BY s.dataset_id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb + select * FROM (select distinct d.id, d.urn, s.owner_id, s.sort_id, s.namespace, s.app_id, s.owner_type, owner_sub_type, group_concat(s.db_id ORDER BY db_id SEPARATOR ",") db_ids, s.is_group, s.is_active, s.source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id} + from stg_dataset_owner s join dict_dataset d on s.dataset_urn = substring(d.urn, 1, char_length(d.urn) - char_length(substring_index(d.urn, '/', -{lvl})) - 1) + WHERE s.owner_id is not null and s.owner_id != '' and s.app_id is not null + group by d.id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb ON DUPLICATE KEY UPDATE - dataset_urn = sb.dataset_urn, + dataset_urn = sb.urn, sort_id = COALESCE(@sort_id, sb.sort_id), owner_type = COALESCE(@owner_type, sb.owner_type), owner_sub_type = COALESCE(@owner_sub_type, sb.owner_sub_type), @@ -50,68 +73,46 @@ class OwnerLoad: source_time = sb.source_time, wh_etl_exec_id = {wh_etl_exec_id}, modified_time = unix_timestamp(NOW()) - """.format(wh_etl_exec_id=self.wh_exec_id) + """ + + for l in range(1, 6): + cmd = template.format(wh_etl_exec_id=self.wh_exec_id, lvl=l) print cmd self.wh_cursor.execute(cmd) self.wh_con.commit() - # matching parent level urns - template = """ - INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id) - select * FROM (select distinct d.id, d.urn, s.owner_id, s.sort_id, s.namespace, s.app_id, s.owner_type, owner_sub_type, group_concat(s.db_id ORDER BY db_id SEPARATOR ",") db_ids, s.is_group, s.is_active, s.source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id} - from stg_dataset_owner s join dict_dataset d on s.dataset_urn = substring(d.urn, 1, char_length(d.urn) - char_length(substring_index(d.urn, '/', -{lvl})) - 1) - WHERE s.owner_id is not null and s.owner_id != '' and s.app_id is not null - group by d.id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb - ON DUPLICATE KEY UPDATE - dataset_urn = sb.urn, - sort_id = COALESCE(@sort_id, sb.sort_id), - owner_type = COALESCE(@owner_type, sb.owner_type), - owner_sub_type = COALESCE(@owner_sub_type, sb.owner_sub_type), - app_id = sb.app_id, - is_active = sb.is_active, - db_ids = sb.db_ids, - source_time = sb.source_time, - wh_etl_exec_id = {wh_etl_exec_id}, - modified_time = unix_timestamp(NOW()) - """ + # put all unmatched dataset in to another table for future reference - for l in range(1, 6): - cmd = template.format(wh_etl_exec_id=self.wh_exec_id, lvl=l) - print cmd - self.wh_cursor.execute(cmd) - self.wh_con.commit() + cmd = """ + INSERT INTO stg_dataset_owner_unmatched (dataset_urn, owner_id, sort_id, app_id, namespace, owner_type, owner_sub_type, is_group, db_name, db_id, is_active, source_time) + SELECT dataset_urn, owner_id, sort_id, app_id, namespace, owner_type, owner_sub_type, is_group, db_name, db_id, is_active, source_time + FROM stg_dataset_owner s where dataset_id is null and is_parent_urn = 'N' + ON DUPLICATE KEY UPDATE + sort_id = s.sort_id, + owner_type = s.owner_type, + owner_sub_type = s.owner_sub_type, + is_active = s.is_active, + source_time = s.source_time; + """ + self.wh_cursor.execute(cmd) + self.wh_con.commit() - # put all unmatched dataset in to another table for future reference + # delete the entries that matched with dataset id in this round - cmd = """ - INSERT INTO stg_dataset_owner_unmatched (dataset_urn, owner_id, sort_id, app_id, namespace, owner_type, owner_sub_type, is_group, db_name, db_id, is_active, source_time) - SELECT dataset_urn, owner_id, sort_id, app_id, namespace, owner_type, owner_sub_type, is_group, db_name, db_id, is_active, source_time - FROM stg_dataset_owner s where dataset_id is null and is_parent_urn = 'N' - ON DUPLICATE KEY UPDATE - sort_id = s.sort_id, - owner_type = s.owner_type, - owner_sub_type = s.owner_sub_type, - is_active = s.is_active, - source_time = s.source_time; - """ - self.wh_cursor.execute(cmd) - self.wh_con.commit() + cmd = """ + DELETE u FROM stg_dataset_owner_unmatched u + JOIN (SELECT DISTINCT dataset_urn, dataset_id FROM stg_dataset_owner) s + ON u.dataset_urn = s.dataset_urn + WHERE s.dataset_id IS NOT NULL; + """ + self.wh_cursor.execute(cmd) + self.wh_con.commit() + finally: + self.wh_cursor.close() + self.wh_con.close() - # delete the entries that matched with dataset id in this round - - cmd = """ - DELETE u FROM stg_dataset_owner_unmatched u - JOIN (SELECT DISTINCT dataset_urn, dataset_id FROM stg_dataset_owner) s - ON u.dataset_urn = s.dataset_urn - WHERE s.dataset_id IS NOT NULL; - """ - self.wh_cursor.execute(cmd) - self.wh_con.commit() - - self.wh_cursor.close() - self.wh_con.close() if __name__ == "__main__": - props = sys.argv[1] - ot = OwnerLoad(props) - ot.run() + props = sys.argv[1] + ot = OwnerLoad(props) + ot.run() diff --git a/metadata-etl/src/main/resources/jython/OwnerTransform.py b/metadata-etl/src/main/resources/jython/OwnerTransform.py index dbce8dd8ce..baad79cc81 100644 --- a/metadata-etl/src/main/resources/jython/OwnerTransform.py +++ b/metadata-etl/src/main/resources/jython/OwnerTransform.py @@ -20,16 +20,16 @@ import sys class OwnerTransform: - _tables = {"dataset_owner": {"columns": "dataset_urn, owner_id, sort_id, namespace, db_name, source_time", - "file": "dataset_owner.csv", - "table": "stg_dataset_owner"} - } + _tables = {"dataset_owner": {"columns": "dataset_urn, owner_id, sort_id, namespace, db_name, source_time", + "file": "dataset_owner.csv", + "table": "stg_dataset_owner"} + } - _clear_staging_tempalte = """ + _clear_staging_tempalte = """ DELETE FROM {table} """ - _read_file_template = """ + _read_file_template = """ LOAD DATA LOCAL INFILE '{folder}/{file}' INTO TABLE {table} FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0' @@ -37,21 +37,21 @@ class OwnerTransform: ({columns}); """ - _update_dataset_id_template = """ + _update_dataset_id_template = """ UPDATE {table} stg JOIN dict_dataset dd ON stg.dataset_urn = dd.urn SET stg.dataset_id = dd.id """ - _update_database_id_template = """ + _update_database_id_template = """ UPDATE {table} stg JOIN cfg_database cd ON stg.db_name = cd.db_code SET stg.db_id = cd.db_id """ - _update_app_id_template = """ + _update_app_id_template = """ UPDATE {table} stg join dir_external_user_info ldap on stg.owner_id = ldap.user_id @@ -60,7 +60,7 @@ class OwnerTransform: stg.is_active = ldap.is_active """ - _update_group_app_id_template = """ + _update_group_app_id_template = """ UPDATE {table} stg join dir_external_group_user_map ldap on stg.owner_id = ldap.group_id @@ -69,7 +69,7 @@ class OwnerTransform: stg.is_active = 'Y' """ - _update_owner_type_template = """ + _update_owner_type_template = """ UPDATE {table} stg join dir_external_user_info ldap on stg.owner_id = ldap.user_id @@ -77,88 +77,91 @@ class OwnerTransform: stg.owner_sub_type = CASE WHEN ldap.department_id = 4020 THEN 'DWH' ELSE 'BA' END """ - _update_parent_flag = """ + _update_parent_flag = """ update {table} s join dict_dataset d on s.dataset_urn = substring(d.urn, 1, char_length(d.urn) - char_length(substring_index(d.urn, '/', -{lvl})) - 1) set s.is_parent_urn = 'Y' """ - def __init__(self, args): - self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], - args[Constant.WH_DB_USERNAME_KEY], - args[Constant.WH_DB_PASSWORD_KEY], - args[Constant.WH_DB_DRIVER_KEY]) - self.wh_cursor = self.wh_con.cursor() - self.db_id = int(args[Constant.DB_ID_KEY]) - self.app_folder = args[Constant.WH_APP_FOLDER_KEY] - self.metadata_folder = self.app_folder + "/" + str(self.db_id) + def __init__(self, args): + self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], + args[Constant.WH_DB_USERNAME_KEY], + args[Constant.WH_DB_PASSWORD_KEY], + args[Constant.WH_DB_DRIVER_KEY]) + self.wh_cursor = self.wh_con.cursor() + self.db_id = int(args[Constant.DB_ID_KEY]) + self.app_folder = args[Constant.WH_APP_FOLDER_KEY] + self.metadata_folder = self.app_folder + "/" + str(self.db_id) - def run(self): - self.read_file_to_stg() - self.update_dataset_id() - self.update_database_id() - self.update_app_id() - self.update_owner_type() - self.wh_cursor.close() - self.wh_con.close() + def run(self): + try: + self.read_file_to_stg() + self.update_dataset_id() + self.update_database_id() + self.update_app_id() + self.update_owner_type() + finally: + self.wh_cursor.close() + self.wh_con.close() - def read_file_to_stg(self): - t = self._tables["dataset_owner"] + def read_file_to_stg(self): + t = self._tables["dataset_owner"] - # Clear stagging table - query = self._clear_staging_tempalte.format(table=t.get("table")) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + # Clear stagging table + query = self._clear_staging_tempalte.format(table=t.get("table")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - # Load file into stagging table - query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns")) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + # Load file into stagging table + query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - def update_dataset_id(self): - t = self._tables["dataset_owner"] - query = self._update_dataset_id_template.format(table=t.get("table")) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + def update_dataset_id(self): + t = self._tables["dataset_owner"] + query = self._update_dataset_id_template.format(table=t.get("table")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - def update_database_id(self): - t = self._tables["dataset_owner"] - query = self._update_database_id_template.format(table=t.get("table")) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + def update_database_id(self): + t = self._tables["dataset_owner"] + query = self._update_database_id_template.format(table=t.get("table")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - def update_app_id(self): - t = self._tables["dataset_owner"] - query = self._update_app_id_template.format(table=t.get("table")) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + def update_app_id(self): + t = self._tables["dataset_owner"] + query = self._update_app_id_template.format(table=t.get("table")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - query = self._update_group_app_id_template.format(table=t.get("table")) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + query = self._update_group_app_id_template.format(table=t.get("table")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - def update_owner_type(self): - t = self._tables["dataset_owner"] - query = self._update_owner_type_template.format(table=t.get("table")) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() + def update_owner_type(self): + t = self._tables["dataset_owner"] + query = self._update_owner_type_template.format(table=t.get("table")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + def update_parent_flag(self): + t = self._tables["dataset_owner"] + for l in range(1, 6): + query = self._update_parent_flag.format(table=t.get("table"), lvl=l) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() - def update_parent_flag(self): - t = self._tables["dataset_owner"] - for l in range(1, 6): - query = self._update_parent_flag.format(table=t.get("table"), lvl=l) - print query - self.wh_cursor.execute(query) - self.wh_con.commit() if __name__ == "__main__": - props = sys.argv[1] - ot = OwnerTransform(props) - ot.run() + props = sys.argv[1] + ot = OwnerTransform(props) + ot.run() diff --git a/metadata-etl/src/main/resources/jython/SchedulerLoad.py b/metadata-etl/src/main/resources/jython/SchedulerLoad.py index 75d04e473a..9b7dea8476 100644 --- a/metadata-etl/src/main/resources/jython/SchedulerLoad.py +++ b/metadata-etl/src/main/resources/jython/SchedulerLoad.py @@ -32,15 +32,17 @@ class SchedulerLoad: self.wh_cursor = self.wh_con.cursor() def run(self): - self.load_flows() - self.load_jobs() - self.load_flow_dags() - self.load_flow_schedules() - self.load_flow_owner_permissions() - self.load_flow_executions() - self.load_job_executions() - self.wh_cursor.close() - self.wh_con.close() + try: + self.load_flows() + self.load_jobs() + self.load_flow_dags() + self.load_flow_schedules() + self.load_flow_owner_permissions() + self.load_flow_executions() + self.load_job_executions() + finally: + self.wh_cursor.close() + self.wh_con.close() def load_flows(self): cmd = """ diff --git a/metadata-etl/src/main/resources/jython/SchedulerTransform.py b/metadata-etl/src/main/resources/jython/SchedulerTransform.py index 92356c5f5b..09dfdf7971 100644 --- a/metadata-etl/src/main/resources/jython/SchedulerTransform.py +++ b/metadata-etl/src/main/resources/jython/SchedulerTransform.py @@ -84,6 +84,7 @@ class SchedulerTransform: self.metadata_folder = self.app_folder + "/" + str(scheduler_type) + "/" + str(self.app_id) def run(self): + try: self.read_flow_file_to_stg() self.read_job_file_to_stg() self.read_dag_file_to_stg() @@ -91,6 +92,7 @@ class SchedulerTransform: self.read_flow_schedule_file_to_stg() self.read_flow_exec_file_to_stg() self.read_job_exec_file_to_stg() + finally: self.wh_cursor.close() self.wh_con.close() diff --git a/metadata-etl/src/main/resources/jython/TeradataExtract.py b/metadata-etl/src/main/resources/jython/TeradataExtract.py index 364a7111b3..f21ce9e012 100644 --- a/metadata-etl/src/main/resources/jython/TeradataExtract.py +++ b/metadata-etl/src/main/resources/jython/TeradataExtract.py @@ -538,14 +538,16 @@ if __name__ == "__main__": e = TeradataExtract() e.conn_td = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) - e.conn_td.cursor().execute("SET QUERY_BAND = 'script=%s; pid=%d; ' FOR SESSION;" % ('TeradataExtract.py', os.getpid())) - e.conn_td.commit() - e.log_file = args[Constant.TD_LOG_KEY] - e.databases = args[Constant.TD_TARGET_DATABASES_KEY].split(',') - e.default_database = args[Constant.TD_DEFAULT_DATABASE_KEY] - index_type = {'P': 'Primary Index', 'K': 'Primary Key', 'S': 'Secondary Index', 'Q': 'Partitioned Primary Index', - 'J': 'Join Index', 'U': 'Unique Index'} + try: + e.conn_td.cursor().execute("SET QUERY_BAND = 'script=%s; pid=%d; ' FOR SESSION;" % ('TeradataExtract.py', os.getpid())) + e.conn_td.commit() + e.log_file = args[Constant.TD_LOG_KEY] + e.databases = args[Constant.TD_TARGET_DATABASES_KEY].split(',') + e.default_database = args[Constant.TD_DEFAULT_DATABASE_KEY] + index_type = {'P': 'Primary Index', 'K': 'Primary Key', 'S': 'Secondary Index', 'Q': 'Partitioned Primary Index', + 'J': 'Join Index', 'U': 'Unique Index'} - e.run(None, None, args[Constant.TD_SCHEMA_OUTPUT_KEY], args[Constant.TD_SAMPLE_OUTPUT_KEY]) - e.conn_td.close() + e.run(None, None, args[Constant.TD_SCHEMA_OUTPUT_KEY], args[Constant.TD_SAMPLE_OUTPUT_KEY]) + finally: + e.conn_td.close() diff --git a/metadata-etl/src/main/resources/jython/TeradataLoad.py b/metadata-etl/src/main/resources/jython/TeradataLoad.py index e62cdf5f13..e001e4c9fb 100644 --- a/metadata-etl/src/main/resources/jython/TeradataLoad.py +++ b/metadata-etl/src/main/resources/jython/TeradataLoad.py @@ -253,7 +253,9 @@ if __name__ == "__main__": l.db_id = args[Constant.DB_ID_KEY] l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY] l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER) - l.load_metadata() - l.load_field() - l.load_sample() - l.conn_mysql.close() + try: + l.load_metadata() + l.load_field() + l.load_sample() + finally: + l.conn_mysql.close()