From d6f77a6f06e1948589218215fe39a01ef69683ac Mon Sep 17 00:00:00 2001 From: Zhen Chen Date: Wed, 16 Dec 2015 17:01:55 -0800 Subject: [PATCH 1/2] fix ldap org hierachy is empty bug and minor changes --- build.gradle | 2 ++ data-model/DDL/ETL_DDL/git_metadata.sql | 14 ++++++++++++++ .../src/main/resources/jython/LdapTransform.py | 5 +++++ wherehows-common/build.gradle | 1 + .../wherehows/common/schemas/GitCommitRecord.java | 2 +- .../main/java/wherehows/common/utils/GitUtil.java | 14 +++++++++++++- 6 files changed, 36 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 2b476498ce..f174a19639 100644 --- a/build.gradle +++ b/build.gradle @@ -52,6 +52,8 @@ subprojects { "akka" : "com.typesafe.akka:akka-actor_2.10:2.2.0", "jgit" : "org.eclipse.jgit:org.eclipse.jgit:4.1.1.201511131810-r", "jsoup" : "org.jsoup:jsoup:1.8.3", + "commons_io" : "commons-io:commons-io:2.4", + "jackson_databind" : "com.fasterxml.jackson.core:jackson-databind:2.6.1", "jackson_core" : "com.fasterxml.jackson.core:jackson-core:2.6.1", diff --git a/data-model/DDL/ETL_DDL/git_metadata.sql b/data-model/DDL/ETL_DDL/git_metadata.sql index df9bf86738..dee56a1af9 100644 --- a/data-model/DDL/ETL_DDL/git_metadata.sql +++ b/data-model/DDL/ETL_DDL/git_metadata.sql @@ -1,3 +1,17 @@ +-- +-- Copyright 2015 LinkedIn Corp. All rights reserved. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- + CREATE TABLE `source_code_commit_info` ( `app_id` SMALLINT(5) UNSIGNED DEFAULT NULL, `repository_urn` VARCHAR(300) CHAR SET latin1 NOT NULL COMMENT 'the git repo urn', diff --git a/metadata-etl/src/main/resources/jython/LdapTransform.py b/metadata-etl/src/main/resources/jython/LdapTransform.py index 562f664cee..b3e3b2e875 100644 --- a/metadata-etl/src/main/resources/jython/LdapTransform.py +++ b/metadata-etl/src/main/resources/jython/LdapTransform.py @@ -182,6 +182,11 @@ class LdapTransform: user_ids = [] org_hierarchy_long_string = "" org_hierarchy_depth_long_string = "" + + query = self._update_hierarchy_info.format(table=t.get("table"), app_id=self.app_id, user_ids=",".join(user_ids), org_hierarchy_long_string=org_hierarchy_long_string, + org_hierarchy_depth_long_string=org_hierarchy_depth_long_string) + # print query + self.wh_cursor.executemany(query) self.wh_con.commit() def find_path_for_user(self, start, pair, hierarchy): diff --git a/wherehows-common/build.gradle b/wherehows-common/build.gradle index 2e8baed9a7..0afb9e00af 100644 --- a/wherehows-common/build.gradle +++ b/wherehows-common/build.gradle @@ -7,6 +7,7 @@ dependencies { compile externalDependency.spring_jdbc compile externalDependency.jgit compile externalDependency.jsoup + compile externalDependency.commons_io testCompile externalDependency.testng testCompile project(":metadata-etl") } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/GitCommitRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/GitCommitRecord.java index 04a4104374..b84eb635a4 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/GitCommitRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/GitCommitRecord.java @@ -41,7 +41,7 @@ public class GitCommitRecord extends AbstractRecord { this.gitRepoUrn = gitRepoUrn; this.commitId = commitMetadata.getCommitId(); this.filePath = commitMetadata.getFilePath(); - this.fileName = FilenameUtils.getName(this.filePath); + this.fileName = commitMetadata.getFileName(); this.commitTime = commitMetadata.getCommitTime().getTime() / 1000; this.committerName = commitMetadata.getCommitter(); this.committerEmail = commitMetadata.getCommitterEmail(); diff --git a/wherehows-common/src/main/java/wherehows/common/utils/GitUtil.java b/wherehows-common/src/main/java/wherehows/common/utils/GitUtil.java index 01054d75b1..990f2f363e 100644 --- a/wherehows-common/src/main/java/wherehows/common/utils/GitUtil.java +++ b/wherehows-common/src/main/java/wherehows/common/utils/GitUtil.java @@ -22,6 +22,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.api.errors.GitAPIException; import org.eclipse.jgit.lib.Constants; @@ -134,6 +135,7 @@ public class GitUtil { for (RevCommit r : commitLog) { CommitMetadata metadata = new CommitMetadata(r.getName()); metadata.setFilePath(filePath); + metadata.setFileName(FilenameUtils.getName(filePath)); metadata.setMessage(r.getShortMessage().trim()); // Difference between committer and author // refer to: http://git-scm.com/book/ch2-3.html @@ -172,6 +174,7 @@ public class GitUtil { String committerEmail; String authorEmail; String filePath; + String fileName; public CommitMetadata() { } @@ -181,7 +184,7 @@ public class GitUtil { } public CommitMetadata(String commitId, String author, String committer, Date commitTime, String message, - String committerEmail, String authorEmail, String filePath) { + String committerEmail, String authorEmail, String filePath, String fileName) { this.commitId = commitId; this.author = author; this.committer = committer; @@ -190,6 +193,7 @@ public class GitUtil { this.committerEmail = committerEmail; this.authorEmail = authorEmail; this.filePath = filePath; + this.fileName = fileName; } public String getCommitId() { @@ -255,6 +259,14 @@ public class GitUtil { public void setFilePath(String filePath) { this.filePath = filePath; } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } } } From 7fca60a527d32335f251b4ed2bc5d9779a812e54 Mon Sep 17 00:00:00 2001 From: Zhen Chen Date: Thu, 17 Dec 2015 16:26:15 -0800 Subject: [PATCH 2/2] add ref flow id for sub flow jobs --- data-model/DDL/ETL_DDL/executor_metadata.sql | 5 ++ .../main/resources/jython/AzkabanExtract.py | 2 + .../main/resources/jython/AzkabanTransform.py | 2 +- .../main/resources/jython/SchedulerLoad.py | 5 +- .../resources/jython/SchedulerTransform.py | 21 ++++++ .../common/schemas/AzkabanJobRecord.java | 73 +++++++++++++++++++ .../wherehows/common/utils/StringUtil.java | 10 ++- 7 files changed, 114 insertions(+), 4 deletions(-) diff --git a/data-model/DDL/ETL_DDL/executor_metadata.sql b/data-model/DDL/ETL_DDL/executor_metadata.sql index 4a48cfdcbe..e4c6673ffd 100644 --- a/data-model/DDL/ETL_DDL/executor_metadata.sql +++ b/data-model/DDL/ETL_DDL/executor_metadata.sql @@ -94,6 +94,7 @@ CREATE TABLE flow_job ( job_path VARCHAR(1024) COMMENT 'job path from top level', job_type_id SMALLINT COMMENT 'type id of the job', job_type VARCHAR(63) COMMENT 'type of the job', + ref_flow_id INT UNSIGNED DEFAULT NULL COMMENT 'the reference flow id of the job if the job is a subflow', pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job', post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job', is_current CHAR(1) COMMENT 'determine if it is a current job', @@ -104,6 +105,7 @@ CREATE TABLE flow_job ( wh_etl_exec_id BIGINT COMMENT 'wherehows etl execution id that create this record', PRIMARY KEY (app_id, job_id, dag_version), INDEX flow_id_idx (app_id, flow_id), + INDEX ref_flow_id_idx (app_id, ref_flow_id), INDEX job_path_idx (app_id, job_path(255)) ) ENGINE = InnoDB @@ -122,6 +124,8 @@ CREATE TABLE stg_flow_job ( job_path VARCHAR(1024) COMMENT 'job path from top level', job_type_id SMALLINT COMMENT 'type id of the job', job_type VARCHAR(63) COMMENT 'type of the job', + ref_flow_id INT UNSIGNED DEFAULT NULL COMMENT 'the reference flow id of the job if the job is a subflow', + ref_flow_path VARCHAR(1024) COMMENT 'the reference flow path of the job if the job is a subflow', pre_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run before this job', post_jobs VARCHAR(4096) COMMENT 'comma separated job ids that run after this job', is_current CHAR(1) COMMENT 'determine if it is a current job', @@ -131,6 +135,7 @@ CREATE TABLE stg_flow_job ( INDEX (app_id, job_id, dag_version), INDEX flow_id_idx (app_id, flow_id), INDEX flow_path_idx (app_id, flow_path(255)), + INDEX ref_flow_path_idx (app_id, ref_flow_path(255)), INDEX job_path_idx (app_id, job_path(255)), INDEX job_type_idx (job_type) ) diff --git a/metadata-etl/src/main/resources/jython/AzkabanExtract.py b/metadata-etl/src/main/resources/jython/AzkabanExtract.py index 3fa901ff6b..20fd00d289 100644 --- a/metadata-etl/src/main/resources/jython/AzkabanExtract.py +++ b/metadata-etl/src/main/resources/jython/AzkabanExtract.py @@ -118,6 +118,8 @@ class AzkabanExtract: node['jobType'], 'Y', self.wh_exec_id) + if node['jobType'] == 'flow': + job_record.setRefFlowPath(row['project_name'] + ":" + node['embeddedFlowId']) job_writer.append(job_record) # job dag diff --git a/metadata-etl/src/main/resources/jython/AzkabanTransform.py b/metadata-etl/src/main/resources/jython/AzkabanTransform.py index 49efd9e9a9..d7e7def2f6 100644 --- a/metadata-etl/src/main/resources/jython/AzkabanTransform.py +++ b/metadata-etl/src/main/resources/jython/AzkabanTransform.py @@ -22,7 +22,7 @@ import sys class AzkabanTransform(SchedulerTransform): SchedulerTransform._tables["flows"]["columns"] = "app_id, flow_name, flow_group, flow_path, flow_level, source_modified_time, source_version, is_active, wh_etl_exec_id" - SchedulerTransform._tables["jobs"]["columns"] = "app_id, flow_path, source_version, job_name, job_path, job_type, is_current, wh_etl_exec_id" + SchedulerTransform._tables["jobs"]["columns"] = "app_id, flow_path, source_version, job_name, job_path, job_type, ref_flow_path, is_current, wh_etl_exec_id" SchedulerTransform._tables["owners"]["columns"] = "app_id, flow_path, owner_id, permissions, owner_type, wh_etl_exec_id" SchedulerTransform._tables["flow_execs"]["columns"] = "app_id, flow_name, flow_path, source_version, flow_exec_id, flow_exec_status, attempt_id, executed_by, start_time, end_time, wh_etl_exec_id" SchedulerTransform._tables["job_execs"]["columns"] = "app_id, flow_path, source_version, flow_exec_id, job_name, job_path, job_exec_id, job_exec_status, attempt_id, start_time, end_time, wh_etl_exec_id" diff --git a/metadata-etl/src/main/resources/jython/SchedulerLoad.py b/metadata-etl/src/main/resources/jython/SchedulerLoad.py index 9b7dea8476..18b7d12892 100644 --- a/metadata-etl/src/main/resources/jython/SchedulerLoad.py +++ b/metadata-etl/src/main/resources/jython/SchedulerLoad.py @@ -81,9 +81,9 @@ class SchedulerLoad: self.wh_con.commit() cmd = """ - INSERT INTO flow_job (app_id, flow_id, first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, pre_jobs, post_jobs, + INSERT INTO flow_job (app_id, flow_id, first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, ref_flow_id, pre_jobs, post_jobs, is_current, is_first, is_last, created_time, modified_time, wh_etl_exec_id) - SELECT app_id, flow_id, source_version first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, pre_jobs, post_jobs, + SELECT app_id, flow_id, source_version first_source_version, dag_version, job_id, job_name, job_path, job_type_id, job_type, ref_flow_id, pre_jobs, post_jobs, 'Y', is_first, is_last, unix_timestamp(NOW()) created_time, NULL, wh_etl_exec_id FROM stg_flow_job s WHERE s.app_id = {app_id} @@ -94,6 +94,7 @@ class SchedulerLoad: job_path = s.job_path, job_type_id = s.job_type_id, job_type = s.job_type, + ref_flow_id = s.ref_flow_id, pre_jobs = s.pre_jobs, post_jobs = s.post_jobs, is_current = 'Y', diff --git a/metadata-etl/src/main/resources/jython/SchedulerTransform.py b/metadata-etl/src/main/resources/jython/SchedulerTransform.py index 09dfdf7971..c89fd7667b 100644 --- a/metadata-etl/src/main/resources/jython/SchedulerTransform.py +++ b/metadata-etl/src/main/resources/jython/SchedulerTransform.py @@ -149,6 +149,27 @@ class SchedulerTransform: self.wh_cursor.execute(query) self.wh_con.commit() + # ad hoc fix for null values, need better solution by changing the load script + query = """ + UPDATE {table} stg + SET stg.ref_flow_path = null + WHERE stg.ref_flow_path = 'null' and stg.app_id = {app_id} + """.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + # Update sub flow id from mapping table + query = """ + UPDATE {table} stg + JOIN flow_source_id_map fm + ON stg.app_id = fm.app_id AND stg.ref_flow_path = fm.source_id_string + SET stg.ref_flow_id = fm.flow_id WHERE stg.app_id = {app_id} + """.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + # Insert new job into job map to generate job id query = """ INSERT INTO job_source_id_map (app_id, source_id_string) diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanJobRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanJobRecord.java index 894670db91..254ef9debb 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanJobRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/AzkabanJobRecord.java @@ -27,6 +27,7 @@ public class AzkabanJobRecord extends AbstractRecord { String jobName; String jobPath; String jobType; + String refFlowPath; Character isCurrent; Long whExecId; @@ -51,9 +52,81 @@ public class AzkabanJobRecord extends AbstractRecord { allFields.add(jobName); allFields.add(jobPath); allFields.add(jobType); + allFields.add(refFlowPath); allFields.add(isCurrent); allFields.add(whExecId); return allFields; } + public Integer getAppId() { + return appId; + } + + public void setAppId(Integer appId) { + this.appId = appId; + } + + public String getFlowPath() { + return flowPath; + } + + public void setFlowPath(String flowPath) { + this.flowPath = flowPath; + } + + public Integer getSourceVersion() { + return sourceVersion; + } + + public void setSourceVersion(Integer sourceVersion) { + this.sourceVersion = sourceVersion; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getJobPath() { + return jobPath; + } + + public void setJobPath(String jobPath) { + this.jobPath = jobPath; + } + + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getRefFlowPath() { + return refFlowPath; + } + + public void setRefFlowPath(String refFlowPath) { + this.refFlowPath = refFlowPath; + } + + public Character getIsCurrent() { + return isCurrent; + } + + public void setIsCurrent(Character isCurrent) { + this.isCurrent = isCurrent; + } + + public Long getWhExecId() { + return whExecId; + } + + public void setWhExecId(Long whExecId) { + this.whExecId = whExecId; + } } diff --git a/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java b/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java index 13d90b1bfe..adbaade3ae 100644 --- a/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java +++ b/wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java @@ -20,12 +20,20 @@ public class StringUtil { public static String toDbString(Object object) { if (object != null) { - return "'" + object.toString().replace("\'", "\\\'").replace("\"", "\\\"") + "'"; + return "'" + object.toString().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"") + "'"; } else { return "null"; } } + public static String toCsvString(Object object) { + if (object != null) { + return "\"" + object.toString().replace("\\", "\\\\").replace("\'", "\\\'").replace("\"", "\\\"") + "\""; + } else { + return "\\N"; + } + } + public static String replace(String s, String target, Object replacement) { if (replacement != null) { return s.replace(target, "'" + replacement.toString().replace("\'", "\\\'").replace("\"", "\\\"") + "'");