import hashlib import logging import os import re import time from abc import ABC, abstractmethod from typing import List, Any from fastapi import UploadFile from langchain_core.documents import Document from langchain_mineru import MinerULoader from pypdf import PdfReader from config.settings import MINERU_TOKEN, TXT_CACHE_DIR MINERU_MAX_PAGES = 200 _HASH_CHUNK_SIZE = 4096 def get_upload_file_hash(upload_file: UploadFile) -> str: """Compute MD5 from in-memory UploadFile bytes without writing to disk.""" upload_file.file.seek(0) hasher = hashlib.md5() for chunk in iter(lambda: upload_file.file.read(_HASH_CHUNK_SIZE), b""): hasher.update(chunk) upload_file.file.seek(0) return hasher.hexdigest() def get_file_hash(file_path: str) -> str: """Compute MD5 of a file on disk. Identical content → identical hash.""" hasher = hashlib.md5() with open(file_path, "rb") as file_handle: for chunk in iter(lambda: file_handle.read(_HASH_CHUNK_SIZE), b""): hasher.update(chunk) return hasher.hexdigest() class BaseParser(ABC): """Abstract base for PDF parsers with MinerU loading and file-system caching.""" def __init__(self) -> None: os.makedirs(TXT_CACHE_DIR, exist_ok=True) # ---- cache helpers ---- def _cache_path_for(self, file_hash: str) -> str: return os.path.join(TXT_CACHE_DIR, f"{file_hash}.txt") def _read_cache(self, cache_path: str) -> str | None: if os.path.exists(cache_path): print(f"✅ 找到缓存,直接读取:{cache_path}") with open(cache_path, "r", encoding="utf-8") as file_handle: return file_handle.read() return None def _write_cache(self, cache_path: str, content: str) -> None: with open(cache_path, "w", encoding="utf-8") as file_handle: file_handle.write(content) print(f"✅ 已保存解析结果到缓存:{cache_path}") def _count_pdf_pages(self, file_path: str) -> int: try: return len(PdfReader(file_path).pages) except Exception as exc: print(f"获取PDF页数失败:{exc}") return 1 # ---- core loading ---- def load_pdf(self, file_path: str) -> List[Document]: start_time = time.time() file_hash = get_file_hash(file_path) cache_path = self._cache_path_for(file_hash) cached_content = self._read_cache(cache_path) if cached_content is not None: return [Document( page_content=cached_content, metadata={ "parser": "MinerULoader-Cache", "source": file_path, "from_cache": True, }, )] total_pages = self._count_pdf_pages(file_path) print(f"PDF总页数:{total_pages}") all_documents: List[Document] = [] for start in range(1, total_pages + 1, MINERU_MAX_PAGES): end = min(start + MINERU_MAX_PAGES - 1, total_pages) print(f"正在加载分片:{start} ~ {end} 页") loader = MinerULoader( source=file_path, mode="precision", token=MINERU_TOKEN, pages=f"{start}-{end}", ) all_documents.extend(loader.load()) full_text_pages: List[str] = [] for idx, doc in enumerate(all_documents): cleaned = self.clean_mineru_text(doc.page_content) doc.page_content = cleaned doc.metadata["parser"] = "MinerULoader" doc.metadata["page_number"] = idx + 1 doc.metadata["total_pages"] = total_pages full_text_pages.append(cleaned) all_content = "\n".join(full_text_pages) self._write_cache(cache_path, all_content) elapsed = round(time.time() - start_time, 2) print(f"PDF 完整加载完成 | 总页数:{total_pages} | 总耗时:{elapsed} 秒") return all_documents @staticmethod def clean_mineru_text(text: str) -> str: """Strip MinerU-specific markup tokens from extracted text.""" if not text: return "" text = re.sub(r"]*>", "", text, flags=re.IGNORECASE) text = re.sub(r"]*>", " - ", text, flags=re.IGNORECASE) text = re.sub(r"]*>", "", text, flags=re.IGNORECASE) text = re.sub(r"]*>", "|", text, flags=re.IGNORECASE) text = re.sub(r"", "|", text, flags=re.IGNORECASE) text = re.sub(r"!\[.*?\]\(.*?\)", "", text) return text.strip() @abstractmethod def parse(self, file_path: str) -> Any: ...