| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- from typing import Any, Dict, Optional
- import requests
- from config.field_mappings import TOP_LEVEL_MAP, TYPE_MAP
- from config.settings import BASE_URL, COAL_WORKING_API, GAS_IDENTIFY_API, VENT_REPORT_API
- from service.auth import AuthService
- class VentDataService:
- """外部数据获取服务:瓦斯等级鉴定报告、工作面作业规程"""
- def __init__(self) -> None:
- self.base_url = BASE_URL
- # ── private HTTP helpers ──────────────────────────────────────────
- def _post_json(self, url: str, payload: dict) -> dict:
- """发送 JSON POST 请求并返回解析后的响应 dict(不处理业务字段)。"""
- headers: Dict[str, str] = {
- "X-Access-Token": AuthService.get_token(),
- "Content-Type": "application/json",
- }
- resp = requests.post(url, json=payload, headers=headers, timeout=15)
- resp.raise_for_status()
- return resp.json()
- def _get_json(self, url: str, params: dict | None = None) -> dict:
- """发送 GET 请求并返回解析后的响应 dict(不处理业务字段)。"""
- resp = requests.get(url, params=params or {})
- return resp.json() if resp.status_code == 200 else {}
- # ── data fetching ─────────────────────────────────────────────────
- def get_gas_identify(self, mine_name: str) -> dict:
- """调用接口获取瓦斯等级鉴定数据"""
- try:
- return self._get_json(GAS_IDENTIFY_API, {"mineName": mine_name})
- except Exception as e:
- print(f"[data_service] get_gas_identify 异常: {e}")
- return {}
- def get_coal_working_rule(self, mine_name: str) -> list:
- """调用接口获取工作面作业规程数据"""
- try:
- return self._get_json(COAL_WORKING_API, {"mineName": mine_name})
- except Exception as e:
- print(f"[data_service] get_coal_working_rule 异常: {e}")
- return []
- def get_vent_report_data(self, mine_name: str, date_month: str) -> dict:
- """调用接口获取工作面作业规程数据"""
- try:
- return self._get_json(
- VENT_REPORT_API,
- {"mineName": mine_name, "dateMonth": date_month},
- )
- except Exception as e:
- print(f"[data_service] get_vent_report_data 异常: {e}")
- return {}
- def _get_default_model_id(self) -> Optional[str]:
- """获取默认的模型版本 ID"""
- url = f"{self.base_url}/Vmodel/modelParamPub/list"
- headers = {"X-Access-Token": AuthService.get_token()}
- try:
- resp = requests.get(url, headers=headers, timeout=10)
- resp.raise_for_status()
- data = resp.json()
- records = data.get("result", {}).get("param", {}).get("records", [])
- if records:
- default_id = records[0].get("defaultmodelid")
- return str(default_id) if default_id is not None else None
- return None
- except Exception as e:
- print(f"⚠️ 获取默认模型ID失败: {e},将不过滤数据")
- return None
- def fetch_all_data(self) -> Dict[str, Any]:
- """获取全量数据,并根据默认模型ID过滤,返回原始 JSON 的 'obj' 部分"""
- url = f"{self.base_url}/ventanaly-jingtaifengliang/xufengController/getNeedqAllData"
- data = self._post_json(url, {})
- if not data.get("success"):
- raise Exception(f"数据接口错误: {data.get('msg')}")
- obj = data["obj"]
- default_model_id = self._get_default_model_id()
- if default_model_id is not None:
- for list_key in (
- "retWorkFaceList",
- "headingFaceList",
- "roomList",
- "otherList",
- "ventilationSystemList",
- ):
- if list_key in obj and isinstance(obj[list_key], list):
- obj[list_key] = [
- item
- for item in obj[list_key]
- if str(item.get("nmodelID") or "") == default_model_id
- ]
- return obj
- def get_summary(self) -> Dict[str, float]:
- obj = self.fetch_all_data()
- ret_work_faces: list = obj.get("retWorkFaceList") or []
- heading_faces: list = obj.get("headingFaceList") or []
- rooms: list = obj.get("roomList") or []
- others: list = obj.get("otherList") or []
- vent_systems: list = obj.get("ventilationSystemList") or []
- return {
- "q_cf": sum(x.get("fqneed", 0) for x in ret_work_faces),
- "q_hf": sum(x.get("fqneed", 0) for x in heading_faces),
- "q_room": sum(x.get("fqneed", 0) for x in rooms),
- "q_other": sum(x.get("fqneed", 0) for x in others),
- "q_system_total": sum(x.get("qra", 0) for x in vent_systems),
- }
- def get_translated_data(self) -> Dict[str, Any]:
- """返回完整翻译后的数据,所有 key 均为中文"""
- raw_data = self.fetch_all_data()
- return self._translate_obj(raw_data)
- # ── translation helpers ───────────────────────────────────────────
- # 列表键 → TYPE_MAP 键的查找表
- _LIST_KEY_TO_TYPE_MAP: Dict[str, str] = {
- "retWorkFaceList": "采煤工作面",
- "headingFaceList": "掘进工作面",
- "roomList": "硐室",
- "otherList": "其他用风地点",
- "ventilationSystemList": "回风系统",
- }
- def _translate_obj(self, obj: Dict[str, Any]) -> Dict[str, Any]:
- """将原始 obj 中列表字段的 key 翻译为中文。"""
- result: Dict[str, Any] = {}
- for key, value in obj.items():
- cn_key = TOP_LEVEL_MAP.get(key, key)
- if isinstance(value, list):
- list_type_name = self._LIST_KEY_TO_TYPE_MAP.get(key, "")
- field_map: Dict[str, str] = TYPE_MAP.get(list_type_name, {})
- translated_list = []
- for item in value:
- translated_item: Dict[str, Any] = {}
- for fk, fv in item.items():
- cn_fk = field_map.get(fk, fk)
- translated_item[cn_fk] = fv
- translated_list.append(translated_item)
- result[cn_key] = translated_list
- else:
- result[cn_key] = value
- return result
- # ── ventilation calculators ───────────────────────────────────────
- def _post_calc(self, url_path: str, params: dict) -> str:
- """通用计算请求:POST → 校验 success → 返回 obj 的字符串形式。"""
- url = f"{self.base_url}/{url_path}"
- try:
- data = self._post_json(url, params)
- except Exception as e:
- return f"❌ 计算接口网络请求失败:{e}"
- if not data.get("success"):
- return f"❌ 计算接口调用失败:{data.get('msg') or data.get('message')}"
- return str(data["obj"])
- def calc_heading_face(self, params: dict) -> str:
- return self._post_calc(
- "ventanaly-jingtaifengliang/xufengController/simulateNeedqHeadingFace",
- params,
- )
- def calc_room(self, params: dict) -> str:
- return self._post_calc(
- "ventanaly-jingtaifengliang/xufengController/getNeedqRoom",
- params,
- )
- def calculate_working_face_ventilation(self, params: dict) -> str:
- """计算采煤/备用工作面需风量。原名 clac_ret_work_face。"""
- return self._post_calc(
- "ventanaly-jingtaifengliang/xufengController/simulateNeedqRetWorkFace",
- params,
- )
- def calculate_other_point_ventilation(self, params: dict) -> str:
- """计算其他用风地点需风量。原名 clac_other。"""
- return self._post_calc(
- "ventanaly-jingtaifengliang/xufengController/simulateNeedqOther",
- params,
- )
- def calculate_system_ventilation(self, params: dict) -> str:
- """计算回风系统总需风量。原名 clac_system。"""
- kaq = params.get("kaq")
- if not kaq:
- return "矿井通风需风系数不能为空"
- qcf = params.get("qcf", 0)
- qsc = params.get("qsc", 0)
- qhf = params.get("qhf", 0)
- qur = params.get("qur", 0)
- qrl = params.get("qrl", 0)
- qra = (qcf + qsc + qhf + qur + qrl) * kaq
- result = params.copy()
- result["qra"] = qra
- return str(result)
- # ── backward-compatible aliases for external callers ──────────────
- def clac_ret_work_face(self, params: dict) -> str:
- """[deprecated] 请使用 calculate_working_face_ventilation。"""
- return self.calculate_working_face_ventilation(params)
- def clac_other(self, params: dict) -> str:
- """[deprecated] 请使用 calculate_other_point_ventilation。"""
- return self.calculate_other_point_ventilation(params)
- def clac_system(self, params: dict) -> str:
- """[deprecated] 请使用 calculate_system_ventilation。"""
- return self.calculate_system_ventilation(params)
|