| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- import json
- import os
- import time
- import uuid
- from typing import Iterable, Optional
- from fastapi import UploadFile
- from langchain_core.messages import HumanMessage
- import utils.util
- from core.agent_builder import review_agent, calculation_agent, comparison_agent, intent_agent
- from core.callback import SyncVentCallback
- from parser.base_parser import get_upload_file_hash
- from service.pdf_parse_service import parse_pdf
- from utils.dox_builder import get_docx_download_url, convert_markdown_to_docx
- from config.settings import MAX_AGENT_ITERATIONS, REPORT_OUTPUT_DIR, INTENT_RECOGNITION_PROMPT, JSON_CACHE_DIR
- from service.session_manager import session_manager
- from service.history_manager import get_or_create_title, append_line
- def format_sse_event(event_type: str, content: str) -> str:
- """Format a server-sent event with the given type and content."""
- data = json.dumps({"type": event_type, "content": content}, ensure_ascii=False)
- return f"data: {data}\n\n"
- async def detect_intent(session_id:str,user_msg: str, has_file: bool) -> str:
- """
- 统一意图识别入口
- :param user_msg: 用户文本指令
- :param has_file: 是否上传文件
- :return: 意图标识字符串
- """
- session_id = session_id or str(uuid.uuid4())
- config = {
- "configurable": {"thread_id": session_id},
- }
- content = f"用户指令:{user_msg}\n是否上传文件:{has_file}"
- resp = await intent_agent.ainvoke(
- {"messages": [HumanMessage(content=content)]},
- config=config
- )
- content = resp["messages"][-1].content.strip()
- intent_data = json.loads(content)
- intent_type = intent_data.get("intent_type", "unknown")
- return intent_type
- def stream_review_sync(file: UploadFile,message:str,session_id:str)-> Iterable[str]:
- filename = file.filename
- if not file.filename.endswith(".pdf"):
- yield format_sse_event("system","目前仅支持PDF文件")
- return
- yield format_sse_event("system",f"正在阅读文档《{filename}》...\n")
- hash_id = get_upload_file_hash(file)
- cache_path = f"{JSON_CACHE_DIR}/{hash_id}.json"
- if os.path.exists(cache_path):
- yield format_sse_event("system", f"已阅读过该文档,直接开始分析...\n")
- with open(cache_path, "r", encoding="utf-8") as f:
- ventilation_plan = json.load(f)
- else:
- yield format_sse_event("system", f"首次阅读该文档,预计需要5分钟...\n")
- ventilation_plan = parse_pdf(file)
- if "error_message" in ventilation_plan:
- yield format_sse_event("system", f"检测到该文档不包含配风计划相关内容,目前我仅支持配风计划相关内容分析与审查。\n")
- return
- session_id = session_id or str(uuid.uuid4())
- user_message = f"{message} {filename}"
- get_or_create_title(session_id, user_message)
- append_line(session_id, "user_message", user_message)
- full_report = ""
- tool_message_buffer = []
- # 会话状态
- state = session_manager.get_or_create(session_id)
- state.messages.append({"role": "user", "content": user_message})
- # 回调实例(原有逻辑不变)
- callback = SyncVentCallback(session_id=session_id, output_buffer=tool_message_buffer)
- config = {
- "configurable": {"thread_id": session_id},
- "callbacks": [callback],
- "max_iterations": MAX_AGENT_ITERATIONS,
- "recursion_limit": 20
- }
- # ===================== 核心改动 =====================
- # 1. messages 只传简短指令
- # 2. 额外挂载 vent_plan_data 到 Agent 自定义状态(大数据不走LLM上下文)
- agent_input = {
- "messages": [HumanMessage(content=user_message)],
- "vent_plan_data": ventilation_plan
- }
- # ====================================================
- # 保留你原有 stream_mode="messages" 不变
- for msg_chunk, meta in review_agent.stream(
- agent_input,
- stream_mode="messages",
- config=config
- ):
- # 优先输出工具消息
- while tool_message_buffer:
- yield tool_message_buffer.pop(0)
- if msg_chunk.content and msg_chunk.type != 'tool':
- full_report += msg_chunk.content
- append_line(session_id, "model_thinking", msg_chunk.content)
- send = json.dumps({"type": "model_thinking", "content": msg_chunk.content}, ensure_ascii=False)
- yield f"data: {send}\n\n"
- # 兜底剩余工具消息
- while tool_message_buffer:
- yield tool_message_buffer.pop(0)
- # 生成Word(原有逻辑不变)
- if full_report:
- state.messages.append({"role": "assistant", "content": full_report})
- session_manager.update(session_id, state)
- output_filename = f"{uuid.uuid4()}.docx"
- output_path = f"{REPORT_OUTPUT_DIR}/{output_filename}"
- full_report = full_report.split("|||SPLIT_CONTENT|||")[-1]
- convert_markdown_to_docx(full_report, output_path)
- download_url = get_docx_download_url(output_filename)
- append_line(session_id, "word_download", download_url)
- doc_json = json.dumps({"type": "word_download", "content": download_url}, ensure_ascii=False)
- yield f"data: {doc_json}\n\n"
- done = json.dumps({"type": "done", "content": "审查完成", "session_id": session_id}, ensure_ascii=False)
- yield f"data: {done}\n\n"
- utils.util.update_user_setting("last_upload_file",hash_id)
- def stream_calculation_sync(user_msg: str, session_id: Optional[str] = None) -> Iterable[str]:
- """同步版本的对话流输出"""
- session_id = session_id or str(uuid.uuid4())
- get_or_create_title(session_id, user_msg)
- append_line(session_id, "user_message", user_msg)
- full_response = ""
- tool_message_buffer = []
- state = session_manager.get_or_create(session_id)
- state.messages.append({"role": "user", "content": user_msg})
- callback = SyncVentCallback(session_id=session_id, output_buffer=tool_message_buffer)
- config = {
- "configurable": {"thread_id": session_id},
- "callbacks": [callback],
- "max_iterations": MAX_AGENT_ITERATIONS,
- "recursion_limit": 20
- }
- for msg_chunk, meta in calculation_agent.stream(
- {"messages": [HumanMessage(content=user_msg)]},
- stream_mode="messages",
- config=config
- ):
- while tool_message_buffer:
- yield tool_message_buffer.pop(0)
- if msg_chunk.content and msg_chunk.type != 'tool':
- full_response += msg_chunk.content
- append_line(session_id, "model_thinking", msg_chunk.content)
- send = json.dumps({"type": "model_thinking", "content": msg_chunk.content}, ensure_ascii=False)
- yield f"data: {send}\n\n"
- while tool_message_buffer:
- yield tool_message_buffer.pop(0)
- state.messages.append({"role": "assistant", "content": full_response})
- session_manager.update(session_id, state)
- done = json.dumps({"type": "done", "content": "对话结束", "session_id": session_id}, ensure_ascii=False)
- yield f"data: {done}\n\n"
- def stream_comparison_sync(file: UploadFile, message: str, session_id: Optional[str] = None) -> Iterable[str]:
- """
- 需风量对照与检查流式输出。
- 参考 stream_review_sync 的实现模式:
- 1. 解析PDF文件(或读缓存)
- 2. 通过 comparison_agent 流式调用5步工具链
- 3. 生成Word报告并提供下载链接
- """
- filename = file.filename
- if not file.filename.endswith(".pdf"):
- yield format_sse_event("system", "目前仅支持PDF文件")
- return
- yield format_sse_event("system", f"正在阅读文档《{filename}》...\n")
- # 计算文件hash,读缓存或解析
- hash_id = get_upload_file_hash(file)
- cache_path = f"{JSON_CACHE_DIR}/{hash_id}.json"
- if os.path.exists(cache_path):
- yield format_sse_event("system", f"已阅读过该文档,直接开始需风量对照分析...\n")
- with open(cache_path, "r", encoding="utf-8") as f:
- ventilation_plan = json.load(f)
- else:
- yield format_sse_event("system", f"首次阅读该文档,预计需要5分钟...\n")
- ventilation_plan = parse_pdf(file)
- if "error_message" in ventilation_plan:
- yield format_sse_event("system",
- f"检测到该文档不包含配风计划相关内容,目前仅支持配风计划需风量对照与分析。\n")
- return
- session_id = session_id or str(uuid.uuid4())
- user_message = f"{message} {filename}"
- get_or_create_title(session_id, user_message)
- append_line(session_id, "user_message", user_message)
- full_report = ""
- tool_message_buffer = []
- # 会话状态
- state = session_manager.get_or_create(session_id)
- state.messages.append({"role": "user", "content": user_message})
- # 回调实例
- callback = SyncVentCallback(session_id=session_id, output_buffer=tool_message_buffer)
- config = {
- "configurable": {"thread_id": session_id},
- "callbacks": [callback],
- "max_iterations": MAX_AGENT_ITERATIONS,
- "recursion_limit": 40
- }
- # 挂载 vent_plan_data 到 Agent 自定义状态
- agent_input = {
- "messages": [HumanMessage(content=user_message)],
- "vent_plan_data": ventilation_plan
- }
- # yield format_sse_event("system", "开始执行需风量三方对照分析:\n"
- # "1) 提取配风计划需风量 → 2) 获取模型解算风量 → 3) 获取监测风量 → "
- # "4) 智能关联合并 → 5) 合规规则检查\n")
- for msg_chunk, meta in comparison_agent.stream(
- agent_input,
- stream_mode="messages",
- config=config
- ):
- # 优先输出工具消息
- while tool_message_buffer:
- yield tool_message_buffer.pop(0)
- if msg_chunk.content and msg_chunk.type != 'tool':
- full_report += msg_chunk.content
- append_line(session_id, "model_thinking", msg_chunk.content)
- send = json.dumps({"type": "model_thinking", "content": msg_chunk.content}, ensure_ascii=False)
- yield f"data: {send}\n\n"
- # 兜底剩余工具消息
- while tool_message_buffer:
- yield tool_message_buffer.pop(0)
- # 生成Word报告
- if full_report:
- state.messages.append({"role": "assistant", "content": full_report})
- session_manager.update(session_id, state)
- output_filename = f"{uuid.uuid4()}.docx"
- output_path = f"{REPORT_OUTPUT_DIR}/{output_filename}"
- full_report = full_report.split("|||SPLIT_CONTENT|||")[-1]
- convert_markdown_to_docx(full_report, output_path)
- download_url = get_docx_download_url(output_filename)
- append_line(session_id, "word_download", download_url)
- doc_json = json.dumps({"type": "word_download", "content": download_url}, ensure_ascii=False)
- yield f"data: {doc_json}\n\n"
- done = json.dumps({"type": "done", "content": "需风量对照检查完成", "session_id": session_id}, ensure_ascii=False)
- yield f"data: {done}\n\n"
- utils.util.update_user_setting("last_upload_file", hash_id)
|