import json import os from openai import OpenAI from asyncio import Queue from langchain_core.callbacks import AsyncCallbackHandler, BaseCallbackHandler from config.settings import TOOL_SUMMARY_PROMPT from service.history_manager import append_line # 初始化阿里云百炼client client = OpenAI( api_key=os.getenv("LLM_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) SUMMARIZER_MODEL = "qwen3.7-max" class AsyncVentCallback(AsyncCallbackHandler): """Async variant of the vent callback handler. Used with LangGraph's async streaming (astream) so tool-start and tool-end events are pushed through an asyncio.Queue to the SSE event loop without blocking. This variant must be an AsyncCallbackHandler because it calls await inside the handler methods. """ def __init__(self, queue: Queue, session_id: str): self.queue = queue self.session_id = session_id async def _put(self, typ: str, content: str): append_line(self.session_id, typ, content) data = json.dumps({"type": typ, "content": content}, ensure_ascii=False) await self.queue.put(f"data: {data}\n\n") async def on_tool_start(self, serialized: dict, input_str: str, **kwargs): tool_name = serialized.get("name", "") await self._put("tool_select", f"选择工具:{tool_name}") await self._put("tool_params", f"调用参数:{input_str[:600]}") async def on_tool_end(self, output: str, **kwargs): await self._put("tool_result", f"工具返回:{str(output)[:800]}") class SyncVentCallback(BaseCallbackHandler): """Sync variant of the vent callback handler. Used with LangGraph's synchronous streaming (stream) so tool-start and tool-end events are interleaved with model output. After summarising the tool event via an LLM call, the result is appended into the shared `output_buffer` list, which the caller drains inline during each iteration step. This variant must be a BaseCallbackHandler (sync) because the synchronous stream loop cannot await async handlers. """ def __init__(self, session_id: str, output_buffer: list): """Initialise the sync callback. Parameters ---------- session_id : str Stable identifier for the current conversation. output_buffer : list Shared mutable list used to interleave tool-summary SSE lines with model output. The caller pops items from this list between synchronous streaming chunks. """ self.session_id = session_id self.output_buffer = output_buffer def _add_msg(self, typ: str, content): # 原始内容转字符串 content_str = json.dumps(content, ensure_ascii=False) if isinstance(content, dict) else str(content) summary = _summarize_tool_event(content_str) # Stream each piece of the summary into the output buffer so the # outer generator loop can yield SSE lines interleaved with model # output. for piece in summary: append_line(self.session_id, typ, piece) data = json.dumps({"type": typ, "content": piece}, ensure_ascii=False) self.output_buffer.append(f"data: {data}\n\n") def on_tool_start(self, serialized: dict, input_str: str, **kwargs): tool_name = serialized.get("name", "") description = serialized.get("description") self._add_msg("tool_select_" + tool_name, f"选择工具:{tool_name},工具介绍:{description}") def on_tool_end(self, output, **kwargs): tool_name = kwargs.get("name", "未知工具") self._add_msg("tool_result_" + tool_name, f"工具返回:{str(output)[:2000]}") def _summarize_tool_event(raw_content: str) -> str: """Summarise a tool-start or tool-end event via the Qwen LLM. Builds a Chinese-language prompt instructing the model to produce a layperson-friendly summary (≤ 100 chars) and streams the result from the DashScope-compatible endpoint. Only the `content` field of each delta is yielded; `reasoning` / thinking chunks are discarded. Parameters ---------- raw_content : str Raw description of the tool selection or tool result. Yields ------ str Successive non-empty content fragments from the streaming completion. """ prompt = f""" {TOOL_SUMMARY_PROMPT} {raw_content} """ messages = [{"role": "user", "content": prompt}] completion = client.chat.completions.create( model=SUMMARIZER_MODEL, messages=messages, extra_body={"enable_thinking": False}, stream=True, ) for chunk in completion: if not chunk.choices: continue delta = chunk.choices[0].delta if hasattr(delta, "content") and delta.content: resp_content = delta.content if resp_content: yield resp_content