diff --git a/agent/canvas.py b/agent/canvas.py index 8e94c639c..4b859fbf3 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -169,6 +169,7 @@ class Canvas: def run(self, running_hint_text = "is running...🕞", **kwargs): if not running_hint_text or not isinstance(running_hint_text, str): running_hint_text = "is running...🕞" + bypass_begin = bool(kwargs.get("bypass_begin", False)) if self.answer: cpn_id = self.answer[0] @@ -188,6 +189,12 @@ class Canvas: if not self.path: self.components["begin"]["obj"].run(self.history, **kwargs) self.path.append(["begin"]) + if bypass_begin: + cpn = self.get_component("begin") + downstream = cpn["downstream"] + self.path.append(downstream) + + self.path.append([]) diff --git a/agent/component/answer.py b/agent/component/answer.py index 67dcbc63f..c8c3439c0 100644 --- a/agent/component/answer.py +++ b/agent/component/answer.py @@ -64,14 +64,17 @@ class Answer(ComponentBase, ABC): for ii, row in stream.iterrows(): answer += row.to_dict()["content"] yield {"content": answer} - else: + elif stream is not None: for st in stream(): res = st yield st - if self._param.post_answers: + if self._param.post_answers and res: res["content"] += random.choice(self._param.post_answers) yield res + if res is None: + res = {"content": ""} + self.set_output(res) def set_exception(self, e): diff --git a/agent/component/base.py b/agent/component/base.py index 8bdddb38e..e35a84e64 100644 --- a/agent/component/base.py +++ b/agent/component/base.py @@ -442,7 +442,6 @@ class ComponentBase(ABC): elif q.get("value"): outs.append(pd.DataFrame([{"content": q["value"]}])) return outs - def get_input(self): if self._param.debug_inputs: return pd.DataFrame([{"content": v["value"]} for v in self._param.debug_inputs if v.get("value")]) diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py index 5d98878fc..118403474 100644 --- a/api/db/services/canvas_service.py +++ b/api/db/services/canvas_service.py @@ -294,8 +294,22 @@ def completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True "source": "agent", "dsl": cvs.dsl } + canvas.messages.append({"role": "user", "content": question, "id": message_id}) + canvas.add_user_input(question) + API4ConversationService.save(**conv) conv = API4Conversation(**conv) + if not conv.message: + conv.message = [] + conv.message.append({ + "role": "user", + "content": question, + "id": message_id + }) + + if not conv.reference: + conv.reference = [] + conv.reference.append({"chunks": [], "doc_aggs": []}) # Handle existing session else: @@ -331,7 +345,7 @@ def completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True if stream: try: completion_tokens = 0 - for ans in canvas.run(stream=True): + for ans in canvas.run(stream=True, bypass_begin=True): if ans.get("running_status"): completion_tokens += len(tiktokenenc.encode(ans.get("content", ""))) yield "data: " + json.dumps( @@ -394,7 +408,7 @@ def completionOpenAI(tenant_id, agent_id, question, session_id=None, stream=True else: # Non-streaming mode try: all_answer_content = "" - for answer in canvas.run(stream=False): + for answer in canvas.run(stream=False, bypass_begin=True): if answer.get("running_status"): continue