base_parser.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import hashlib
  2. import logging
  3. import os
  4. import re
  5. import time
  6. from abc import ABC, abstractmethod
  7. from typing import List, Any
  8. from fastapi import UploadFile
  9. from langchain_core.documents import Document
  10. from langchain_mineru import MinerULoader
  11. from pypdf import PdfReader
  12. from config.settings import MINERU_TOKEN, TXT_CACHE_DIR
  13. MINERU_MAX_PAGES = 200
  14. _HASH_CHUNK_SIZE = 4096
  15. def get_upload_file_hash(upload_file: UploadFile) -> str:
  16. """Compute MD5 from in-memory UploadFile bytes without writing to disk."""
  17. upload_file.file.seek(0)
  18. hasher = hashlib.md5()
  19. for chunk in iter(lambda: upload_file.file.read(_HASH_CHUNK_SIZE), b""):
  20. hasher.update(chunk)
  21. upload_file.file.seek(0)
  22. return hasher.hexdigest()
  23. def get_file_hash(file_path: str) -> str:
  24. """Compute MD5 of a file on disk. Identical content → identical hash."""
  25. hasher = hashlib.md5()
  26. with open(file_path, "rb") as file_handle:
  27. for chunk in iter(lambda: file_handle.read(_HASH_CHUNK_SIZE), b""):
  28. hasher.update(chunk)
  29. return hasher.hexdigest()
  30. class BaseParser(ABC):
  31. """Abstract base for PDF parsers with MinerU loading and file-system caching."""
  32. def __init__(self) -> None:
  33. os.makedirs(TXT_CACHE_DIR, exist_ok=True)
  34. # ---- cache helpers ----
  35. def _cache_path_for(self, file_hash: str) -> str:
  36. return os.path.join(TXT_CACHE_DIR, f"{file_hash}.txt")
  37. def _read_cache(self, cache_path: str) -> str | None:
  38. if os.path.exists(cache_path):
  39. print(f"✅ 找到缓存,直接读取:{cache_path}")
  40. with open(cache_path, "r", encoding="utf-8") as file_handle:
  41. return file_handle.read()
  42. return None
  43. def _write_cache(self, cache_path: str, content: str) -> None:
  44. with open(cache_path, "w", encoding="utf-8") as file_handle:
  45. file_handle.write(content)
  46. print(f"✅ 已保存解析结果到缓存:{cache_path}")
  47. def _count_pdf_pages(self, file_path: str) -> int:
  48. try:
  49. return len(PdfReader(file_path).pages)
  50. except Exception as exc:
  51. print(f"获取PDF页数失败:{exc}")
  52. return 1
  53. # ---- core loading ----
  54. def load_pdf(self, file_path: str) -> List[Document]:
  55. start_time = time.time()
  56. file_hash = get_file_hash(file_path)
  57. cache_path = self._cache_path_for(file_hash)
  58. cached_content = self._read_cache(cache_path)
  59. if cached_content is not None:
  60. return [Document(
  61. page_content=cached_content,
  62. metadata={
  63. "parser": "MinerULoader-Cache",
  64. "source": file_path,
  65. "from_cache": True,
  66. },
  67. )]
  68. total_pages = self._count_pdf_pages(file_path)
  69. print(f"PDF总页数:{total_pages}")
  70. all_documents: List[Document] = []
  71. for start in range(1, total_pages + 1, MINERU_MAX_PAGES):
  72. end = min(start + MINERU_MAX_PAGES - 1, total_pages)
  73. print(f"正在加载分片:{start} ~ {end} 页")
  74. loader = MinerULoader(
  75. source=file_path,
  76. mode="precision",
  77. token=MINERU_TOKEN,
  78. pages=f"{start}-{end}",
  79. )
  80. all_documents.extend(loader.load())
  81. full_text_pages: List[str] = []
  82. for idx, doc in enumerate(all_documents):
  83. cleaned = self.clean_mineru_text(doc.page_content)
  84. doc.page_content = cleaned
  85. doc.metadata["parser"] = "MinerULoader"
  86. doc.metadata["page_number"] = idx + 1
  87. doc.metadata["total_pages"] = total_pages
  88. full_text_pages.append(cleaned)
  89. all_content = "\n".join(full_text_pages)
  90. self._write_cache(cache_path, all_content)
  91. elapsed = round(time.time() - start_time, 2)
  92. print(f"PDF 完整加载完成 | 总页数:{total_pages} | 总耗时:{elapsed} 秒")
  93. return all_documents
  94. @staticmethod
  95. def clean_mineru_text(text: str) -> str:
  96. """Strip MinerU-specific markup tokens from extracted text."""
  97. if not text:
  98. return ""
  99. text = re.sub(r"</?table[^>]*>", "", text, flags=re.IGNORECASE)
  100. text = re.sub(r"</?tr[^>]*>", " - ", text, flags=re.IGNORECASE)
  101. text = re.sub(r"<img\s+[^>]*>", "", text, flags=re.IGNORECASE)
  102. text = re.sub(r"<td\s*[^>]*>", "|", text, flags=re.IGNORECASE)
  103. text = re.sub(r"</td>", "|", text, flags=re.IGNORECASE)
  104. text = re.sub(r"!\[.*?\]\(.*?\)", "", text)
  105. return text.strip()
  106. @abstractmethod
  107. def parse(self, file_path: str) -> Any:
  108. ...