| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- import json
- import os
- from openai import OpenAI
- from asyncio import Queue
- from langchain_core.callbacks import AsyncCallbackHandler, BaseCallbackHandler
- from config.settings import TOOL_SUMMARY_PROMPT
- from service.history_manager import append_line
- # 初始化阿里云百炼client
- client = OpenAI(
- api_key=os.getenv("LLM_API_KEY"),
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
- SUMMARIZER_MODEL = "qwen3.7-max"
- class AsyncVentCallback(AsyncCallbackHandler):
- """Async variant of the vent callback handler.
- Used with LangGraph's async streaming (astream) so tool-start and tool-end
- events are pushed through an asyncio.Queue to the SSE event loop without
- blocking. This variant must be an AsyncCallbackHandler because it calls
- await inside the handler methods.
- """
- def __init__(self, queue: Queue, session_id: str):
- self.queue = queue
- self.session_id = session_id
- async def _put(self, typ: str, content: str):
- append_line(self.session_id, typ, content)
- data = json.dumps({"type": typ, "content": content}, ensure_ascii=False)
- await self.queue.put(f"data: {data}\n\n")
- async def on_tool_start(self, serialized: dict, input_str: str, **kwargs):
- tool_name = serialized.get("name", "")
- await self._put("tool_select", f"选择工具:{tool_name}")
- await self._put("tool_params", f"调用参数:{input_str[:600]}")
- async def on_tool_end(self, output: str, **kwargs):
- await self._put("tool_result", f"工具返回:{str(output)[:800]}")
- class SyncVentCallback(BaseCallbackHandler):
- """Sync variant of the vent callback handler.
- Used with LangGraph's synchronous streaming (stream) so tool-start and
- tool-end events are interleaved with model output. After summarising the
- tool event via an LLM call, the result is appended into the shared
- `output_buffer` list, which the caller drains inline during each
- iteration step. This variant must be a BaseCallbackHandler (sync) because
- the synchronous stream loop cannot await async handlers.
- """
- def __init__(self, session_id: str, output_buffer: list):
- """Initialise the sync callback.
- Parameters
- ----------
- session_id : str
- Stable identifier for the current conversation.
- output_buffer : list
- Shared mutable list used to interleave tool-summary SSE lines with
- model output. The caller pops items from this list between
- synchronous streaming chunks.
- """
- self.session_id = session_id
- self.output_buffer = output_buffer
- def _add_msg(self, typ: str, content):
- # 原始内容转字符串
- content_str = json.dumps(content, ensure_ascii=False) if isinstance(content, dict) else str(content)
- summary = _summarize_tool_event(content_str)
- # Stream each piece of the summary into the output buffer so the
- # outer generator loop can yield SSE lines interleaved with model
- # output.
- for piece in summary:
- append_line(self.session_id, typ, piece)
- data = json.dumps({"type": typ, "content": piece}, ensure_ascii=False)
- self.output_buffer.append(f"data: {data}\n\n")
- def on_tool_start(self, serialized: dict, input_str: str, **kwargs):
- tool_name = serialized.get("name", "")
- description = serialized.get("description")
- self._add_msg("tool_select_" + tool_name, f"选择工具:{tool_name},工具介绍:{description}")
- def on_tool_end(self, output, **kwargs):
- tool_name = kwargs.get("name", "未知工具")
- self._add_msg("tool_result_" + tool_name, f"工具返回:{str(output)[:2000]}")
- def _summarize_tool_event(raw_content: str) -> str:
- """Summarise a tool-start or tool-end event via the Qwen LLM.
- Builds a Chinese-language prompt instructing the model to produce a
- layperson-friendly summary (≤ 100 chars) and streams the result from the
- DashScope-compatible endpoint. Only the `content` field of each delta
- is yielded; `reasoning` / thinking chunks are discarded.
- Parameters
- ----------
- raw_content : str
- Raw description of the tool selection or tool result.
- Yields
- ------
- str
- Successive non-empty content fragments from the streaming completion.
- """
- prompt = f"""
- {TOOL_SUMMARY_PROMPT}
- {raw_content}
- """
- messages = [{"role": "user", "content": prompt}]
- completion = client.chat.completions.create(
- model=SUMMARIZER_MODEL,
- messages=messages,
- extra_body={"enable_thinking": False},
- stream=True,
- )
- for chunk in completion:
- if not chunk.choices:
- continue
- delta = chunk.choices[0].delta
- if hasattr(delta, "content") and delta.content:
- resp_content = delta.content
- if resp_content:
- yield resp_content
|