This commit is contained in:
baifuyu 2023-12-10 20:13:10 +08:00
parent c926772b05
commit b48aa9f759
12 changed files with 110 additions and 197 deletions

View File

@ -13,7 +13,6 @@
package com.antgroup.openspg.builder.model.record;
public enum SPGRecordTypeEnum {
ENTITY,
CONCEPT,

View File

@ -0,0 +1,28 @@
package com.antgroup.openspg.builder.model.pipeline
import com.antgroup.openspg.builder.model.BuilderJsonUtils
import com.antgroup.openspg.builder.model.pipeline.config.CsvSourceNodeConfig
import com.antgroup.openspg.builder.model.pipeline.config.GraphStoreSinkNodeConfig
import com.antgroup.openspg.builder.model.pipeline.config.SPGTypeMappingNodeConfig
import spock.lang.Specification
class PipelineTest extends Specification {
def "testPipelineSer"() {
given:
Node node1 = new Node("1", "csv", new CsvSourceNodeConfig(
"./data/TaxOfRiskApp.csv", 2, ["id"]));
Node node2 = new Node("2", "mapping",
new SPGTypeMappingNodeConfig("RiskMining.TaxOfRiskUser", [], []));
Node node3 = new Node("3", "sink", new GraphStoreSinkNodeConfig());
Edge edge1 = new Edge("1", "2");
Edge edge2 = new Edge("2", "3");
Pipeline pipeline = new Pipeline([node1, node2, node3], [edge1, edge2]);
expect:
BuilderJsonUtils.serialize(pipeline).contains(BuilderJsonUtils.DEFAULT_TYPE_FIELD_NAME)
}
}

View File

@ -1,6 +0,0 @@
package com.antgroup.openspg.builder.model
import spock.lang.Specification
class BuilderJsonUtilsTest extends Specification {
}

View File

@ -1,45 +0,0 @@
package com.antgroup.openspg.builder.model.pipeline
import com.antgroup.openspg.builder.model.BuilderJsonUtils
import com.antgroup.openspg.builder.model.pipeline.config.CsvSourceNodeConfig
import com.antgroup.openspg.builder.model.pipeline.config.GraphStoreSinkNodeConfig
import com.antgroup.openspg.builder.model.pipeline.config.SPGTypeMappingNodeConfig
import com.antgroup.openspg.server.common.model.datasource.connection.GraphStoreConnectionInfo
import com.google.common.collect.Lists
import spock.lang.Specification
class PipelineTest extends Specification {
def "testPipelineSer"() {
given:
Node node1 =
new Node(
"1",
"csv",
new CsvSourceNodeConfig("./data/TaxOfRiskApp.csv", 1, Lists.newArrayList("id")));
Node node2 =
new Node(
"2",
"mapping",
new SPGTypeMappingNodeConfig(
"RiskMining.TaxOfRiskUser", new ArrayList<>(), new ArrayList<>()));
Map<String, Object> params = new HashMap<>();
params.put("graphName", "default");
params.put("timeout", "5000");
params.put("host", "127.0.0.1");
params.put("accessId", "admin");
params.put("accessKey", "73@TuGraph");
Node node3 =
new Node("3", "sink", new GraphStoreSinkNodeConfig(new GraphStoreConnectionInfo().setScheme("tugraph")
.setParams(params)));
Edge edge1 = new Edge("1", "2");
Edge edge2 = new Edge("2", "3");
Pipeline pipeline = new Pipeline(Lists.newArrayList(node1, node2, node3), Lists.newArrayList(edge1, edge2));
expect:
BuilderJsonUtils.serialize(pipeline).contains(BuilderJsonUtils.DEFAULT_TYPE_FIELD_NAME)
}
}

View File

@ -0,0 +1,13 @@
java -jar \
-Dcloudext.graphstore.drivers=com.antgroup.openspg.cloudext.impl.graphstore.tugraph.TuGraphStoreClientDriver \
-Dcloudext.searchengine.drivers=com.antgroup.openspg.cloudext.impl.searchengine.elasticsearch.ElasticSearchEngineClientDriver \
./target/builder-runner-local-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
--projectId 2 \
--jobName "TaxOfRiskApp" \
--pipeline "{\"nodes\":[{\"id\":\"1\",\"name\":\"csv\",\"nodeConfig\":{\"@type\":\"CSV_SOURCE\",\"startRow\":2,\"url\":\"./src/test/resources/TaxOfRiskApp.csv\",\"columns\":[\"id\"],\"type\":\"CSV_SOURCE\"}},{\"id\":\"2\",\"name\":\"mapping\",\"nodeConfig\":{\"@type\":\"SPG_TYPE_MAPPING\",\"spgType\":\"RiskMining.TaxOfRiskUser\",\"mappingFilters\":[],\"mappingConfigs\":[],\"type\":\"SPG_TYPE_MAPPING\"}},{\"id\":\"3\",\"name\":\"sink\",\"nodeConfig\":{\"@type\":\"GRAPH_SINK\",\"type\":\"GRAPH_SINK\"}}],\"edges\":[{\"from\":\"1\",\"to\":\"2\"},{\"from\":\"2\",\"to\":\"3\"}]}" \
--pythonExec "/usr/local/bin/python3.9" \
--pythonPaths "/usr/local/lib/python3.9/site-packages;./python" \
--schemaUrl "http://localhost:8887" \
--parallelism "1" \
--alterOperation "UPSERT" \
--logFile TaxOfRiskApp.log

View File

@ -1,11 +1,19 @@
package com.antgroup.openspg.builder.runner.local;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.ConsoleAppender;
import ch.qos.logback.core.FileAppender;
import com.antgroup.openspg.builder.core.runtime.BuilderContext;
import com.antgroup.openspg.builder.core.runtime.impl.DefaultBuilderCatalog;
import com.antgroup.openspg.builder.model.BuilderJsonUtils;
import com.antgroup.openspg.builder.model.exception.PipelineConfigException;
import com.antgroup.openspg.builder.model.pipeline.Pipeline;
import com.antgroup.openspg.builder.model.record.RecordAlterOperationEnum;
import com.antgroup.openspg.common.util.StringUtils;
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
import com.antgroup.openspg.server.api.facade.ApiResponse;
import com.antgroup.openspg.server.api.facade.client.SchemaFacade;
@ -14,6 +22,7 @@ import com.antgroup.openspg.server.api.http.client.HttpSchemaFacade;
import com.antgroup.openspg.server.api.http.client.util.ConnectionInfo;
import com.antgroup.openspg.server.api.http.client.util.HttpClientBootstrap;
import org.apache.commons.cli.*;
import org.slf4j.LoggerFactory;
public class LocalBuilderMain {
@ -25,6 +34,7 @@ public class LocalBuilderMain {
private static final String SCHEMA_URL_OPTION = "schemaUrl";
private static final String PARALLELISM_OPTION = "parallelism";
private static final String ALTER_OPERATION_OPTION = "alterOperation";
private static final String LOG_FILE_OPTION = "logFile";
public static void main(String[] args) throws Exception {
CommandLine commandLine = parseArgs(args);
@ -44,6 +54,7 @@ public class LocalBuilderMain {
options.addOption(PARALLELISM_OPTION, PARALLELISM_OPTION, true, "parallelism");
options.addOption(
ALTER_OPERATION_OPTION, ALTER_OPERATION_OPTION, true, "alter operation, upsert or delete");
options.addOption(LOG_FILE_OPTION, LOG_FILE_OPTION, true, "log file");
CommandLine commandLine = null;
HelpFormatter helper = new HelpFormatter();
@ -58,6 +69,9 @@ public class LocalBuilderMain {
}
private static void run(CommandLine commandLine) throws Exception {
String logFileName = commandLine.getOptionValue(LOG_FILE_OPTION);
setUpLogFile(logFileName);
long projectId = Long.parseLong(commandLine.getOptionValue(PROJECT_ID_OPTION));
String jobName = commandLine.getOptionValue(JOB_NAME_OPTION);
@ -106,4 +120,52 @@ public class LocalBuilderMain {
}
throw new PipelineConfigException("");
}
private static void setUpLogFile(String logFileName) {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
loggerContext.reset();
PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder();
patternLayoutEncoder.setPattern("%d [%X{traceId}] [%X{rpcId}] [%t] %-5p %c{2} - %m%n");
patternLayoutEncoder.setContext(loggerContext);
patternLayoutEncoder.start();
FileAppender<ILoggingEvent> fileAppender = null;
if (StringUtils.isNotBlank(logFileName)) {
fileAppender = new FileAppender<>();
fileAppender.setFile(logFileName);
fileAppender.setEncoder(patternLayoutEncoder);
fileAppender.setContext(loggerContext);
fileAppender.setAppend(false);
fileAppender.start();
}
ConsoleAppender<ILoggingEvent> consoleAppender = new ConsoleAppender<>();
consoleAppender.setEncoder(patternLayoutEncoder);
consoleAppender.setContext(loggerContext);
consoleAppender.start();
Logger brpcLogger = loggerContext.getLogger("com.baidu.brpc");
brpcLogger.setLevel(Level.ERROR);
brpcLogger.setAdditive(false);
if (fileAppender != null) {
brpcLogger.addAppender(fileAppender);
}
brpcLogger.addAppender(consoleAppender);
Logger dtflysLogger = loggerContext.getLogger("com.dtflys.forest");
dtflysLogger.setLevel(Level.ERROR);
dtflysLogger.setAdditive(false);
if (fileAppender != null) {
dtflysLogger.addAppender(fileAppender);
}
dtflysLogger.addAppender(consoleAppender);
Logger rootLogger = loggerContext.getLogger("root");
if (fileAppender != null) {
rootLogger.addAppender(fileAppender);
}
rootLogger.addAppender(consoleAppender);
rootLogger.setLevel(Level.INFO);
}
}

View File

@ -15,8 +15,6 @@ import com.antgroup.openspg.builder.runner.local.physical.sink.SinkWriterFactory
import com.antgroup.openspg.builder.runner.local.physical.source.BaseSourceReader;
import com.antgroup.openspg.builder.runner.local.physical.source.SourceReaderFactory;
import com.antgroup.openspg.builder.runner.local.runtime.BuilderMetric;
import com.antgroup.openspg.builder.runner.local.runtime.DefaultRecordCollector;
import com.antgroup.openspg.builder.runner.local.runtime.ErrorRecordCollector;
import com.antgroup.openspg.common.util.thread.ThreadUtils;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
@ -36,7 +34,6 @@ public class LocalBuilderRunner implements BuilderRunner {
private BaseSourceReader<?> sourceReader = null;
private BaseSinkWriter<?> sinkWriter = null;
private BuilderMetric builderMetric = null;
private ErrorRecordCollector errorRecordCollector = null;
private final int parallelism;
private final ThreadPoolExecutor threadPoolExecutor;
@ -67,9 +64,6 @@ public class LocalBuilderRunner implements BuilderRunner {
// 构建指标统计并将构建指标输出到log
builderMetric = new BuilderMetric(context.getJobName());
builderMetric.reportToLog();
// 错误记录收集将构建错误的记录收集到csv文件中
errorRecordCollector = new DefaultRecordCollector(context.getJobName(), null);
}
@Override
@ -90,9 +84,11 @@ public class LocalBuilderRunner implements BuilderRunner {
results = builderExecutor.eval(records);
} catch (BuilderRecordException e) {
errorCnt.inc(records.size());
// todo
log.error("builder record error", e);
}
if (CollectionUtils.isNotEmpty(results)) {
sinkWriter.write(results);
}
sinkWriter.write(results);
records = Collections.unmodifiableList(sourceReader.read());
}
},
@ -125,9 +121,6 @@ public class LocalBuilderRunner implements BuilderRunner {
if (builderMetric != null) {
builderMetric.close();
}
if (errorRecordCollector != null) {
errorRecordCollector.close();
}
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdownNow();
}

View File

@ -41,7 +41,7 @@ public class BuilderMetric implements Serializable {
public void reportToLog() {
reporter = Slf4jReporter.forRegistry(metricRegistry).outputTo(log).build();
reporter.start(1, TimeUnit.SECONDS);
reporter.start(1, TimeUnit.MINUTES);
}
public void close() {

View File

@ -1,86 +0,0 @@
/*
* Copyright 2023 Ant Group CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.builder.runner.local.runtime;
import com.antgroup.openspg.builder.model.exception.BuilderRecordException;
import com.antgroup.openspg.builder.model.record.BuilderRecord;
import com.antgroup.openspg.cloudext.interfaces.tablestore.TableFileHandler;
import com.antgroup.openspg.cloudext.interfaces.tablestore.TableStoreClient;
import com.antgroup.openspg.cloudext.interfaces.tablestore.TableStoreClientDriverManager;
import com.antgroup.openspg.cloudext.interfaces.tablestore.cmd.TableFileCreateCmd;
import com.antgroup.openspg.cloudext.interfaces.tablestore.model.ColumnMeta;
import com.antgroup.openspg.server.common.model.datasource.connection.TableStoreConnectionInfo;
public class DefaultRecordCollector implements ErrorRecordCollector {
private static final String RECORD_ID = "recordId";
private static final String COMPONENT = "componentName";
private static final String ERROR_MSG = "errorMsg";
private final String tableName;
private final TableStoreConnectionInfo connInfo;
private volatile TableFileHandler tableFileHandler;
public DefaultRecordCollector(String tableName, TableStoreConnectionInfo connInfo) {
this.connInfo = connInfo;
this.tableName = tableName;
}
private void init() {
if (tableFileHandler == null) {
synchronized (this) {
if (tableFileHandler == null) {
TableStoreClient tableStoreClient = TableStoreClientDriverManager.getClient(connInfo);
tableFileHandler =
tableStoreClient.create(
new TableFileCreateCmd(
tableName,
new ColumnMeta[] {
new ColumnMeta(RECORD_ID),
new ColumnMeta(COMPONENT),
new ColumnMeta(ERROR_MSG)
}));
}
}
}
}
@Override
public boolean haveCollected() {
return tableFileHandler != null;
}
@Override
public String getTableName() {
if (tableFileHandler != null) {
return tableFileHandler.getTableName();
}
return null;
}
@Override
public void collectRecord(BuilderRecord record, BuilderRecordException e) {
init();
// tableFileHandler.write(
// new TableRecord(
// new Object[] {record.getRecordId(), e.getProcessor().getName(), e.getMessage()}));
}
@Override
public void close() throws Exception {
if (tableFileHandler != null) {
tableFileHandler.close();
}
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2023 Ant Group CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.builder.runner.local.runtime;
import com.antgroup.openspg.builder.model.exception.BuilderRecordException;
import com.antgroup.openspg.builder.model.record.BuilderRecord;
public interface ErrorRecordCollector {
boolean haveCollected();
String getTableName();
void collectRecord(BuilderRecord record, BuilderRecordException e);
void close() throws Exception;
}

View File

@ -1,19 +0,0 @@
<Configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
</encoder>
</appender>
<logger name="com.baidu.brpc" level="ERROR" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
<logger name="com.dtflys.forest" level="ERROR" additivity="false">
<appender-ref ref="STDOUT"/>
</logger>
   
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</Configuration>

View File

@ -0,0 +1,2 @@
id
赌博应用
1 id
2 赌博应用