import asyncio import json import uuid import traceback from typing import AsyncIterable, Optional from langchain_core.messages import HumanMessage from core.callback import AsyncVentCallback from utils.pdf_builder import markdown_to_pdf, get_pdf_download_url from config.settings import ( MAX_AGENT_ITERATIONS, REPORT_OUTPUT_DIR, STREAM_BATCH_SIZE, STREAM_BATCH_DELAY, ) from core.agent_builder import review_agent, calculation_agent from service.session_manager import session_manager from service.history_manager import get_or_create_title, append_line async def _stream_text_chunks(queue: asyncio.Queue, text: str, event_type: str = "model_thinking") -> None: """Split text into chunks and push them to the async queue as SSE data frames.""" for start in range(0, len(text), STREAM_BATCH_SIZE): chunk = text[start:start + STREAM_BATCH_SIZE] data = json.dumps({"type": event_type, "content": chunk}, ensure_ascii=False) await queue.put(f"data: {data}\n\n") await asyncio.sleep(STREAM_BATCH_DELAY) async def _yield_with_heartbeat(queue: asyncio.Queue, timeout_seconds: int = 120) -> AsyncIterable[str]: """Yield items from the queue, sending a heartbeat on timeout.""" while True: try: yield await asyncio.wait_for(queue.get(), timeout=timeout_seconds) except asyncio.TimeoutError: timeout_data = json.dumps({"type": "timeout", "content": "处理中..."}, ensure_ascii=False) yield f"data: {timeout_data}\n\n" async def stream_review_async(message: str, file_name: str, parse_data: dict, session_id: Optional[str]) -> AsyncIterable[str]: queue = asyncio.Queue(1000) session_id = session_id or str(uuid.uuid4()) user_content = f"{message} {file_name} \n {parse_data}" # 不存在文件→大模型生成标题写入首行,存在直接读取标题 get_or_create_title(session_id, user_content) callback = AsyncVentCallback(queue, session_id) all_responses = [] full_report = "" # 用户消息落地日志 append_line(session_id, "user_message", f"{message} {file_name}") async def run(): nonlocal full_report try: state = session_manager.get_or_create(session_id) state.messages.append({"role": "user", "content": user_content}) config = { "callbacks": [callback], "max_iterations": MAX_AGENT_ITERATIONS, "recursion_limit": 20 } async for chunk in review_agent.astream( {"messages": [HumanMessage(content=user_content)]}, stream_mode="updates", config=config ): for node, update in chunk.items(): for msg in update.get("messages", []): if msg.type == "ai" and msg.content: all_responses.append(msg.content) await _stream_text_chunks(queue, msg.content) append_line(session_id, "model_thinking", msg.content) if all_responses: full_report = all_responses[-1].strip() state.messages.append({"role": "assistant", "content": full_report}) session_manager.update(session_id, state) if full_report: output_filename = f"{uuid.uuid4()}.pdf" output_path = f"{REPORT_OUTPUT_DIR}/{output_filename}" markdown_to_pdf(full_report, output_path) download_url = get_pdf_download_url(output_filename) pdf_data = json.dumps({"type": "pdf_download", "content": download_url}, ensure_ascii=False) append_line(session_id, "pdf_download", download_url) await queue.put(f"data: {pdf_data}\n\n") done_data = json.dumps({"type": "done", "content": "审查完成", "session_id": session_id}, ensure_ascii=False) await queue.put(f"data: {done_data}\n\n") except Exception as e: err_msg = str(e) + "\n" + traceback.format_exc()[:800] append_line(session_id, "error", err_msg) err_data = json.dumps({"type": "error", "content": err_msg}, ensure_ascii=False) print(err_msg) await queue.put(f"data: {err_data}\n\n") asyncio.create_task(run()) async for item in _yield_with_heartbeat(queue): yield item async def stream_calculation_async(user_msg: str, session_id: Optional[str]) -> AsyncIterable[str]: queue = asyncio.Queue(1000) session_id = session_id or str(uuid.uuid4()) get_or_create_title(session_id, user_msg) callback = AsyncVentCallback(queue, session_id) last_ai_content = "" append_line(session_id, "user_message", user_msg) async def run(): nonlocal last_ai_content try: state = session_manager.get_or_create(session_id) state.messages.append({"role": "user", "content": user_msg}) config = { "callbacks": [callback], "max_iterations": MAX_AGENT_ITERATIONS, "recursion_limit": 20 } async for chunk in calculation_agent.astream( {"messages": [HumanMessage(content=user_msg)]}, stream_mode="updates", config=config ): for node, update in chunk.items(): for msg in update.get("messages", []): if msg.type == "ai" and msg.content: last_ai_content = msg.content await _stream_text_chunks(queue, msg.content) append_line(session_id, "model_thinking", msg.content) state.messages.append({"role": "assistant", "content": last_ai_content}) session_manager.update(session_id, state) done_data = json.dumps({"type": "done", "content": "对话结束", "session_id": session_id}, ensure_ascii=False) await queue.put(f"data: {done_data}\n\n") except Exception as e: err_msg = str(e) + "\n" + traceback.format_exc()[:800] append_line(session_id, "error", err_msg) err_data = json.dumps({"type": "error", "content": err_msg}, ensure_ascii=False) print(err_msg) await queue.put(f"data: {err_data}\n\n") asyncio.create_task(run()) async for item in _yield_with_heartbeat(queue): yield item