import json from typing import Annotated, Any from datetime import datetime import logging from langchain.tools import tool, InjectedState from service.data_service import VentDataService # 日志配置 logging.basicConfig(level=logging.INFO, format="【%(name)s】%(message)s") logger = logging.getLogger("FormReviewTool") def _extract_mine_name(ventilation_plan: dict) -> str: """Extract the mine name string from a ventilation plan dict.""" return ventilation_plan.get("mine_info", {}).get("mine_name", "") def map_temperature_to_velocity_range(temp_c: float) -> str: """根据进风流温度,返回对应的采面风速范围字符串""" if temp_c < 20: return "1.0" elif 20 <= temp_c < 23: return "1.0 ~ 1.5" elif 23 <= temp_c < 26: return "1.5 ~ 1.8" elif 26 <= temp_c < 28: return "1.8 ~ 2.5" elif 28 <= temp_c <= 30: return "2.5 ~ 3.0" else: return "超出范围" @tool def form_review( state: Annotated[dict, InjectedState] ) -> str: """ 配风计划形式审查综合工具。 检查五项合规性: 1. 四级签字(编制人、通风科长、通风副总、总工程师)是否齐全 2. 编制时间是否早于配风月份(即是否为上月编制) 3. 所有用风地点是否具备风量计算与核验过程 4. 是否列出全部用风地点 5. 多回风井场景下是否按区域分别计算需风量 """ # 从注入状态获取配风数据 ventilation_plan = state.get("vent_plan_data", {}) if not ventilation_plan: return "❌ 错误:未读取到配风计划数据" logger.info("===== 开始执行形式审查 =====") audit_results: list[str] = [] passed_count = 0 failed_count = 0 mine_basic_info = ventilation_plan.get("mine_info", {}) coal_faces = ventilation_plan.get("coal_faces", []) tunneling_faces = ventilation_plan.get("tunneling_faces", []) chambers = ventilation_plan.get("chambers", []) other_points = ventilation_plan.get("other_points", []) # 检查1:四级审核签字 logger.info("执行检查:1/5 审核签字校验") try: required_signatures: list[tuple[str, str]] = [ ("reviewer", "编制人"), ("ventilation_chief", "通风科长"), ("ventilation_deputy", "通风副总"), ("chief_engineer", "总工程师") ] missing_signatures: list[str] = [] for field, name in required_signatures: val = mine_basic_info.get(field, "") if val is None or str(val).strip() == "": missing_signatures.append(name) if missing_signatures: result_line = f"❌ 签字审查不通过:缺失 {','.join(missing_signatures)}" audit_results.append(result_line) failed_count += 1 logger.warning(result_line) else: result_line = "✅ 签字审查通过:四级签字齐全" audit_results.append(result_line) passed_count += 1 logger.info(result_line) except Exception as e: result_line = f"❌ 签字审查异常:{str(e)}" audit_results.append(result_line) failed_count += 1 logger.error(result_line) # 检查2:编制时间是否为上月 logger.info("执行检查:2/5 编制时间合规性校验") try: plan_month = mine_basic_info.get("vent_date", "") preparation_date = mine_basic_info.get("preparation_date", "") logger.info(f"→ 配风月份:{plan_month}") logger.info(f"→ 编制日期:{preparation_date}") if not plan_month or not preparation_date: result_line = "❌ 时间审查不通过:日期字段为空" audit_results.append(result_line) failed_count += 1 logger.warning(result_line) else: py, pm = map(int, plan_month.split("-")) prep_dt = datetime.strptime(preparation_date[:10], "%Y-%m-%d") is_timely = (prep_dt.year < py) or (prep_dt.year == py and prep_dt.month < pm) if is_timely: result_line = "✅ 时间审查通过:编制时间符合上月要求" audit_results.append(result_line) passed_count += 1 logger.info(result_line) else: result_line = "❌ 时间审查不通过:编制时间非上月" audit_results.append(result_line) failed_count += 1 logger.warning(result_line) except Exception as e: result_line = f"❌ 时间审查异常:{str(e)}" audit_results.append(result_line) failed_count += 1 logger.error(result_line) # 检查3:风量计算核验过程 logger.info("执行检查:3/5 风量计算核验过程完整性校验") try: locations_without_calc: list[str] = [] for item in coal_faces: if not item.get("has_calc_check_process"): locations_without_calc.append(f"回采工作面「{item.get('face_name', '')}」") for item in tunneling_faces: if not item.get("has_calc_check_process"): locations_without_calc.append(f"掘进工作面「{item.get('face_name', '')}」") for item in chambers: if not item.get("has_calc_check_process"): locations_without_calc.append(f"硐室「{item.get('name', '')}」") for item in other_points: if not item.get("has_calc_check_process"): locations_without_calc.append(f"其他地点「{item.get('name', '')}」") logger.info(f"→ 缺失计算过程地点数量:{len(locations_without_calc)}") if locations_without_calc: lack_str = ";".join(locations_without_calc) result_line = f"❌ 计算过程不通过:以下 {len(locations_without_calc)} 个地点未提供风量计算与核验过程:{lack_str}" audit_results.append(result_line) failed_count += 1 logger.warning(result_line) else: result_line = "✅ 计算过程通过:全部地点均具备完整风量计算与核验过程" audit_results.append(result_line) passed_count += 1 logger.info(result_line) except Exception as e: result_line = f"❌ 计算过程检查异常:{str(e)}" audit_results.append(result_line) failed_count += 1 logger.error(result_line) # 检查4:是否列出全部用风地点 logger.info("执行检查:4/5 用风地点清单完整性校验") try: lists_all_locations = mine_basic_info.get("list_all_vent_place", False) logger.info(f"→ list_all_vent_place = {lists_all_locations}") if lists_all_locations: result_line = "✅ 用风地点通过:已全部列出" audit_results.append(result_line) passed_count += 1 logger.info(result_line) else: result_line = "❌ 用风地点不通过:未全部列出" audit_results.append(result_line) failed_count += 1 logger.warning(result_line) except Exception as e: result_line = f"❌ 用风地点异常:{str(e)}" audit_results.append(result_line) failed_count += 1 logger.error(result_line) # 检查5:多回风井分区计算 logger.info("执行检查:5/5 多回风井分区计算校验") try: return_well_count = mine_basic_info.get("return_well_count", 1) has_zone_calculation = mine_basic_info.get("return_well_zone_calc", False) logger.info(f"→ 回风井数量:{return_well_count}") logger.info(f"→ 是否分区计算:{has_zone_calculation}") if return_well_count >= 2: if has_zone_calculation: result_line = "✅ 回风井分区通过:已按区域计算" audit_results.append(result_line) passed_count += 1 logger.info(result_line) else: result_line = "❌ 回风井分区不通过:多回风井未分区计算" audit_results.append(result_line) failed_count += 1 logger.warning(result_line) else: result_line = "✅ 回风井分区无需校验:单回风井" audit_results.append(result_line) passed_count += 1 logger.info(result_line) except Exception as e: result_line = f"❌ 回风井分区异常:{str(e)}" audit_results.append(result_line) failed_count += 1 logger.error(result_line) # 汇总输出 final_status = "✅ 形式审查全部通过" if failed_count == 0 else "❌ 形式审查存在不合格项" output = "".join(audit_results) logger.info(f"===== 形式审查完成 =====") logger.info(f"通过:{passed_count} 项 | 不通过:{failed_count} 项 | 结果:{final_status}") return f""" 【形式审查汇总结果】 {output} -------------------------------- 最终结论:{final_status} 通过项:{passed_count} 项 不通过项:{failed_count} """ @tool def check_current_month_plan( state: Annotated[dict, InjectedState] ) -> str: """ 验证配风计划月份是否为当月最新版本。 规则:将配风计划中的 vent_date 字段与当前系统年月比对, 一致则判定为最新版,否则提示版本过期。 """ ventilation_plan = state.get("vent_plan_data", {}) mine_basic_info = ventilation_plan.get("mine_info", {}) plan_month = mine_basic_info.get("vent_date", "") logger.info("开始检查配风计划是否为当月最新版本 (verify_month_version)") current_system_month = datetime.now().strftime("%Y-%m") logger.info(f"当前系统月份:{current_system_month}") logger.info(f"配风计划月份:{plan_month}") if not plan_month: logger.error("检查失败:plan_month 字段为空,无法校验版本") return "检查失败:配风计划月份不能为空" if len(plan_month) != 7 or plan_month[4] != "-": logger.error(f"检查失败:日期格式错误,必须为 YYYY-MM,当前值:{plan_month}") return f"检查失败:日期格式错误,应为 YYYY-MM,实际为 {plan_month}" if plan_month == current_system_month: logger.info("检查通过:当前配风计划是当月最新版本") return f"检查通过:是当月最新版(当前月份:{current_system_month},计划月份:{plan_month})" else: logger.warning("检查不通过:当前配风计划不是当月最新版本") return f"检查不通过:非当月最新版(当前月份:{current_system_month},计划月份:{plan_month})" @tool def check_data_by_gas_report( state: Annotated[dict, InjectedState] ) -> str: """ 校验配风计划中瓦斯与二氧化碳数据是否与瓦斯等级鉴定报告一致。 自动拉取瓦斯鉴定报告,抽取配风计划中的采煤工作面、掘进工作面、 其他用风地点数据,构造结构化提示交由大模型逐条比对。 """ ventilation_plan = state.get("vent_plan_data", {}) logger.info("===== 开始执行瓦斯、二氧化碳数据一致性校验 =====") data_service = VentDataService() try: mine_name = _extract_mine_name(ventilation_plan) if not mine_name: raise ValueError("矿井名称为空") except Exception as e: err = "无法从配风计划中获取矿井名称" logger.error(f"{err}:{str(e)}") return f"校验失败:{err}" logger.info("正在调用接口获取瓦斯等级鉴定报告...") gas_identification_report = data_service.get_gas_identify(mine_name) # 构造校验提示词 prompt = f""" 你是专业的煤矿通风瓦斯、二氧化碳数据校验专家,请严格执行【配风计划基础数据一致性审查】。 请将以下配风计划中的瓦斯、二氧化碳数据进行逐项比对,输出结构化校验结果。 ============================ 【第一份数据:配风计划】 "采煤工作面":{ventilation_plan.get("coal_faces", [])} "掘进工作面":{ventilation_plan.get("tunneling_faces", [])} "其他用风地点": {ventilation_plan.get("other_points", [])} 【第二份数据:瓦斯等级鉴定报告】 "检测地点":{gas_identification_report.get("result", [])} ============================ 【校验规则 1 —— 瓦斯数据一致性】 1. 平均绝对瓦斯涌出量 qcg 必须与 瓦斯等级鉴定报告avg_abs_gas 一致 2. 平均绝对二氧化碳涌出量 qcc 必须与 瓦斯等级鉴定报告 avg_abs_co2 一致 3. 瓦斯/二氧化碳不均匀系数kcg/kcc 必须与 瓦斯等级鉴定报告gas_uneven_coeff/co2_uneven_coeff 一致 若地点无鉴定报告,提示:未找到XXX地点的瓦斯/二氧化碳鉴定报告,请重新上传。 【输出要求】 1. 按分类逐条输出;2. 标注 通过/不一致/缺失;3. 语言专业简洁。 请开始校验: """ logger.info("已生成数据校验提示词,交付大模型进行比对") return f""" ==== 配风计划基础数据一致性检查 ==== 已完成数据获取: - 矿井名称:{mine_name} - 配风计划:已加载 - 瓦斯鉴定报告:{'已获取' if gas_identification_report else '获取失败/为空'} 【请大模型基于以下规则与数据执行比对校验】 {prompt} """ @tool def check_data_by_face_design( state: Annotated[dict, InjectedState] ) -> str: """ 校验配风计划工作面参数是否与作业规程一致。 拉取工作面作业规程,比对工作面长度、平均采高、有效断面积等参数, 构造结构化提示交由大模型逐项校验。 """ ventilation_plan = state.get("vent_plan_data", {}) logger.info("===== 开始执行工作面设计规程一致性校验 =====") data_service = VentDataService() try: mine_name = _extract_mine_name(ventilation_plan) if not mine_name: raise ValueError("矿井名称为空") except Exception as e: err = "无法从配风计划中获取矿井名称" logger.error(f"{err}:{str(e)}") return f"校验失败:{err}" logger.info("正在调用接口获取工作面作业规程...") face_design_documents = data_service.get_coal_working_rule(mine_name) prompt = f""" 你是专业的煤矿通风数据校验专家,请执行【配风计划基础数据一致性审查】。 ============================ 【第一份数据:配风计划】 {ventilation_plan.get("coal_faces", [])} 【第二份数据:工作面作业规程】 {face_design_documents} ============================ 【校验规则】 1. 工作面长度 face_length 与作业规程 faceLength 一致 2. 平均采高 average_mining_height 与作业规程 avgMiningHeight 一致 3. 平均有效断面积 scf 与作业 avg_section_area 一致 缺失文档则提示:缺少xxx工作面作业规程。 【输出要求】 逐条校验、标注结果、最后给出总体合格/不合格。 请开始校验: """ logger.info("已生成数据校验提示词,交付大模型进行比对") return f""" ==== 配风计划基础数据一致性检查 ==== 已完成数据获取: - 矿井名称:{mine_name} - 配风计划:已加载 - 工作面作业规程:{'已获取' if len(face_design_documents) > 0 else '获取失败/为空'} 【请大模型基于以下规则与数据执行比对校验】 {prompt} """ @tool def check_data_by_vent_report( state: Annotated[dict, InjectedState] ) -> str: """ 核验配风计划工作面风速是否与测风报表温度数据匹配。 拉取当月测风报表,提取各测点的温度数据,按温度区间映射标准风速范围, 构造结构化提示交由大模型比对工作面风速是否处于合理区间。 """ ventilation_plan = state.get("vent_plan_data", {}) logger.info("===== 开始执行测风报表温度一致性校验 =====") data_service = VentDataService() try: mine_basic_info = ventilation_plan.get("mine_info", {}) mine_name = mine_basic_info.get("mine_name", "") plan_month = mine_basic_info.get("vent_date", "") if not mine_name: raise ValueError("矿井名称为空") except Exception as e: err = "无法从配风计划基础信息获取数据" logger.error(f"{err}:{str(e)}") return f"校验失败:{err}" logger.info("正在调用接口获取测风报表...") ventilation_report = data_service.get_vent_report_data(mine_name, plan_month) if not ventilation_report: return f"未获取到{mine_name}的{plan_month}的测风报表" vent_report_result = ventilation_report.get("result", {}) if not vent_report_result: return f"未获取到{mine_name}的{plan_month}的测风报表" measurement_points = vent_report_result.get("testPointList", []) velocity_records: list[dict[str, Any]] = [] for point in measurement_points: velocity_record: dict[str, Any] = { "point_name": point.get("pointName", ""), "point_desc": point.get("pointDesc", ""), "temp_c": point.get("tempC", 0.0), "coal_v": map_temperature_to_velocity_range(point.get("tempC", 0.0)) } velocity_records.append(velocity_record) prompt = f""" 你是专业煤矿通风校验专家,根据测风报表温度校验工作面风速合理性。 ============================ 【第一份数据:配风计划】 {ventilation_plan.get("coal_faces", [])} 【第二份数据:测风报表温度数据】 {velocity_records} ============================ 【校验规则】 1. 工作面风速 vcf 匹配对应温度区间标准风速范围 2. 比对 average_temperature 与报表 temp_c 缺失数据提示对应地点缺失报表。 【输出要求】 逐条标注:通过/不合理/缺失,清晰说明偏差。 请开始校验: """ logger.info("已生成数据校验提示词,交付大模型进行比对") return f""" ==== 配风计划基础数据一致性检查 ==== 已完成数据获取: - 矿井名称:{mine_name} - 配风计划:已加载 - 测风报表当月温度数据:{'已获取' if velocity_records else '获取失败'} 【请大模型基于以下规则与数据执行比对校验】 {prompt} """ # ==================== 本地调试代码(仅本地运行,线上Agent不执行) ==================== if __name__ == '__main__': with open(r"D:\workspace\py\vent_agent\check_agent\配风计划(新).json", encoding="utf-8") as f: data = json.load(f) # 模拟 InjectedState 注入的状态对象 mock_state = {"vent_plan_data": data} # 调试示例 res = form_review(state=mock_state) print(res)