| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import asyncio
- import json
- import uuid
- import traceback
- from typing import AsyncIterable, Optional
- from langchain_core.messages import HumanMessage
- from core.callback import AsyncVentCallback
- from utils.pdf_builder import markdown_to_pdf, get_pdf_download_url
- from config.settings import (
- MAX_AGENT_ITERATIONS,
- REPORT_OUTPUT_DIR,
- STREAM_BATCH_SIZE,
- STREAM_BATCH_DELAY,
- )
- from core.agent_builder import review_agent, calculation_agent
- from service.session_manager import session_manager
- from service.history_manager import get_or_create_title, append_line
- async def _stream_text_chunks(queue: asyncio.Queue, text: str, event_type: str = "model_thinking") -> None:
- """Split text into chunks and push them to the async queue as SSE data frames."""
- for start in range(0, len(text), STREAM_BATCH_SIZE):
- chunk = text[start:start + STREAM_BATCH_SIZE]
- data = json.dumps({"type": event_type, "content": chunk}, ensure_ascii=False)
- await queue.put(f"data: {data}\n\n")
- await asyncio.sleep(STREAM_BATCH_DELAY)
- async def _yield_with_heartbeat(queue: asyncio.Queue, timeout_seconds: int = 120) -> AsyncIterable[str]:
- """Yield items from the queue, sending a heartbeat on timeout."""
- while True:
- try:
- yield await asyncio.wait_for(queue.get(), timeout=timeout_seconds)
- except asyncio.TimeoutError:
- timeout_data = json.dumps({"type": "timeout", "content": "处理中..."}, ensure_ascii=False)
- yield f"data: {timeout_data}\n\n"
- async def stream_review_async(message: str, file_name: str, parse_data: dict, session_id: Optional[str]) -> AsyncIterable[str]:
- queue = asyncio.Queue(1000)
- session_id = session_id or str(uuid.uuid4())
- user_content = f"{message} {file_name} \n {parse_data}"
- # 不存在文件→大模型生成标题写入首行,存在直接读取标题
- get_or_create_title(session_id, user_content)
- callback = AsyncVentCallback(queue, session_id)
- all_responses = []
- full_report = ""
- # 用户消息落地日志
- append_line(session_id, "user_message", f"{message} {file_name}")
- async def run():
- nonlocal full_report
- try:
- state = session_manager.get_or_create(session_id)
- state.messages.append({"role": "user", "content": user_content})
- config = {
- "callbacks": [callback],
- "max_iterations": MAX_AGENT_ITERATIONS,
- "recursion_limit": 20
- }
- async for chunk in review_agent.astream(
- {"messages": [HumanMessage(content=user_content)]},
- stream_mode="updates",
- config=config
- ):
- for node, update in chunk.items():
- for msg in update.get("messages", []):
- if msg.type == "ai" and msg.content:
- all_responses.append(msg.content)
- await _stream_text_chunks(queue, msg.content)
- append_line(session_id, "model_thinking", msg.content)
- if all_responses:
- full_report = all_responses[-1].strip()
- state.messages.append({"role": "assistant", "content": full_report})
- session_manager.update(session_id, state)
- if full_report:
- output_filename = f"{uuid.uuid4()}.pdf"
- output_path = f"{REPORT_OUTPUT_DIR}/{output_filename}"
- markdown_to_pdf(full_report, output_path)
- download_url = get_pdf_download_url(output_filename)
- pdf_data = json.dumps({"type": "pdf_download", "content": download_url}, ensure_ascii=False)
- append_line(session_id, "pdf_download", download_url)
- await queue.put(f"data: {pdf_data}\n\n")
- done_data = json.dumps({"type": "done", "content": "审查完成", "session_id": session_id}, ensure_ascii=False)
- await queue.put(f"data: {done_data}\n\n")
- except Exception as e:
- err_msg = str(e) + "\n" + traceback.format_exc()[:800]
- append_line(session_id, "error", err_msg)
- err_data = json.dumps({"type": "error", "content": err_msg}, ensure_ascii=False)
- print(err_msg)
- await queue.put(f"data: {err_data}\n\n")
- asyncio.create_task(run())
- async for item in _yield_with_heartbeat(queue):
- yield item
- async def stream_calculation_async(user_msg: str, session_id: Optional[str]) -> AsyncIterable[str]:
- queue = asyncio.Queue(1000)
- session_id = session_id or str(uuid.uuid4())
- get_or_create_title(session_id, user_msg)
- callback = AsyncVentCallback(queue, session_id)
- last_ai_content = ""
- append_line(session_id, "user_message", user_msg)
- async def run():
- nonlocal last_ai_content
- try:
- state = session_manager.get_or_create(session_id)
- state.messages.append({"role": "user", "content": user_msg})
- config = {
- "callbacks": [callback],
- "max_iterations": MAX_AGENT_ITERATIONS,
- "recursion_limit": 20
- }
- async for chunk in calculation_agent.astream(
- {"messages": [HumanMessage(content=user_msg)]},
- stream_mode="updates",
- config=config
- ):
- for node, update in chunk.items():
- for msg in update.get("messages", []):
- if msg.type == "ai" and msg.content:
- last_ai_content = msg.content
- await _stream_text_chunks(queue, msg.content)
- append_line(session_id, "model_thinking", msg.content)
- state.messages.append({"role": "assistant", "content": last_ai_content})
- session_manager.update(session_id, state)
- done_data = json.dumps({"type": "done", "content": "对话结束", "session_id": session_id}, ensure_ascii=False)
- await queue.put(f"data: {done_data}\n\n")
- except Exception as e:
- err_msg = str(e) + "\n" + traceback.format_exc()[:800]
- append_line(session_id, "error", err_msg)
- err_data = json.dumps({"type": "error", "content": err_msg}, ensure_ascii=False)
- print(err_msg)
- await queue.put(f"data: {err_data}\n\n")
- asyncio.create_task(run())
- async for item in _yield_with_heartbeat(queue):
- yield item
|