From b48aa9f759cf902749cf7efe5fc1ed6195208dda Mon Sep 17 00:00:00 2001 From: baifuyu Date: Sun, 10 Dec 2023 20:13:10 +0800 Subject: [PATCH] bugfix --- .../model/record/SPGRecordTypeEnum.java | 1 - .../model/pipeline/PipelineTest.groovy | 28 ++++++ .../builder/model/BuilderJsonUtilsTest.groovy | 6 -- .../model/pipeline/PipelineTest.groovy | 45 ---------- builder/runner/local/builder.sh | 13 +++ .../runner/local/LocalBuilderMain.java | 62 +++++++++++++ .../runner/local/LocalBuilderRunner.java | 15 +--- .../runner/local/runtime/BuilderMetric.java | 2 +- .../local/runtime/DefaultRecordCollector.java | 86 ------------------- .../local/runtime/ErrorRecordCollector.java | 28 ------ .../local/src/main/resources/logback.xml | 19 ---- .../local/src/test/resources/TaxOfRiskApp.csv | 2 + 12 files changed, 110 insertions(+), 197 deletions(-) create mode 100644 builder/model/src/test/groovy/com/antgroup/openspg/builder/model/pipeline/PipelineTest.groovy delete mode 100644 builder/model/src/test/java/com/antgroup/openspg/builder/model/BuilderJsonUtilsTest.groovy delete mode 100644 builder/model/src/test/java/com/antgroup/openspg/builder/model/pipeline/PipelineTest.groovy create mode 100644 builder/runner/local/builder.sh delete mode 100644 builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/DefaultRecordCollector.java delete mode 100644 builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/ErrorRecordCollector.java delete mode 100644 builder/runner/local/src/main/resources/logback.xml create mode 100644 builder/runner/local/src/test/resources/TaxOfRiskApp.csv diff --git a/builder/model/src/main/java/com/antgroup/openspg/builder/model/record/SPGRecordTypeEnum.java b/builder/model/src/main/java/com/antgroup/openspg/builder/model/record/SPGRecordTypeEnum.java index 843011c8..2dada6de 100644 --- a/builder/model/src/main/java/com/antgroup/openspg/builder/model/record/SPGRecordTypeEnum.java +++ b/builder/model/src/main/java/com/antgroup/openspg/builder/model/record/SPGRecordTypeEnum.java @@ -13,7 +13,6 @@ package com.antgroup.openspg.builder.model.record; - public enum SPGRecordTypeEnum { ENTITY, CONCEPT, diff --git a/builder/model/src/test/groovy/com/antgroup/openspg/builder/model/pipeline/PipelineTest.groovy b/builder/model/src/test/groovy/com/antgroup/openspg/builder/model/pipeline/PipelineTest.groovy new file mode 100644 index 00000000..31bcae66 --- /dev/null +++ b/builder/model/src/test/groovy/com/antgroup/openspg/builder/model/pipeline/PipelineTest.groovy @@ -0,0 +1,28 @@ +package com.antgroup.openspg.builder.model.pipeline + +import com.antgroup.openspg.builder.model.BuilderJsonUtils +import com.antgroup.openspg.builder.model.pipeline.config.CsvSourceNodeConfig +import com.antgroup.openspg.builder.model.pipeline.config.GraphStoreSinkNodeConfig +import com.antgroup.openspg.builder.model.pipeline.config.SPGTypeMappingNodeConfig +import spock.lang.Specification + +class PipelineTest extends Specification { + + def "testPipelineSer"() { + given: + Node node1 = new Node("1", "csv", new CsvSourceNodeConfig( + "./data/TaxOfRiskApp.csv", 2, ["id"])); + Node node2 = new Node("2", "mapping", + new SPGTypeMappingNodeConfig("RiskMining.TaxOfRiskUser", [], [])); + Node node3 = new Node("3", "sink", new GraphStoreSinkNodeConfig()); + + + Edge edge1 = new Edge("1", "2"); + Edge edge2 = new Edge("2", "3"); + + Pipeline pipeline = new Pipeline([node1, node2, node3], [edge1, edge2]); + + expect: + BuilderJsonUtils.serialize(pipeline).contains(BuilderJsonUtils.DEFAULT_TYPE_FIELD_NAME) + } +} diff --git a/builder/model/src/test/java/com/antgroup/openspg/builder/model/BuilderJsonUtilsTest.groovy b/builder/model/src/test/java/com/antgroup/openspg/builder/model/BuilderJsonUtilsTest.groovy deleted file mode 100644 index 01445570..00000000 --- a/builder/model/src/test/java/com/antgroup/openspg/builder/model/BuilderJsonUtilsTest.groovy +++ /dev/null @@ -1,6 +0,0 @@ -package com.antgroup.openspg.builder.model - -import spock.lang.Specification - -class BuilderJsonUtilsTest extends Specification { -} diff --git a/builder/model/src/test/java/com/antgroup/openspg/builder/model/pipeline/PipelineTest.groovy b/builder/model/src/test/java/com/antgroup/openspg/builder/model/pipeline/PipelineTest.groovy deleted file mode 100644 index 157e5079..00000000 --- a/builder/model/src/test/java/com/antgroup/openspg/builder/model/pipeline/PipelineTest.groovy +++ /dev/null @@ -1,45 +0,0 @@ -package com.antgroup.openspg.builder.model.pipeline - -import com.antgroup.openspg.builder.model.BuilderJsonUtils -import com.antgroup.openspg.builder.model.pipeline.config.CsvSourceNodeConfig -import com.antgroup.openspg.builder.model.pipeline.config.GraphStoreSinkNodeConfig -import com.antgroup.openspg.builder.model.pipeline.config.SPGTypeMappingNodeConfig -import com.antgroup.openspg.server.common.model.datasource.connection.GraphStoreConnectionInfo -import com.google.common.collect.Lists -import spock.lang.Specification - -class PipelineTest extends Specification { - - def "testPipelineSer"() { - given: - Node node1 = - new Node( - "1", - "csv", - new CsvSourceNodeConfig("./data/TaxOfRiskApp.csv", 1, Lists.newArrayList("id"))); - Node node2 = - new Node( - "2", - "mapping", - new SPGTypeMappingNodeConfig( - "RiskMining.TaxOfRiskUser", new ArrayList<>(), new ArrayList<>())); - - Map params = new HashMap<>(); - params.put("graphName", "default"); - params.put("timeout", "5000"); - params.put("host", "127.0.0.1"); - params.put("accessId", "admin"); - params.put("accessKey", "73@TuGraph"); - Node node3 = - new Node("3", "sink", new GraphStoreSinkNodeConfig(new GraphStoreConnectionInfo().setScheme("tugraph") - .setParams(params))); - - Edge edge1 = new Edge("1", "2"); - Edge edge2 = new Edge("2", "3"); - - Pipeline pipeline = new Pipeline(Lists.newArrayList(node1, node2, node3), Lists.newArrayList(edge1, edge2)); - - expect: - BuilderJsonUtils.serialize(pipeline).contains(BuilderJsonUtils.DEFAULT_TYPE_FIELD_NAME) - } -} diff --git a/builder/runner/local/builder.sh b/builder/runner/local/builder.sh new file mode 100644 index 00000000..191bc4b3 --- /dev/null +++ b/builder/runner/local/builder.sh @@ -0,0 +1,13 @@ +java -jar \ + -Dcloudext.graphstore.drivers=com.antgroup.openspg.cloudext.impl.graphstore.tugraph.TuGraphStoreClientDriver \ + -Dcloudext.searchengine.drivers=com.antgroup.openspg.cloudext.impl.searchengine.elasticsearch.ElasticSearchEngineClientDriver \ + ./target/builder-runner-local-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ + --projectId 2 \ + --jobName "TaxOfRiskApp" \ + --pipeline "{\"nodes\":[{\"id\":\"1\",\"name\":\"csv\",\"nodeConfig\":{\"@type\":\"CSV_SOURCE\",\"startRow\":2,\"url\":\"./src/test/resources/TaxOfRiskApp.csv\",\"columns\":[\"id\"],\"type\":\"CSV_SOURCE\"}},{\"id\":\"2\",\"name\":\"mapping\",\"nodeConfig\":{\"@type\":\"SPG_TYPE_MAPPING\",\"spgType\":\"RiskMining.TaxOfRiskUser\",\"mappingFilters\":[],\"mappingConfigs\":[],\"type\":\"SPG_TYPE_MAPPING\"}},{\"id\":\"3\",\"name\":\"sink\",\"nodeConfig\":{\"@type\":\"GRAPH_SINK\",\"type\":\"GRAPH_SINK\"}}],\"edges\":[{\"from\":\"1\",\"to\":\"2\"},{\"from\":\"2\",\"to\":\"3\"}]}" \ + --pythonExec "/usr/local/bin/python3.9" \ + --pythonPaths "/usr/local/lib/python3.9/site-packages;./python" \ + --schemaUrl "http://localhost:8887" \ + --parallelism "1" \ + --alterOperation "UPSERT" \ + --logFile TaxOfRiskApp.log diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderMain.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderMain.java index 7dfb09fa..74d3955a 100644 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderMain.java +++ b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderMain.java @@ -1,11 +1,19 @@ package com.antgroup.openspg.builder.runner.local; +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.encoder.PatternLayoutEncoder; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.ConsoleAppender; +import ch.qos.logback.core.FileAppender; import com.antgroup.openspg.builder.core.runtime.BuilderContext; import com.antgroup.openspg.builder.core.runtime.impl.DefaultBuilderCatalog; import com.antgroup.openspg.builder.model.BuilderJsonUtils; import com.antgroup.openspg.builder.model.exception.PipelineConfigException; import com.antgroup.openspg.builder.model.pipeline.Pipeline; import com.antgroup.openspg.builder.model.record.RecordAlterOperationEnum; +import com.antgroup.openspg.common.util.StringUtils; import com.antgroup.openspg.core.schema.model.type.ProjectSchema; import com.antgroup.openspg.server.api.facade.ApiResponse; import com.antgroup.openspg.server.api.facade.client.SchemaFacade; @@ -14,6 +22,7 @@ import com.antgroup.openspg.server.api.http.client.HttpSchemaFacade; import com.antgroup.openspg.server.api.http.client.util.ConnectionInfo; import com.antgroup.openspg.server.api.http.client.util.HttpClientBootstrap; import org.apache.commons.cli.*; +import org.slf4j.LoggerFactory; public class LocalBuilderMain { @@ -25,6 +34,7 @@ public class LocalBuilderMain { private static final String SCHEMA_URL_OPTION = "schemaUrl"; private static final String PARALLELISM_OPTION = "parallelism"; private static final String ALTER_OPERATION_OPTION = "alterOperation"; + private static final String LOG_FILE_OPTION = "logFile"; public static void main(String[] args) throws Exception { CommandLine commandLine = parseArgs(args); @@ -44,6 +54,7 @@ public class LocalBuilderMain { options.addOption(PARALLELISM_OPTION, PARALLELISM_OPTION, true, "parallelism"); options.addOption( ALTER_OPERATION_OPTION, ALTER_OPERATION_OPTION, true, "alter operation, upsert or delete"); + options.addOption(LOG_FILE_OPTION, LOG_FILE_OPTION, true, "log file"); CommandLine commandLine = null; HelpFormatter helper = new HelpFormatter(); @@ -58,6 +69,9 @@ public class LocalBuilderMain { } private static void run(CommandLine commandLine) throws Exception { + String logFileName = commandLine.getOptionValue(LOG_FILE_OPTION); + setUpLogFile(logFileName); + long projectId = Long.parseLong(commandLine.getOptionValue(PROJECT_ID_OPTION)); String jobName = commandLine.getOptionValue(JOB_NAME_OPTION); @@ -106,4 +120,52 @@ public class LocalBuilderMain { } throw new PipelineConfigException(""); } + + private static void setUpLogFile(String logFileName) { + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + loggerContext.reset(); + + PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder(); + patternLayoutEncoder.setPattern("%d [%X{traceId}] [%X{rpcId}] [%t] %-5p %c{2} - %m%n"); + patternLayoutEncoder.setContext(loggerContext); + patternLayoutEncoder.start(); + + FileAppender fileAppender = null; + if (StringUtils.isNotBlank(logFileName)) { + fileAppender = new FileAppender<>(); + fileAppender.setFile(logFileName); + fileAppender.setEncoder(patternLayoutEncoder); + fileAppender.setContext(loggerContext); + fileAppender.setAppend(false); + fileAppender.start(); + } + + ConsoleAppender consoleAppender = new ConsoleAppender<>(); + consoleAppender.setEncoder(patternLayoutEncoder); + consoleAppender.setContext(loggerContext); + consoleAppender.start(); + + Logger brpcLogger = loggerContext.getLogger("com.baidu.brpc"); + brpcLogger.setLevel(Level.ERROR); + brpcLogger.setAdditive(false); + if (fileAppender != null) { + brpcLogger.addAppender(fileAppender); + } + brpcLogger.addAppender(consoleAppender); + + Logger dtflysLogger = loggerContext.getLogger("com.dtflys.forest"); + dtflysLogger.setLevel(Level.ERROR); + dtflysLogger.setAdditive(false); + if (fileAppender != null) { + dtflysLogger.addAppender(fileAppender); + } + dtflysLogger.addAppender(consoleAppender); + + Logger rootLogger = loggerContext.getLogger("root"); + if (fileAppender != null) { + rootLogger.addAppender(fileAppender); + } + rootLogger.addAppender(consoleAppender); + rootLogger.setLevel(Level.INFO); + } } diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderRunner.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderRunner.java index 3c428381..7e086f58 100644 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderRunner.java +++ b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/LocalBuilderRunner.java @@ -15,8 +15,6 @@ import com.antgroup.openspg.builder.runner.local.physical.sink.SinkWriterFactory import com.antgroup.openspg.builder.runner.local.physical.source.BaseSourceReader; import com.antgroup.openspg.builder.runner.local.physical.source.SourceReaderFactory; import com.antgroup.openspg.builder.runner.local.runtime.BuilderMetric; -import com.antgroup.openspg.builder.runner.local.runtime.DefaultRecordCollector; -import com.antgroup.openspg.builder.runner.local.runtime.ErrorRecordCollector; import com.antgroup.openspg.common.util.thread.ThreadUtils; import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; @@ -36,7 +34,6 @@ public class LocalBuilderRunner implements BuilderRunner { private BaseSourceReader sourceReader = null; private BaseSinkWriter sinkWriter = null; private BuilderMetric builderMetric = null; - private ErrorRecordCollector errorRecordCollector = null; private final int parallelism; private final ThreadPoolExecutor threadPoolExecutor; @@ -67,9 +64,6 @@ public class LocalBuilderRunner implements BuilderRunner { // 构建指标统计,并将构建指标输出到log builderMetric = new BuilderMetric(context.getJobName()); builderMetric.reportToLog(); - - // 错误记录收集,将构建错误的记录收集到csv文件中 - errorRecordCollector = new DefaultRecordCollector(context.getJobName(), null); } @Override @@ -90,9 +84,11 @@ public class LocalBuilderRunner implements BuilderRunner { results = builderExecutor.eval(records); } catch (BuilderRecordException e) { errorCnt.inc(records.size()); - // todo + log.error("builder record error", e); + } + if (CollectionUtils.isNotEmpty(results)) { + sinkWriter.write(results); } - sinkWriter.write(results); records = Collections.unmodifiableList(sourceReader.read()); } }, @@ -125,9 +121,6 @@ public class LocalBuilderRunner implements BuilderRunner { if (builderMetric != null) { builderMetric.close(); } - if (errorRecordCollector != null) { - errorRecordCollector.close(); - } if (threadPoolExecutor != null) { threadPoolExecutor.shutdownNow(); } diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/BuilderMetric.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/BuilderMetric.java index e93803ac..782f17c7 100644 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/BuilderMetric.java +++ b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/BuilderMetric.java @@ -41,7 +41,7 @@ public class BuilderMetric implements Serializable { public void reportToLog() { reporter = Slf4jReporter.forRegistry(metricRegistry).outputTo(log).build(); - reporter.start(1, TimeUnit.SECONDS); + reporter.start(1, TimeUnit.MINUTES); } public void close() { diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/DefaultRecordCollector.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/DefaultRecordCollector.java deleted file mode 100644 index 91f17ced..00000000 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/DefaultRecordCollector.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2023 Ant Group CO., Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. - */ - -package com.antgroup.openspg.builder.runner.local.runtime; - -import com.antgroup.openspg.builder.model.exception.BuilderRecordException; -import com.antgroup.openspg.builder.model.record.BuilderRecord; -import com.antgroup.openspg.cloudext.interfaces.tablestore.TableFileHandler; -import com.antgroup.openspg.cloudext.interfaces.tablestore.TableStoreClient; -import com.antgroup.openspg.cloudext.interfaces.tablestore.TableStoreClientDriverManager; -import com.antgroup.openspg.cloudext.interfaces.tablestore.cmd.TableFileCreateCmd; -import com.antgroup.openspg.cloudext.interfaces.tablestore.model.ColumnMeta; -import com.antgroup.openspg.server.common.model.datasource.connection.TableStoreConnectionInfo; - -public class DefaultRecordCollector implements ErrorRecordCollector { - - private static final String RECORD_ID = "recordId"; - private static final String COMPONENT = "componentName"; - private static final String ERROR_MSG = "errorMsg"; - - private final String tableName; - private final TableStoreConnectionInfo connInfo; - private volatile TableFileHandler tableFileHandler; - - public DefaultRecordCollector(String tableName, TableStoreConnectionInfo connInfo) { - this.connInfo = connInfo; - this.tableName = tableName; - } - - private void init() { - if (tableFileHandler == null) { - synchronized (this) { - if (tableFileHandler == null) { - TableStoreClient tableStoreClient = TableStoreClientDriverManager.getClient(connInfo); - tableFileHandler = - tableStoreClient.create( - new TableFileCreateCmd( - tableName, - new ColumnMeta[] { - new ColumnMeta(RECORD_ID), - new ColumnMeta(COMPONENT), - new ColumnMeta(ERROR_MSG) - })); - } - } - } - } - - @Override - public boolean haveCollected() { - return tableFileHandler != null; - } - - @Override - public String getTableName() { - if (tableFileHandler != null) { - return tableFileHandler.getTableName(); - } - return null; - } - - @Override - public void collectRecord(BuilderRecord record, BuilderRecordException e) { - init(); - // tableFileHandler.write( - // new TableRecord( - // new Object[] {record.getRecordId(), e.getProcessor().getName(), e.getMessage()})); - } - - @Override - public void close() throws Exception { - if (tableFileHandler != null) { - tableFileHandler.close(); - } - } -} diff --git a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/ErrorRecordCollector.java b/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/ErrorRecordCollector.java deleted file mode 100644 index 27bcce9d..00000000 --- a/builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/runtime/ErrorRecordCollector.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2023 Ant Group CO., Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. - */ - -package com.antgroup.openspg.builder.runner.local.runtime; - -import com.antgroup.openspg.builder.model.exception.BuilderRecordException; -import com.antgroup.openspg.builder.model.record.BuilderRecord; - -public interface ErrorRecordCollector { - - boolean haveCollected(); - - String getTableName(); - - void collectRecord(BuilderRecord record, BuilderRecordException e); - - void close() throws Exception; -} diff --git a/builder/runner/local/src/main/resources/logback.xml b/builder/runner/local/src/main/resources/logback.xml deleted file mode 100644 index 3ce0754b..00000000 --- a/builder/runner/local/src/main/resources/logback.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - %-4relative [%thread] %-5level %logger{35} - %msg %n - - - - - - - - - - -     - - - - \ No newline at end of file diff --git a/builder/runner/local/src/test/resources/TaxOfRiskApp.csv b/builder/runner/local/src/test/resources/TaxOfRiskApp.csv new file mode 100644 index 00000000..b9bcd829 --- /dev/null +++ b/builder/runner/local/src/test/resources/TaxOfRiskApp.csv @@ -0,0 +1,2 @@ +id +赌博应用