feat(reasoner): support thinker in knext (#344)

Co-authored-by: matthewhyx <matthew.hyx@antgroup.com>
This commit is contained in:
FishJoy 2024-08-26 13:38:49 +08:00 committed by GitHub
parent f0e552e9ec
commit b48dd2cbe5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 647 additions and 165 deletions

View File

@ -18,6 +18,7 @@ from knext.command.sub_command.config import list_config
from knext.command.sub_command.project import create_project
from knext.command.sub_command.project import list_project
from knext.command.sub_command.reasoner import execute_reasoner_job
from knext.command.sub_command.thinker import execute_thinker_job
from knext.command.sub_command.schema import commit_schema
from knext.command.sub_command.schema import list_schema
from knext.command.sub_command.schema import reg_concept_rule
@ -79,5 +80,15 @@ def reasoner() -> None:
reasoner.command("execute")(execute_reasoner_job)
@_main.group()
def thinker() -> None:
"""Thinker client."""
pass
thinker.command("execute")(execute_thinker_job)
if __name__ == "__main__":
_main()

View File

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import click
from knext.thinker.client import ThinkerClient
@click.option("--subject", help="The subject of reasoning goal, eg: id,type or type")
@click.option("--predicate", help="The predicate of reasoning goal, eg: type")
@click.option("--object", help="The object of reasoning goal, eg: id,type or type")
@click.option("--mode", help="Reasoning mode, eg: spo or node")
@click.option("--params", help="Reasoning context")
def execute_thinker_job(subject="", predicate="", object="", mode="spo", params=""):
"""
Submit asynchronous reasoner jobs to server by providing DSL file or string.
"""
client = ThinkerClient()
client.execute(subject, predicate, object, mode, params)

View File

@ -16,6 +16,9 @@ from knext.schema.client import SchemaClient
from knext.schema.model.base import SpgTypeEnum
combo_seperator = "\0+\0"
def is_blank(text):
if not text:
return True
@ -95,7 +98,7 @@ class SPGConceptRuleMarkLang:
return
reasoning_po_match = re.match(
r"^\[([^\]]+)\]->\(`?([a-zA-Z0-9\.]+)`?/`([^`]+)`\):$",
r"^\[([^\]]+)\]->\(`?([a-zA-Z0-9\.]+)`?/`([^`]+)`(\+`([^`]+)`)?\):$",
expression,
)
if reasoning_po_match:
@ -103,16 +106,19 @@ class SPGConceptRuleMarkLang:
"please define namespace first"
)
combo_add = reasoning_po_match.group(5)
self.predicate = reasoning_po_match.group(1)
self.dst_concept = (
reasoning_po_match.group(2),
reasoning_po_match.group(3),
reasoning_po_match.group(3)
if combo_add is None
else reasoning_po_match.group(3) + combo_seperator + combo_add,
)
self.is_reasoning = True
return
reasoning_spo_match = re.match(
r"^\(`?([a-zA-Z0-9\.]+)`?/`([^`]+)`\)-\[([^\]]+)\]->\(`([a-zA-Z0-9\.]+)`/`([^`]+)`\):$",
r"^\(`?([a-zA-Z0-9\.]+)`?/`([^`]+)`\)-\[([^\]]+)\]->\(`([a-zA-Z0-9\.]+)`/`([^`]+)`(\+`([^`]+)`)?\):$",
expression,
)
if reasoning_spo_match:
@ -125,9 +131,12 @@ class SPGConceptRuleMarkLang:
reasoning_spo_match.group(2),
)
self.predicate = reasoning_spo_match.group(3)
combo_add = reasoning_po_match.group(7)
self.dst_concept = (
reasoning_spo_match.group(4),
reasoning_spo_match.group(5),
reasoning_spo_match.group(5)
if combo_add is None
else reasoning_spo_match.group(5) + combo_seperator + combo_add,
)
self.is_reasoning = True
return
@ -215,6 +224,9 @@ class SPGConceptRuleMarkLang:
break
if self.is_reasoning:
if combo_seperator in object_name:
names = object_name.split(combo_seperator)
object_name = f"{names[0]}`+{object_type}/`{names[1]}"
if (
subject_type is None
and self.predicate is None
@ -230,7 +242,7 @@ class SPGConceptRuleMarkLang:
head = f"DefinePriority ({object_type})" + " {\n"
else:
head = (
f"Define (:{object_type}/`{object_name}`)-[:{predicate_name}]->"
f"Define (:{subject_type}/`{subject_name}`)-[:{predicate_name}]->"
f"(:{object_type}/`{object_name}`)" + " {\n"
)
elif subject_name is None:

View File

@ -0,0 +1,10 @@
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.

View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import os
from knext.common.base.client import Client
class ThinkerClient(Client):
"""SPG Thinker Client."""
def __init__(self, host_addr: str = None, project_id: int = None):
super().__init__(host_addr, project_id)
def execute(self, subject="", predicate="", object="", mode="spo", params=""):
"""
Execute a synchronous builder job in local runner.
"""
import subprocess
from knext.reasoner import lib
from knext.common import env
jar_path = os.path.join(lib.__path__[0], lib.LOCAL_REASONER_JAR)
java_cmd = [
"java",
"-cp",
jar_path,
"com.antgroup.openspg.reasoner.runner.local.thinker.LocalThinkerMain",
"--projectId",
self._project_id,
"--subject",
subject or "",
"--predicate",
predicate or "",
"--object",
object or "",
"--mode",
mode,
"--params" or "",
params,
"--schemaUrl",
os.environ.get("KNEXT_HOST_ADDR") or env.LOCAL_SCHEMA_URL,
"--graphStateClass",
os.environ.get("KNEXT_GRAPH_STATE_CLASS") or lib.LOCAL_GRAPH_STATE_CLASS,
"--graphStoreUrl",
os.environ.get("KNEXT_GRAPH_STORE_URL") or lib.LOCAL_GRAPH_STORE_URL,
]
subprocess.call(java_cmd)

View File

@ -68,6 +68,10 @@
<groupId>com.antgroup.openspg.reasoner</groupId>
<artifactId>reasoner-warehouse-common</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.openspg.reasoner</groupId>
<artifactId>reasoner-thinker</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.openspg.reasoner</groupId>
<artifactId>reasoner-cloudext-warehouse</artifactId>

View File

@ -13,13 +13,6 @@
package com.antgroup.openspg.reasoner.runner.local;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.ConsoleAppender;
import ch.qos.logback.core.FileAppender;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.antgroup.openspg.reasoner.catalog.impl.KgSchemaConnectionInfo;
@ -42,7 +35,6 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.LoggerFactory;
@Slf4j
public class LocalReasonerMain {
@ -149,36 +141,36 @@ public class LocalReasonerMain {
try {
cmd = parser.parse(options, args);
String logFileName = cmd.getOptionValue(LOG_FILE_OPTION);
setUpLogFile(logFileName);
String logFileName = cmd.getOptionValue(ParamsKey.LOG_FILE_OPTION);
LogUtil.setUpLogFile(logFileName);
projectId = Long.parseLong(cmd.getOptionValue(PROJECT_ID_OPTION));
projectId = Long.parseLong(cmd.getOptionValue(ParamsKey.PROJECT_ID_OPTION));
dsl = cmd.getOptionValue(QUERY_OPTION);
dsl = cmd.getOptionValue(ParamsKey.QUERY_OPTION);
if (StringUtils.isEmpty(dsl)) {
throw new ParseException("please provide query dsl!");
}
outputFile = cmd.getOptionValue(OUTPUT_OPTION);
outputFile = cmd.getOptionValue(ParamsKey.OUTPUT_OPTION);
if (StringUtils.isEmpty(outputFile)) {
outputFile = null;
}
schemaUri = cmd.getOptionValue(SCHEMA_URL_OPTION);
schemaUri = cmd.getOptionValue(ParamsKey.SCHEMA_URL_OPTION);
if (StringUtils.isEmpty(schemaUri)) {
throw new ParseException("please provide openspg schema uri!");
}
graphLoaderClass = cmd.getOptionValue(GRAPH_LOADER_CLASS_OPTION);
graphStateClass = cmd.getOptionValue(GRAPH_STATE_CLASS_OPTION);
graphStateUrl = cmd.getOptionValue(GRAPH_STORE_URL_OPTION);
graphLoaderClass = cmd.getOptionValue(ParamsKey.GRAPH_LOADER_CLASS_OPTION);
graphStateClass = cmd.getOptionValue(ParamsKey.GRAPH_STATE_CLASS_OPTION);
graphStateUrl = cmd.getOptionValue(ParamsKey.GRAPH_STORE_URL_OPTION);
if (StringUtils.isEmpty(graphStateUrl)) {
graphStateUrl = null;
}
String startIdListJson = cmd.getOptionValue(START_ID_OPTION);
String startIdListJson = cmd.getOptionValue(ParamsKey.START_ID_OPTION);
if (StringUtils.isBlank(startIdListJson)) {
startIdList = Collections.emptyList();
} else {
startIdList = JSON.parseObject(startIdListJson, new TypeReference<List<List<String>>>() {});
}
String paramsJson = cmd.getOptionValue(PARAMs_OPTION);
String paramsJson = cmd.getOptionValue(ParamsKey.PARAMs_OPTION);
if (StringUtils.isNotEmpty(paramsJson)) {
params = new HashMap<>(JSON.parseObject(paramsJson));
}
@ -203,81 +195,33 @@ public class LocalReasonerMain {
return task;
}
protected static final String PROJECT_ID_OPTION = "projectId";
protected static final String QUERY_OPTION = "query";
protected static final String OUTPUT_OPTION = "output";
protected static final String SCHEMA_URL_OPTION = "schemaUrl";
protected static final String GRAPH_STATE_CLASS_OPTION = "graphStateClass";
protected static final String GRAPH_LOADER_CLASS_OPTION = "graphLoaderClass";
protected static final String GRAPH_STORE_URL_OPTION = "graphStoreUrl";
protected static final String START_ID_OPTION = "startIdList";
protected static final String PARAMs_OPTION = "params";
protected static final String LOG_FILE_OPTION = "logFile";
protected static Options getOptions() {
Options options = new Options();
options.addRequiredOption(PROJECT_ID_OPTION, PROJECT_ID_OPTION, true, "project id");
options.addRequiredOption(QUERY_OPTION, QUERY_OPTION, true, "query dsl string");
options.addOption(OUTPUT_OPTION, OUTPUT_OPTION, true, "output file name");
options.addRequiredOption(SCHEMA_URL_OPTION, SCHEMA_URL_OPTION, true, "schema url");
options.addOption(
GRAPH_STATE_CLASS_OPTION, GRAPH_STATE_CLASS_OPTION, true, "graph state class name");
options.addOption(
GRAPH_LOADER_CLASS_OPTION, GRAPH_LOADER_CLASS_OPTION, true, "graph loader class name");
options.addRequiredOption(
GRAPH_STORE_URL_OPTION, GRAPH_STORE_URL_OPTION, true, "graph store url");
options.addOption(START_ID_OPTION, START_ID_OPTION, true, "start id list");
options.addOption(PARAMs_OPTION, PARAMs_OPTION, true, "params");
options.addOption(LOG_FILE_OPTION, LOG_FILE_OPTION, true, "log file name");
ParamsKey.PROJECT_ID_OPTION, ParamsKey.PROJECT_ID_OPTION, true, "project id");
options.addRequiredOption(
ParamsKey.QUERY_OPTION, ParamsKey.QUERY_OPTION, true, "query dsl string");
options.addOption(ParamsKey.OUTPUT_OPTION, ParamsKey.OUTPUT_OPTION, true, "output file name");
options.addRequiredOption(
ParamsKey.SCHEMA_URL_OPTION, ParamsKey.SCHEMA_URL_OPTION, true, "schema url");
options.addOption(
ParamsKey.GRAPH_STATE_CLASS_OPTION,
ParamsKey.GRAPH_STATE_CLASS_OPTION,
true,
"graph state class name");
options.addOption(
ParamsKey.GRAPH_LOADER_CLASS_OPTION,
ParamsKey.GRAPH_LOADER_CLASS_OPTION,
true,
"graph loader class name");
options.addRequiredOption(
ParamsKey.GRAPH_STORE_URL_OPTION,
ParamsKey.GRAPH_STORE_URL_OPTION,
true,
"graph store url");
options.addOption(ParamsKey.START_ID_OPTION, ParamsKey.START_ID_OPTION, true, "start id list");
options.addOption(ParamsKey.PARAMs_OPTION, ParamsKey.PARAMs_OPTION, true, "params");
options.addOption(ParamsKey.LOG_FILE_OPTION, ParamsKey.LOG_FILE_OPTION, true, "log file name");
return options;
}
protected static void setUpLogFile(String logFileName) {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
loggerContext.reset();
PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder();
patternLayoutEncoder.setPattern("%d [%X{traceId}] [%X{rpcId}] [%t] %-5p %c{2} - %m%n");
patternLayoutEncoder.setContext(loggerContext);
patternLayoutEncoder.start();
FileAppender<ILoggingEvent> fileAppender = null;
if (StringUtils.isNotBlank(logFileName)) {
fileAppender = new FileAppender<>();
fileAppender.setFile(logFileName);
fileAppender.setEncoder(patternLayoutEncoder);
fileAppender.setContext(loggerContext);
fileAppender.setAppend(false);
fileAppender.start();
}
ConsoleAppender<ILoggingEvent> consoleAppender = new ConsoleAppender<>();
consoleAppender.setEncoder(patternLayoutEncoder);
consoleAppender.setContext(loggerContext);
consoleAppender.start();
Logger brpcLogger = loggerContext.getLogger("com.baidu.brpc");
brpcLogger.setLevel(Level.ERROR);
brpcLogger.setAdditive(false);
if (fileAppender != null) {
brpcLogger.addAppender(fileAppender);
}
brpcLogger.addAppender(consoleAppender);
Logger dtflysLogger = loggerContext.getLogger("com.dtflys.forest");
dtflysLogger.setLevel(Level.ERROR);
dtflysLogger.setAdditive(false);
if (fileAppender != null) {
dtflysLogger.addAppender(fileAppender);
}
dtflysLogger.addAppender(consoleAppender);
Logger rootLogger = loggerContext.getLogger("root");
if (fileAppender != null) {
rootLogger.addAppender(fileAppender);
}
rootLogger.addAppender(consoleAppender);
rootLogger.setLevel(Level.INFO);
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.runner.local;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.ConsoleAppender;
import ch.qos.logback.core.FileAppender;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.LoggerFactory;
public class LogUtil {
public static void setUpLogFile(String logFileName) {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
loggerContext.reset();
PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder();
patternLayoutEncoder.setPattern("%d [%X{traceId}] [%X{rpcId}] [%t] %-5p %c{2} - %m%n");
patternLayoutEncoder.setContext(loggerContext);
patternLayoutEncoder.start();
FileAppender<ILoggingEvent> fileAppender = null;
if (StringUtils.isNotBlank(logFileName)) {
fileAppender = new FileAppender<>();
fileAppender.setFile(logFileName);
fileAppender.setEncoder(patternLayoutEncoder);
fileAppender.setContext(loggerContext);
fileAppender.setAppend(false);
fileAppender.start();
}
ConsoleAppender<ILoggingEvent> consoleAppender = new ConsoleAppender<>();
consoleAppender.setEncoder(patternLayoutEncoder);
consoleAppender.setContext(loggerContext);
consoleAppender.start();
Logger brpcLogger = loggerContext.getLogger("com.baidu.brpc");
brpcLogger.setLevel(Level.ERROR);
brpcLogger.setAdditive(false);
if (fileAppender != null) {
brpcLogger.addAppender(fileAppender);
}
brpcLogger.addAppender(consoleAppender);
Logger dtflysLogger = loggerContext.getLogger("com.dtflys.forest");
dtflysLogger.setLevel(Level.ERROR);
dtflysLogger.setAdditive(false);
if (fileAppender != null) {
dtflysLogger.addAppender(fileAppender);
}
dtflysLogger.addAppender(consoleAppender);
Logger rootLogger = loggerContext.getLogger("root");
if (fileAppender != null) {
rootLogger.addAppender(fileAppender);
}
rootLogger.addAppender(consoleAppender);
rootLogger.setLevel(Level.INFO);
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.runner.local;
public class ParamsKey {
public static final String PROJECT_ID_OPTION = "projectId";
public static final String QUERY_OPTION = "query";
public static final String OUTPUT_OPTION = "output";
public static final String SCHEMA_URL_OPTION = "schemaUrl";
public static final String GRAPH_STATE_CLASS_OPTION = "graphStateClass";
public static final String GRAPH_LOADER_CLASS_OPTION = "graphLoaderClass";
public static final String GRAPH_STORE_URL_OPTION = "graphStoreUrl";
public static final String START_ID_OPTION = "startIdList";
public static final String PARAMs_OPTION = "params";
public static final String LOG_FILE_OPTION = "logFile";
public static final String SUBJECT = "subject";
public static final String PREDICATE = "predicate";
public static final String OBJECT = "object";
public static final String MODE = "mode";
}

View File

@ -0,0 +1,229 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.runner.local.thinker;
import com.alibaba.fastjson.JSON;
import com.antgroup.openspg.reasoner.catalog.impl.KgSchemaConnectionInfo;
import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId;
import com.antgroup.openspg.reasoner.graphstate.GraphState;
import com.antgroup.openspg.reasoner.graphstate.impl.MemGraphState;
import com.antgroup.openspg.reasoner.runner.local.LogUtil;
import com.antgroup.openspg.reasoner.runner.local.ParamsKey;
import com.antgroup.openspg.reasoner.runner.local.load.graph.AbstractLocalGraphLoader;
import com.antgroup.openspg.reasoner.thinker.Thinker;
import com.antgroup.openspg.reasoner.thinker.catalog.LogicCatalog;
import com.antgroup.openspg.reasoner.thinker.catalog.OpenSPGLogicCatalog;
import com.antgroup.openspg.reasoner.thinker.engine.DefaultThinker;
import com.antgroup.openspg.reasoner.thinker.logic.Result;
import com.antgroup.openspg.reasoner.thinker.logic.graph.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.cli.*;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@Slf4j
public class LocalThinkerMain {
public static void main(String[] args) {
doMain(args);
System.exit(0);
}
public static void doMain(String[] args) {
ThinkerParams task = parseArgs(args);
if (null == task) {
System.exit(1);
}
GraphState<IVertexId> graphState = loadGraph(task);
LogicCatalog logicCatalog = new OpenSPGLogicCatalog(task.getProjectId(), task.getConnInfo());
logicCatalog.init();
Thinker thinker = new DefaultThinker(graphState, logicCatalog);
List<Result> result;
if (task.getMode().toLowerCase().equals("spo")) {
result =
thinker.find(
task.getTriple().getSubject(),
task.getTriple().getPredicate(),
task.getTriple().getObject(),
task.getParams());
} else {
result = thinker.find((Node) task.getTriple().getSubject(), task.getParams());
}
if (CollectionUtils.isEmpty(result)) {
log.error("local runner return null");
return;
}
log.info("result:\n {}", result);
}
private static ThinkerParams parseArgs(String[] args) {
Options options = getThinkerOptions();
CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd;
long projectId;
String schemaUri;
String graphStateClass;
String graphLoaderClass;
String graphStateUrl;
Element s = Element.ANY;
Element p = Element.ANY;
Element o = Element.ANY;
String mode = "spo";
Map<String, Object> params = new HashMap<>(3);
try {
cmd = parser.parse(options, args);
String logFileName = cmd.getOptionValue(ParamsKey.LOG_FILE_OPTION);
LogUtil.setUpLogFile(logFileName);
projectId = Long.parseLong(cmd.getOptionValue(ParamsKey.PROJECT_ID_OPTION));
schemaUri = cmd.getOptionValue(ParamsKey.SCHEMA_URL_OPTION);
if (StringUtils.isEmpty(schemaUri)) {
throw new ParseException("please provide openspg schema uri!");
}
graphLoaderClass = cmd.getOptionValue(ParamsKey.GRAPH_LOADER_CLASS_OPTION);
graphStateClass = cmd.getOptionValue(ParamsKey.GRAPH_STATE_CLASS_OPTION);
graphStateUrl = cmd.getOptionValue(ParamsKey.GRAPH_STORE_URL_OPTION);
if (StringUtils.isEmpty(graphStateUrl)) {
graphStateUrl = null;
}
String paramsJson = cmd.getOptionValue(ParamsKey.PARAMs_OPTION);
if (StringUtils.isNotEmpty(paramsJson)) {
params = new HashMap<>(JSON.parseObject(paramsJson));
}
String subject = cmd.getOptionValue(ParamsKey.SUBJECT);
if (StringUtils.isNotBlank(subject)) {
s = strToElement(subject, false);
}
String predicate = cmd.getOptionValue(ParamsKey.PREDICATE);
if (StringUtils.isNotBlank(predicate)) {
p = strToElement(predicate, true);
}
String object = cmd.getOptionValue(ParamsKey.OBJECT);
if (StringUtils.isNotBlank(object)) {
o = strToElement(object, false);
}
if (s == Element.ANY && p == Element.ANY && o == Element.ANY) {
throw new RuntimeException(
"subject, predicate, object cannot all be empty at the same time.");
}
String m = cmd.getOptionValue(ParamsKey.MODE);
if (StringUtils.isNotBlank(m)) {
mode = m;
}
} catch (Exception e) {
log.error(e.getMessage());
formatter.printHelp("ThinkerLocalMain", options);
return null;
}
ThinkerParams task = new ThinkerParams();
task.setTriple(new Triple(s, p, o));
task.setConnInfo(new KgSchemaConnectionInfo(schemaUri, ""));
task.setGraphLoadClass(graphLoaderClass);
task.setGraphStateClassName(graphStateClass);
task.setGraphStateInitString(graphStateUrl);
task.setProjectId(projectId);
task.setParams(params);
task.setMode(mode);
return task;
}
private static Element strToElement(String content, Boolean isPredicate) {
String[] parts = StringUtils.split(content, ",");
if (parts.length == 2) {
return new Entity(parts[0], parts[1]);
} else if (parts.length == 1) {
if (isPredicate) {
return new Predicate(parts[0]);
} else {
return new Node(parts[0]);
}
} else {
throw new RuntimeException("format error, require id,type or type");
}
}
private static Options getThinkerOptions() {
Options options = new Options();
options.addRequiredOption(
ParamsKey.PROJECT_ID_OPTION, ParamsKey.PROJECT_ID_OPTION, true, "project id");
options.addRequiredOption(
ParamsKey.SCHEMA_URL_OPTION, ParamsKey.SCHEMA_URL_OPTION, true, "schema url");
options.addOption(
ParamsKey.SUBJECT, ParamsKey.SUBJECT, true, "query subject, eg: id,type or type");
options.addOption(ParamsKey.PREDICATE, ParamsKey.PREDICATE, true, "query predicate, eg: type");
options.addOption(
ParamsKey.OBJECT, ParamsKey.OBJECT, true, "query object, eg: id,type or type");
options.addOption(ParamsKey.OUTPUT_OPTION, ParamsKey.OUTPUT_OPTION, true, "output file name");
options.addOption(
ParamsKey.GRAPH_STATE_CLASS_OPTION,
ParamsKey.GRAPH_STATE_CLASS_OPTION,
true,
"graph state class name");
options.addOption(
ParamsKey.GRAPH_LOADER_CLASS_OPTION,
ParamsKey.GRAPH_LOADER_CLASS_OPTION,
true,
"graph loader class name");
options.addOption(
ParamsKey.GRAPH_STORE_URL_OPTION,
ParamsKey.GRAPH_STORE_URL_OPTION,
true,
"graph store url");
options.addOption(ParamsKey.PARAMs_OPTION, ParamsKey.PARAMs_OPTION, true, "params");
options.addOption(ParamsKey.LOG_FILE_OPTION, ParamsKey.LOG_FILE_OPTION, true, "log file name");
options.addOption(ParamsKey.MODE, ParamsKey.MODE, true, "infer mode, eg: spo or node");
return options;
}
protected static GraphState<IVertexId> loadGraph(ThinkerParams params) {
GraphState<IVertexId> graphState;
String graphStateClass = params.getGraphStateClassName();
if (StringUtils.isNotEmpty(graphStateClass)) {
try {
graphState =
(GraphState<IVertexId>)
Class.forName(graphStateClass)
.getConstructor(String.class)
.newInstance(params.getGraphStateInitString());
} catch (Exception e) {
throw new RuntimeException("can not create graph state from " + graphStateClass, e);
}
return graphState;
}
MemGraphState memGraphState = new MemGraphState();
String graphLoadClass = params.getGraphLoadClass();
if (StringUtils.isBlank(graphLoadClass)) {
return memGraphState;
}
AbstractLocalGraphLoader graphLoader;
try {
graphLoader =
(AbstractLocalGraphLoader) Class.forName(graphLoadClass).getConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException("can not create graph loader from name " + graphLoadClass, e);
}
graphLoader.setGraphState(memGraphState);
graphLoader.load();
return memGraphState;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.runner.local.thinker;
import com.antgroup.openspg.reasoner.catalog.impl.KgSchemaConnectionInfo;
import com.antgroup.openspg.reasoner.thinker.logic.graph.Triple;
import java.io.Serializable;
import java.util.Map;
import lombok.Data;
@Data
public class ThinkerParams implements Serializable {
private KgSchemaConnectionInfo connInfo = null;
private Long projectId;
private Triple triple;
private Map<String, Object> params;
/** Choose between graphLoadClass and graphState, or specify a class name */
private String graphLoadClass = null;
/** User specified the name of graphstate */
private String graphStateClassName = null;
private String graphStateInitString = null;
private String mode = "spo";
}

View File

@ -1,68 +0,0 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.thinker.catalog;
import com.antgroup.openspg.reasoner.lube.catalog.AbstractConnection;
import com.antgroup.openspg.reasoner.lube.catalog.Catalog;
import com.antgroup.openspg.reasoner.lube.catalog.SemanticPropertyGraph;
import com.antgroup.openspg.reasoner.lube.catalog.struct.Field;
import com.antgroup.openspg.reasoner.thinker.logic.LogicNetwork;
import com.antgroup.openspg.reasoner.thinker.logic.rule.Rule;
import java.util.ArrayList;
import java.util.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
public class DefaultLogicCatalog extends LogicCatalog {
private Catalog kgCatalog;
private List<Rule> rules;
private DefaultLogicCatalog() {
rules = new ArrayList<>();
}
public DefaultLogicCatalog(List<Rule> rules, Catalog kgCatalog) {
this.rules = rules;
this.kgCatalog = kgCatalog;
}
@Override
public LogicNetwork loadLogicNetwork() {
LogicNetwork logicNetwork = new LogicNetwork();
for (Rule r : rules) {
logicNetwork.addRule(r);
}
return logicNetwork;
}
@Override
public SemanticPropertyGraph getKnowledgeGraph() {
return kgCatalog.getKnowledgeGraph();
}
@Override
public Map<AbstractConnection, Set<String>> getConnections() {
return kgCatalog.getConnections();
}
@Override
public Set<Field> getDefaultNodeProperties() {
return kgCatalog.getDefaultNodeProperties();
}
@Override
public Set<Field> getDefaultEdgeProperties() {
return kgCatalog.getDefaultEdgeProperties();
}
}

View File

@ -0,0 +1,108 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.reasoner.thinker.catalog;
import com.antgroup.openspg.core.schema.model.semantic.TripleSemantic;
import com.antgroup.openspg.core.schema.model.type.BaseSPGType;
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
import com.antgroup.openspg.core.schema.model.type.SPGTypeEnum;
import com.antgroup.openspg.reasoner.catalog.impl.KgSchemaConnectionInfo;
import com.antgroup.openspg.reasoner.catalog.impl.OpenSPGCatalog;
import com.antgroup.openspg.reasoner.common.exception.SystemError;
import com.antgroup.openspg.reasoner.lube.catalog.AbstractConnection;
import com.antgroup.openspg.reasoner.lube.catalog.Catalog;
import com.antgroup.openspg.reasoner.lube.catalog.SemanticPropertyGraph;
import com.antgroup.openspg.reasoner.lube.catalog.struct.Field;
import com.antgroup.openspg.reasoner.thinker.SimplifyThinkerParser;
import com.antgroup.openspg.reasoner.thinker.logic.LogicNetwork;
import com.antgroup.openspg.reasoner.thinker.logic.rule.Rule;
import com.antgroup.openspg.server.api.facade.ApiResponse;
import com.antgroup.openspg.server.api.facade.client.ConceptFacade;
import com.antgroup.openspg.server.api.facade.client.SchemaFacade;
import com.antgroup.openspg.server.api.facade.dto.schema.request.ProjectSchemaRequest;
import com.antgroup.openspg.server.api.facade.dto.schema.request.SPGTypeRequest;
import com.antgroup.openspg.server.api.http.client.HttpConceptFacade;
import com.antgroup.openspg.server.api.http.client.HttpSchemaFacade;
import com.antgroup.openspg.server.api.http.client.util.ConnectionInfo;
import com.antgroup.openspg.server.api.http.client.util.HttpClientBootstrap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
public class OpenSPGLogicCatalog extends LogicCatalog {
private Catalog openSPGCatalog;
private Long projectId;
private SchemaFacade schemaFacade;
private ConceptFacade conceptFacade;
public OpenSPGLogicCatalog(Long projectId, KgSchemaConnectionInfo connInfo) {
this.openSPGCatalog = new OpenSPGCatalog(projectId, connInfo, null);
HttpClientBootstrap.init(new ConnectionInfo(connInfo.uri()));
this.projectId = projectId;
this.schemaFacade = new HttpSchemaFacade();
this.conceptFacade = new HttpConceptFacade();
}
@Override
public LogicNetwork loadLogicNetwork() {
ProjectSchemaRequest request = new ProjectSchemaRequest();
request.setProjectId(projectId);
ApiResponse<ProjectSchema> projectSchema = schemaFacade.queryProjectSchema(request);
if (!projectSchema.isSuccess()) {
throw new SystemError("Cannot get schema for projectId=" + projectId, null);
}
Set<String> conceptTypes =
projectSchema.getData().getSpgTypes().stream()
.filter(e -> e.getSpgTypeEnum() == SPGTypeEnum.CONCEPT_TYPE)
.map(BaseSPGType::getName)
.collect(Collectors.toSet());
SPGTypeRequest spgTypeRequest = new SPGTypeRequest(StringUtils.join(conceptTypes, ","));
ApiResponse<List<TripleSemantic>> response =
conceptFacade.getReasoningConceptsDetail(spgTypeRequest);
if (!response.isSuccess()) {
throw new SystemError("Cannot get schema for projectId=" + projectId, null);
}
LogicNetwork logicNetwork = new LogicNetwork();
for (TripleSemantic ts : response.getData()) {
SimplifyThinkerParser parser = new SimplifyThinkerParser();
Rule rule = parser.parseSimplifyDsl(ts.getLogicalRule().getContent(), null).head();
logicNetwork.addRule(rule);
}
return logicNetwork;
}
@Override
public SemanticPropertyGraph getKnowledgeGraph() {
return this.openSPGCatalog.getKnowledgeGraph();
}
@Override
public scala.collection.immutable.Map<AbstractConnection, scala.collection.immutable.Set<String>>
getConnections() {
return this.openSPGCatalog.getConnections();
}
@Override
public scala.collection.immutable.Set<Field> getDefaultNodeProperties() {
return this.openSPGCatalog.getDefaultNodeProperties();
}
@Override
public scala.collection.immutable.Set<Field> getDefaultEdgeProperties() {
return this.openSPGCatalog.getDefaultEdgeProperties();
}
}

View File

@ -26,6 +26,7 @@ import com.antgroup.openspg.server.api.http.server.HttpBizCallback;
import com.antgroup.openspg.server.api.http.server.HttpBizTemplate;
import com.antgroup.openspg.server.biz.common.util.AssertUtils;
import com.antgroup.openspg.server.biz.schema.ConceptManager;
import com.google.common.collect.Lists;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
@ -73,7 +74,8 @@ public class ConceptController extends BaseController {
@Override
public List<TripleSemantic> action() {
return conceptManager.getReasoningConceptsDetail(request.getNameList());
List<String> nameList = Lists.newArrayList(request.getName().split(","));
return conceptManager.getReasoningConceptsDetail(nameList);
}
});
}