| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- import hashlib
- import logging
- import os
- import re
- import time
- from abc import ABC, abstractmethod
- from typing import List, Any
- from fastapi import UploadFile
- from langchain_core.documents import Document
- from langchain_mineru import MinerULoader
- from pypdf import PdfReader
- from config.settings import MINERU_TOKEN, TXT_CACHE_DIR
- MINERU_MAX_PAGES = 200
- _HASH_CHUNK_SIZE = 4096
- def get_upload_file_hash(upload_file: UploadFile) -> str:
- """Compute MD5 from in-memory UploadFile bytes without writing to disk."""
- upload_file.file.seek(0)
- hasher = hashlib.md5()
- for chunk in iter(lambda: upload_file.file.read(_HASH_CHUNK_SIZE), b""):
- hasher.update(chunk)
- upload_file.file.seek(0)
- return hasher.hexdigest()
- def get_file_hash(file_path: str) -> str:
- """Compute MD5 of a file on disk. Identical content → identical hash."""
- hasher = hashlib.md5()
- with open(file_path, "rb") as file_handle:
- for chunk in iter(lambda: file_handle.read(_HASH_CHUNK_SIZE), b""):
- hasher.update(chunk)
- return hasher.hexdigest()
- class BaseParser(ABC):
- """Abstract base for PDF parsers with MinerU loading and file-system caching."""
- def __init__(self) -> None:
- os.makedirs(TXT_CACHE_DIR, exist_ok=True)
- # ---- cache helpers ----
- def _cache_path_for(self, file_hash: str) -> str:
- return os.path.join(TXT_CACHE_DIR, f"{file_hash}.txt")
- def _read_cache(self, cache_path: str) -> str | None:
- if os.path.exists(cache_path):
- print(f"✅ 找到缓存,直接读取:{cache_path}")
- with open(cache_path, "r", encoding="utf-8") as file_handle:
- return file_handle.read()
- return None
- def _write_cache(self, cache_path: str, content: str) -> None:
- with open(cache_path, "w", encoding="utf-8") as file_handle:
- file_handle.write(content)
- print(f"✅ 已保存解析结果到缓存:{cache_path}")
- def _count_pdf_pages(self, file_path: str) -> int:
- try:
- return len(PdfReader(file_path).pages)
- except Exception as exc:
- print(f"获取PDF页数失败:{exc}")
- return 1
- # ---- core loading ----
- def load_pdf(self, file_path: str) -> List[Document]:
- start_time = time.time()
- file_hash = get_file_hash(file_path)
- cache_path = self._cache_path_for(file_hash)
- cached_content = self._read_cache(cache_path)
- if cached_content is not None:
- return [Document(
- page_content=cached_content,
- metadata={
- "parser": "MinerULoader-Cache",
- "source": file_path,
- "from_cache": True,
- },
- )]
- total_pages = self._count_pdf_pages(file_path)
- print(f"PDF总页数:{total_pages}")
- all_documents: List[Document] = []
- for start in range(1, total_pages + 1, MINERU_MAX_PAGES):
- end = min(start + MINERU_MAX_PAGES - 1, total_pages)
- print(f"正在加载分片:{start} ~ {end} 页")
- loader = MinerULoader(
- source=file_path,
- mode="precision",
- token=MINERU_TOKEN,
- pages=f"{start}-{end}",
- )
- all_documents.extend(loader.load())
- full_text_pages: List[str] = []
- for idx, doc in enumerate(all_documents):
- cleaned = self.clean_mineru_text(doc.page_content)
- doc.page_content = cleaned
- doc.metadata["parser"] = "MinerULoader"
- doc.metadata["page_number"] = idx + 1
- doc.metadata["total_pages"] = total_pages
- full_text_pages.append(cleaned)
- all_content = "\n".join(full_text_pages)
- self._write_cache(cache_path, all_content)
- elapsed = round(time.time() - start_time, 2)
- print(f"PDF 完整加载完成 | 总页数:{total_pages} | 总耗时:{elapsed} 秒")
- return all_documents
- @staticmethod
- def clean_mineru_text(text: str) -> str:
- """Strip MinerU-specific markup tokens from extracted text."""
- if not text:
- return ""
- text = re.sub(r"</?table[^>]*>", "", text, flags=re.IGNORECASE)
- text = re.sub(r"</?tr[^>]*>", " - ", text, flags=re.IGNORECASE)
- text = re.sub(r"<img\s+[^>]*>", "", text, flags=re.IGNORECASE)
- text = re.sub(r"<td\s*[^>]*>", "|", text, flags=re.IGNORECASE)
- text = re.sub(r"</td>", "|", text, flags=re.IGNORECASE)
- text = re.sub(r"!\[.*?\]\(.*?\)", "", text)
- return text.strip()
- @abstractmethod
- def parse(self, file_path: str) -> Any:
- ...
|