from typing import Any, Dict, Optional import requests from config.field_mappings import TOP_LEVEL_MAP, TYPE_MAP from config.settings import BASE_URL, COAL_WORKING_API, GAS_IDENTIFY_API, VENT_REPORT_API from service.auth import AuthService class VentDataService: """外部数据获取服务:瓦斯等级鉴定报告、工作面作业规程""" def __init__(self) -> None: self.base_url = BASE_URL # ── private HTTP helpers ────────────────────────────────────────── def _post_json(self, url: str, payload: dict) -> dict: """发送 JSON POST 请求并返回解析后的响应 dict(不处理业务字段)。""" headers: Dict[str, str] = { "X-Access-Token": AuthService.get_token(), "Content-Type": "application/json", } resp = requests.post(url, json=payload, headers=headers, timeout=15) resp.raise_for_status() return resp.json() def _get_json(self, url: str, params: dict | None = None) -> dict: """发送 GET 请求并返回解析后的响应 dict(不处理业务字段)。""" resp = requests.get(url, params=params or {}) return resp.json() if resp.status_code == 200 else {} # ── data fetching ───────────────────────────────────────────────── def get_gas_identify(self, mine_name: str) -> dict: """调用接口获取瓦斯等级鉴定数据""" try: return self._get_json(GAS_IDENTIFY_API, {"mineName": mine_name}) except Exception as e: print(f"[data_service] get_gas_identify 异常: {e}") return {} def get_coal_working_rule(self, mine_name: str) -> list: """调用接口获取工作面作业规程数据""" try: return self._get_json(COAL_WORKING_API, {"mineName": mine_name}) except Exception as e: print(f"[data_service] get_coal_working_rule 异常: {e}") return [] def get_vent_report_data(self, mine_name: str, date_month: str) -> dict: """调用接口获取工作面作业规程数据""" try: return self._get_json( VENT_REPORT_API, {"mineName": mine_name, "dateMonth": date_month}, ) except Exception as e: print(f"[data_service] get_vent_report_data 异常: {e}") return {} def _get_default_model_id(self) -> Optional[str]: """获取默认的模型版本 ID""" url = f"{self.base_url}/Vmodel/modelParamPub/list" headers = {"X-Access-Token": AuthService.get_token()} try: resp = requests.get(url, headers=headers, timeout=10) resp.raise_for_status() data = resp.json() records = data.get("result", {}).get("param", {}).get("records", []) if records: default_id = records[0].get("defaultmodelid") return str(default_id) if default_id is not None else None return None except Exception as e: print(f"⚠️ 获取默认模型ID失败: {e},将不过滤数据") return None def fetch_all_data(self) -> Dict[str, Any]: """获取全量数据,并根据默认模型ID过滤,返回原始 JSON 的 'obj' 部分""" url = f"{self.base_url}/ventanaly-jingtaifengliang/xufengController/getNeedqAllData" data = self._post_json(url, {}) if not data.get("success"): raise Exception(f"数据接口错误: {data.get('msg')}") obj = data["obj"] default_model_id = self._get_default_model_id() if default_model_id is not None: for list_key in ( "retWorkFaceList", "headingFaceList", "roomList", "otherList", "ventilationSystemList", ): if list_key in obj and isinstance(obj[list_key], list): obj[list_key] = [ item for item in obj[list_key] if str(item.get("nmodelID") or "") == default_model_id ] return obj def get_summary(self) -> Dict[str, float]: obj = self.fetch_all_data() ret_work_faces: list = obj.get("retWorkFaceList") or [] heading_faces: list = obj.get("headingFaceList") or [] rooms: list = obj.get("roomList") or [] others: list = obj.get("otherList") or [] vent_systems: list = obj.get("ventilationSystemList") or [] return { "q_cf": sum(x.get("fqneed", 0) for x in ret_work_faces), "q_hf": sum(x.get("fqneed", 0) for x in heading_faces), "q_room": sum(x.get("fqneed", 0) for x in rooms), "q_other": sum(x.get("fqneed", 0) for x in others), "q_system_total": sum(x.get("qra", 0) for x in vent_systems), } def get_translated_data(self) -> Dict[str, Any]: """返回完整翻译后的数据,所有 key 均为中文""" raw_data = self.fetch_all_data() return self._translate_obj(raw_data) # ── translation helpers ─────────────────────────────────────────── # 列表键 → TYPE_MAP 键的查找表 _LIST_KEY_TO_TYPE_MAP: Dict[str, str] = { "retWorkFaceList": "采煤工作面", "headingFaceList": "掘进工作面", "roomList": "硐室", "otherList": "其他用风地点", "ventilationSystemList": "回风系统", } def _translate_obj(self, obj: Dict[str, Any]) -> Dict[str, Any]: """将原始 obj 中列表字段的 key 翻译为中文。""" result: Dict[str, Any] = {} for key, value in obj.items(): cn_key = TOP_LEVEL_MAP.get(key, key) if isinstance(value, list): list_type_name = self._LIST_KEY_TO_TYPE_MAP.get(key, "") field_map: Dict[str, str] = TYPE_MAP.get(list_type_name, {}) translated_list = [] for item in value: translated_item: Dict[str, Any] = {} for fk, fv in item.items(): cn_fk = field_map.get(fk, fk) translated_item[cn_fk] = fv translated_list.append(translated_item) result[cn_key] = translated_list else: result[cn_key] = value return result # ── ventilation calculators ─────────────────────────────────────── def _post_calc(self, url_path: str, params: dict) -> str: """通用计算请求:POST → 校验 success → 返回 obj 的字符串形式。""" url = f"{self.base_url}/{url_path}" try: data = self._post_json(url, params) except Exception as e: return f"❌ 计算接口网络请求失败:{e}" if not data.get("success"): return f"❌ 计算接口调用失败:{data.get('msg') or data.get('message')}" return str(data["obj"]) def calc_heading_face(self, params: dict) -> str: return self._post_calc( "ventanaly-jingtaifengliang/xufengController/simulateNeedqHeadingFace", params, ) def calc_room(self, params: dict) -> str: return self._post_calc( "ventanaly-jingtaifengliang/xufengController/getNeedqRoom", params, ) def calculate_working_face_ventilation(self, params: dict) -> str: """计算采煤/备用工作面需风量。原名 clac_ret_work_face。""" return self._post_calc( "ventanaly-jingtaifengliang/xufengController/simulateNeedqRetWorkFace", params, ) def calculate_other_point_ventilation(self, params: dict) -> str: """计算其他用风地点需风量。原名 clac_other。""" return self._post_calc( "ventanaly-jingtaifengliang/xufengController/simulateNeedqOther", params, ) def calculate_system_ventilation(self, params: dict) -> str: """计算回风系统总需风量。原名 clac_system。""" kaq = params.get("kaq") if not kaq: return "矿井通风需风系数不能为空" qcf = params.get("qcf", 0) qsc = params.get("qsc", 0) qhf = params.get("qhf", 0) qur = params.get("qur", 0) qrl = params.get("qrl", 0) qra = (qcf + qsc + qhf + qur + qrl) * kaq result = params.copy() result["qra"] = qra return str(result) # ── backward-compatible aliases for external callers ────────────── def clac_ret_work_face(self, params: dict) -> str: """[deprecated] 请使用 calculate_working_face_ventilation。""" return self.calculate_working_face_ventilation(params) def clac_other(self, params: dict) -> str: """[deprecated] 请使用 calculate_other_point_ventilation。""" return self.calculate_other_point_ventilation(params) def clac_system(self, params: dict) -> str: """[deprecated] 请使用 calculate_system_ventilation。""" return self.calculate_system_ventilation(params)