""" 需风量对照与检查工具集 工具1: get_plan_required_wind — 获取配风计划需风量 工具2: get_model_calculated_wind — 获取模型解算风量 工具3: get_sensor_wind — 获取监测风量 工具4: merge_wind_by_location — 使用LLM按名称关联合并风量数据 工具5: check_wind_compliance — 风量合规规则检查 """ import json import os import logging from typing import Annotated, Any from langchain.tools import tool, InjectedState from langchain.chat_models import init_chat_model from openai import OpenAI from service.data_service import VentDataService from config.settings import JSON_CACHE_DIR, BASE_URL, LLM_API_KEY, LLM_BASE_URL, LLM_MODEL from service.auth import AuthService import requests logging.basicConfig(level=logging.INFO, format="【%(name)s】%(message)s") logger = logging.getLogger("VentComparisonTool") # 模块级存储,用于同一个 Agent 调用链中工具间传递中间结果 _store: dict = {} # 独立初始化 LLM model(避免循环导入) # _merge_model = init_chat_model( # model=LLM_MODEL, # api_key=LLM_API_KEY, # base_url=LLM_BASE_URL, # temperature=0, # streaming=True, # model_provider="openai", # extra_body={"enable_thinking": False}, # timeout=300 # ) client = OpenAI( api_key=os.getenv("LLM_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) MERGED_MODEL = "qwen3.7-max" # ============================================================ # 工具1: 获取配风计划需风量 # ============================================================ @tool def get_plan_required_wind( state: Annotated[dict, InjectedState] ) -> str: """ 从用户上传的配风计划文件中提取各用风地点的需风量数据。 自动读取解析缓存(基于文件hashID),若未解析则先解析再缓存。 提取字段: - coal_faces: face_name → q_max - tunneling_faces: face_name → q_max - chambers: name → qd - other_points: name → Qo 或其他风量字段 返回统一列表 [{"plan_name":"地点名","plan_q":需风量值}] """ vent_plan = state.get("vent_plan_data", {}) if not vent_plan: return json.dumps({"error": "未读取到配风计划数据,请先上传PDF文件"}, ensure_ascii=False) logger.info("===== 开始提取配风计划需风量 =====") plan_list = [] # 采煤工作面 coal_faces = vent_plan.get("coal_faces", []) for face in coal_faces: name = face.get("face_name", "") q_max = face.get("q_max", 0) if name: plan_list.append({"plan_name": name, "plan_q": q_max if q_max else 0}) logger.info(f" 采煤工作面: {name} -> 需风量 {q_max}") # 掘进工作面 tunneling_faces = vent_plan.get("tunneling_faces", []) for face in tunneling_faces: name = face.get("face_name", "") q_max = face.get("q_max", 0) if name: plan_list.append({"plan_name": name, "plan_q": q_max if q_max else 0}) logger.info(f" 掘进工作面: {name} -> 需风量 {q_max}") # 硐室 chambers = vent_plan.get("chambers", []) for chamber in chambers: name = chamber.get("name", "") qd = chamber.get("qd", 0) if name: plan_list.append({"plan_name": name, "plan_q": qd if qd else 0}) logger.info(f" 硐室: {name} -> 需风量 {qd}") # 其他用风地点 other_points = vent_plan.get("other_points", []) for point in other_points: name = point.get("name", "") q_val = point.get("Qo") or point.get("q_max") or 0 if name: plan_list.append({"plan_name": name, "plan_q": q_val if q_val else 0}) logger.info(f" 其他地点: {name} -> 需风量 {q_val}") logger.info(f"===== 提取完成,共 {len(plan_list)} 个地点 =====") result = {"plan_list": plan_list, "count": len(plan_list)} # 写入模块级存储供后续工具使用 _store["plan_wind_result"] = result logger.info(f"===== 提取完成,共 {len(plan_list)} 个地点 =====") return json.dumps(result, ensure_ascii=False) # ============================================================ # 工具2: 获取模型解算风量 # ============================================================ @tool def get_model_calculated_wind() -> str: """ 获取通风模型解算风量数据。 通过 GET /Vmodel/agent/get/model/wind/{model_id} 接口拉取。 返回所有地点的模型解算风量列表。 获取成功后自动将结果存入模块级存储供后续工具使用。 """ logger.info("===== 开始获取模型解算风量 =====") data_service = VentDataService() default_model_id = data_service._get_default_model_id() if not default_model_id: logger.warning("无法获取默认模型ID") return json.dumps({"error": "无法获取默认模型ID,请确认通风系统已配置默认模型"}, ensure_ascii=False) logger.info(f"默认模型ID: {default_model_id}") try: url = f"{BASE_URL}/Vmodel/agent/get/model/wind/{default_model_id}" headers = {"X-Access-Token": AuthService.get_token()} if AuthService.get_token() else {} logger.info(f"请求地址: {url}") resp = requests.get(url, headers=headers, timeout=30) resp.raise_for_status() data = resp.json() if data.get("success"): model_list = data.get("result", []) formatted = [] for item in model_list: formatted.append({ "name": item.get("name", ""), "fq": item.get("fq", 0) }) logger.info(f"获取到 {len(formatted)} 条模型解算风量数据") result = {"model_list": formatted, "count": len(formatted)} _store["model_wind_result"] = result return json.dumps(result, ensure_ascii=False) else: logger.warning(f"接口返回失败: {data.get('message', '')}") return json.dumps({"error": f"获取模型解算风量失败: {data.get('message', '')}"}, ensure_ascii=False) except Exception as e: logger.error(f"获取模型解算风量异常: {str(e)}") return json.dumps({"error": f"获取模型解算风量异常: {str(e)}"}, ensure_ascii=False) # ============================================================ # 工具3: 获取监测风量 # ============================================================ @tool def get_sensor_wind() -> str: """ 获取传感器监测风量数据。 通过 GET /Vmodel/agent/get/sensor/wind/{model_id} 接口拉取。 返回所有传感器的监测风量列表。 获取成功后自动将结果存入模块级存储供后续工具使用。 """ logger.info("===== 开始获取传感器监测风量 =====") data_service = VentDataService() default_model_id = data_service._get_default_model_id() if not default_model_id: logger.warning("无法获取默认模型ID") return json.dumps({"error": "无法获取默认模型ID,请确认通风系统已配置默认模型"}, ensure_ascii=False) logger.info(f"默认模型ID: {default_model_id}") try: url = f"{BASE_URL}/Vmodel/agent/get/sensor/wind/{default_model_id}" headers = {"X-Access-Token": AuthService.get_token()} if AuthService.get_token() else {} logger.info(f"请求地址: {url}") resp = requests.get(url, headers=headers, timeout=30) resp.raise_for_status() data = resp.json() if data.get("success"): sensor_list = data.get("result", []) formatted = [] for item in sensor_list: formatted.append({ "tunName": item.get("tunName", ""), "location": item.get("location", ""), "fq": item.get("fq", 0) }) logger.info(f"获取到 {len(formatted)} 条监测风量数据") result = {"sensor_list": formatted, "count": len(formatted)} _store["sensor_wind_result"] = result return json.dumps(result, ensure_ascii=False) else: logger.warning(f"接口返回失败: {data.get('message', '')}") return json.dumps({"error": f"获取监测风量失败: {data.get('message', '')}"}, ensure_ascii=False) except Exception as e: logger.error(f"获取监测风量异常: {str(e)}") return json.dumps({"error": f"获取监测风量异常: {str(e)}"}, ensure_ascii=False) # ============================================================ # 工具4: LLM 名称关联合并风量数据 # ============================================================ @tool def merge_wind_by_location() -> str: """ 使用大模型的语义理解能力,将配风计划、模型解算、传感器监测三份数据 按地点名称进行语义关联与合并。 从前序工具自动存入的模块级存储中读取数据 合并规则: - 以配风计划中的地点为主(plan_name / plan_q) - 从模型解算数据中匹配对应地点的 model_fq - 从传感器监测数据中匹配对应地点的 wind_fq - 忽略两个外部接口中无法匹配到配风计划的冗余地点 返回:合并后的JSON数组。 """ logger.info("===== 开始LLM名称关联合并风量数据 =====") store = _store plan_data = store.get("plan_wind_result") model_data = store.get("model_wind_result") sensor_data = store.get("sensor_wind_result") # 校验数据完整性 missing = [] if not plan_data: missing.append("配风计划需风量(plan_wind_result)") if not model_data: missing.append("模型解算风量(model_wind_result)") if not sensor_data: missing.append("监测风量(sensor_wind_result)") if missing: return json.dumps({"error": f"缺少数据源: {', '.join(missing)},请确保前序工具已执行并保存结果"}, ensure_ascii=False) plan_data_json = json.dumps(plan_data, ensure_ascii=False) model_data_json = json.dumps(model_data, ensure_ascii=False) sensor_data_json = json.dumps(sensor_data, ensure_ascii=False) prompt = f"""你是一个煤矿通风专业的数据关联专家。请将以下三份数据按地点名称进行语义关联与合并。 关联规则: 1. 以配风计划需风量中的地点为基准(plan_name),这是核心列表 2. 从模型解算风量中找出名称语义最相似的地点,提取其 fq 作为 model_fq 3. 从监测风量中找出名称语义最相似的地点,提取其 fq 作为 wind_fq。监测风量中的 tunName 比 location 更具参考价值 4. 如果某个配风计划地点在模型解算/监测中找不到对应项,则对应字段填 null 5. 忽略模型解算和监测中无法与任何配风计划地点关联的冗余数据 6. 名称匹配需要语义理解,例如:22210综采工作面 约等于 22210 综采工作面 约等于 22210工作面 ============================ 配风计划需风量: {plan_data_json} 模型解算风量: {model_data_json} 监测风量: {sensor_data_json} ============================ 请输出严格JSON数组(不要包含任何解释文字),格式如下: [{{ "location": "地点名称", "plan_fq": 数值, "model_fq": 数值或null, "wind_fq": 数值或null }}] 注意: - plan_fq 使用配风计划中的 plan_q 值 - model_fq 使用模型解算中的 fq 值,找不到则为 null - wind_fq 使用监测风量中的 fq 值,找不到则为 null - 只输出JSON数组,开头不要包含```json标记,结尾不要包含```标记 """ try: messages = [{"role": "user", "content": prompt}] completion = client.chat.completions.create( model=MERGED_MODEL, messages=messages, extra_body={"enable_thinking": False}, stream=False, ) result_text = completion.choices[0].message.content # 清洗可能的markdown标记 if result_text.startswith("```"): lines = result_text.split("\n") if lines[0].startswith("```"): lines = lines[1:] if lines and lines[-1].startswith("```"): lines = lines[:-1] result_text = "\n".join(lines) logger.info(f"LLM合并结果前500字符: {result_text[:500]}") # 验证是否为有效JSON merged = json.loads(result_text) result = {"merged_list": merged, "count": len(merged)} with open("result_output.json", "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) _store["merged_wind_result"] = result return json.dumps(result, ensure_ascii=False) except json.JSONDecodeError as e: logger.error(f"LLM返回非JSON格式: {result_text[:300]}") return json.dumps({"error": f"LLM合并结果格式异常: {str(e)}", "raw": result_text[:500]}, ensure_ascii=False) except Exception as e: logger.error(f"LLM合并异常: {str(e)}") return json.dumps({"error": f"LLM合并异常: {str(e)}"}, ensure_ascii=False) # ============================================================ # 工具5: 风量合规规则检查 # ============================================================ @tool def check_wind_compliance() -> str: """ 从模块级存储中读取合并后的风量数据进行合规性规则检查。 检查规则: 1. 每个地点:解算风量(model_fq) 且 监测风量(wind_fq) >= 需风量(plan_fq) 2. 每个地点:进风量(取解算风量和监测风量中较大者) > 需风量 * 150% 视为过量 返回:合规与不合规地点的分类结果及详细分析 """ logger.info("===== 开始风量合规规则检查 =====") merged_data = _store.get("merged_wind_result") if not merged_data: return json.dumps({"error": "未获取到合并后的风量数据,请确保 merge_wind_by_location 已执行"}, ensure_ascii=False) merged_list = merged_data.get("merged_list", []) if not merged_list: return json.dumps({"error": "合并后的风量数据为空"}, ensure_ascii=False) compliant = [] non_compliant = [] for item in merged_list: location = item.get("location", "未知地点") plan_fq = item.get("plan_fq") or 0 model_fq = item.get("model_fq") wind_fq = item.get("wind_fq") issues = [] # 规则1:解算风量 且 监测风量 >= 需风量 model_check = True wind_check = True if model_fq is not None and model_fq > 0: if model_fq < plan_fq: model_check = False diff = plan_fq - model_fq issues.append(f"解算风量({model_fq:.1f}) < 需风量({plan_fq:.1f}),差额{diff:.1f}") elif plan_fq > 0: model_check = False issues.append(f"缺少解算风量数据,无法满足需风量({plan_fq:.1f})") if wind_fq is not None and wind_fq > 0: if wind_fq < plan_fq: wind_check = False diff = plan_fq - wind_fq issues.append(f"监测风量({wind_fq:.1f}) < 需风量({plan_fq:.1f}),差额{diff:.1f}") elif plan_fq > 0: wind_check = False issues.append(f"缺少监测风量数据,无法满足需风量({plan_fq:.1f})") # 规则2:进风量 > 需风量 * 150% over_supply = False actual_wind = max( model_fq if model_fq else 0, wind_fq if wind_fq else 0 ) if plan_fq > 0 and actual_wind > 0: ratio = actual_wind / plan_fq if ratio > 1.5: over_supply = True issues.append(f"进风量({actual_wind:.1f}) > 需风量*150%({plan_fq * 1.5:.1f}),供风过量{ratio*100:.0f}%") entry = { "location": location, "plan_fq": plan_fq, "model_fq": model_fq, "wind_fq": wind_fq, "issues": issues, "model_check": model_check, "wind_check": wind_check, "over_supply": over_supply, "is_compliant": len(issues) == 0 } if entry["is_compliant"]: compliant.append(entry) else: non_compliant.append(entry) logger.info(f"合规: {len(compliant)}, 不合规: {len(non_compliant)}") total = len(merged_list) compliant_count = len(compliant) result = { "compliant": compliant, "non_compliant": non_compliant, "summary": { "total": total, "compliant_count": compliant_count, "non_compliant_count": len(non_compliant), "compliance_rate": f"{compliant_count / total * 100:.1f}%" if total else "0%" } } return json.dumps(result, ensure_ascii=False, indent=2) if __name__ == '__main__': # 本地调试代码 print("vent_comparison_tools 加载成功")