From 5946f355ff908a0069872e12aaee091946dadb47 Mon Sep 17 00:00:00 2001 From: SunZhaonan Date: Tue, 1 Dec 2015 16:39:58 -0800 Subject: [PATCH] Fix multithread caused default database id bug --- .../metadata/etl/lineage/AzLineageExtractor.java | 4 ++-- .../etl/lineage/AzLineageExtractorMaster.java | 2 +- .../java/metadata/etl/lineage/AzLogParser.java | 14 ++++++-------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java index e7c78083fe..362bc8b55f 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractor.java @@ -61,8 +61,8 @@ public class AzLineageExtractor { // normalize and combine the path LineageCombiner lineageCombiner = new LineageCombiner(message.connection); lineageCombiner.addAll(oneAzkabanJobLineage); - - List lineageFromLog = AzLogParser.getLineageFromLog(log, message.azkabanJobExecution); + Integer defaultDatabaseId = Integer.valueOf(message.prop.getProperty(Constant.AZ_DEFAULT_HADOOP_DATABASE_ID_KEY)); + List lineageFromLog = AzLogParser.getLineageFromLog(log, message.azkabanJobExecution, defaultDatabaseId); lineageCombiner.addAll(lineageFromLog); return lineageCombiner.getCombinedLineage(); diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java index 18fd4c663a..00bce1cff1 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLineageExtractorMaster.java @@ -90,7 +90,7 @@ public class AzLineageExtractorMaster { Connection conn = DriverManager.getConnection(connUrl); DatabaseWriter databaseWriter = new DatabaseWriter(connUrl, "stg_job_execution_data_lineage"); - AzLogParser.initialize(conn, Integer.valueOf(prop.getProperty(Constant.AZ_DEFAULT_HADOOP_DATABASE_ID_KEY))); + AzLogParser.initialize(conn); PathAnalyzer.initialize(conn); int timeout = 30; // default 30 minutes for one job if (prop.containsKey(Constant.LINEAGE_ACTOR_TIMEOUT_KEY)) diff --git a/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java b/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java index fc6fd6edc0..14a2ba0ff4 100644 --- a/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java +++ b/metadata-etl/src/main/java/metadata/etl/lineage/AzLogParser.java @@ -34,7 +34,6 @@ public class AzLogParser { static List logLineagePatterns; static List logHadoopIdPatterns; - static int defaultDatabaseId; /** * Parse the hadoop job id from the log. @@ -65,18 +64,17 @@ public class AzLogParser { /** * initialize, download the regex info into cache */ - public synchronized static void initialize(Connection conn, int defaultDatabaseId) + public synchronized static void initialize(Connection conn) throws SQLException { if (logHadoopIdPatterns != null && logLineagePatterns != null) { return; } - loadLineagePatterns(conn, defaultDatabaseId); - loadHadoopIdPatterns(conn, defaultDatabaseId); + loadLineagePatterns(conn); + loadHadoopIdPatterns(conn); } - private static void loadLineagePatterns(Connection conn, int defaultDatabaseId) + private static void loadLineagePatterns(Connection conn) throws SQLException { - AzLogParser.defaultDatabaseId = defaultDatabaseId; logLineagePatterns = new ArrayList<>(); String cmd = "SELECT regex, database_type, database_name_index, dataset_index, operation_type, source_target_type, " + "record_count_index, record_byte_index, insert_count_index, insert_byte_index, " @@ -95,7 +93,7 @@ public class AzLogParser { } } - private static void loadHadoopIdPatterns(Connection conn, int defaultDatabaseId) + private static void loadHadoopIdPatterns(Connection conn) throws SQLException { logHadoopIdPatterns = new ArrayList<>(); String cmd = "SELECT regex FROM log_reference_job_id_pattern WHERE is_active = 1"; @@ -111,7 +109,7 @@ public class AzLogParser { * @param azkabanJobExecRecord contain the job execution info to construct the result * @return */ - public static List getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord) { + public static List getLineageFromLog(String log, AzkabanJobExecRecord azkabanJobExecRecord, Integer defaultDatabaseId) { List result = new ArrayList<>();