from config.field_mappings import ( HEADING_FACE_FIELDS, OTHER_FIELDS, ROOM_FIELDS, SYSTEM_FIELDS, WORK_FACE_FIELDS, ) from service.data_service import VentDataService _TYPE_TO_TRANSLATED_KEY = { "采煤工作面": "回采工作面及备用工作面", "备用工作面": "回采工作面及备用工作面", "掘进工作面": "掘进工作面", "硐室": "硐室", "其他用风地点": "其他用风地点", "回风系统": "回风系统", } _RECALCULATE_DISPATCH = { "掘进工作面": lambda svc, data: svc.calc_heading_face(data), "硐室": lambda svc, data: svc.calc_room(data), "采煤工作面": lambda svc, data: svc.clac_ret_work_face(data), "备用工作面": lambda svc, data: svc.clac_ret_work_face(data), "其他用风地点": lambda svc, data: svc.clac_other(data), "回风系统": lambda svc, data: svc.clac_system(data), } _SIMULATE_DISPATCH = { "掘进工作面": lambda svc, params: svc.calc_heading_face(params), "硐室": lambda svc, params: svc.calc_room(params), "采煤工作面": lambda svc, params: svc.clac_ret_work_face({"face": params}), "备用工作面": lambda svc, params: svc.clac_ret_work_face({"refFace": params}), "其他用风地点": lambda svc, params: svc.clac_other(params), "回风系统": lambda svc, params: svc.clac_system(params), } _FIELD_TRANSLATE_DISPATCH = { "掘进工作面": HEADING_FACE_FIELDS, "采煤工作面": WORK_FACE_FIELDS, "备用工作面": WORK_FACE_FIELDS, "硐室": ROOM_FIELDS, "其他用风地点": OTHER_FIELDS, "回风系统": SYSTEM_FIELDS, } _TYPE_MAPPING = { "retWorkFaceList": "采煤工作面", "headingFaceList": "掘进工作面", "roomList": "硐室", "otherList": "其他用风地点", "ventilationSystemList": "回风系统", } _PLACE_TYPE_MAPPING = { "retWorkFaceList": "采煤工作面", "headingFaceList": "掘进工作面", "roomList": "硐室", "otherList": "其他用风地点", "ventilationSystemList": "回风系统", } def fetch_ventilation_data( query_place: str, query_type: str, ) -> str: """ 查询矿井需风量,根据用户询问的地点返回详细需风量或全矿汇总。 参数: - query_place (string): 地点名称,如“测试采煤面A”。全矿汇总时传空字符串 ''。 - query_type (string): 地点类型,必须从以下选项中选择一个: '采煤工作面', '备用工作面', '掘进工作面', '硐室', '其他用风地点', '回风系统'。 全矿汇总时传空字符串 ''。 """ if query_place in ("", "None", "null", None): query_place = "" if query_type in ("", "None", "null", None): query_type = "" data_service = VentDataService() try: translated_data = data_service.get_translated_data() if not translated_data: return "❌ 未获取到任何数据。" except Exception as e: return f"❌ 获取数据失败:{str(e)}" if not query_place and not query_type: return _build_full_summary(translated_data) if query_place and not query_type: return "请提供地点类型(如“采煤工作面”),以便我为您查找。当前支持的类型:采煤工作面、备用工作面、掘进工作面、硐室、其他用风地点、回风系统。" list_key = _TYPE_TO_TRANSLATED_KEY.get(query_type) if not list_key: return f"❌ 不支持的类型「{query_type}」" items = translated_data.get(list_key, []) valid_items = [it for it in items if it.get("名称") and str(it["名称"]).strip() not in ("", "None", "null")] return str(valid_items) def _build_full_summary(translated_data: dict) -> str: data_service = VentDataService() try: summary = data_service.get_summary() except Exception as e: return f"❌ 汇总失败:{e}" lines = [ "===== 全矿井需风量汇总 =====", f"• 采煤/备用工作面总需风量:{summary['q_cf']} m³/min", f"• 掘进工作面总需风量:{summary['q_hf']} m³/min", f"• 硐室总需风量:{summary['q_room']} m³/min", f"• 其他用风地点总需风量:{summary['q_other']} m³/min", f"• 回风系统汇总需风量:{summary['q_system_total']} m³/min", "--- 各回风系统明细 ---", ] for sys in translated_data.get("回风系统", []): name = sys.get("名称", "未知") qra = sys.get("系统总需风量(m³/min)", "—") kaq = sys.get("矿井通风系数", "—") lines.append(f" {name}:{qra} m³/min(系数 {kaq})") return "".join(lines) def recompute_ventilation( place_name: str, parameters: dict, ) -> str: """ 根据修改后的参数重新计算需风量。 参数: - place_name: 地点名称 - parameters: 计算条件参数参数。必须按照条件参数字典将用户自然语言描述转为英文键(注意用户描述不需要和字典中文释义严格对应,语义一致即可),例如瓦斯不均匀系数为1.2->{"khg":"1.2"} 仅需传入要变更的字段,未传入的字段将保持原值。 """ data_service = VentDataService() try: raw_data = data_service.fetch_all_data() except Exception as e: return f"❌ 获取数据失败:{e}" target = None place_type: str | None = None for list_key, place_type_candidate in _PLACE_TYPE_MAPPING.items(): items = raw_data.get(list_key) or [] for item in items: if item.get("strName") == place_name: target = item place_type = place_type_candidate break if target: break if not target: return f"未找到地点「{place_name}」" updated = target.copy() updated.update(parameters) try: calc_fn = _RECALCULATE_DISPATCH.get(place_type) if calc_fn is not None: return calc_fn(data_service, updated) return f"暂不支持 {place_type} 的动态重算" except Exception as e: return f"❌ 计算失败:{e}" def simulate_ventilation( place_type: str, parameters: dict, ) -> str: """ 根据用户提供的用风地点类型以及计算条件参数调用外部计算接口模拟地点的用风量。 参数: - place_type: 用风地点类型,必须从以下选项中选择一个:'采煤工作面', '备用工作面', '掘进工作面', '硐室', '其他用风地点', '回风系统'。 - parameters: 计算条件参数参数。需要按照条件参数字典将用户自然语言描述转为英文键(注意用户描述不需要和字典中文释义严格对应,语义一致即可),例如瓦斯不均匀系数为1.2->{"khg":"1.2"} """ data_service = VentDataService() try: calc_fn = _SIMULATE_DISPATCH.get(place_type) if calc_fn is not None: return calc_fn(data_service, parameters) return f"暂不支持 {place_type} 的动态重算" except Exception as e: return f"❌ 计算失败:{e}" def fetch_field_translation(place_type: str) -> str: """ 根据用风地点类型查询计算条件参数的字典,字典中有标准的英文key和该key的中文自然语言释义 参数 - place_type: 用风地点类型,必须从以下选项中选择一个:'采煤工作面', '备用工作面', '掘进工作面', '硐室', '其他用风地点', '回风系统'。 返回 该类型的计算参数数据字典 """ result = _FIELD_TRANSLATE_DISPATCH.get(place_type) if result is not None: return result return "未找到" + place_type + "相关字段定义"