From b48dd2cbe5091c9bd1b7db28924e01a61b967eaa Mon Sep 17 00:00:00 2001 From: FishJoy Date: Mon, 26 Aug 2024 13:38:49 +0800 Subject: [PATCH] feat(reasoner): support thinker in knext (#344) Co-authored-by: matthewhyx --- python/knext/knext/command/knext_cli.py | 11 + .../knext/command/sub_command/thinker.py | 28 +++ .../knext/schema/marklang/concept_rule_ml.py | 22 +- python/knext/knext/thinker/__init__.py | 10 + python/knext/knext/thinker/client.py | 59 +++++ reasoner/runner/local-runner/pom.xml | 4 + .../runner/local/LocalReasonerMain.java | 126 +++------- .../reasoner/runner/local/LogUtil.java | 74 ++++++ .../reasoner/runner/local/ParamsKey.java | 31 +++ .../local/thinker/LocalThinkerMain.java | 229 ++++++++++++++++++ .../runner/local/thinker/ThinkerParams.java | 38 +++ .../thinker/catalog/DefaultLogicCatalog.java | 68 ------ .../thinker/catalog/OpenSPGLogicCatalog.java | 108 +++++++++ .../server/openapi/ConceptController.java | 4 +- 14 files changed, 647 insertions(+), 165 deletions(-) create mode 100644 python/knext/knext/command/sub_command/thinker.py create mode 100644 python/knext/knext/thinker/__init__.py create mode 100644 python/knext/knext/thinker/client.py create mode 100644 reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/LogUtil.java create mode 100644 reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/ParamsKey.java create mode 100644 reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/thinker/LocalThinkerMain.java create mode 100644 reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/thinker/ThinkerParams.java delete mode 100644 reasoner/thinker/src/main/java/com/antgroup/openspg/reasoner/thinker/catalog/DefaultLogicCatalog.java create mode 100644 reasoner/thinker/src/main/java/com/antgroup/openspg/reasoner/thinker/catalog/OpenSPGLogicCatalog.java diff --git a/python/knext/knext/command/knext_cli.py b/python/knext/knext/command/knext_cli.py index 4f4a1f66..c44ac006 100644 --- a/python/knext/knext/command/knext_cli.py +++ b/python/knext/knext/command/knext_cli.py @@ -18,6 +18,7 @@ from knext.command.sub_command.config import list_config from knext.command.sub_command.project import create_project from knext.command.sub_command.project import list_project from knext.command.sub_command.reasoner import execute_reasoner_job +from knext.command.sub_command.thinker import execute_thinker_job from knext.command.sub_command.schema import commit_schema from knext.command.sub_command.schema import list_schema from knext.command.sub_command.schema import reg_concept_rule @@ -79,5 +80,15 @@ def reasoner() -> None: reasoner.command("execute")(execute_reasoner_job) + +@_main.group() +def thinker() -> None: + """Thinker client.""" + pass + + +thinker.command("execute")(execute_thinker_job) + + if __name__ == "__main__": _main() diff --git a/python/knext/knext/command/sub_command/thinker.py b/python/knext/knext/command/sub_command/thinker.py new file mode 100644 index 00000000..9e0399e5 --- /dev/null +++ b/python/knext/knext/command/sub_command/thinker.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 OpenSPG Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. + +import click + +from knext.thinker.client import ThinkerClient + + +@click.option("--subject", help="The subject of reasoning goal, eg: id,type or type") +@click.option("--predicate", help="The predicate of reasoning goal, eg: type") +@click.option("--object", help="The object of reasoning goal, eg: id,type or type") +@click.option("--mode", help="Reasoning mode, eg: spo or node") +@click.option("--params", help="Reasoning context") +def execute_thinker_job(subject="", predicate="", object="", mode="spo", params=""): + """ + Submit asynchronous reasoner jobs to server by providing DSL file or string. + """ + client = ThinkerClient() + client.execute(subject, predicate, object, mode, params) diff --git a/python/knext/knext/schema/marklang/concept_rule_ml.py b/python/knext/knext/schema/marklang/concept_rule_ml.py index 45868e4d..98e9099e 100644 --- a/python/knext/knext/schema/marklang/concept_rule_ml.py +++ b/python/knext/knext/schema/marklang/concept_rule_ml.py @@ -16,6 +16,9 @@ from knext.schema.client import SchemaClient from knext.schema.model.base import SpgTypeEnum +combo_seperator = "\0+\0" + + def is_blank(text): if not text: return True @@ -95,7 +98,7 @@ class SPGConceptRuleMarkLang: return reasoning_po_match = re.match( - r"^\[([^\]]+)\]->\(`?([a-zA-Z0-9\.]+)`?/`([^`]+)`\):$", + r"^\[([^\]]+)\]->\(`?([a-zA-Z0-9\.]+)`?/`([^`]+)`(\+`([^`]+)`)?\):$", expression, ) if reasoning_po_match: @@ -103,16 +106,19 @@ class SPGConceptRuleMarkLang: "please define namespace first" ) + combo_add = reasoning_po_match.group(5) self.predicate = reasoning_po_match.group(1) self.dst_concept = ( reasoning_po_match.group(2), - reasoning_po_match.group(3), + reasoning_po_match.group(3) + if combo_add is None + else reasoning_po_match.group(3) + combo_seperator + combo_add, ) self.is_reasoning = True return reasoning_spo_match = re.match( - r"^\(`?([a-zA-Z0-9\.]+)`?/`([^`]+)`\)-\[([^\]]+)\]->\(`([a-zA-Z0-9\.]+)`/`([^`]+)`\):$", + r"^\(`?([a-zA-Z0-9\.]+)`?/`([^`]+)`\)-\[([^\]]+)\]->\(`([a-zA-Z0-9\.]+)`/`([^`]+)`(\+`([^`]+)`)?\):$", expression, ) if reasoning_spo_match: @@ -125,9 +131,12 @@ class SPGConceptRuleMarkLang: reasoning_spo_match.group(2), ) self.predicate = reasoning_spo_match.group(3) + combo_add = reasoning_po_match.group(7) self.dst_concept = ( reasoning_spo_match.group(4), - reasoning_spo_match.group(5), + reasoning_spo_match.group(5) + if combo_add is None + else reasoning_spo_match.group(5) + combo_seperator + combo_add, ) self.is_reasoning = True return @@ -215,6 +224,9 @@ class SPGConceptRuleMarkLang: break if self.is_reasoning: + if combo_seperator in object_name: + names = object_name.split(combo_seperator) + object_name = f"{names[0]}`+{object_type}/`{names[1]}" if ( subject_type is None and self.predicate is None @@ -230,7 +242,7 @@ class SPGConceptRuleMarkLang: head = f"DefinePriority ({object_type})" + " {\n" else: head = ( - f"Define (:{object_type}/`{object_name}`)-[:{predicate_name}]->" + f"Define (:{subject_type}/`{subject_name}`)-[:{predicate_name}]->" f"(:{object_type}/`{object_name}`)" + " {\n" ) elif subject_name is None: diff --git a/python/knext/knext/thinker/__init__.py b/python/knext/knext/thinker/__init__.py new file mode 100644 index 00000000..6f6914a4 --- /dev/null +++ b/python/knext/knext/thinker/__init__.py @@ -0,0 +1,10 @@ +# Copyright 2023 OpenSPG Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. diff --git a/python/knext/knext/thinker/client.py b/python/knext/knext/thinker/client.py new file mode 100644 index 00000000..84541d77 --- /dev/null +++ b/python/knext/knext/thinker/client.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 OpenSPG Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. +import os + +from knext.common.base.client import Client + + +class ThinkerClient(Client): + """SPG Thinker Client.""" + + def __init__(self, host_addr: str = None, project_id: int = None): + super().__init__(host_addr, project_id) + + def execute(self, subject="", predicate="", object="", mode="spo", params=""): + """ + Execute a synchronous builder job in local runner. + """ + + import subprocess + from knext.reasoner import lib + from knext.common import env + + jar_path = os.path.join(lib.__path__[0], lib.LOCAL_REASONER_JAR) + + java_cmd = [ + "java", + "-cp", + jar_path, + "com.antgroup.openspg.reasoner.runner.local.thinker.LocalThinkerMain", + "--projectId", + self._project_id, + "--subject", + subject or "", + "--predicate", + predicate or "", + "--object", + object or "", + "--mode", + mode, + "--params" or "", + params, + "--schemaUrl", + os.environ.get("KNEXT_HOST_ADDR") or env.LOCAL_SCHEMA_URL, + "--graphStateClass", + os.environ.get("KNEXT_GRAPH_STATE_CLASS") or lib.LOCAL_GRAPH_STATE_CLASS, + "--graphStoreUrl", + os.environ.get("KNEXT_GRAPH_STORE_URL") or lib.LOCAL_GRAPH_STORE_URL, + ] + + subprocess.call(java_cmd) diff --git a/reasoner/runner/local-runner/pom.xml b/reasoner/runner/local-runner/pom.xml index 94d80fce..4d1a65de 100644 --- a/reasoner/runner/local-runner/pom.xml +++ b/reasoner/runner/local-runner/pom.xml @@ -68,6 +68,10 @@ com.antgroup.openspg.reasoner reasoner-warehouse-common + + com.antgroup.openspg.reasoner + reasoner-thinker + com.antgroup.openspg.reasoner reasoner-cloudext-warehouse diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/LocalReasonerMain.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/LocalReasonerMain.java index d75b6349..7a927999 100644 --- a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/LocalReasonerMain.java +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/LocalReasonerMain.java @@ -13,13 +13,6 @@ package com.antgroup.openspg.reasoner.runner.local; -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.Logger; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.encoder.PatternLayoutEncoder; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.ConsoleAppender; -import ch.qos.logback.core.FileAppender; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import com.antgroup.openspg.reasoner.catalog.impl.KgSchemaConnectionInfo; @@ -42,7 +35,6 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.StringUtils; -import org.slf4j.LoggerFactory; @Slf4j public class LocalReasonerMain { @@ -149,36 +141,36 @@ public class LocalReasonerMain { try { cmd = parser.parse(options, args); - String logFileName = cmd.getOptionValue(LOG_FILE_OPTION); - setUpLogFile(logFileName); + String logFileName = cmd.getOptionValue(ParamsKey.LOG_FILE_OPTION); + LogUtil.setUpLogFile(logFileName); - projectId = Long.parseLong(cmd.getOptionValue(PROJECT_ID_OPTION)); + projectId = Long.parseLong(cmd.getOptionValue(ParamsKey.PROJECT_ID_OPTION)); - dsl = cmd.getOptionValue(QUERY_OPTION); + dsl = cmd.getOptionValue(ParamsKey.QUERY_OPTION); if (StringUtils.isEmpty(dsl)) { throw new ParseException("please provide query dsl!"); } - outputFile = cmd.getOptionValue(OUTPUT_OPTION); + outputFile = cmd.getOptionValue(ParamsKey.OUTPUT_OPTION); if (StringUtils.isEmpty(outputFile)) { outputFile = null; } - schemaUri = cmd.getOptionValue(SCHEMA_URL_OPTION); + schemaUri = cmd.getOptionValue(ParamsKey.SCHEMA_URL_OPTION); if (StringUtils.isEmpty(schemaUri)) { throw new ParseException("please provide openspg schema uri!"); } - graphLoaderClass = cmd.getOptionValue(GRAPH_LOADER_CLASS_OPTION); - graphStateClass = cmd.getOptionValue(GRAPH_STATE_CLASS_OPTION); - graphStateUrl = cmd.getOptionValue(GRAPH_STORE_URL_OPTION); + graphLoaderClass = cmd.getOptionValue(ParamsKey.GRAPH_LOADER_CLASS_OPTION); + graphStateClass = cmd.getOptionValue(ParamsKey.GRAPH_STATE_CLASS_OPTION); + graphStateUrl = cmd.getOptionValue(ParamsKey.GRAPH_STORE_URL_OPTION); if (StringUtils.isEmpty(graphStateUrl)) { graphStateUrl = null; } - String startIdListJson = cmd.getOptionValue(START_ID_OPTION); + String startIdListJson = cmd.getOptionValue(ParamsKey.START_ID_OPTION); if (StringUtils.isBlank(startIdListJson)) { startIdList = Collections.emptyList(); } else { startIdList = JSON.parseObject(startIdListJson, new TypeReference>>() {}); } - String paramsJson = cmd.getOptionValue(PARAMs_OPTION); + String paramsJson = cmd.getOptionValue(ParamsKey.PARAMs_OPTION); if (StringUtils.isNotEmpty(paramsJson)) { params = new HashMap<>(JSON.parseObject(paramsJson)); } @@ -203,81 +195,33 @@ public class LocalReasonerMain { return task; } - protected static final String PROJECT_ID_OPTION = "projectId"; - protected static final String QUERY_OPTION = "query"; - protected static final String OUTPUT_OPTION = "output"; - protected static final String SCHEMA_URL_OPTION = "schemaUrl"; - protected static final String GRAPH_STATE_CLASS_OPTION = "graphStateClass"; - protected static final String GRAPH_LOADER_CLASS_OPTION = "graphLoaderClass"; - protected static final String GRAPH_STORE_URL_OPTION = "graphStoreUrl"; - protected static final String START_ID_OPTION = "startIdList"; - protected static final String PARAMs_OPTION = "params"; - protected static final String LOG_FILE_OPTION = "logFile"; - protected static Options getOptions() { Options options = new Options(); - - options.addRequiredOption(PROJECT_ID_OPTION, PROJECT_ID_OPTION, true, "project id"); - options.addRequiredOption(QUERY_OPTION, QUERY_OPTION, true, "query dsl string"); - options.addOption(OUTPUT_OPTION, OUTPUT_OPTION, true, "output file name"); - options.addRequiredOption(SCHEMA_URL_OPTION, SCHEMA_URL_OPTION, true, "schema url"); - options.addOption( - GRAPH_STATE_CLASS_OPTION, GRAPH_STATE_CLASS_OPTION, true, "graph state class name"); - options.addOption( - GRAPH_LOADER_CLASS_OPTION, GRAPH_LOADER_CLASS_OPTION, true, "graph loader class name"); options.addRequiredOption( - GRAPH_STORE_URL_OPTION, GRAPH_STORE_URL_OPTION, true, "graph store url"); - options.addOption(START_ID_OPTION, START_ID_OPTION, true, "start id list"); - options.addOption(PARAMs_OPTION, PARAMs_OPTION, true, "params"); - options.addOption(LOG_FILE_OPTION, LOG_FILE_OPTION, true, "log file name"); + ParamsKey.PROJECT_ID_OPTION, ParamsKey.PROJECT_ID_OPTION, true, "project id"); + options.addRequiredOption( + ParamsKey.QUERY_OPTION, ParamsKey.QUERY_OPTION, true, "query dsl string"); + options.addOption(ParamsKey.OUTPUT_OPTION, ParamsKey.OUTPUT_OPTION, true, "output file name"); + options.addRequiredOption( + ParamsKey.SCHEMA_URL_OPTION, ParamsKey.SCHEMA_URL_OPTION, true, "schema url"); + options.addOption( + ParamsKey.GRAPH_STATE_CLASS_OPTION, + ParamsKey.GRAPH_STATE_CLASS_OPTION, + true, + "graph state class name"); + options.addOption( + ParamsKey.GRAPH_LOADER_CLASS_OPTION, + ParamsKey.GRAPH_LOADER_CLASS_OPTION, + true, + "graph loader class name"); + options.addRequiredOption( + ParamsKey.GRAPH_STORE_URL_OPTION, + ParamsKey.GRAPH_STORE_URL_OPTION, + true, + "graph store url"); + options.addOption(ParamsKey.START_ID_OPTION, ParamsKey.START_ID_OPTION, true, "start id list"); + options.addOption(ParamsKey.PARAMs_OPTION, ParamsKey.PARAMs_OPTION, true, "params"); + options.addOption(ParamsKey.LOG_FILE_OPTION, ParamsKey.LOG_FILE_OPTION, true, "log file name"); return options; } - - protected static void setUpLogFile(String logFileName) { - LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); - loggerContext.reset(); - - PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder(); - patternLayoutEncoder.setPattern("%d [%X{traceId}] [%X{rpcId}] [%t] %-5p %c{2} - %m%n"); - patternLayoutEncoder.setContext(loggerContext); - patternLayoutEncoder.start(); - - FileAppender fileAppender = null; - if (StringUtils.isNotBlank(logFileName)) { - fileAppender = new FileAppender<>(); - fileAppender.setFile(logFileName); - fileAppender.setEncoder(patternLayoutEncoder); - fileAppender.setContext(loggerContext); - fileAppender.setAppend(false); - fileAppender.start(); - } - - ConsoleAppender consoleAppender = new ConsoleAppender<>(); - consoleAppender.setEncoder(patternLayoutEncoder); - consoleAppender.setContext(loggerContext); - consoleAppender.start(); - - Logger brpcLogger = loggerContext.getLogger("com.baidu.brpc"); - brpcLogger.setLevel(Level.ERROR); - brpcLogger.setAdditive(false); - if (fileAppender != null) { - brpcLogger.addAppender(fileAppender); - } - brpcLogger.addAppender(consoleAppender); - - Logger dtflysLogger = loggerContext.getLogger("com.dtflys.forest"); - dtflysLogger.setLevel(Level.ERROR); - dtflysLogger.setAdditive(false); - if (fileAppender != null) { - dtflysLogger.addAppender(fileAppender); - } - dtflysLogger.addAppender(consoleAppender); - - Logger rootLogger = loggerContext.getLogger("root"); - if (fileAppender != null) { - rootLogger.addAppender(fileAppender); - } - rootLogger.addAppender(consoleAppender); - rootLogger.setLevel(Level.INFO); - } } diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/LogUtil.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/LogUtil.java new file mode 100644 index 00000000..9b42bdd2 --- /dev/null +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/LogUtil.java @@ -0,0 +1,74 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.runner.local; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.encoder.PatternLayoutEncoder; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.ConsoleAppender; +import ch.qos.logback.core.FileAppender; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.LoggerFactory; + +public class LogUtil { + public static void setUpLogFile(String logFileName) { + LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); + loggerContext.reset(); + + PatternLayoutEncoder patternLayoutEncoder = new PatternLayoutEncoder(); + patternLayoutEncoder.setPattern("%d [%X{traceId}] [%X{rpcId}] [%t] %-5p %c{2} - %m%n"); + patternLayoutEncoder.setContext(loggerContext); + patternLayoutEncoder.start(); + + FileAppender fileAppender = null; + if (StringUtils.isNotBlank(logFileName)) { + fileAppender = new FileAppender<>(); + fileAppender.setFile(logFileName); + fileAppender.setEncoder(patternLayoutEncoder); + fileAppender.setContext(loggerContext); + fileAppender.setAppend(false); + fileAppender.start(); + } + + ConsoleAppender consoleAppender = new ConsoleAppender<>(); + consoleAppender.setEncoder(patternLayoutEncoder); + consoleAppender.setContext(loggerContext); + consoleAppender.start(); + + Logger brpcLogger = loggerContext.getLogger("com.baidu.brpc"); + brpcLogger.setLevel(Level.ERROR); + brpcLogger.setAdditive(false); + if (fileAppender != null) { + brpcLogger.addAppender(fileAppender); + } + brpcLogger.addAppender(consoleAppender); + + Logger dtflysLogger = loggerContext.getLogger("com.dtflys.forest"); + dtflysLogger.setLevel(Level.ERROR); + dtflysLogger.setAdditive(false); + if (fileAppender != null) { + dtflysLogger.addAppender(fileAppender); + } + dtflysLogger.addAppender(consoleAppender); + + Logger rootLogger = loggerContext.getLogger("root"); + if (fileAppender != null) { + rootLogger.addAppender(fileAppender); + } + rootLogger.addAppender(consoleAppender); + rootLogger.setLevel(Level.INFO); + } +} diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/ParamsKey.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/ParamsKey.java new file mode 100644 index 00000000..08079d2a --- /dev/null +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/ParamsKey.java @@ -0,0 +1,31 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.runner.local; + +public class ParamsKey { + public static final String PROJECT_ID_OPTION = "projectId"; + public static final String QUERY_OPTION = "query"; + public static final String OUTPUT_OPTION = "output"; + public static final String SCHEMA_URL_OPTION = "schemaUrl"; + public static final String GRAPH_STATE_CLASS_OPTION = "graphStateClass"; + public static final String GRAPH_LOADER_CLASS_OPTION = "graphLoaderClass"; + public static final String GRAPH_STORE_URL_OPTION = "graphStoreUrl"; + public static final String START_ID_OPTION = "startIdList"; + public static final String PARAMs_OPTION = "params"; + public static final String LOG_FILE_OPTION = "logFile"; + public static final String SUBJECT = "subject"; + public static final String PREDICATE = "predicate"; + public static final String OBJECT = "object"; + public static final String MODE = "mode"; +} diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/thinker/LocalThinkerMain.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/thinker/LocalThinkerMain.java new file mode 100644 index 00000000..eda010cf --- /dev/null +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/thinker/LocalThinkerMain.java @@ -0,0 +1,229 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.runner.local.thinker; + +import com.alibaba.fastjson.JSON; +import com.antgroup.openspg.reasoner.catalog.impl.KgSchemaConnectionInfo; +import com.antgroup.openspg.reasoner.common.graph.vertex.IVertexId; +import com.antgroup.openspg.reasoner.graphstate.GraphState; +import com.antgroup.openspg.reasoner.graphstate.impl.MemGraphState; +import com.antgroup.openspg.reasoner.runner.local.LogUtil; +import com.antgroup.openspg.reasoner.runner.local.ParamsKey; +import com.antgroup.openspg.reasoner.runner.local.load.graph.AbstractLocalGraphLoader; +import com.antgroup.openspg.reasoner.thinker.Thinker; +import com.antgroup.openspg.reasoner.thinker.catalog.LogicCatalog; +import com.antgroup.openspg.reasoner.thinker.catalog.OpenSPGLogicCatalog; +import com.antgroup.openspg.reasoner.thinker.engine.DefaultThinker; +import com.antgroup.openspg.reasoner.thinker.logic.Result; +import com.antgroup.openspg.reasoner.thinker.logic.graph.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.cli.*; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +@Slf4j +public class LocalThinkerMain { + public static void main(String[] args) { + doMain(args); + System.exit(0); + } + + public static void doMain(String[] args) { + ThinkerParams task = parseArgs(args); + if (null == task) { + System.exit(1); + } + GraphState graphState = loadGraph(task); + LogicCatalog logicCatalog = new OpenSPGLogicCatalog(task.getProjectId(), task.getConnInfo()); + logicCatalog.init(); + Thinker thinker = new DefaultThinker(graphState, logicCatalog); + List result; + if (task.getMode().toLowerCase().equals("spo")) { + result = + thinker.find( + task.getTriple().getSubject(), + task.getTriple().getPredicate(), + task.getTriple().getObject(), + task.getParams()); + } else { + result = thinker.find((Node) task.getTriple().getSubject(), task.getParams()); + } + if (CollectionUtils.isEmpty(result)) { + log.error("local runner return null"); + return; + } + log.info("result:\n {}", result); + } + + private static ThinkerParams parseArgs(String[] args) { + Options options = getThinkerOptions(); + + CommandLineParser parser = new DefaultParser(); + HelpFormatter formatter = new HelpFormatter(); + CommandLine cmd; + + long projectId; + String schemaUri; + String graphStateClass; + String graphLoaderClass; + String graphStateUrl; + Element s = Element.ANY; + Element p = Element.ANY; + Element o = Element.ANY; + String mode = "spo"; + Map params = new HashMap<>(3); + try { + cmd = parser.parse(options, args); + + String logFileName = cmd.getOptionValue(ParamsKey.LOG_FILE_OPTION); + LogUtil.setUpLogFile(logFileName); + + projectId = Long.parseLong(cmd.getOptionValue(ParamsKey.PROJECT_ID_OPTION)); + + schemaUri = cmd.getOptionValue(ParamsKey.SCHEMA_URL_OPTION); + if (StringUtils.isEmpty(schemaUri)) { + throw new ParseException("please provide openspg schema uri!"); + } + graphLoaderClass = cmd.getOptionValue(ParamsKey.GRAPH_LOADER_CLASS_OPTION); + graphStateClass = cmd.getOptionValue(ParamsKey.GRAPH_STATE_CLASS_OPTION); + graphStateUrl = cmd.getOptionValue(ParamsKey.GRAPH_STORE_URL_OPTION); + if (StringUtils.isEmpty(graphStateUrl)) { + graphStateUrl = null; + } + String paramsJson = cmd.getOptionValue(ParamsKey.PARAMs_OPTION); + if (StringUtils.isNotEmpty(paramsJson)) { + params = new HashMap<>(JSON.parseObject(paramsJson)); + } + String subject = cmd.getOptionValue(ParamsKey.SUBJECT); + if (StringUtils.isNotBlank(subject)) { + s = strToElement(subject, false); + } + String predicate = cmd.getOptionValue(ParamsKey.PREDICATE); + if (StringUtils.isNotBlank(predicate)) { + p = strToElement(predicate, true); + } + String object = cmd.getOptionValue(ParamsKey.OBJECT); + if (StringUtils.isNotBlank(object)) { + o = strToElement(object, false); + } + if (s == Element.ANY && p == Element.ANY && o == Element.ANY) { + throw new RuntimeException( + "subject, predicate, object cannot all be empty at the same time."); + } + String m = cmd.getOptionValue(ParamsKey.MODE); + if (StringUtils.isNotBlank(m)) { + mode = m; + } + } catch (Exception e) { + log.error(e.getMessage()); + formatter.printHelp("ThinkerLocalMain", options); + return null; + } + + ThinkerParams task = new ThinkerParams(); + task.setTriple(new Triple(s, p, o)); + task.setConnInfo(new KgSchemaConnectionInfo(schemaUri, "")); + task.setGraphLoadClass(graphLoaderClass); + task.setGraphStateClassName(graphStateClass); + task.setGraphStateInitString(graphStateUrl); + task.setProjectId(projectId); + task.setParams(params); + task.setMode(mode); + return task; + } + + private static Element strToElement(String content, Boolean isPredicate) { + String[] parts = StringUtils.split(content, ","); + if (parts.length == 2) { + return new Entity(parts[0], parts[1]); + } else if (parts.length == 1) { + if (isPredicate) { + return new Predicate(parts[0]); + } else { + return new Node(parts[0]); + } + } else { + throw new RuntimeException("format error, require id,type or type"); + } + } + + private static Options getThinkerOptions() { + Options options = new Options(); + options.addRequiredOption( + ParamsKey.PROJECT_ID_OPTION, ParamsKey.PROJECT_ID_OPTION, true, "project id"); + options.addRequiredOption( + ParamsKey.SCHEMA_URL_OPTION, ParamsKey.SCHEMA_URL_OPTION, true, "schema url"); + options.addOption( + ParamsKey.SUBJECT, ParamsKey.SUBJECT, true, "query subject, eg: id,type or type"); + options.addOption(ParamsKey.PREDICATE, ParamsKey.PREDICATE, true, "query predicate, eg: type"); + options.addOption( + ParamsKey.OBJECT, ParamsKey.OBJECT, true, "query object, eg: id,type or type"); + options.addOption(ParamsKey.OUTPUT_OPTION, ParamsKey.OUTPUT_OPTION, true, "output file name"); + options.addOption( + ParamsKey.GRAPH_STATE_CLASS_OPTION, + ParamsKey.GRAPH_STATE_CLASS_OPTION, + true, + "graph state class name"); + options.addOption( + ParamsKey.GRAPH_LOADER_CLASS_OPTION, + ParamsKey.GRAPH_LOADER_CLASS_OPTION, + true, + "graph loader class name"); + options.addOption( + ParamsKey.GRAPH_STORE_URL_OPTION, + ParamsKey.GRAPH_STORE_URL_OPTION, + true, + "graph store url"); + options.addOption(ParamsKey.PARAMs_OPTION, ParamsKey.PARAMs_OPTION, true, "params"); + options.addOption(ParamsKey.LOG_FILE_OPTION, ParamsKey.LOG_FILE_OPTION, true, "log file name"); + options.addOption(ParamsKey.MODE, ParamsKey.MODE, true, "infer mode, eg: spo or node"); + return options; + } + + protected static GraphState loadGraph(ThinkerParams params) { + GraphState graphState; + String graphStateClass = params.getGraphStateClassName(); + if (StringUtils.isNotEmpty(graphStateClass)) { + try { + graphState = + (GraphState) + Class.forName(graphStateClass) + .getConstructor(String.class) + .newInstance(params.getGraphStateInitString()); + } catch (Exception e) { + throw new RuntimeException("can not create graph state from " + graphStateClass, e); + } + return graphState; + } + + MemGraphState memGraphState = new MemGraphState(); + String graphLoadClass = params.getGraphLoadClass(); + if (StringUtils.isBlank(graphLoadClass)) { + return memGraphState; + } + AbstractLocalGraphLoader graphLoader; + try { + graphLoader = + (AbstractLocalGraphLoader) Class.forName(graphLoadClass).getConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException("can not create graph loader from name " + graphLoadClass, e); + } + graphLoader.setGraphState(memGraphState); + graphLoader.load(); + return memGraphState; + } +} diff --git a/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/thinker/ThinkerParams.java b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/thinker/ThinkerParams.java new file mode 100644 index 00000000..8f743158 --- /dev/null +++ b/reasoner/runner/local-runner/src/main/java/com/antgroup/openspg/reasoner/runner/local/thinker/ThinkerParams.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.runner.local.thinker; + +import com.antgroup.openspg.reasoner.catalog.impl.KgSchemaConnectionInfo; +import com.antgroup.openspg.reasoner.thinker.logic.graph.Triple; +import java.io.Serializable; +import java.util.Map; +import lombok.Data; + +@Data +public class ThinkerParams implements Serializable { + private KgSchemaConnectionInfo connInfo = null; + private Long projectId; + private Triple triple; + private Map params; + + /** Choose between graphLoadClass and graphState, or specify a class name */ + private String graphLoadClass = null; + + /** User specified the name of graphstate */ + private String graphStateClassName = null; + + private String graphStateInitString = null; + + private String mode = "spo"; +} diff --git a/reasoner/thinker/src/main/java/com/antgroup/openspg/reasoner/thinker/catalog/DefaultLogicCatalog.java b/reasoner/thinker/src/main/java/com/antgroup/openspg/reasoner/thinker/catalog/DefaultLogicCatalog.java deleted file mode 100644 index c1cf86c7..00000000 --- a/reasoner/thinker/src/main/java/com/antgroup/openspg/reasoner/thinker/catalog/DefaultLogicCatalog.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2023 OpenSPG Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. - */ - -package com.antgroup.openspg.reasoner.thinker.catalog; - -import com.antgroup.openspg.reasoner.lube.catalog.AbstractConnection; -import com.antgroup.openspg.reasoner.lube.catalog.Catalog; -import com.antgroup.openspg.reasoner.lube.catalog.SemanticPropertyGraph; -import com.antgroup.openspg.reasoner.lube.catalog.struct.Field; -import com.antgroup.openspg.reasoner.thinker.logic.LogicNetwork; -import com.antgroup.openspg.reasoner.thinker.logic.rule.Rule; -import java.util.ArrayList; -import java.util.List; -import scala.collection.immutable.Map; -import scala.collection.immutable.Set; - -public class DefaultLogicCatalog extends LogicCatalog { - private Catalog kgCatalog; - private List rules; - - private DefaultLogicCatalog() { - rules = new ArrayList<>(); - } - - public DefaultLogicCatalog(List rules, Catalog kgCatalog) { - this.rules = rules; - this.kgCatalog = kgCatalog; - } - - @Override - public LogicNetwork loadLogicNetwork() { - LogicNetwork logicNetwork = new LogicNetwork(); - for (Rule r : rules) { - logicNetwork.addRule(r); - } - return logicNetwork; - } - - @Override - public SemanticPropertyGraph getKnowledgeGraph() { - return kgCatalog.getKnowledgeGraph(); - } - - @Override - public Map> getConnections() { - return kgCatalog.getConnections(); - } - - @Override - public Set getDefaultNodeProperties() { - return kgCatalog.getDefaultNodeProperties(); - } - - @Override - public Set getDefaultEdgeProperties() { - return kgCatalog.getDefaultEdgeProperties(); - } -} diff --git a/reasoner/thinker/src/main/java/com/antgroup/openspg/reasoner/thinker/catalog/OpenSPGLogicCatalog.java b/reasoner/thinker/src/main/java/com/antgroup/openspg/reasoner/thinker/catalog/OpenSPGLogicCatalog.java new file mode 100644 index 00000000..b7a561c0 --- /dev/null +++ b/reasoner/thinker/src/main/java/com/antgroup/openspg/reasoner/thinker/catalog/OpenSPGLogicCatalog.java @@ -0,0 +1,108 @@ +/* + * Copyright 2023 OpenSPG Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. + */ + +package com.antgroup.openspg.reasoner.thinker.catalog; + +import com.antgroup.openspg.core.schema.model.semantic.TripleSemantic; +import com.antgroup.openspg.core.schema.model.type.BaseSPGType; +import com.antgroup.openspg.core.schema.model.type.ProjectSchema; +import com.antgroup.openspg.core.schema.model.type.SPGTypeEnum; +import com.antgroup.openspg.reasoner.catalog.impl.KgSchemaConnectionInfo; +import com.antgroup.openspg.reasoner.catalog.impl.OpenSPGCatalog; +import com.antgroup.openspg.reasoner.common.exception.SystemError; +import com.antgroup.openspg.reasoner.lube.catalog.AbstractConnection; +import com.antgroup.openspg.reasoner.lube.catalog.Catalog; +import com.antgroup.openspg.reasoner.lube.catalog.SemanticPropertyGraph; +import com.antgroup.openspg.reasoner.lube.catalog.struct.Field; +import com.antgroup.openspg.reasoner.thinker.SimplifyThinkerParser; +import com.antgroup.openspg.reasoner.thinker.logic.LogicNetwork; +import com.antgroup.openspg.reasoner.thinker.logic.rule.Rule; +import com.antgroup.openspg.server.api.facade.ApiResponse; +import com.antgroup.openspg.server.api.facade.client.ConceptFacade; +import com.antgroup.openspg.server.api.facade.client.SchemaFacade; +import com.antgroup.openspg.server.api.facade.dto.schema.request.ProjectSchemaRequest; +import com.antgroup.openspg.server.api.facade.dto.schema.request.SPGTypeRequest; +import com.antgroup.openspg.server.api.http.client.HttpConceptFacade; +import com.antgroup.openspg.server.api.http.client.HttpSchemaFacade; +import com.antgroup.openspg.server.api.http.client.util.ConnectionInfo; +import com.antgroup.openspg.server.api.http.client.util.HttpClientBootstrap; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; + +public class OpenSPGLogicCatalog extends LogicCatalog { + private Catalog openSPGCatalog; + private Long projectId; + private SchemaFacade schemaFacade; + private ConceptFacade conceptFacade; + + public OpenSPGLogicCatalog(Long projectId, KgSchemaConnectionInfo connInfo) { + this.openSPGCatalog = new OpenSPGCatalog(projectId, connInfo, null); + HttpClientBootstrap.init(new ConnectionInfo(connInfo.uri())); + this.projectId = projectId; + this.schemaFacade = new HttpSchemaFacade(); + this.conceptFacade = new HttpConceptFacade(); + } + + @Override + public LogicNetwork loadLogicNetwork() { + ProjectSchemaRequest request = new ProjectSchemaRequest(); + request.setProjectId(projectId); + ApiResponse projectSchema = schemaFacade.queryProjectSchema(request); + if (!projectSchema.isSuccess()) { + throw new SystemError("Cannot get schema for projectId=" + projectId, null); + } + Set conceptTypes = + projectSchema.getData().getSpgTypes().stream() + .filter(e -> e.getSpgTypeEnum() == SPGTypeEnum.CONCEPT_TYPE) + .map(BaseSPGType::getName) + .collect(Collectors.toSet()); + + SPGTypeRequest spgTypeRequest = new SPGTypeRequest(StringUtils.join(conceptTypes, ",")); + ApiResponse> response = + conceptFacade.getReasoningConceptsDetail(spgTypeRequest); + if (!response.isSuccess()) { + throw new SystemError("Cannot get schema for projectId=" + projectId, null); + } + LogicNetwork logicNetwork = new LogicNetwork(); + for (TripleSemantic ts : response.getData()) { + SimplifyThinkerParser parser = new SimplifyThinkerParser(); + Rule rule = parser.parseSimplifyDsl(ts.getLogicalRule().getContent(), null).head(); + logicNetwork.addRule(rule); + } + + return logicNetwork; + } + + @Override + public SemanticPropertyGraph getKnowledgeGraph() { + return this.openSPGCatalog.getKnowledgeGraph(); + } + + @Override + public scala.collection.immutable.Map> + getConnections() { + return this.openSPGCatalog.getConnections(); + } + + @Override + public scala.collection.immutable.Set getDefaultNodeProperties() { + return this.openSPGCatalog.getDefaultNodeProperties(); + } + + @Override + public scala.collection.immutable.Set getDefaultEdgeProperties() { + return this.openSPGCatalog.getDefaultEdgeProperties(); + } +} diff --git a/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/ConceptController.java b/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/ConceptController.java index 75637818..de5bdf34 100644 --- a/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/ConceptController.java +++ b/server/api/http-server/src/main/java/com/antgroup/openspg/server/api/http/server/openapi/ConceptController.java @@ -26,6 +26,7 @@ import com.antgroup.openspg.server.api.http.server.HttpBizCallback; import com.antgroup.openspg.server.api.http.server.HttpBizTemplate; import com.antgroup.openspg.server.biz.common.util.AssertUtils; import com.antgroup.openspg.server.biz.schema.ConceptManager; +import com.google.common.collect.Lists; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; @@ -73,7 +74,8 @@ public class ConceptController extends BaseController { @Override public List action() { - return conceptManager.getReasoningConceptsDetail(request.getNameList()); + List nameList = Lists.newArrayList(request.getName().split(",")); + return conceptManager.getReasoningConceptsDetail(nameList); } }); }