import json import os import time import uuid from typing import Iterable, Optional from fastapi import UploadFile from langchain_core.messages import HumanMessage import utils.util from core.agent_builder import review_agent, calculation_agent, comparison_agent, intent_agent from core.callback import SyncVentCallback from parser.base_parser import get_upload_file_hash from service.pdf_parse_service import parse_pdf from utils.dox_builder import get_docx_download_url, convert_markdown_to_docx from config.settings import MAX_AGENT_ITERATIONS, REPORT_OUTPUT_DIR, INTENT_RECOGNITION_PROMPT, JSON_CACHE_DIR from service.session_manager import session_manager from service.history_manager import get_or_create_title, append_line def format_sse_event(event_type: str, content: str) -> str: """Format a server-sent event with the given type and content.""" data = json.dumps({"type": event_type, "content": content}, ensure_ascii=False) return f"data: {data}\n\n" async def detect_intent(session_id:str,user_msg: str, has_file: bool) -> str: """ 统一意图识别入口 :param user_msg: 用户文本指令 :param has_file: 是否上传文件 :return: 意图标识字符串 """ session_id = session_id or str(uuid.uuid4()) config = { "configurable": {"thread_id": session_id}, } content = f"用户指令:{user_msg}\n是否上传文件:{has_file}" resp = await intent_agent.ainvoke( {"messages": [HumanMessage(content=content)]}, config=config ) content = resp["messages"][-1].content.strip() intent_data = json.loads(content) intent_type = intent_data.get("intent_type", "unknown") return intent_type def stream_review_sync(file: UploadFile,message:str,session_id:str)-> Iterable[str]: filename = file.filename if not file.filename.endswith(".pdf"): yield format_sse_event("system","目前仅支持PDF文件") return yield format_sse_event("system",f"正在阅读文档《{filename}》...\n") hash_id = get_upload_file_hash(file) cache_path = f"{JSON_CACHE_DIR}/{hash_id}.json" if os.path.exists(cache_path): yield format_sse_event("system", f"已阅读过该文档,直接开始分析...\n") with open(cache_path, "r", encoding="utf-8") as f: ventilation_plan = json.load(f) else: yield format_sse_event("system", f"首次阅读该文档,预计需要5分钟...\n") ventilation_plan = parse_pdf(file) if "error_message" in ventilation_plan: yield format_sse_event("system", f"检测到该文档不包含配风计划相关内容,目前我仅支持配风计划相关内容分析与审查。\n") return session_id = session_id or str(uuid.uuid4()) user_message = f"{message} {filename}" get_or_create_title(session_id, user_message) append_line(session_id, "user_message", user_message) full_report = "" tool_message_buffer = [] # 会话状态 state = session_manager.get_or_create(session_id) state.messages.append({"role": "user", "content": user_message}) # 回调实例(原有逻辑不变) callback = SyncVentCallback(session_id=session_id, output_buffer=tool_message_buffer) config = { "configurable": {"thread_id": session_id}, "callbacks": [callback], "max_iterations": MAX_AGENT_ITERATIONS, "recursion_limit": 20 } # ===================== 核心改动 ===================== # 1. messages 只传简短指令 # 2. 额外挂载 vent_plan_data 到 Agent 自定义状态(大数据不走LLM上下文) agent_input = { "messages": [HumanMessage(content=user_message)], "vent_plan_data": ventilation_plan } # ==================================================== # 保留你原有 stream_mode="messages" 不变 for msg_chunk, meta in review_agent.stream( agent_input, stream_mode="messages", config=config ): # 优先输出工具消息 while tool_message_buffer: yield tool_message_buffer.pop(0) if msg_chunk.content and msg_chunk.type != 'tool': full_report += msg_chunk.content append_line(session_id, "model_thinking", msg_chunk.content) send = json.dumps({"type": "model_thinking", "content": msg_chunk.content}, ensure_ascii=False) yield f"data: {send}\n\n" # 兜底剩余工具消息 while tool_message_buffer: yield tool_message_buffer.pop(0) # 生成Word(原有逻辑不变) if full_report: state.messages.append({"role": "assistant", "content": full_report}) session_manager.update(session_id, state) output_filename = f"{uuid.uuid4()}.docx" output_path = f"{REPORT_OUTPUT_DIR}/{output_filename}" full_report = full_report.split("|||SPLIT_CONTENT|||")[-1] convert_markdown_to_docx(full_report, output_path) download_url = get_docx_download_url(output_filename) append_line(session_id, "word_download", download_url) doc_json = json.dumps({"type": "word_download", "content": download_url}, ensure_ascii=False) yield f"data: {doc_json}\n\n" done = json.dumps({"type": "done", "content": "审查完成", "session_id": session_id}, ensure_ascii=False) yield f"data: {done}\n\n" utils.util.update_user_setting("last_upload_file",hash_id) def stream_calculation_sync(user_msg: str, session_id: Optional[str] = None) -> Iterable[str]: """同步版本的对话流输出""" session_id = session_id or str(uuid.uuid4()) get_or_create_title(session_id, user_msg) append_line(session_id, "user_message", user_msg) full_response = "" tool_message_buffer = [] state = session_manager.get_or_create(session_id) state.messages.append({"role": "user", "content": user_msg}) callback = SyncVentCallback(session_id=session_id, output_buffer=tool_message_buffer) config = { "configurable": {"thread_id": session_id}, "callbacks": [callback], "max_iterations": MAX_AGENT_ITERATIONS, "recursion_limit": 20 } for msg_chunk, meta in calculation_agent.stream( {"messages": [HumanMessage(content=user_msg)]}, stream_mode="messages", config=config ): while tool_message_buffer: yield tool_message_buffer.pop(0) if msg_chunk.content and msg_chunk.type != 'tool': full_response += msg_chunk.content append_line(session_id, "model_thinking", msg_chunk.content) send = json.dumps({"type": "model_thinking", "content": msg_chunk.content}, ensure_ascii=False) yield f"data: {send}\n\n" while tool_message_buffer: yield tool_message_buffer.pop(0) state.messages.append({"role": "assistant", "content": full_response}) session_manager.update(session_id, state) done = json.dumps({"type": "done", "content": "对话结束", "session_id": session_id}, ensure_ascii=False) yield f"data: {done}\n\n" def stream_comparison_sync(file: UploadFile, message: str, session_id: Optional[str] = None) -> Iterable[str]: """ 需风量对照与检查流式输出。 参考 stream_review_sync 的实现模式: 1. 解析PDF文件(或读缓存) 2. 通过 comparison_agent 流式调用5步工具链 3. 生成Word报告并提供下载链接 """ filename = file.filename if not file.filename.endswith(".pdf"): yield format_sse_event("system", "目前仅支持PDF文件") return yield format_sse_event("system", f"正在阅读文档《{filename}》...\n") # 计算文件hash,读缓存或解析 hash_id = get_upload_file_hash(file) cache_path = f"{JSON_CACHE_DIR}/{hash_id}.json" if os.path.exists(cache_path): yield format_sse_event("system", f"已阅读过该文档,直接开始需风量对照分析...\n") with open(cache_path, "r", encoding="utf-8") as f: ventilation_plan = json.load(f) else: yield format_sse_event("system", f"首次阅读该文档,预计需要5分钟...\n") ventilation_plan = parse_pdf(file) if "error_message" in ventilation_plan: yield format_sse_event("system", f"检测到该文档不包含配风计划相关内容,目前仅支持配风计划需风量对照与分析。\n") return session_id = session_id or str(uuid.uuid4()) user_message = f"{message} {filename}" get_or_create_title(session_id, user_message) append_line(session_id, "user_message", user_message) full_report = "" tool_message_buffer = [] # 会话状态 state = session_manager.get_or_create(session_id) state.messages.append({"role": "user", "content": user_message}) # 回调实例 callback = SyncVentCallback(session_id=session_id, output_buffer=tool_message_buffer) config = { "configurable": {"thread_id": session_id}, "callbacks": [callback], "max_iterations": MAX_AGENT_ITERATIONS, "recursion_limit": 40 } # 挂载 vent_plan_data 到 Agent 自定义状态 agent_input = { "messages": [HumanMessage(content=user_message)], "vent_plan_data": ventilation_plan } # yield format_sse_event("system", "开始执行需风量三方对照分析:\n" # "1) 提取配风计划需风量 → 2) 获取模型解算风量 → 3) 获取监测风量 → " # "4) 智能关联合并 → 5) 合规规则检查\n") for msg_chunk, meta in comparison_agent.stream( agent_input, stream_mode="messages", config=config ): # 优先输出工具消息 while tool_message_buffer: yield tool_message_buffer.pop(0) if msg_chunk.content and msg_chunk.type != 'tool': full_report += msg_chunk.content append_line(session_id, "model_thinking", msg_chunk.content) send = json.dumps({"type": "model_thinking", "content": msg_chunk.content}, ensure_ascii=False) yield f"data: {send}\n\n" # 兜底剩余工具消息 while tool_message_buffer: yield tool_message_buffer.pop(0) # 生成Word报告 if full_report: state.messages.append({"role": "assistant", "content": full_report}) session_manager.update(session_id, state) output_filename = f"{uuid.uuid4()}.docx" output_path = f"{REPORT_OUTPUT_DIR}/{output_filename}" full_report = full_report.split("|||SPLIT_CONTENT|||")[-1] convert_markdown_to_docx(full_report, output_path) download_url = get_docx_download_url(output_filename) append_line(session_id, "word_download", download_url) doc_json = json.dumps({"type": "word_download", "content": download_url}, ensure_ascii=False) yield f"data: {doc_json}\n\n" done = json.dumps({"type": "done", "content": "需风量对照检查完成", "session_id": session_id}, ensure_ascii=False) yield f"data: {done}\n\n" utils.util.update_user_setting("last_upload_file", hash_id)