callback.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import json
  2. import os
  3. from openai import OpenAI
  4. from asyncio import Queue
  5. from langchain_core.callbacks import AsyncCallbackHandler, BaseCallbackHandler
  6. from config.settings import TOOL_SUMMARY_PROMPT
  7. from service.history_manager import append_line
  8. # 初始化阿里云百炼client
  9. client = OpenAI(
  10. api_key=os.getenv("LLM_API_KEY"),
  11. base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
  12. )
  13. SUMMARIZER_MODEL = "qwen3.7-max"
  14. class AsyncVentCallback(AsyncCallbackHandler):
  15. """Async variant of the vent callback handler.
  16. Used with LangGraph's async streaming (astream) so tool-start and tool-end
  17. events are pushed through an asyncio.Queue to the SSE event loop without
  18. blocking. This variant must be an AsyncCallbackHandler because it calls
  19. await inside the handler methods.
  20. """
  21. def __init__(self, queue: Queue, session_id: str):
  22. self.queue = queue
  23. self.session_id = session_id
  24. async def _put(self, typ: str, content: str):
  25. append_line(self.session_id, typ, content)
  26. data = json.dumps({"type": typ, "content": content}, ensure_ascii=False)
  27. await self.queue.put(f"data: {data}\n\n")
  28. async def on_tool_start(self, serialized: dict, input_str: str, **kwargs):
  29. tool_name = serialized.get("name", "")
  30. await self._put("tool_select", f"选择工具:{tool_name}")
  31. await self._put("tool_params", f"调用参数:{input_str[:600]}")
  32. async def on_tool_end(self, output: str, **kwargs):
  33. await self._put("tool_result", f"工具返回:{str(output)[:800]}")
  34. class SyncVentCallback(BaseCallbackHandler):
  35. """Sync variant of the vent callback handler.
  36. Used with LangGraph's synchronous streaming (stream) so tool-start and
  37. tool-end events are interleaved with model output. After summarising the
  38. tool event via an LLM call, the result is appended into the shared
  39. `output_buffer` list, which the caller drains inline during each
  40. iteration step. This variant must be a BaseCallbackHandler (sync) because
  41. the synchronous stream loop cannot await async handlers.
  42. """
  43. def __init__(self, session_id: str, output_buffer: list):
  44. """Initialise the sync callback.
  45. Parameters
  46. ----------
  47. session_id : str
  48. Stable identifier for the current conversation.
  49. output_buffer : list
  50. Shared mutable list used to interleave tool-summary SSE lines with
  51. model output. The caller pops items from this list between
  52. synchronous streaming chunks.
  53. """
  54. self.session_id = session_id
  55. self.output_buffer = output_buffer
  56. def _add_msg(self, typ: str, content):
  57. # 原始内容转字符串
  58. content_str = json.dumps(content, ensure_ascii=False) if isinstance(content, dict) else str(content)
  59. summary = _summarize_tool_event(content_str)
  60. # Stream each piece of the summary into the output buffer so the
  61. # outer generator loop can yield SSE lines interleaved with model
  62. # output.
  63. for piece in summary:
  64. append_line(self.session_id, typ, piece)
  65. data = json.dumps({"type": typ, "content": piece}, ensure_ascii=False)
  66. self.output_buffer.append(f"data: {data}\n\n")
  67. def on_tool_start(self, serialized: dict, input_str: str, **kwargs):
  68. tool_name = serialized.get("name", "")
  69. description = serialized.get("description")
  70. self._add_msg("tool_select_" + tool_name, f"选择工具:{tool_name},工具介绍:{description}")
  71. def on_tool_end(self, output, **kwargs):
  72. tool_name = kwargs.get("name", "未知工具")
  73. self._add_msg("tool_result_" + tool_name, f"工具返回:{str(output)[:2000]}")
  74. def _summarize_tool_event(raw_content: str) -> str:
  75. """Summarise a tool-start or tool-end event via the Qwen LLM.
  76. Builds a Chinese-language prompt instructing the model to produce a
  77. layperson-friendly summary (≤ 100 chars) and streams the result from the
  78. DashScope-compatible endpoint. Only the `content` field of each delta
  79. is yielded; `reasoning` / thinking chunks are discarded.
  80. Parameters
  81. ----------
  82. raw_content : str
  83. Raw description of the tool selection or tool result.
  84. Yields
  85. ------
  86. str
  87. Successive non-empty content fragments from the streaming completion.
  88. """
  89. prompt = f"""
  90. {TOOL_SUMMARY_PROMPT}
  91. {raw_content}
  92. """
  93. messages = [{"role": "user", "content": prompt}]
  94. completion = client.chat.completions.create(
  95. model=SUMMARIZER_MODEL,
  96. messages=messages,
  97. extra_body={"enable_thinking": False},
  98. stream=True,
  99. )
  100. for chunk in completion:
  101. if not chunk.choices:
  102. continue
  103. delta = chunk.choices[0].delta
  104. if hasattr(delta, "content") and delta.content:
  105. resp_content = delta.content
  106. if resp_content:
  107. yield resp_content