fix(thinker): support thinker server runner (#389)

Co-authored-by: FishJoy <chengqiang.cq@antgroup.com>
This commit is contained in:
royzhao 2024-11-05 11:50:24 +08:00 committed by GitHub
parent 0e687f74e5
commit 37514d94f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 980 additions and 32 deletions

Binary file not shown.

View File

@ -15,3 +15,4 @@ from knext.reasoner.rest.models import *
from knext.project.rest.models import *
from knext.search.rest.models import *
from knext.graph_algo.rest.models import *
from knext.thinker.rest.models import *

View File

@ -12,6 +12,9 @@
import os
from knext.common.base.client import Client
from knext.common.rest import Configuration, ApiClient
from knext.thinker import rest
from knext.thinker.rest import ThinkerTaskRequest, ThinkerTaskResponse
class ThinkerClient(Client):
@ -20,40 +23,32 @@ class ThinkerClient(Client):
def __init__(self, host_addr: str = None, project_id: int = None):
super().__init__(host_addr, project_id)
self._rest_client: rest.ThinkerApi = rest.ThinkerApi(
api_client=ApiClient(configuration=Configuration(host=host_addr))
)
def execute(self, subject="", predicate="", object="", mode="spo", params=""):
"""
Execute a synchronous builder job in local runner.
"""
req: ThinkerTaskRequest = ThinkerTaskRequest(
project_id=self._project_id,
subject=subject,
predicate=predicate,
object=object,
mode=mode,
params=params,
)
rep: ThinkerTaskResponse = self._rest_client.reason_thinker_post(
thinker_task_request=req
)
print(rep)
import subprocess
from knext.reasoner import lib
from knext.common import env
jar_path = os.path.join(lib.__path__[0], lib.LOCAL_REASONER_JAR)
java_cmd = [
"java",
"-cp",
jar_path,
"com.antgroup.openspg.reasoner.runner.local.thinker.LocalThinkerMain",
"--projectId",
self._project_id,
"--subject",
subject or "",
"--predicate",
predicate or "",
"--object",
object or "",
"--mode",
mode,
"--params" or "",
params,
"--schemaUrl",
os.environ.get("KNEXT_HOST_ADDR") or env.LOCAL_SCHEMA_URL,
"--graphStateClass",
os.environ.get("KNEXT_GRAPH_STATE_CLASS") or lib.LOCAL_GRAPH_STATE_CLASS,
"--graphStoreUrl",
os.environ.get("KNEXT_GRAPH_STORE_URL") or lib.LOCAL_GRAPH_STORE_URL,
]
subprocess.call(java_cmd)
if __name__ == "__main__":
sc = ThinkerClient("http://127.0.0.1:8887", 2)
sc.execute(
subject="DiseaseLevel",
mode="node",
params='{"spg.reasoner.thinker.strict":·true,·"收缩压":150}',
)

View File

@ -0,0 +1,33 @@
# coding: utf-8
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
# flake8: noqa
"""
knext
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
__version__ = "1"
# import models into sdk package
from knext.thinker.rest.models.thinker_task_request import ThinkerTaskRequest
from knext.thinker.rest.models.thinker_task_response import ThinkerTaskResponse
# import apis into sdk package
from knext.thinker.rest.thinker_api import ThinkerApi

View File

@ -0,0 +1,28 @@
# coding: utf-8
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
# flake8: noqa
"""
knext
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
# import models into model package
from knext.thinker.rest.models.thinker_task_request import ThinkerTaskRequest
from knext.thinker.rest.models.thinker_task_response import ThinkerTaskResponse

View File

@ -0,0 +1,277 @@
# coding: utf-8
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
"""
knext
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
import pprint
import re # noqa: F401
import six
from knext.common.rest.configuration import Configuration
class ThinkerTaskRequest(object):
"""NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
"""
Attributes:
openapi_types (dict): The key is attribute name
and the value is attribute type.
attribute_map (dict): The key is attribute name
and the value is json key in definition.
"""
openapi_types = {
"project_id": "int",
"subject": "str",
"predicate": "str",
"object": "str",
"mode": "str",
"params": "str",
}
attribute_map = {
"project_id": "projectId",
"subject": "subject",
"predicate": "predicate",
"object": "object",
"mode": "mode",
"params": "params",
}
def __init__(
self,
project_id=None,
subject=None,
predicate=None,
object=None,
mode=None,
params=None,
local_vars_configuration=None,
): # noqa: E501
"""ThinkerTaskRequest - a model defined in OpenAPI""" # noqa: E501
if local_vars_configuration is None:
local_vars_configuration = Configuration()
self.local_vars_configuration = local_vars_configuration
self._project_id = None
self._subject = None
self._predicate = None
self._object = None
self._mode = None
self._params = None
self.discriminator = None
self.project_id = project_id
if subject is not None:
self.subject = subject
if predicate is not None:
self.predicate = predicate
if object is not None:
self.object = object
if mode is not None:
self.mode = mode
if params is not None:
self.params = params
@property
def project_id(self):
"""Gets the project_id of this ThinkerTaskRequest. # noqa: E501
:return: The project_id of this ThinkerTaskRequest. # noqa: E501
:rtype: int
"""
return self._project_id
@project_id.setter
def project_id(self, project_id):
"""Sets the project_id of this ThinkerTaskRequest.
:param project_id: The project_id of this ThinkerTaskRequest. # noqa: E501
:type: int
"""
if (
self.local_vars_configuration.client_side_validation and project_id is None
): # noqa: E501
raise ValueError(
"Invalid value for `project_id`, must not be `None`"
) # noqa: E501
self._project_id = project_id
@property
def subject(self):
"""Gets the subject of this ThinkerTaskRequest. # noqa: E501
:return: The subject of this ThinkerTaskRequest. # noqa: E501
:rtype: str
"""
return self._subject
@subject.setter
def subject(self, subject):
"""Sets the subject of this ThinkerTaskRequest.
:param subject: The subject of this ThinkerTaskRequest. # noqa: E501
:type: str
"""
self._subject = subject
@property
def predicate(self):
"""Gets the predicate of this ThinkerTaskRequest. # noqa: E501
:return: The predicate of this ThinkerTaskRequest. # noqa: E501
:rtype: str
"""
return self._predicate
@predicate.setter
def predicate(self, predicate):
"""Sets the predicate of this ThinkerTaskRequest.
:param predicate: The predicate of this ThinkerTaskRequest. # noqa: E501
:type: str
"""
self._predicate = predicate
@property
def object(self):
"""Gets the object of this ThinkerTaskRequest. # noqa: E501
:return: The object of this ThinkerTaskRequest. # noqa: E501
:rtype: str
"""
return self._object
@object.setter
def object(self, object):
"""Sets the object of this ThinkerTaskRequest.
:param object: The object of this ThinkerTaskRequest. # noqa: E501
:type: str
"""
self._object = object
@property
def mode(self):
"""Gets the mode of this ThinkerTaskRequest. # noqa: E501
:return: The mode of this ThinkerTaskRequest. # noqa: E501
:rtype: str
"""
return self._mode
@mode.setter
def mode(self, mode):
"""Sets the mode of this ThinkerTaskRequest.
:param mode: The mode of this ThinkerTaskRequest. # noqa: E501
:type: str
"""
self._mode = mode
@property
def params(self):
"""Gets the params of this ThinkerTaskRequest. # noqa: E501
:return: The params of this ThinkerTaskRequest. # noqa: E501
:rtype: str
"""
return self._params
@params.setter
def params(self, params):
"""Sets the params of this ThinkerTaskRequest.
:param params: The params of this ThinkerTaskRequest. # noqa: E501
:type: str
"""
self._params = params
def to_dict(self):
"""Returns the model properties as a dict"""
result = {}
for attr, _ in six.iteritems(self.openapi_types):
value = getattr(self, attr)
if isinstance(value, list):
result[attr] = list(
map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value)
)
elif hasattr(value, "to_dict"):
result[attr] = value.to_dict()
elif isinstance(value, dict):
result[attr] = dict(
map(
lambda item: (item[0], item[1].to_dict())
if hasattr(item[1], "to_dict")
else item,
value.items(),
)
)
else:
result[attr] = value
return result
def to_str(self):
"""Returns the string representation of the model"""
return pprint.pformat(self.to_dict())
def __repr__(self):
"""For `print` and `pprint`"""
return self.to_str()
def __eq__(self, other):
"""Returns true if both objects are equal"""
if not isinstance(other, ThinkerTaskRequest):
return False
return self.to_dict() == other.to_dict()
def __ne__(self, other):
"""Returns true if both objects are not equal"""
if not isinstance(other, ThinkerTaskRequest):
return True
return self.to_dict() != other.to_dict()

View File

@ -0,0 +1,194 @@
# coding: utf-8
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
"""
knext
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
import pprint
import re # noqa: F401
import six
from knext.common.rest.configuration import Configuration
class ThinkerTaskResponse(object):
"""NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
"""
Attributes:
openapi_types (dict): The key is attribute name
and the value is attribute type.
attribute_map (dict): The key is attribute name
and the value is json key in definition.
"""
openapi_types = {"project_id": "int", "task_id": "str", "result": "list[object]"}
attribute_map = {"project_id": "projectId", "task_id": "taskId", "result": "result"}
def __init__(
self, project_id=None, task_id=None, result=None, local_vars_configuration=None
): # noqa: E501
"""ThinkerTaskResponse - a model defined in OpenAPI""" # noqa: E501
if local_vars_configuration is None:
local_vars_configuration = Configuration()
self.local_vars_configuration = local_vars_configuration
self._project_id = None
self._task_id = None
self._result = None
self.discriminator = None
self.project_id = project_id
self.task_id = task_id
self.result = result
@property
def project_id(self):
"""Gets the project_id of this ThinkerTaskResponse. # noqa: E501
:return: The project_id of this ThinkerTaskResponse. # noqa: E501
:rtype: int
"""
return self._project_id
@project_id.setter
def project_id(self, project_id):
"""Sets the project_id of this ThinkerTaskResponse.
:param project_id: The project_id of this ThinkerTaskResponse. # noqa: E501
:type: int
"""
if (
self.local_vars_configuration.client_side_validation and project_id is None
): # noqa: E501
raise ValueError(
"Invalid value for `project_id`, must not be `None`"
) # noqa: E501
self._project_id = project_id
@property
def task_id(self):
"""Gets the task_id of this ThinkerTaskResponse. # noqa: E501
:return: The task_id of this ThinkerTaskResponse. # noqa: E501
:rtype: str
"""
return self._task_id
@task_id.setter
def task_id(self, task_id):
"""Sets the task_id of this ThinkerTaskResponse.
:param task_id: The task_id of this ThinkerTaskResponse. # noqa: E501
:type: str
"""
if (
self.local_vars_configuration.client_side_validation and task_id is None
): # noqa: E501
raise ValueError(
"Invalid value for `task_id`, must not be `None`"
) # noqa: E501
self._task_id = task_id
@property
def result(self):
"""Gets the result of this ThinkerTaskResponse. # noqa: E501
:return: The result of this ThinkerTaskResponse. # noqa: E501
:rtype: list[object]
"""
return self._result
@result.setter
def result(self, result):
"""Sets the result of this ThinkerTaskResponse.
:param result: The result of this ThinkerTaskResponse. # noqa: E501
:type: list[object]
"""
if (
self.local_vars_configuration.client_side_validation and result is None
): # noqa: E501
raise ValueError(
"Invalid value for `result`, must not be `None`"
) # noqa: E501
self._result = result
def to_dict(self):
"""Returns the model properties as a dict"""
result = {}
for attr, _ in six.iteritems(self.openapi_types):
value = getattr(self, attr)
if isinstance(value, list):
result[attr] = list(
map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value)
)
elif hasattr(value, "to_dict"):
result[attr] = value.to_dict()
elif isinstance(value, dict):
result[attr] = dict(
map(
lambda item: (item[0], item[1].to_dict())
if hasattr(item[1], "to_dict")
else item,
value.items(),
)
)
else:
result[attr] = value
return result
def to_str(self):
"""Returns the string representation of the model"""
return pprint.pformat(self.to_dict())
def __repr__(self):
"""For `print` and `pprint`"""
return self.to_str()
def __eq__(self, other):
"""Returns true if both objects are equal"""
if not isinstance(other, ThinkerTaskResponse):
return False
return self.to_dict() == other.to_dict()
def __ne__(self, other):
"""Returns true if both objects are not equal"""
if not isinstance(other, ThinkerTaskResponse):
return True
return self.to_dict() != other.to_dict()

View File

@ -0,0 +1,161 @@
# coding: utf-8
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
"""
knext
No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator) # noqa: E501
The version of the OpenAPI document: 1.0.0
Generated by: https://openapi-generator.tech
"""
from __future__ import absolute_import
import re # noqa: F401
# python 2 and python 3 compatibility library
import six
from knext.common.rest.api_client import ApiClient
from knext.common.rest.exceptions import ApiTypeError, ApiValueError # noqa: F401
class ThinkerApi(object):
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
def __init__(self, api_client=None):
if api_client is None:
api_client = ApiClient()
self.api_client = api_client
def reason_thinker_post(self, **kwargs): # noqa: E501
"""thinker # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.reason_thinker_post(async_req=True)
>>> result = thread.get()
:param async_req bool: execute request asynchronously
:param ThinkerTaskRequest thinker_task_request:
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: ThinkerTaskResponse
If the method is called asynchronously,
returns the request thread.
"""
kwargs["_return_http_data_only"] = True
return self.reason_thinker_post_with_http_info(**kwargs) # noqa: E501
def reason_thinker_post_with_http_info(self, **kwargs): # noqa: E501
"""thinker # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.reason_thinker_post_with_http_info(async_req=True)
>>> result = thread.get()
:param async_req bool: execute request asynchronously
:param ThinkerTaskRequest thinker_task_request:
:param _return_http_data_only: response data without head status code
and headers
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: tuple(ThinkerTaskResponse, status_code(int), headers(HTTPHeaderDict))
If the method is called asynchronously,
returns the request thread.
"""
local_var_params = locals()
all_params = ["thinker_task_request"]
all_params.extend(
[
"async_req",
"_return_http_data_only",
"_preload_content",
"_request_timeout",
]
)
for key, val in six.iteritems(local_var_params["kwargs"]):
if key not in all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method reason_thinker_post" % key
)
local_var_params[key] = val
del local_var_params["kwargs"]
collection_formats = {}
path_params = {}
query_params = []
header_params = {}
form_params = []
local_var_files = {}
body_params = None
if "thinker_task_request" in local_var_params:
body_params = local_var_params["thinker_task_request"]
# HTTP header `Accept`
header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
) # noqa: E501
# HTTP header `Content-Type`
header_params[
"Content-Type"
] = self.api_client.select_header_content_type( # noqa: E501
["application/json"]
) # noqa: E501
# Authentication setting
auth_settings = [] # noqa: E501
return self.api_client.call_api(
"/reason/thinker",
"POST",
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type="ThinkerTaskResponse", # noqa: E501
auth_settings=auth_settings,
async_req=local_var_params.get("async_req"),
_return_http_data_only=local_var_params.get(
"_return_http_data_only"
), # noqa: E501
_preload_content=local_var_params.get("_preload_content", True),
_request_timeout=local_var_params.get("_request_timeout"),
collection_formats=collection_formats,
)

View File

@ -145,7 +145,7 @@ public class LocalThinkerMain {
return task;
}
private static Element strToElement(String content, Boolean isPredicate) {
public static Element strToElement(String content, Boolean isPredicate) {
String[] parts = StringUtils.split(content, ",");
if (parts.length == 2) {
return new Entity(parts[0], parts[1]);
@ -193,6 +193,23 @@ public class LocalThinkerMain {
return options;
}
public static GraphState<IVertexId> loadGraph(String graphStateClass, String graphStoreUrl) {
GraphState<IVertexId> graphState;
if (StringUtils.isNotEmpty(graphStateClass)) {
try {
graphState =
(GraphState<IVertexId>)
Class.forName(graphStateClass)
.getConstructor(String.class)
.newInstance(graphStoreUrl);
} catch (Exception e) {
throw new RuntimeException("can not create graph state from " + graphStateClass, e);
}
return graphState;
}
return null;
}
protected static GraphState<IVertexId> loadGraph(ThinkerParams params) {
GraphState<IVertexId> graphState;
String graphStateClass = params.getGraphStateClassName();

View File

@ -0,0 +1,25 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.server.api.facade.dto.service.request;
import lombok.Data;
@Data
public class ThinkerTaskRequest {
private Long projectId;
private String subject;
private String predicate;
private String object;
private String mode;
private String params;
}

View File

@ -0,0 +1,23 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.server.api.facade.dto.service.response;
import java.util.List;
import lombok.Data;
@Data
public class ThinkerTaskResponse {
private List<Object> result;
private String taskId;
private Long projectId;
}

View File

@ -15,7 +15,9 @@ package com.antgroup.openspg.server.api.http.server.openapi;
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
import com.antgroup.openspg.server.api.facade.dto.service.request.ReasonerTaskRequest;
import com.antgroup.openspg.server.api.facade.dto.service.request.ThinkerTaskRequest;
import com.antgroup.openspg.server.api.facade.dto.service.response.ReasonerTaskResponse;
import com.antgroup.openspg.server.api.facade.dto.service.response.ThinkerTaskResponse;
import com.antgroup.openspg.server.api.http.server.BaseController;
import com.antgroup.openspg.server.api.http.server.HttpBizCallback;
import com.antgroup.openspg.server.api.http.server.HttpBizTemplate;
@ -48,6 +50,20 @@ public class ReasonController extends BaseController {
});
}
@RequestMapping(method = RequestMethod.POST, value = "/thinker")
public ResponseEntity<Object> reason(@RequestBody ThinkerTaskRequest request) {
return HttpBizTemplate.execute(
new HttpBizCallback<ThinkerTaskResponse>() {
@Override
public void check() {}
@Override
public ThinkerTaskResponse action() {
return reasonerManager.thinker(request);
}
});
}
@RequestMapping(method = RequestMethod.GET, value = "/schema")
public ResponseEntity<Object> getReasonSchema(@RequestParam Long projectId) {
return HttpBizTemplate.execute(

View File

@ -14,9 +14,13 @@ package com.antgroup.openspg.server.biz.service;
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
import com.antgroup.openspg.server.api.facade.dto.service.request.ReasonerTaskRequest;
import com.antgroup.openspg.server.api.facade.dto.service.request.ThinkerTaskRequest;
import com.antgroup.openspg.server.api.facade.dto.service.response.ReasonerTaskResponse;
import com.antgroup.openspg.server.api.facade.dto.service.response.ThinkerTaskResponse;
public interface ReasonerManager {
ThinkerTaskResponse thinker(ThinkerTaskRequest request);
ReasonerTaskResponse reason(ReasonerTaskRequest request);
ProjectSchema getReasonSchema(Long projectId);

View File

@ -12,14 +12,20 @@
*/
package com.antgroup.openspg.server.biz.service.impl;
import com.antgroup.kg.reasoner.thinker.logic.Result;
import com.antgroup.openspg.core.schema.model.type.ProjectSchema;
import com.antgroup.openspg.server.api.facade.dto.service.request.ReasonerTaskRequest;
import com.antgroup.openspg.server.api.facade.dto.service.request.ThinkerTaskRequest;
import com.antgroup.openspg.server.api.facade.dto.service.response.ReasonerTaskResponse;
import com.antgroup.openspg.server.api.facade.dto.service.response.ThinkerTaskResponse;
import com.antgroup.openspg.server.biz.common.ProjectManager;
import com.antgroup.openspg.server.biz.service.ReasonerManager;
import com.antgroup.openspg.server.common.model.reasoner.ReasonerTask;
import com.antgroup.openspg.server.common.model.reasoner.ThinkerTask;
import com.antgroup.openspg.server.core.reasoner.service.CatalogService;
import com.antgroup.openspg.server.core.reasoner.service.ReasonerService;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -30,6 +36,26 @@ public class ReasonerManagerImpl implements ReasonerManager {
@Autowired private ReasonerService reasonerService;
@Autowired private CatalogService catalogService;
@Override
public ThinkerTaskResponse thinker(ThinkerTaskRequest request) {
ThinkerTask task = new ThinkerTask();
task.setTaskId(UUID.randomUUID().toString());
task.setMode(request.getMode());
task.setObject(request.getObject());
task.setPredicate(request.getPredicate());
task.setSubject(request.getSubject());
task.setProjectId(request.getProjectId());
task.setParams(request.getParams());
String graphStoreUrl = getGraphStoreUrl(request.getProjectId());
task.setGraphStoreUrl(graphStoreUrl);
List<Result> res = reasonerService.thinker(task);
ThinkerTaskResponse response = new ThinkerTaskResponse();
response.setProjectId(request.getProjectId());
response.setTaskId(task.getTaskId());
response.setResult(new ArrayList<>(res));
return response;
}
@Override
public ReasonerTaskResponse reason(ReasonerTaskRequest request) {
String graphStoreUrl = getGraphStoreUrl(request.getProjectId());

View File

@ -0,0 +1,27 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.server.common.model.reasoner;
import lombok.Data;
@Data
public class ThinkerTask {
private String taskId;
private Long projectId;
private String subject;
private String predicate;
private String object;
private String mode;
private String params;
private String graphStoreUrl;
}

View File

@ -12,8 +12,13 @@
*/
package com.antgroup.openspg.server.core.reasoner.service;
import com.antgroup.kg.reasoner.thinker.logic.Result;
import com.antgroup.openspg.server.common.model.reasoner.ReasonerTask;
import com.antgroup.openspg.server.common.model.reasoner.ThinkerTask;
import java.util.List;
public interface ReasonerService {
ReasonerTask runTask(ReasonerTask request);
List<Result> thinker(ThinkerTask request);
}

View File

@ -13,17 +13,29 @@
package com.antgroup.openspg.server.core.reasoner.service.impl;
import com.alibaba.fastjson.JSON;
import com.antgroup.kg.reasoner.thinker.logic.Result;
import com.antgroup.kg.reasoner.thinker.logic.graph.Element;
import com.antgroup.kg.reasoner.thinker.logic.graph.Triple;
import com.antgroup.openspg.reasoner.catalog.impl.KgSchemaConnectionInfo;
import com.antgroup.openspg.reasoner.lube.catalog.Catalog;
import com.antgroup.openspg.reasoner.runner.local.thinker.LocalThinkerMain;
import com.antgroup.openspg.reasoner.runner.local.thinker.ThinkerParams;
import com.antgroup.openspg.reasoner.udf.impl.UdfMngImpl;
import com.antgroup.openspg.server.common.model.reasoner.ReasonerTask;
import com.antgroup.openspg.server.common.model.reasoner.StatusEnum;
import com.antgroup.openspg.server.common.model.reasoner.ThinkerTask;
import com.antgroup.openspg.server.common.service.config.AppEnvConfig;
import com.antgroup.openspg.server.core.reasoner.service.CatalogService;
import com.antgroup.openspg.server.core.reasoner.service.ReasonerService;
import com.antgroup.openspg.server.core.reasoner.service.runner.ReasonerRunner;
import com.antgroup.openspg.server.core.reasoner.service.runner.ThinkerRunner;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -65,4 +77,51 @@ public class ReasonerServiceImpl implements ReasonerService {
}
return request;
}
@Override
public List<Result> thinker(ThinkerTask request) {
Element s = Element.ANY;
Element p = Element.ANY;
Element o = Element.ANY;
String mode = "spo";
String subject = request.getSubject();
if (StringUtils.isNotBlank(subject)) {
s = LocalThinkerMain.strToElement(subject, false);
}
String predicate = request.getPredicate();
if (StringUtils.isNotBlank(predicate)) {
p = LocalThinkerMain.strToElement(predicate, true);
}
String object = request.getObject();
if (StringUtils.isNotBlank(object)) {
o = LocalThinkerMain.strToElement(object, false);
}
if (s == Element.ANY && p == Element.ANY && o == Element.ANY) {
throw new RuntimeException(
"subject, predicate, object cannot all be empty at the same time.");
}
String m = request.getMode();
if (StringUtils.isNotBlank(m)) {
mode = m;
}
ThinkerParams task = new ThinkerParams();
task.setTriple(new Triple(s, p, o));
task.setConnInfo(new KgSchemaConnectionInfo(appEnvConfig.getSchemaUri(), ""));
task.setGraphStateInitString(request.getGraphStoreUrl());
task.setProjectId(request.getProjectId());
Map<String, Object> params = new HashMap<>(3);
if (StringUtils.isNotEmpty(request.getParams())) {
params = new HashMap<>(JSON.parseObject(request.getParams()));
}
task.setParams(params);
task.setMode(mode);
ThinkerRunner thinkerRunner = new ThinkerRunner(task);
return thinkerRunner.run();
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2023 OpenSPG Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied.
*/
package com.antgroup.openspg.server.core.reasoner.service.runner;
import com.antgroup.kg.reasoner.thinker.Thinker;
import com.antgroup.kg.reasoner.thinker.catalog.LogicCatalog;
import com.antgroup.kg.reasoner.thinker.engine.DefaultThinker;
import com.antgroup.kg.reasoner.thinker.logic.Result;
import com.antgroup.kg.reasoner.thinker.logic.graph.Node;
import com.antgroup.openspg.reasoner.runner.local.thinker.LocalThinkerMain;
import com.antgroup.openspg.reasoner.runner.local.thinker.OpenSPGLogicCatalog;
import com.antgroup.openspg.reasoner.runner.local.thinker.ThinkerParams;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ThinkerRunner {
private final ThinkerParams task;
private final String graphStateClass =
"com.antgroup.openspg.reasoner.warehouse.cloudext.CloudExtGraphState";
public ThinkerRunner(ThinkerParams task) {
this.task = task;
}
public List<Result> run() {
LogicCatalog logicCatalog = new OpenSPGLogicCatalog(task.getProjectId(), task.getConnInfo());
logicCatalog.init();
Thinker thinker =
new DefaultThinker(
LocalThinkerMain.loadGraph(graphStateClass, task.getGraphStateInitString()),
logicCatalog);
List<Result> result;
if (task.getMode().toLowerCase().equals("spo")) {
result =
thinker.find(
task.getTriple().getSubject(),
task.getTriple().getPredicate(),
task.getTriple().getObject(),
task.getParams());
} else {
result = thinker.find((Node) task.getTriple().getObject(), task.getParams());
}
return result;
}
}