model interfaces

(cherry picked from commit 4c556c39f6fdefd8755d9afcec247086f5d47380)
This commit is contained in:
didicout 2023-12-05 21:51:26 +08:00 committed by Qu
parent 425285ad79
commit de35b970b7
13 changed files with 259 additions and 104 deletions

View File

@ -1,54 +0,0 @@
from abc import ABC, abstractmethod
class ModelExecutor(ABC):
"""
对应xflow AntLLM
"""
@classmethod
def from_config(cls,
args='sys',
**kwargs):
pass
def __init__(self,
backend_model,
backend_tokenizer,
init_args,
**kwargs):
self.backend_model = backend_model
self.backend_tokenizer = backend_tokenizer
self.init_args = init_args
self.kwargs = kwargs
class LLMExecutor(ModelExecutor):
@abstractmethod
def sft_train(self, args=None, callbacks=None, **kwargs):
raise NotImplementedError("")
@abstractmethod
def rl_tuning(self, args=None, callbacks=None, **kwargs):
raise NotImplementedError("")
@abstractmethod
def batch_inference(self, args, **kwargs):
pass
@abstractmethod
def inference(self, input, inference_args, **kwargs):
raise NotImplementedError()
class HfLLMExecutor(ModelExecutor):
pass
class DeepKEExecutor(ModelExecutor):
pass

View File

@ -1,50 +0,0 @@
# coding: utf-8
# Copyright (c) Antfin, Inc. All rights reserved.
import sys
from abc import ABC
class ModelInvoker(ABC):
"""
对应 xflow ModelHubEntry
"""
def submit_sft(self, submit_mode='k8s'):
pass
def submit_rl_tuning(self, submit_mode='k8s'):
pass
def deploy(cls, args, deploy_mode='k8s'):
pass
def inference(self, input, **kwargs):
"""
这个是从已有的服务中获取inference
Args:
args:
**kwargs:
Returns:
"""
pass
@classmethod
def from_config(cls, args='sys'):
return cls()
class OpenAI(ModelInvoker):
def __init__(self, token):
self.token = token
pass
def inference(self, input, **kwargs):
import requests
requests.post(url="https://api.openai.com", params={"input": input, "token": self.token})

View File

@ -0,0 +1 @@
from .base import NNExecutor, LLMExecutor

View File

@ -0,0 +1,70 @@
from abc import ABC, abstractmethod
class NNExecutor(ABC):
"""
Entry point of model execution in a certain pod.
"""
@classmethod
def from_config(cls, nn_config, **kwargs):
pass
def __init__(self,
backend_model,
backend_tokenizer,
init_args,
**kwargs):
self.backend_model = backend_model
self.backend_tokenizer = backend_tokenizer
self.init_args = init_args
self.kwargs = kwargs
class LLMExecutor(NNExecutor):
@classmethod
def from_config(cls, nn_config: dict, **kwargs):
"""
Args:
nn_config
"""
# TODO
pass
@abstractmethod
def sft(self, args=None, callbacks=None, **kwargs):
"""
The entry point of SFT execution in a certain pod.
"""
raise NotImplementedError(f"{self.__class__.__name__} does not support SFT.")
@abstractmethod
def rl_tuning(self, args=None, callbacks=None, **kwargs):
"""
The entry point of SFT execution in a certain pod.
"""
raise NotImplementedError(f"{self.__class__.__name__} does not support RL-Tuning.")
def batch_inference(self, args, **kwargs):
pass
@abstractmethod
def inference(self, data, **kwargs):
"""
The entry point of inference. Usually for local invokers or model services.
"""
raise NotImplementedError()
class HfLLMExecutor(NNExecutor):
pass
class DeepKeExecutor(NNExecutor):
pass

View File

@ -0,0 +1 @@
from .base import NNInvoker, LLMInvoker

View File

@ -0,0 +1,83 @@
# coding: utf-8
# Copyright (c) Antfin, Inc. All rights reserved.
import json
import os
from abc import ABC
from typing import Union
from nn4k.executor import LLMExecutor
from nn4k.executor import NNExecutor
from nn4k.nnhub import SimpleNNHub
class NNInvoker(ABC):
"""
Invoking Entry Interfaces for NN Models.
One NNInvoker object is for one NN Model.
- Interfaces starting with "submit_" means submitting a batch task to a remote execution engine.
- Interfaces starting with "remote_" means querying a remote service for some results.
- Interfaces starting with "local_" means running something locally.
"""
hub = SimpleNNHub()
def __init__(self, nn_executor: NNExecutor) -> None:
if os.getenv("NN4K_DEBUG") is None:
raise EnvironmentError("In prod env, only NNInvoker.from_config is allowed for creating an nn_invoker.")
super().__init__()
self._nn_executor: NNExecutor = nn_executor
class LLMInvoker(NNInvoker):
def __init__(self, nn_executor: LLMExecutor) -> None:
super().__init__(nn_executor)
def submit_inference(self, submit_mode='k8s'):
pass
def submit_sft(self, submit_mode='k8s'):
pass
def submit_rl_tuning(self, submit_mode='k8s'):
pass
# def deploy(cls, args, deploy_mode='k8s'):
# pass
def remote_inference(self, input, **kwargs):
"""
这个是从已有的服务中获取inference
Args:
args:
**kwargs:
Returns:
"""
pass
def local_inference(self, data, **kwargs):
self._nn_executor.inference(data, **kwargs)
def init_local_model(self):
name = self._nn_config.get("nn_name")
version = self._nn_config.get("nn_version")
self._nn_executor: LLMExecutor = self.hub.get_model_executor(name, version)
@classmethod
def from_config(cls, nn_config: Union[str, dict]):
try:
if isinstance(nn_config, str):
with open(nn_config, "r") as f:
nn_config = json.load(f)
except:
raise ValueError("cannot decode config file")
if nn_config.get("invoker_type", "LLM") == "LLM":
o = cls.__new__(cls)
o._nn_config = nn_config
return o
elif nn_config.get("invoker_type", "LLM") == "OpenAI":
from nn4k.invoker.openai_invoker import OpenAIInvoker
return OpenAIInvoker.from_config(nn_config)

View File

@ -0,0 +1,21 @@
from typing import Union
from nn4k.invoker import NNInvoker
class OpenAIInvoker(NNInvoker):
@classmethod
def from_config(cls, nn_config: Union[str, dict]):
import openai
o = cls.__new__(cls)
o._openai_client = openai.OpenAI()
o._open_ai_model = nn_config.get("open_ai_model")
# TODO config key
# TODO complete
return o
def remote_inference(self, input, **kwargs):
# TODO
pass

View File

@ -0,0 +1,83 @@
from abc import ABC, abstractmethod
from typing import Optional, Union, Tuple, Type
from nn4k.executor import NNExecutor
class NNHub(ABC):
@abstractmethod
def publish(self,
model_executor: Union[NNExecutor, Tuple[Type[NNExecutor], tuple, dict, tuple]],
name: str,
version: str = None) -> str:
"""
Publish a model(executor) to hub.
Args:
model_executor: An NNExecutor object, which is pickleable.
Or a tuple of (class, args, kwargs, weight_ids) for creating an NNExecutor
, while all these 4 augments are pickleable.
name: The name of a model, like `llama2`.
We do not have a `namespace`. Use a joined name like `alibaba/qwen` to support such features.
version: Optional. Auto generate a version if this param is not given.
Returns:
The published model version.
"""
pass
@abstractmethod
def get_model_executor(self, name: str, version: str = None) -> Optional[NNExecutor]:
"""
Get a ModelExecutor instance from Hub.
Args:
name: The name of a model.
version: The version of a model. Get default version of a model if this param is not given.
Returns:
The ModelExecutor Instance. None for NotFound.
"""
pass
def start_service(self, name: str, version: str, service_id: str = None, **kwargs):
raise NotImplementedError("This Hub does not support starting model service.")
def stop_service(self, name: str, version: str, service_id: str = None, **kwargs):
raise NotImplementedError("This Hub does not support stopping model service.")
def get_service(self, name: str, version: str, service_id: str = None):
raise NotImplementedError("This Hub does not support model services.")
class SimpleNNHub(NNHub):
def __init__(self) -> None:
super().__init__()
self._model_executors = {}
# init executor info.
# TODO
self._add_executor(())
def _add_executor(self,
executor: Union[NNExecutor, Tuple[Type[NNExecutor], tuple, dict, tuple]],
name: str,
version: str = None):
if version is None:
version = 'default'
if self._model_executors.get(name) is None:
self._model_executors[name] = {
version: executor
}
else:
self._model_executors[name][version] = executor
def publish(self, model_executor: NNExecutor, name: str, version: str = None) -> str:
print("WARNING: You are using SimpleNNHub which can only maintain models in memory without data persistence!")
if version is None:
version = 'default'
self._add_executor(model_executor, name, version)
return version
def get_model_executor(self, name: str, version: str = None) -> Optional[NNExecutor]:
if self._model_executors.get(name) is None:
return None
return self._model_executors.get(name).get(version)