feat(CA): init CA (Controllable Agent) library (#184)

This commit is contained in:
mfz-ant 2024-04-10 17:07:50 +08:00 committed by GitHub
parent 98be8a8282
commit 303e8f9569
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 234 additions and 0 deletions

View File

@ -0,0 +1,34 @@
# CA Library
The CA (Controllable Agent) library is designed to streamline the process of computation and scheduling within Python applications. It provides a modular framework that allows for the separation of computation logic from scheduling logic, as well as the ability to interface with remote Large Language Models (LLMs).
## Modules
The library is composed of several modules, each with a distinct purpose:
### Base Module
The Base Module is at the core of the CA library. It provides the essential building blocks for constructing computational workflows. It ensures that the computational logic can operate independently from the scheduling mechanism, thereby allowing developers to focus on the algorithmic aspects without worrying about underlying infrastructure.
### LLM Module
The LLM Module extends the functionality of the CA library by allowing users to invoke Large Language Models that are hosted remotely. This module provides an interface to communicate with LLM services, enabling the integration of sophisticated language processing capabilities into the user's applications.
## Service
The CA library also includes service components that facilitate the deployment and invocation of algorithms comprised of multiple modules.
- **Deployment**: This service allows users to deploy a collection of modules as a cohesive algorithm onto a desired runtime environment. The service ensures that all modules are correctly instantiated and interconnected to function as a single algorithmic unit.
- **Invocation**: Once deployed, the invocation service enables users to execute the algorithm by providing an interface to trigger the computational process. It handles the routing of input data to the appropriate modules and the aggregation of results for output.
## TODO
The development roadmap for the CA library includes several enhancements aimed at improving the user experience and expanding the library's capabilities:
- **Develop Automatic Deployment**: To eliminate the need for manual container and service startup, work on an automatic deployment feature will allow users to deploy their algorithms with minimal setup. This advancement would streamline the process, making the deployment of complex algorithms more user-friendly.
- **Develop LLM's Lora Deployment Capability**: To simplify the deployment of models with Lora technology, a specialized service will be created. This service would automate the deployment process, enabling users to focus on the model's functionality without worrying about the intricacies of deployment.
- **Develop Agent Capability**: An agent-based model will be introduced to drive algorithmic workflows using large models. This approach would enable more dynamic and intelligent orchestration of computational processes, potentially leading to improved efficiency and adaptability.

View File

@ -0,0 +1,10 @@
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.

View File

@ -0,0 +1,10 @@
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.

View File

@ -0,0 +1,76 @@
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import asyncio
import functools
def is_awaitable(value):
if isinstance(value, asyncio.Task):
return True
else:
return False
async def resolve_awaitables(value):
# If the value is an awaitable, resolve it
if is_awaitable(value):
value = await value
# After resolution, check if the result is a dict or list
return await resolve_awaitables(value)
# If the value is a list, resolve each item
if isinstance(value, list):
# Create and return a list with all awaitables resolved
return [await resolve_awaitables(item) for item in value]
# If the value is a dict, resolve each value
if isinstance(value, dict):
# Create and return a dict with all awaitables resolved
return {k: await resolve_awaitables(v) for k, v in value.items()}
# If the value is neither an awaitable, list, nor dict, return it as is
return value
class CABaseModule(object):
def __init__(self):
self.tg = asyncio.get_event_loop()
def __call__(self, **kwargs):
return_as_native = kwargs.pop("return_as_native", False)
task = self.tg.create_task(self._async_invoke(**kwargs))
if return_as_native:
return self.tg.run_until_complete(self.wait_all_tasks_finished(task))
else:
return task
async def wait_all_tasks_finished(self, task):
return await resolve_awaitables(task)
async def _async_invoke(self, **kwargs):
# await kwargs if need
resolved_kwargs = await resolve_awaitables(kwargs)
func = functools.partial(self.invoke, **resolved_kwargs)
result = await self.tg.run_in_executor(None, func)
return result
async def _resolve_kwargs(self, kwargs):
resolved = {}
for key, value in kwargs.items():
if is_awaitable(value):
resolved[key] = await value
else:
resolved[key] = value
return resolved
def invoke(self, **kwargs):
raise NotImplementedError

View File

@ -0,0 +1,94 @@
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.
import time
import requests
import json
import numpy as np
from knext.ca.module.base import CABaseModule
class LLMClient(object):
def __init__(self, url) -> None:
self.generate_url = f"http://{url}:8000/v2/models/vllm_model/generate"
self.fetch_config_url = f"http://{url}:8000/v2/models/vllm_model/config"
self.headers = {"Content-Type": "application/json"}
def call_service(self, prompt, max_tokens=32, temperature=0):
data = {
"text_input": prompt,
"parameters": {
"stream": False,
"temperature": temperature,
"max_tokens": max_tokens,
},
}
response = requests.post(
self.generate_url, headers=self.headers, data=json.dumps(data)
)
if response.status_code == 200:
# print(f'response from server: {response.json()}')
response_result = response.json()["text_output"]
newline_pos = response_result.find("\n")
if newline_pos != -1:
return response_result[newline_pos + 1 :]
else:
return response_result
else:
return f"Error: {response.status_code} - {response.text}"
def display_model_config(self):
response = requests.get(self.fetch_config_url)
if response.status_code == 200:
# Parse the result into JSON format
config = response.json()
# Structured printing of JSON results
print(json.dumps(config, indent=2))
else:
print(f"Error: {response.status_code} - {response.text}")
class LLMModule(CABaseModule):
def __init__(self, url):
super(LLMModule, self).__init__()
self.llm_client = LLMClient(url)
def invoke(
self,
prompt,
max_output_len=64,
temperature=0,
):
result = self.llm_client.call_service(
prompt=prompt,
max_tokens=max_output_len,
temperature=temperature,
)
return result # {'response', result}
def main():
url = "lcoalhost"
llm_client = LLMModule(url=url)
prompt = "What is Triton Inference Server?"
response = llm_client.invoke(
prompt=prompt,
max_output_len=10,
temperature=0.5,
)
print(f"response: {response}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,10 @@
# Copyright 2023 OpenSPG Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied.