data_service.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. from typing import Any, Dict, Optional
  2. import requests
  3. from config.field_mappings import TOP_LEVEL_MAP, TYPE_MAP
  4. from config.settings import BASE_URL, COAL_WORKING_API, GAS_IDENTIFY_API, VENT_REPORT_API
  5. from service.auth import AuthService
  6. class VentDataService:
  7. """外部数据获取服务:瓦斯等级鉴定报告、工作面作业规程"""
  8. def __init__(self) -> None:
  9. self.base_url = BASE_URL
  10. # ── private HTTP helpers ──────────────────────────────────────────
  11. def _post_json(self, url: str, payload: dict) -> dict:
  12. """发送 JSON POST 请求并返回解析后的响应 dict(不处理业务字段)。"""
  13. headers: Dict[str, str] = {
  14. "X-Access-Token": AuthService.get_token(),
  15. "Content-Type": "application/json",
  16. }
  17. resp = requests.post(url, json=payload, headers=headers, timeout=15)
  18. resp.raise_for_status()
  19. return resp.json()
  20. def _get_json(self, url: str, params: dict | None = None) -> dict:
  21. """发送 GET 请求并返回解析后的响应 dict(不处理业务字段)。"""
  22. resp = requests.get(url, params=params or {})
  23. return resp.json() if resp.status_code == 200 else {}
  24. # ── data fetching ─────────────────────────────────────────────────
  25. def get_gas_identify(self, mine_name: str) -> dict:
  26. """调用接口获取瓦斯等级鉴定数据"""
  27. try:
  28. return self._get_json(GAS_IDENTIFY_API, {"mineName": mine_name})
  29. except Exception as e:
  30. print(f"[data_service] get_gas_identify 异常: {e}")
  31. return {}
  32. def get_coal_working_rule(self, mine_name: str) -> list:
  33. """调用接口获取工作面作业规程数据"""
  34. try:
  35. return self._get_json(COAL_WORKING_API, {"mineName": mine_name})
  36. except Exception as e:
  37. print(f"[data_service] get_coal_working_rule 异常: {e}")
  38. return []
  39. def get_vent_report_data(self, mine_name: str, date_month: str) -> dict:
  40. """调用接口获取工作面作业规程数据"""
  41. try:
  42. return self._get_json(
  43. VENT_REPORT_API,
  44. {"mineName": mine_name, "dateMonth": date_month},
  45. )
  46. except Exception as e:
  47. print(f"[data_service] get_vent_report_data 异常: {e}")
  48. return {}
  49. def _get_default_model_id(self) -> Optional[str]:
  50. """获取默认的模型版本 ID"""
  51. url = f"{self.base_url}/Vmodel/modelParamPub/list"
  52. headers = {"X-Access-Token": AuthService.get_token()}
  53. try:
  54. resp = requests.get(url, headers=headers, timeout=10)
  55. resp.raise_for_status()
  56. data = resp.json()
  57. records = data.get("result", {}).get("param", {}).get("records", [])
  58. if records:
  59. default_id = records[0].get("defaultmodelid")
  60. return str(default_id) if default_id is not None else None
  61. return None
  62. except Exception as e:
  63. print(f"⚠️ 获取默认模型ID失败: {e},将不过滤数据")
  64. return None
  65. def fetch_all_data(self) -> Dict[str, Any]:
  66. """获取全量数据,并根据默认模型ID过滤,返回原始 JSON 的 'obj' 部分"""
  67. url = f"{self.base_url}/ventanaly-jingtaifengliang/xufengController/getNeedqAllData"
  68. data = self._post_json(url, {})
  69. if not data.get("success"):
  70. raise Exception(f"数据接口错误: {data.get('msg')}")
  71. obj = data["obj"]
  72. default_model_id = self._get_default_model_id()
  73. if default_model_id is not None:
  74. for list_key in (
  75. "retWorkFaceList",
  76. "headingFaceList",
  77. "roomList",
  78. "otherList",
  79. "ventilationSystemList",
  80. ):
  81. if list_key in obj and isinstance(obj[list_key], list):
  82. obj[list_key] = [
  83. item
  84. for item in obj[list_key]
  85. if str(item.get("nmodelID") or "") == default_model_id
  86. ]
  87. return obj
  88. def get_summary(self) -> Dict[str, float]:
  89. obj = self.fetch_all_data()
  90. ret_work_faces: list = obj.get("retWorkFaceList") or []
  91. heading_faces: list = obj.get("headingFaceList") or []
  92. rooms: list = obj.get("roomList") or []
  93. others: list = obj.get("otherList") or []
  94. vent_systems: list = obj.get("ventilationSystemList") or []
  95. return {
  96. "q_cf": sum(x.get("fqneed", 0) for x in ret_work_faces),
  97. "q_hf": sum(x.get("fqneed", 0) for x in heading_faces),
  98. "q_room": sum(x.get("fqneed", 0) for x in rooms),
  99. "q_other": sum(x.get("fqneed", 0) for x in others),
  100. "q_system_total": sum(x.get("qra", 0) for x in vent_systems),
  101. }
  102. def get_translated_data(self) -> Dict[str, Any]:
  103. """返回完整翻译后的数据,所有 key 均为中文"""
  104. raw_data = self.fetch_all_data()
  105. return self._translate_obj(raw_data)
  106. # ── translation helpers ───────────────────────────────────────────
  107. # 列表键 → TYPE_MAP 键的查找表
  108. _LIST_KEY_TO_TYPE_MAP: Dict[str, str] = {
  109. "retWorkFaceList": "采煤工作面",
  110. "headingFaceList": "掘进工作面",
  111. "roomList": "硐室",
  112. "otherList": "其他用风地点",
  113. "ventilationSystemList": "回风系统",
  114. }
  115. def _translate_obj(self, obj: Dict[str, Any]) -> Dict[str, Any]:
  116. """将原始 obj 中列表字段的 key 翻译为中文。"""
  117. result: Dict[str, Any] = {}
  118. for key, value in obj.items():
  119. cn_key = TOP_LEVEL_MAP.get(key, key)
  120. if isinstance(value, list):
  121. list_type_name = self._LIST_KEY_TO_TYPE_MAP.get(key, "")
  122. field_map: Dict[str, str] = TYPE_MAP.get(list_type_name, {})
  123. translated_list = []
  124. for item in value:
  125. translated_item: Dict[str, Any] = {}
  126. for fk, fv in item.items():
  127. cn_fk = field_map.get(fk, fk)
  128. translated_item[cn_fk] = fv
  129. translated_list.append(translated_item)
  130. result[cn_key] = translated_list
  131. else:
  132. result[cn_key] = value
  133. return result
  134. # ── ventilation calculators ───────────────────────────────────────
  135. def _post_calc(self, url_path: str, params: dict) -> str:
  136. """通用计算请求:POST → 校验 success → 返回 obj 的字符串形式。"""
  137. url = f"{self.base_url}/{url_path}"
  138. try:
  139. data = self._post_json(url, params)
  140. except Exception as e:
  141. return f"❌ 计算接口网络请求失败:{e}"
  142. if not data.get("success"):
  143. return f"❌ 计算接口调用失败:{data.get('msg') or data.get('message')}"
  144. return str(data["obj"])
  145. def calc_heading_face(self, params: dict) -> str:
  146. return self._post_calc(
  147. "ventanaly-jingtaifengliang/xufengController/simulateNeedqHeadingFace",
  148. params,
  149. )
  150. def calc_room(self, params: dict) -> str:
  151. return self._post_calc(
  152. "ventanaly-jingtaifengliang/xufengController/getNeedqRoom",
  153. params,
  154. )
  155. def calculate_working_face_ventilation(self, params: dict) -> str:
  156. """计算采煤/备用工作面需风量。原名 clac_ret_work_face。"""
  157. return self._post_calc(
  158. "ventanaly-jingtaifengliang/xufengController/simulateNeedqRetWorkFace",
  159. params,
  160. )
  161. def calculate_other_point_ventilation(self, params: dict) -> str:
  162. """计算其他用风地点需风量。原名 clac_other。"""
  163. return self._post_calc(
  164. "ventanaly-jingtaifengliang/xufengController/simulateNeedqOther",
  165. params,
  166. )
  167. def calculate_system_ventilation(self, params: dict) -> str:
  168. """计算回风系统总需风量。原名 clac_system。"""
  169. kaq = params.get("kaq")
  170. if not kaq:
  171. return "矿井通风需风系数不能为空"
  172. qcf = params.get("qcf", 0)
  173. qsc = params.get("qsc", 0)
  174. qhf = params.get("qhf", 0)
  175. qur = params.get("qur", 0)
  176. qrl = params.get("qrl", 0)
  177. qra = (qcf + qsc + qhf + qur + qrl) * kaq
  178. result = params.copy()
  179. result["qra"] = qra
  180. return str(result)
  181. # ── backward-compatible aliases for external callers ──────────────
  182. def clac_ret_work_face(self, params: dict) -> str:
  183. """[deprecated] 请使用 calculate_working_face_ventilation。"""
  184. return self.calculate_working_face_ventilation(params)
  185. def clac_other(self, params: dict) -> str:
  186. """[deprecated] 请使用 calculate_other_point_ventilation。"""
  187. return self.calculate_other_point_ventilation(params)
  188. def clac_system(self, params: dict) -> str:
  189. """[deprecated] 请使用 calculate_system_ventilation。"""
  190. return self.calculate_system_ventilation(params)