| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480 |
- import json
- from typing import Annotated, Any
- from datetime import datetime
- import logging
- from langchain.tools import tool, InjectedState
- from service.data_service import VentDataService
- # 日志配置
- logging.basicConfig(level=logging.INFO, format="【%(name)s】%(message)s")
- logger = logging.getLogger("FormReviewTool")
- def _extract_mine_name(ventilation_plan: dict) -> str:
- """Extract the mine name string from a ventilation plan dict."""
- return ventilation_plan.get("mine_info", {}).get("mine_name", "")
- def map_temperature_to_velocity_range(temp_c: float) -> str:
- """根据进风流温度,返回对应的采面风速范围字符串"""
- if temp_c < 20:
- return "1.0"
- elif 20 <= temp_c < 23:
- return "1.0 ~ 1.5"
- elif 23 <= temp_c < 26:
- return "1.5 ~ 1.8"
- elif 26 <= temp_c < 28:
- return "1.8 ~ 2.5"
- elif 28 <= temp_c <= 30:
- return "2.5 ~ 3.0"
- else:
- return "超出范围"
- @tool
- def form_review(
- state: Annotated[dict, InjectedState]
- ) -> str:
- """
- 配风计划形式审查综合工具。
- 检查五项合规性:
- 1. 四级签字(编制人、通风科长、通风副总、总工程师)是否齐全
- 2. 编制时间是否早于配风月份(即是否为上月编制)
- 3. 所有用风地点是否具备风量计算与核验过程
- 4. 是否列出全部用风地点
- 5. 多回风井场景下是否按区域分别计算需风量
- """
- # 从注入状态获取配风数据
- ventilation_plan = state.get("vent_plan_data", {})
- if not ventilation_plan:
- return "❌ 错误:未读取到配风计划数据"
- logger.info("===== 开始执行形式审查 =====")
- audit_results: list[str] = []
- passed_count = 0
- failed_count = 0
- mine_basic_info = ventilation_plan.get("mine_info", {})
- coal_faces = ventilation_plan.get("coal_faces", [])
- tunneling_faces = ventilation_plan.get("tunneling_faces", [])
- chambers = ventilation_plan.get("chambers", [])
- other_points = ventilation_plan.get("other_points", [])
- # 检查1:四级审核签字
- logger.info("执行检查:1/5 审核签字校验")
- try:
- required_signatures: list[tuple[str, str]] = [
- ("reviewer", "编制人"),
- ("ventilation_chief", "通风科长"),
- ("ventilation_deputy", "通风副总"),
- ("chief_engineer", "总工程师")
- ]
- missing_signatures: list[str] = []
- for field, name in required_signatures:
- val = mine_basic_info.get(field, "")
- if val is None or str(val).strip() == "":
- missing_signatures.append(name)
- if missing_signatures:
- result_line = f"❌ 签字审查不通过:缺失 {','.join(missing_signatures)}"
- audit_results.append(result_line)
- failed_count += 1
- logger.warning(result_line)
- else:
- result_line = "✅ 签字审查通过:四级签字齐全"
- audit_results.append(result_line)
- passed_count += 1
- logger.info(result_line)
- except Exception as e:
- result_line = f"❌ 签字审查异常:{str(e)}"
- audit_results.append(result_line)
- failed_count += 1
- logger.error(result_line)
- # 检查2:编制时间是否为上月
- logger.info("执行检查:2/5 编制时间合规性校验")
- try:
- plan_month = mine_basic_info.get("vent_date", "")
- preparation_date = mine_basic_info.get("preparation_date", "")
- logger.info(f"→ 配风月份:{plan_month}")
- logger.info(f"→ 编制日期:{preparation_date}")
- if not plan_month or not preparation_date:
- result_line = "❌ 时间审查不通过:日期字段为空"
- audit_results.append(result_line)
- failed_count += 1
- logger.warning(result_line)
- else:
- py, pm = map(int, plan_month.split("-"))
- prep_dt = datetime.strptime(preparation_date[:10], "%Y-%m-%d")
- is_timely = (prep_dt.year < py) or (prep_dt.year == py and prep_dt.month < pm)
- if is_timely:
- result_line = "✅ 时间审查通过:编制时间符合上月要求"
- audit_results.append(result_line)
- passed_count += 1
- logger.info(result_line)
- else:
- result_line = "❌ 时间审查不通过:编制时间非上月"
- audit_results.append(result_line)
- failed_count += 1
- logger.warning(result_line)
- except Exception as e:
- result_line = f"❌ 时间审查异常:{str(e)}"
- audit_results.append(result_line)
- failed_count += 1
- logger.error(result_line)
- # 检查3:风量计算核验过程
- logger.info("执行检查:3/5 风量计算核验过程完整性校验")
- try:
- locations_without_calc: list[str] = []
- for item in coal_faces:
- if not item.get("has_calc_check_process"):
- locations_without_calc.append(f"回采工作面「{item.get('face_name', '')}」")
- for item in tunneling_faces:
- if not item.get("has_calc_check_process"):
- locations_without_calc.append(f"掘进工作面「{item.get('face_name', '')}」")
- for item in chambers:
- if not item.get("has_calc_check_process"):
- locations_without_calc.append(f"硐室「{item.get('name', '')}」")
- for item in other_points:
- if not item.get("has_calc_check_process"):
- locations_without_calc.append(f"其他地点「{item.get('name', '')}」")
- logger.info(f"→ 缺失计算过程地点数量:{len(locations_without_calc)}")
- if locations_without_calc:
- lack_str = ";".join(locations_without_calc)
- result_line = f"❌ 计算过程不通过:以下 {len(locations_without_calc)} 个地点未提供风量计算与核验过程:{lack_str}"
- audit_results.append(result_line)
- failed_count += 1
- logger.warning(result_line)
- else:
- result_line = "✅ 计算过程通过:全部地点均具备完整风量计算与核验过程"
- audit_results.append(result_line)
- passed_count += 1
- logger.info(result_line)
- except Exception as e:
- result_line = f"❌ 计算过程检查异常:{str(e)}"
- audit_results.append(result_line)
- failed_count += 1
- logger.error(result_line)
- # 检查4:是否列出全部用风地点
- logger.info("执行检查:4/5 用风地点清单完整性校验")
- try:
- lists_all_locations = mine_basic_info.get("list_all_vent_place", False)
- logger.info(f"→ list_all_vent_place = {lists_all_locations}")
- if lists_all_locations:
- result_line = "✅ 用风地点通过:已全部列出"
- audit_results.append(result_line)
- passed_count += 1
- logger.info(result_line)
- else:
- result_line = "❌ 用风地点不通过:未全部列出"
- audit_results.append(result_line)
- failed_count += 1
- logger.warning(result_line)
- except Exception as e:
- result_line = f"❌ 用风地点异常:{str(e)}"
- audit_results.append(result_line)
- failed_count += 1
- logger.error(result_line)
- # 检查5:多回风井分区计算
- logger.info("执行检查:5/5 多回风井分区计算校验")
- try:
- return_well_count = mine_basic_info.get("return_well_count", 1)
- has_zone_calculation = mine_basic_info.get("return_well_zone_calc", False)
- logger.info(f"→ 回风井数量:{return_well_count}")
- logger.info(f"→ 是否分区计算:{has_zone_calculation}")
- if return_well_count >= 2:
- if has_zone_calculation:
- result_line = "✅ 回风井分区通过:已按区域计算"
- audit_results.append(result_line)
- passed_count += 1
- logger.info(result_line)
- else:
- result_line = "❌ 回风井分区不通过:多回风井未分区计算"
- audit_results.append(result_line)
- failed_count += 1
- logger.warning(result_line)
- else:
- result_line = "✅ 回风井分区无需校验:单回风井"
- audit_results.append(result_line)
- passed_count += 1
- logger.info(result_line)
- except Exception as e:
- result_line = f"❌ 回风井分区异常:{str(e)}"
- audit_results.append(result_line)
- failed_count += 1
- logger.error(result_line)
- # 汇总输出
- final_status = "✅ 形式审查全部通过" if failed_count == 0 else "❌ 形式审查存在不合格项"
- output = "".join(audit_results)
- logger.info(f"===== 形式审查完成 =====")
- logger.info(f"通过:{passed_count} 项 | 不通过:{failed_count} 项 | 结果:{final_status}")
- return f"""
- 【形式审查汇总结果】
- {output}
- --------------------------------
- 最终结论:{final_status}
- 通过项:{passed_count} 项
- 不通过项:{failed_count}
- """
- @tool
- def check_current_month_plan(
- state: Annotated[dict, InjectedState]
- ) -> str:
- """
- 验证配风计划月份是否为当月最新版本。
- 规则:将配风计划中的 vent_date 字段与当前系统年月比对,
- 一致则判定为最新版,否则提示版本过期。
- """
- ventilation_plan = state.get("vent_plan_data", {})
- mine_basic_info = ventilation_plan.get("mine_info", {})
- plan_month = mine_basic_info.get("vent_date", "")
- logger.info("开始检查配风计划是否为当月最新版本 (verify_month_version)")
- current_system_month = datetime.now().strftime("%Y-%m")
- logger.info(f"当前系统月份:{current_system_month}")
- logger.info(f"配风计划月份:{plan_month}")
- if not plan_month:
- logger.error("检查失败:plan_month 字段为空,无法校验版本")
- return "检查失败:配风计划月份不能为空"
- if len(plan_month) != 7 or plan_month[4] != "-":
- logger.error(f"检查失败:日期格式错误,必须为 YYYY-MM,当前值:{plan_month}")
- return f"检查失败:日期格式错误,应为 YYYY-MM,实际为 {plan_month}"
- if plan_month == current_system_month:
- logger.info("检查通过:当前配风计划是当月最新版本")
- return f"检查通过:是当月最新版(当前月份:{current_system_month},计划月份:{plan_month})"
- else:
- logger.warning("检查不通过:当前配风计划不是当月最新版本")
- return f"检查不通过:非当月最新版(当前月份:{current_system_month},计划月份:{plan_month})"
- @tool
- def check_data_by_gas_report(
- state: Annotated[dict, InjectedState]
- ) -> str:
- """
- 校验配风计划中瓦斯与二氧化碳数据是否与瓦斯等级鉴定报告一致。
- 自动拉取瓦斯鉴定报告,抽取配风计划中的采煤工作面、掘进工作面、
- 其他用风地点数据,构造结构化提示交由大模型逐条比对。
- """
- ventilation_plan = state.get("vent_plan_data", {})
- logger.info("===== 开始执行瓦斯、二氧化碳数据一致性校验 =====")
- data_service = VentDataService()
- try:
- mine_name = _extract_mine_name(ventilation_plan)
- if not mine_name:
- raise ValueError("矿井名称为空")
- except Exception as e:
- err = "无法从配风计划中获取矿井名称"
- logger.error(f"{err}:{str(e)}")
- return f"校验失败:{err}"
- logger.info("正在调用接口获取瓦斯等级鉴定报告...")
- gas_identification_report = data_service.get_gas_identify(mine_name)
- # 构造校验提示词
- prompt = f"""
- 你是专业的煤矿通风瓦斯、二氧化碳数据校验专家,请严格执行【配风计划基础数据一致性审查】。
- 请将以下配风计划中的瓦斯、二氧化碳数据进行逐项比对,输出结构化校验结果。
- ============================
- 【第一份数据:配风计划】
- "采煤工作面":{ventilation_plan.get("coal_faces", [])}
- "掘进工作面":{ventilation_plan.get("tunneling_faces", [])}
- "其他用风地点": {ventilation_plan.get("other_points", [])}
- 【第二份数据:瓦斯等级鉴定报告】
- "检测地点":{gas_identification_report.get("result", [])}
- ============================
- 【校验规则 1 —— 瓦斯数据一致性】
- 1. 平均绝对瓦斯涌出量 qcg 必须与 瓦斯等级鉴定报告avg_abs_gas 一致
- 2. 平均绝对二氧化碳涌出量 qcc 必须与 瓦斯等级鉴定报告 avg_abs_co2 一致
- 3. 瓦斯/二氧化碳不均匀系数kcg/kcc 必须与 瓦斯等级鉴定报告gas_uneven_coeff/co2_uneven_coeff 一致
- 若地点无鉴定报告,提示:未找到XXX地点的瓦斯/二氧化碳鉴定报告,请重新上传。
- 【输出要求】
- 1. 按分类逐条输出;2. 标注 通过/不一致/缺失;3. 语言专业简洁。
- 请开始校验:
- """
- logger.info("已生成数据校验提示词,交付大模型进行比对")
- return f"""
- ==== 配风计划基础数据一致性检查 ====
- 已完成数据获取:
- - 矿井名称:{mine_name}
- - 配风计划:已加载
- - 瓦斯鉴定报告:{'已获取' if gas_identification_report else '获取失败/为空'}
- 【请大模型基于以下规则与数据执行比对校验】
- {prompt}
- """
- @tool
- def check_data_by_face_design(
- state: Annotated[dict, InjectedState]
- ) -> str:
- """
- 校验配风计划工作面参数是否与作业规程一致。
- 拉取工作面作业规程,比对工作面长度、平均采高、有效断面积等参数,
- 构造结构化提示交由大模型逐项校验。
- """
- ventilation_plan = state.get("vent_plan_data", {})
- logger.info("===== 开始执行工作面设计规程一致性校验 =====")
- data_service = VentDataService()
- try:
- mine_name = _extract_mine_name(ventilation_plan)
- if not mine_name:
- raise ValueError("矿井名称为空")
- except Exception as e:
- err = "无法从配风计划中获取矿井名称"
- logger.error(f"{err}:{str(e)}")
- return f"校验失败:{err}"
- logger.info("正在调用接口获取工作面作业规程...")
- face_design_documents = data_service.get_coal_working_rule(mine_name)
- prompt = f"""
- 你是专业的煤矿通风数据校验专家,请执行【配风计划基础数据一致性审查】。
- ============================
- 【第一份数据:配风计划】
- {ventilation_plan.get("coal_faces", [])}
- 【第二份数据:工作面作业规程】
- {face_design_documents}
- ============================
- 【校验规则】
- 1. 工作面长度 face_length 与作业规程 faceLength 一致
- 2. 平均采高 average_mining_height 与作业规程 avgMiningHeight 一致
- 3. 平均有效断面积 scf 与作业 avg_section_area 一致
- 缺失文档则提示:缺少xxx工作面作业规程。
- 【输出要求】
- 逐条校验、标注结果、最后给出总体合格/不合格。
- 请开始校验:
- """
- logger.info("已生成数据校验提示词,交付大模型进行比对")
- return f"""
- ==== 配风计划基础数据一致性检查 ====
- 已完成数据获取:
- - 矿井名称:{mine_name}
- - 配风计划:已加载
- - 工作面作业规程:{'已获取' if len(face_design_documents) > 0 else '获取失败/为空'}
- 【请大模型基于以下规则与数据执行比对校验】
- {prompt}
- """
- @tool
- def check_data_by_vent_report(
- state: Annotated[dict, InjectedState]
- ) -> str:
- """
- 核验配风计划工作面风速是否与测风报表温度数据匹配。
- 拉取当月测风报表,提取各测点的温度数据,按温度区间映射标准风速范围,
- 构造结构化提示交由大模型比对工作面风速是否处于合理区间。
- """
- ventilation_plan = state.get("vent_plan_data", {})
- logger.info("===== 开始执行测风报表温度一致性校验 =====")
- data_service = VentDataService()
- try:
- mine_basic_info = ventilation_plan.get("mine_info", {})
- mine_name = mine_basic_info.get("mine_name", "")
- plan_month = mine_basic_info.get("vent_date", "")
- if not mine_name:
- raise ValueError("矿井名称为空")
- except Exception as e:
- err = "无法从配风计划基础信息获取数据"
- logger.error(f"{err}:{str(e)}")
- return f"校验失败:{err}"
- logger.info("正在调用接口获取测风报表...")
- ventilation_report = data_service.get_vent_report_data(mine_name, plan_month)
- if not ventilation_report:
- return f"未获取到{mine_name}的{plan_month}的测风报表"
- vent_report_result = ventilation_report.get("result", {})
- if not vent_report_result:
- return f"未获取到{mine_name}的{plan_month}的测风报表"
- measurement_points = vent_report_result.get("testPointList", [])
- velocity_records: list[dict[str, Any]] = []
- for point in measurement_points:
- velocity_record: dict[str, Any] = {
- "point_name": point.get("pointName", ""),
- "point_desc": point.get("pointDesc", ""),
- "temp_c": point.get("tempC", 0.0),
- "coal_v": map_temperature_to_velocity_range(point.get("tempC", 0.0))
- }
- velocity_records.append(velocity_record)
- prompt = f"""
- 你是专业煤矿通风校验专家,根据测风报表温度校验工作面风速合理性。
- ============================
- 【第一份数据:配风计划】
- {ventilation_plan.get("coal_faces", [])}
- 【第二份数据:测风报表温度数据】
- {velocity_records}
- ============================
- 【校验规则】
- 1. 工作面风速 vcf 匹配对应温度区间标准风速范围
- 2. 比对 average_temperature 与报表 temp_c
- 缺失数据提示对应地点缺失报表。
- 【输出要求】
- 逐条标注:通过/不合理/缺失,清晰说明偏差。
- 请开始校验:
- """
- logger.info("已生成数据校验提示词,交付大模型进行比对")
- return f"""
- ==== 配风计划基础数据一致性检查 ====
- 已完成数据获取:
- - 矿井名称:{mine_name}
- - 配风计划:已加载
- - 测风报表当月温度数据:{'已获取' if velocity_records else '获取失败'}
- 【请大模型基于以下规则与数据执行比对校验】
- {prompt}
- """
- # ==================== 本地调试代码(仅本地运行,线上Agent不执行) ====================
- if __name__ == '__main__':
- with open(r"D:\workspace\py\vent_agent\check_agent\配风计划(新).json", encoding="utf-8") as f:
- data = json.load(f)
- # 模拟 InjectedState 注入的状态对象
- mock_state = {"vent_plan_data": data}
- # 调试示例
- res = form_review(state=mock_state)
- print(res)
|