import hashlib
import logging
import os
import re
import time
from abc import ABC, abstractmethod
from typing import List, Any
from fastapi import UploadFile
from langchain_core.documents import Document
from langchain_mineru import MinerULoader
from pypdf import PdfReader
from config.settings import MINERU_TOKEN, TXT_CACHE_DIR
MINERU_MAX_PAGES = 200
_HASH_CHUNK_SIZE = 4096
def get_upload_file_hash(upload_file: UploadFile) -> str:
"""Compute MD5 from in-memory UploadFile bytes without writing to disk."""
upload_file.file.seek(0)
hasher = hashlib.md5()
for chunk in iter(lambda: upload_file.file.read(_HASH_CHUNK_SIZE), b""):
hasher.update(chunk)
upload_file.file.seek(0)
return hasher.hexdigest()
def get_file_hash(file_path: str) -> str:
"""Compute MD5 of a file on disk. Identical content → identical hash."""
hasher = hashlib.md5()
with open(file_path, "rb") as file_handle:
for chunk in iter(lambda: file_handle.read(_HASH_CHUNK_SIZE), b""):
hasher.update(chunk)
return hasher.hexdigest()
class BaseParser(ABC):
"""Abstract base for PDF parsers with MinerU loading and file-system caching."""
def __init__(self) -> None:
os.makedirs(TXT_CACHE_DIR, exist_ok=True)
# ---- cache helpers ----
def _cache_path_for(self, file_hash: str) -> str:
return os.path.join(TXT_CACHE_DIR, f"{file_hash}.txt")
def _read_cache(self, cache_path: str) -> str | None:
if os.path.exists(cache_path):
print(f"✅ 找到缓存,直接读取:{cache_path}")
with open(cache_path, "r", encoding="utf-8") as file_handle:
return file_handle.read()
return None
def _write_cache(self, cache_path: str, content: str) -> None:
with open(cache_path, "w", encoding="utf-8") as file_handle:
file_handle.write(content)
print(f"✅ 已保存解析结果到缓存:{cache_path}")
def _count_pdf_pages(self, file_path: str) -> int:
try:
return len(PdfReader(file_path).pages)
except Exception as exc:
print(f"获取PDF页数失败:{exc}")
return 1
# ---- core loading ----
def load_pdf(self, file_path: str) -> List[Document]:
start_time = time.time()
file_hash = get_file_hash(file_path)
cache_path = self._cache_path_for(file_hash)
cached_content = self._read_cache(cache_path)
if cached_content is not None:
return [Document(
page_content=cached_content,
metadata={
"parser": "MinerULoader-Cache",
"source": file_path,
"from_cache": True,
},
)]
total_pages = self._count_pdf_pages(file_path)
print(f"PDF总页数:{total_pages}")
all_documents: List[Document] = []
for start in range(1, total_pages + 1, MINERU_MAX_PAGES):
end = min(start + MINERU_MAX_PAGES - 1, total_pages)
print(f"正在加载分片:{start} ~ {end} 页")
loader = MinerULoader(
source=file_path,
mode="precision",
token=MINERU_TOKEN,
pages=f"{start}-{end}",
)
all_documents.extend(loader.load())
full_text_pages: List[str] = []
for idx, doc in enumerate(all_documents):
cleaned = self.clean_mineru_text(doc.page_content)
doc.page_content = cleaned
doc.metadata["parser"] = "MinerULoader"
doc.metadata["page_number"] = idx + 1
doc.metadata["total_pages"] = total_pages
full_text_pages.append(cleaned)
all_content = "\n".join(full_text_pages)
self._write_cache(cache_path, all_content)
elapsed = round(time.time() - start_time, 2)
print(f"PDF 完整加载完成 | 总页数:{total_pages} | 总耗时:{elapsed} 秒")
return all_documents
@staticmethod
def clean_mineru_text(text: str) -> str:
"""Strip MinerU-specific markup tokens from extracted text."""
if not text:
return ""
text = re.sub(r"?table[^>]*>", "", text, flags=re.IGNORECASE)
text = re.sub(r"?tr[^>]*>", " - ", text, flags=re.IGNORECASE)
text = re.sub(r"]*>", "", text, flags=re.IGNORECASE)
text = re.sub(r"