| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- """
- 需风量对照与检查工具集
- 工具1: get_plan_required_wind — 获取配风计划需风量
- 工具2: get_model_calculated_wind — 获取模型解算风量
- 工具3: get_sensor_wind — 获取监测风量
- 工具4: merge_wind_by_location — 使用LLM按名称关联合并风量数据
- 工具5: check_wind_compliance — 风量合规规则检查
- """
- import json
- import os
- import logging
- from typing import Annotated, Any
- from langchain.tools import tool, InjectedState
- from langchain.chat_models import init_chat_model
- from openai import OpenAI
- from service.data_service import VentDataService
- from config.settings import JSON_CACHE_DIR, BASE_URL, LLM_API_KEY, LLM_BASE_URL, LLM_MODEL
- from service.auth import AuthService
- import requests
- logging.basicConfig(level=logging.INFO, format="【%(name)s】%(message)s")
- logger = logging.getLogger("VentComparisonTool")
- # 模块级存储,用于同一个 Agent 调用链中工具间传递中间结果
- _store: dict = {}
- # 独立初始化 LLM model(避免循环导入)
- # _merge_model = init_chat_model(
- # model=LLM_MODEL,
- # api_key=LLM_API_KEY,
- # base_url=LLM_BASE_URL,
- # temperature=0,
- # streaming=True,
- # model_provider="openai",
- # extra_body={"enable_thinking": False},
- # timeout=300
- # )
- client = OpenAI(
- api_key=os.getenv("LLM_API_KEY"),
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
- MERGED_MODEL = "qwen3.7-max"
- # ============================================================
- # 工具1: 获取配风计划需风量
- # ============================================================
- @tool
- def get_plan_required_wind(
- state: Annotated[dict, InjectedState]
- ) -> str:
- """
- 从用户上传的配风计划文件中提取各用风地点的需风量数据。
- 自动读取解析缓存(基于文件hashID),若未解析则先解析再缓存。
- 提取字段:
- - coal_faces: face_name → q_max
- - tunneling_faces: face_name → q_max
- - chambers: name → qd
- - other_points: name → Qo 或其他风量字段
- 返回统一列表 [{"plan_name":"地点名","plan_q":需风量值}]
- """
- vent_plan = state.get("vent_plan_data", {})
- if not vent_plan:
- return json.dumps({"error": "未读取到配风计划数据,请先上传PDF文件"}, ensure_ascii=False)
- logger.info("===== 开始提取配风计划需风量 =====")
- plan_list = []
- # 采煤工作面
- coal_faces = vent_plan.get("coal_faces", [])
- for face in coal_faces:
- name = face.get("face_name", "")
- q_max = face.get("q_max", 0)
- if name:
- plan_list.append({"plan_name": name, "plan_q": q_max if q_max else 0})
- logger.info(f" 采煤工作面: {name} -> 需风量 {q_max}")
- # 掘进工作面
- tunneling_faces = vent_plan.get("tunneling_faces", [])
- for face in tunneling_faces:
- name = face.get("face_name", "")
- q_max = face.get("q_max", 0)
- if name:
- plan_list.append({"plan_name": name, "plan_q": q_max if q_max else 0})
- logger.info(f" 掘进工作面: {name} -> 需风量 {q_max}")
- # 硐室
- chambers = vent_plan.get("chambers", [])
- for chamber in chambers:
- name = chamber.get("name", "")
- qd = chamber.get("qd", 0)
- if name:
- plan_list.append({"plan_name": name, "plan_q": qd if qd else 0})
- logger.info(f" 硐室: {name} -> 需风量 {qd}")
- # 其他用风地点
- other_points = vent_plan.get("other_points", [])
- for point in other_points:
- name = point.get("name", "")
- q_val = point.get("Qo") or point.get("q_max") or 0
- if name:
- plan_list.append({"plan_name": name, "plan_q": q_val if q_val else 0})
- logger.info(f" 其他地点: {name} -> 需风量 {q_val}")
- logger.info(f"===== 提取完成,共 {len(plan_list)} 个地点 =====")
- result = {"plan_list": plan_list, "count": len(plan_list)}
- # 写入模块级存储供后续工具使用
- _store["plan_wind_result"] = result
- logger.info(f"===== 提取完成,共 {len(plan_list)} 个地点 =====")
- return json.dumps(result, ensure_ascii=False)
- # ============================================================
- # 工具2: 获取模型解算风量
- # ============================================================
- @tool
- def get_model_calculated_wind() -> str:
- """
- 获取通风模型解算风量数据。
- 通过 GET /Vmodel/agent/get/model/wind/{model_id} 接口拉取。
- 返回所有地点的模型解算风量列表。
- 获取成功后自动将结果存入模块级存储供后续工具使用。
- """
- logger.info("===== 开始获取模型解算风量 =====")
- data_service = VentDataService()
- default_model_id = data_service._get_default_model_id()
- if not default_model_id:
- logger.warning("无法获取默认模型ID")
- return json.dumps({"error": "无法获取默认模型ID,请确认通风系统已配置默认模型"}, ensure_ascii=False)
- logger.info(f"默认模型ID: {default_model_id}")
- try:
- url = f"{BASE_URL}/Vmodel/agent/get/model/wind/{default_model_id}"
- headers = {"X-Access-Token": AuthService.get_token()} if AuthService.get_token() else {}
- logger.info(f"请求地址: {url}")
- resp = requests.get(url, headers=headers, timeout=30)
- resp.raise_for_status()
- data = resp.json()
- if data.get("success"):
- model_list = data.get("result", [])
- formatted = []
- for item in model_list:
- formatted.append({
- "name": item.get("name", ""),
- "fq": item.get("fq", 0)
- })
- logger.info(f"获取到 {len(formatted)} 条模型解算风量数据")
- result = {"model_list": formatted, "count": len(formatted)}
- _store["model_wind_result"] = result
- return json.dumps(result, ensure_ascii=False)
- else:
- logger.warning(f"接口返回失败: {data.get('message', '')}")
- return json.dumps({"error": f"获取模型解算风量失败: {data.get('message', '')}"}, ensure_ascii=False)
- except Exception as e:
- logger.error(f"获取模型解算风量异常: {str(e)}")
- return json.dumps({"error": f"获取模型解算风量异常: {str(e)}"}, ensure_ascii=False)
- # ============================================================
- # 工具3: 获取监测风量
- # ============================================================
- @tool
- def get_sensor_wind() -> str:
- """
- 获取传感器监测风量数据。
- 通过 GET /Vmodel/agent/get/sensor/wind/{model_id} 接口拉取。
- 返回所有传感器的监测风量列表。
- 获取成功后自动将结果存入模块级存储供后续工具使用。
- """
- logger.info("===== 开始获取传感器监测风量 =====")
- data_service = VentDataService()
- default_model_id = data_service._get_default_model_id()
- if not default_model_id:
- logger.warning("无法获取默认模型ID")
- return json.dumps({"error": "无法获取默认模型ID,请确认通风系统已配置默认模型"}, ensure_ascii=False)
- logger.info(f"默认模型ID: {default_model_id}")
- try:
- url = f"{BASE_URL}/Vmodel/agent/get/sensor/wind/{default_model_id}"
- headers = {"X-Access-Token": AuthService.get_token()} if AuthService.get_token() else {}
- logger.info(f"请求地址: {url}")
- resp = requests.get(url, headers=headers, timeout=30)
- resp.raise_for_status()
- data = resp.json()
- if data.get("success"):
- sensor_list = data.get("result", [])
- formatted = []
- for item in sensor_list:
- formatted.append({
- "tunName": item.get("tunName", ""),
- "location": item.get("location", ""),
- "fq": item.get("fq", 0)
- })
- logger.info(f"获取到 {len(formatted)} 条监测风量数据")
- result = {"sensor_list": formatted, "count": len(formatted)}
- _store["sensor_wind_result"] = result
- return json.dumps(result, ensure_ascii=False)
- else:
- logger.warning(f"接口返回失败: {data.get('message', '')}")
- return json.dumps({"error": f"获取监测风量失败: {data.get('message', '')}"}, ensure_ascii=False)
- except Exception as e:
- logger.error(f"获取监测风量异常: {str(e)}")
- return json.dumps({"error": f"获取监测风量异常: {str(e)}"}, ensure_ascii=False)
- # ============================================================
- # 工具4: LLM 名称关联合并风量数据
- # ============================================================
- @tool
- def merge_wind_by_location() -> str:
- """
- 使用大模型的语义理解能力,将配风计划、模型解算、传感器监测三份数据
- 按地点名称进行语义关联与合并。
- 从前序工具自动存入的模块级存储中读取数据
- 合并规则:
- - 以配风计划中的地点为主(plan_name / plan_q)
- - 从模型解算数据中匹配对应地点的 model_fq
- - 从传感器监测数据中匹配对应地点的 wind_fq
- - 忽略两个外部接口中无法匹配到配风计划的冗余地点
- 返回:合并后的JSON数组。
- """
- logger.info("===== 开始LLM名称关联合并风量数据 =====")
- store = _store
- plan_data = store.get("plan_wind_result")
- model_data = store.get("model_wind_result")
- sensor_data = store.get("sensor_wind_result")
- # 校验数据完整性
- missing = []
- if not plan_data:
- missing.append("配风计划需风量(plan_wind_result)")
- if not model_data:
- missing.append("模型解算风量(model_wind_result)")
- if not sensor_data:
- missing.append("监测风量(sensor_wind_result)")
- if missing:
- return json.dumps({"error": f"缺少数据源: {', '.join(missing)},请确保前序工具已执行并保存结果"}, ensure_ascii=False)
- plan_data_json = json.dumps(plan_data, ensure_ascii=False)
- model_data_json = json.dumps(model_data, ensure_ascii=False)
- sensor_data_json = json.dumps(sensor_data, ensure_ascii=False)
- prompt = f"""你是一个煤矿通风专业的数据关联专家。请将以下三份数据按地点名称进行语义关联与合并。
- 关联规则:
- 1. 以配风计划需风量中的地点为基准(plan_name),这是核心列表
- 2. 从模型解算风量中找出名称语义最相似的地点,提取其 fq 作为 model_fq
- 3. 从监测风量中找出名称语义最相似的地点,提取其 fq 作为 wind_fq。监测风量中的 tunName 比 location 更具参考价值
- 4. 如果某个配风计划地点在模型解算/监测中找不到对应项,则对应字段填 null
- 5. 忽略模型解算和监测中无法与任何配风计划地点关联的冗余数据
- 6. 名称匹配需要语义理解,例如:22210综采工作面 约等于 22210 综采工作面 约等于 22210工作面
-
- ============================
- 配风计划需风量:
- {plan_data_json}
-
- 模型解算风量:
- {model_data_json}
-
- 监测风量:
- {sensor_data_json}
- ============================
-
- 请输出严格JSON数组(不要包含任何解释文字),格式如下:
- [{{
- "location": "地点名称",
- "plan_fq": 数值,
- "model_fq": 数值或null,
- "wind_fq": 数值或null
- }}]
-
- 注意:
- - plan_fq 使用配风计划中的 plan_q 值
- - model_fq 使用模型解算中的 fq 值,找不到则为 null
- - wind_fq 使用监测风量中的 fq 值,找不到则为 null
- - 只输出JSON数组,开头不要包含```json标记,结尾不要包含```标记
- """
- try:
- messages = [{"role": "user", "content": prompt}]
- completion = client.chat.completions.create(
- model=MERGED_MODEL,
- messages=messages,
- extra_body={"enable_thinking": False},
- stream=False,
- )
- result_text = completion.choices[0].message.content
- # 清洗可能的markdown标记
- if result_text.startswith("```"):
- lines = result_text.split("\n")
- if lines[0].startswith("```"):
- lines = lines[1:]
- if lines and lines[-1].startswith("```"):
- lines = lines[:-1]
- result_text = "\n".join(lines)
- logger.info(f"LLM合并结果前500字符: {result_text[:500]}")
- # 验证是否为有效JSON
- merged = json.loads(result_text)
- result = {"merged_list": merged, "count": len(merged)}
- with open("result_output.json", "w", encoding="utf-8") as f:
- json.dump(result, f, ensure_ascii=False, indent=2)
- _store["merged_wind_result"] = result
- return json.dumps(result, ensure_ascii=False)
- except json.JSONDecodeError as e:
- logger.error(f"LLM返回非JSON格式: {result_text[:300]}")
- return json.dumps({"error": f"LLM合并结果格式异常: {str(e)}", "raw": result_text[:500]}, ensure_ascii=False)
- except Exception as e:
- logger.error(f"LLM合并异常: {str(e)}")
- return json.dumps({"error": f"LLM合并异常: {str(e)}"}, ensure_ascii=False)
- # ============================================================
- # 工具5: 风量合规规则检查
- # ============================================================
- @tool
- def check_wind_compliance() -> str:
- """
- 从模块级存储中读取合并后的风量数据进行合规性规则检查。
- 检查规则:
- 1. 每个地点:解算风量(model_fq) 且 监测风量(wind_fq) >= 需风量(plan_fq)
- 2. 每个地点:进风量(取解算风量和监测风量中较大者) > 需风量 * 150% 视为过量
- 返回:合规与不合规地点的分类结果及详细分析
- """
- logger.info("===== 开始风量合规规则检查 =====")
- merged_data = _store.get("merged_wind_result")
- if not merged_data:
- return json.dumps({"error": "未获取到合并后的风量数据,请确保 merge_wind_by_location 已执行"}, ensure_ascii=False)
- merged_list = merged_data.get("merged_list", [])
- if not merged_list:
- return json.dumps({"error": "合并后的风量数据为空"}, ensure_ascii=False)
- compliant = []
- non_compliant = []
- for item in merged_list:
- location = item.get("location", "未知地点")
- plan_fq = item.get("plan_fq") or 0
- model_fq = item.get("model_fq")
- wind_fq = item.get("wind_fq")
- issues = []
- # 规则1:解算风量 且 监测风量 >= 需风量
- model_check = True
- wind_check = True
- if model_fq is not None and model_fq > 0:
- if model_fq < plan_fq:
- model_check = False
- diff = plan_fq - model_fq
- issues.append(f"解算风量({model_fq:.1f}) < 需风量({plan_fq:.1f}),差额{diff:.1f}")
- elif plan_fq > 0:
- model_check = False
- issues.append(f"缺少解算风量数据,无法满足需风量({plan_fq:.1f})")
- if wind_fq is not None and wind_fq > 0:
- if wind_fq < plan_fq:
- wind_check = False
- diff = plan_fq - wind_fq
- issues.append(f"监测风量({wind_fq:.1f}) < 需风量({plan_fq:.1f}),差额{diff:.1f}")
- elif plan_fq > 0:
- wind_check = False
- issues.append(f"缺少监测风量数据,无法满足需风量({plan_fq:.1f})")
- # 规则2:进风量 > 需风量 * 150%
- over_supply = False
- actual_wind = max(
- model_fq if model_fq else 0,
- wind_fq if wind_fq else 0
- )
- if plan_fq > 0 and actual_wind > 0:
- ratio = actual_wind / plan_fq
- if ratio > 1.5:
- over_supply = True
- issues.append(f"进风量({actual_wind:.1f}) > 需风量*150%({plan_fq * 1.5:.1f}),供风过量{ratio*100:.0f}%")
- entry = {
- "location": location,
- "plan_fq": plan_fq,
- "model_fq": model_fq,
- "wind_fq": wind_fq,
- "issues": issues,
- "model_check": model_check,
- "wind_check": wind_check,
- "over_supply": over_supply,
- "is_compliant": len(issues) == 0
- }
- if entry["is_compliant"]:
- compliant.append(entry)
- else:
- non_compliant.append(entry)
- logger.info(f"合规: {len(compliant)}, 不合规: {len(non_compliant)}")
- total = len(merged_list)
- compliant_count = len(compliant)
- result = {
- "compliant": compliant,
- "non_compliant": non_compliant,
- "summary": {
- "total": total,
- "compliant_count": compliant_count,
- "non_compliant_count": len(non_compliant),
- "compliance_rate": f"{compliant_count / total * 100:.1f}%" if total else "0%"
- }
- }
- return json.dumps(result, ensure_ascii=False, indent=2)
- if __name__ == '__main__':
- # 本地调试代码
- print("vent_comparison_tools 加载成功")
|