Files
FunMD_Convert/docling/app/services/docling_adapter.py

1248 lines
46 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pathlib import Path
from typing import Optional, Tuple, Dict, List, Any
from urllib.parse import urlparse, unquote
import os
import re
import io
from bs4 import BeautifulSoup
from bs4.element import PageElement
import marko
import sys
try:
_DOC_BASE = Path(__file__).resolve().parents[2] / "docling"
p = str(_DOC_BASE)
if p not in sys.path:
sys.path.insert(0, p)
except Exception:
pass
try:
from docling.document_converter import DocumentConverter
except Exception:
class DocumentConverter: # type: ignore
def __init__(self, *args, **kwargs):
pass
def convert(self, source):
raise RuntimeError("docling not available")
from docx import Document
from docx.shared import Mm, Pt
from docx.enum.section import WD_SECTION
from docx.enum.text import WD_PARAGRAPH_ALIGNMENT
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
from urllib.request import urlopen
import json
try:
from weasyprint import HTML, CSS # type: ignore
except Exception:
HTML = None
CSS = None
try:
from xhtml2pdf import pisa as _pisa # type: ignore
_HAS_XHTML2PDF: bool = True
except Exception:
_pisa = None # type: ignore
_HAS_XHTML2PDF: bool = False
# reportlab 用于生成支持中文的 PDF
try:
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import mm, cm
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak, HRFlowable
from reportlab.lib import colors
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
from reportlab.lib.enums import TA_LEFT, TA_CENTER, TA_RIGHT
from reportlab.platypus import KeepInFrame
from reportlab.pdfgen import canvas
from reportlab.lib.colors import HexColor
_HAS_REPORTLAB: bool = True
except Exception as e:
import traceback
print(f"[ERROR] reportlab import failed: {e}")
traceback.print_exc()
A4 = None
_HAS_REPORTLAB: bool = False
_mdit: Any = None
_tasklists_plugin: Any = None
_deflist_plugin: Any = None
_footnote_plugin: Any = None
_attrs_plugin: Any = None
_HAS_MD_IT: bool = False
try:
import markdown_it as _mdit # type: ignore
from mdit_py_plugins.tasklists import tasklists_plugin as _tasklists_plugin # type: ignore
from mdit_py_plugins.deflist import deflist_plugin as _deflist_plugin # type: ignore
from mdit_py_plugins.footnote import footnote_plugin as _footnote_plugin # type: ignore
from mdit_py_plugins.attrs import attrs_plugin as _attrs_plugin # type: ignore
_HAS_MD_IT = True
except Exception:
pass
converter = DocumentConverter()
LINKMAP_PATH = Path(__file__).resolve().parent.parent / "configs" / "linkmap" / "linkmap.json"
_LINKMAP: Dict[str, str] = {}
def load_linkmap() -> Dict[str, str]:
global _LINKMAP
try:
if LINKMAP_PATH.exists():
_LINKMAP = json.loads(LINKMAP_PATH.read_text("utf-8")) or {}
except Exception:
_LINKMAP = {}
return _LINKMAP
def save_linkmap(mapping: Dict[str, str]) -> None:
LINKMAP_PATH.parent.mkdir(parents=True, exist_ok=True)
LINKMAP_PATH.write_text(json.dumps(mapping, ensure_ascii=False, indent=2), "utf-8")
load_linkmap()
def resolve_link(href: Optional[str], data_doc: Optional[str]) -> Optional[str]:
if href:
return href
if not _LINKMAP:
load_linkmap()
if data_doc and data_doc in _LINKMAP:
return _LINKMAP[data_doc]
return None
def export_payload(doc, fmt: str) -> Tuple[str, str]:
f = fmt.lower()
if f == "markdown":
return doc.export_to_markdown(), "text/markdown"
if f == "html":
return doc.export_to_html(), "text/html"
if f == "json":
return doc.export_to_json(), "application/json"
if f == "doctags":
return doc.export_to_doctags(), "application/json"
raise ValueError("unsupported export")
def infer_basename(source_url: Optional[str], upload_name: Optional[str]) -> str:
if source_url:
path = urlparse(source_url).path
name = os.path.basename(path) or "document"
name = unquote(name)
return os.path.splitext(name)[0] or "document"
if upload_name:
name = os.path.splitext(os.path.basename(upload_name))[0] or "document"
return name
return "document"
def sanitize_filename(name: Optional[str]) -> str:
if not name:
return "document"
name = name.strip()[:128]
name = re.sub(r'[<>:"/\\|?*\x00-\x1F]', "_", name) or "document"
return name
def convert_source(source: str, export: str) -> Tuple[str, str]:
result = converter.convert(source)
return export_payload(result.document, export)
def md_to_docx_bytes(md: str, toc: bool = False, header_text: Optional[str] = None, footer_text: Optional[str] = None, logo_url: Optional[str] = None, copyright_text: Optional[str] = None, filename_text: Optional[str] = None, cover_src: Optional[str] = None, product_name: Optional[str] = None, document_name: Optional[str] = None, product_version: Optional[str] = None, document_version: Optional[str] = None) -> bytes:
try:
import logging as _log
_log.info(f"md_to_docx_bytes start toc={toc} header={bool(header_text)} footer={bool(footer_text)} logo={bool(logo_url)} cover={bool(cover_src)}")
except Exception:
pass
def _add_field(paragraph, instr: str):
r1 = paragraph.add_run()
b = OxmlElement('w:fldChar')
b.set(qn('w:fldCharType'), 'begin')
r1._r.append(b)
r2 = paragraph.add_run()
t = OxmlElement('w:instrText')
t.set(qn('xml:space'), 'preserve')
t.text = instr
r2._r.append(t)
r3 = paragraph.add_run()
e = OxmlElement('w:fldChar')
e.set(qn('w:fldCharType'), 'end')
r3._r.append(e)
def _available_width(section) -> int:
return section.page_width - section.left_margin - section.right_margin
def _fetch_bytes(u: str) -> Optional[bytes]:
try:
if u.lower().startswith('http://') or u.lower().startswith('https://'):
with urlopen(u, timeout=10) as r:
return r.read()
p = Path(u)
if p.exists() and p.is_file():
return p.read_bytes()
except Exception:
return None
return None
html = normalize_html(md, options={
"toc": "1" if toc else "",
"header_text": header_text,
"footer_text": footer_text,
"logo_url": logo_url,
"copyright_text": copyright_text,
"filename_text": filename_text,
"cover_src": cover_src,
"product_name": product_name,
"document_name": document_name,
"product_version": product_version,
"document_version": document_version,
})
try:
import logging as _log
_log.info(f"md_to_docx_bytes normalize_html length={len(html)}")
except Exception:
pass
soup = BeautifulSoup(html, "html.parser")
doc = Document()
sec0 = doc.sections[0]
sec0.page_width = Mm(210)
sec0.page_height = Mm(297)
sec0.left_margin = Mm(15)
sec0.right_margin = Mm(15)
sec0.top_margin = Mm(20)
sec0.bottom_margin = Mm(20)
has_cover = bool(cover_src or (soup.find('section', class_='cover') is not None))
if has_cover:
sec0.left_margin = Mm(0)
sec0.right_margin = Mm(0)
sec0.top_margin = Mm(0)
sec0.bottom_margin = Mm(0)
if cover_src:
b = _fetch_bytes(cover_src)
if b:
bio = io.BytesIO(b)
doc.add_picture(bio, width=_available_width(sec0))
if product_name:
p = doc.add_paragraph()
r = p.add_run(product_name)
r.font.size = Pt(18)
r.bold = True
t = document_name or None
if not t:
h1 = soup.body.find('h1') if soup.body else soup.find('h1')
t = h1.get_text(strip=True) if h1 else '文档'
p2 = doc.add_paragraph()
r2 = p2.add_run(t or '文档')
r2.font.size = Pt(24)
r2.bold = True
if filename_text:
p3 = doc.add_paragraph()
r3 = p3.add_run(filename_text)
r3.font.size = Pt(13)
meta_parts = []
if product_version:
meta_parts.append("产品版本:" + product_version)
if document_version:
meta_parts.append("文档版本:" + document_version)
if meta_parts:
pm = doc.add_paragraph(" ".join(meta_parts))
pm.font = None
doc.add_section(WD_SECTION.NEW_PAGE)
sec = doc.sections[-1]
sec.page_width = Mm(210)
sec.page_height = Mm(297)
sec.left_margin = Mm(15)
sec.right_margin = Mm(15)
sec.top_margin = Mm(20)
sec.bottom_margin = Mm(20)
else:
sec = sec0
if header_text or logo_url or filename_text:
hp = sec.header.add_paragraph()
left = header_text or ''
right = ''
if '||' in left:
parts = left.split('||', 1)
left, right = parts[0], parts[1]
elif '|' in left:
parts = left.split('|', 1)
left, right = parts[0], parts[1]
if left.strip():
hp.add_run(left.strip())
if right.strip():
rp = sec.header.add_paragraph()
rp.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
rp.add_run(right.strip())
elif filename_text:
rp = sec.header.add_paragraph()
rp.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
rp.add_run(filename_text)
if footer_text or copyright_text:
fp = sec.footer.add_paragraph()
if footer_text:
fp.add_run(footer_text)
if copyright_text:
cp = sec.footer.add_paragraph()
cp.add_run(copyright_text)
pn = sec.footer.add_paragraph()
pn.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
_add_field(pn, 'PAGE')
if toc:
doc.add_paragraph('目录')
_add_field(doc.add_paragraph(), 'TOC \\o "1-3" \\h \\z \\u')
doc.add_page_break()
def add_inline(p, node):
if isinstance(node, str):
p.add_run(node)
return
if node.name in ['strong', 'b']:
r = p.add_run(node.get_text())
r.bold = True
return
if node.name in ['em', 'i']:
r = p.add_run(node.get_text())
r.italic = True
return
if node.name == 'code':
r = p.add_run(node.get_text())
r.font.name = 'Courier New'
return
if node.name == 'a':
text = node.get_text()
href = node.get('href')
extra = node.get('data-doc')
resolved = resolve_link(href, extra)
if resolved:
p.add_run(text + ' [' + resolved + ']')
else:
p.add_run(text)
return
if node.name == 'img':
src = node.get('src') or ''
b = _fetch_bytes(src)
if b:
bio = io.BytesIO(b)
try:
doc.add_picture(bio, width=_available_width(sec))
except Exception:
pass
return
for c in getattr(node, 'children', []):
add_inline(p, c)
def process_block(el):
name = getattr(el, 'name', None)
if name is None:
return
cls = el.get('class') or []
if name == 'div' and 'doc-meta' in cls:
return
if name == 'section' and 'cover' in cls:
return
if name == 'nav' and 'toc' in cls:
return
if name == 'div':
for child in el.children:
process_block(child)
return
if name == 'h1':
doc.add_heading(el.get_text(), level=1)
return
if name == 'h2' or (name == 'strong' and 'subtitle' in cls):
doc.add_heading(el.get_text(), level=2)
return
if name == 'h3':
doc.add_heading(el.get_text(), level=3)
return
if name == 'p':
p = doc.add_paragraph()
for c in el.children:
add_inline(p, c)
return
if name in ['ul', 'ol']:
for li in el.find_all('li', recursive=False):
p = doc.add_paragraph(style='List Bullet')
for c in li.children:
add_inline(p, c)
return
if name == 'pre':
code = el.get_text() or ''
p = doc.add_paragraph()
run = p.add_run(code)
run.font.name = 'Courier New'
return
if name == 'blockquote':
p = doc.add_paragraph(el.get_text())
p.paragraph_format.left_indent = Mm(10)
return
if name == 'table':
rows = []
thead = el.find('thead')
tbody = el.find('tbody')
if thead:
hdrs = [th.get_text(strip=True) for th in thead.find_all('th')]
else:
hdrs = [cell.get_text(strip=True) for cell in el.find_all('tr')[0].find_all(['th','td'])] if el.find_all('tr') else []
trs = tbody.find_all('tr') if tbody else el.find_all('tr')[1:]
for tr in trs:
tds = [td.get_text(strip=True) for td in tr.find_all('td')]
rows.append(tds)
tbl = doc.add_table(rows=1 + len(rows), cols=len(hdrs) or 1)
hdr = tbl.rows[0].cells
for k, h in enumerate(hdrs or ['']):
hdr[k].text = h
for r_idx, row in enumerate(rows):
cells = tbl.rows[1 + r_idx].cells
for c_idx in range(len(hdrs) or 1):
cells[c_idx].text = (row[c_idx] if c_idx < len(row) else '')
return
if name == 'img':
src = el.get('src') or ''
b = _fetch_bytes(src)
if b:
bio = io.BytesIO(b)
try:
doc.add_picture(bio, width=_available_width(sec))
except Exception:
pass
return
body = soup.body or soup
for el in body.children:
process_block(el)
bio = io.BytesIO()
try:
import logging as _log
_log.info("md_to_docx_bytes saving doc")
except Exception:
pass
doc.save(bio)
try:
import logging as _log
_log.info(f"md_to_docx_bytes done size={bio.tell()}")
except Exception:
pass
return bio.getvalue()
def md_to_pdf_bytes(md: str) -> bytes:
return md_to_pdf_bytes_with_renderer(md, renderer="weasyprint")
def _md_with_tables_to_html(md_text: str) -> str:
lines = md_text.splitlines()
out = []
i = 0
while i < len(lines):
line = lines[i]
def is_sep(s: str) -> bool:
s = s.strip()
if "|" not in s:
return False
s = s.strip("|")
return all(set(seg.strip()) <= set("-: ") and len(seg.strip()) >= 1 for seg in s.split("|"))
if "|" in line and i + 1 < len(lines) and is_sep(lines[i + 1]):
headers = [c.strip() for c in line.strip().strip("|").split("|")]
j = i + 2
rows = []
while j < len(lines) and "|" in lines[j]:
rows.append([c.strip() for c in lines[j].strip().strip("|").split("|")])
j += 1
tbl = ["<table>", "<thead><tr>"]
for h in headers:
tbl.append(f"<th>{h}</th>")
tbl.append("</tr></thead><tbody>")
for row in rows:
tbl.append("<tr>")
for idx in range(len(headers)):
cell = row[idx] if idx < len(row) else ""
tbl.append(f"<td>{cell}</td>")
tbl.append("</tr>")
tbl.append("</tbody></table>")
out.append("".join(tbl))
i = j
continue
out.append(line)
i += 1
return marko.convert("\n".join(out))
def _render_markdown_html(md_text: str) -> str:
if _HAS_MD_IT and _mdit is not None:
try:
md = _mdit.MarkdownIt("commonmark").enable(["table", "strikethrough"])
if _tasklists_plugin:
md.use(_tasklists_plugin)
if _deflist_plugin:
md.use(_deflist_plugin)
if _footnote_plugin:
md.use(_footnote_plugin)
if _attrs_plugin:
md.use(_attrs_plugin)
return md.render(md_text)
except Exception:
pass
return _md_with_tables_to_html(md_text)
def normalize_html(md_or_html: str, options: Optional[Dict[str, Optional[str]]] = None) -> str:
html = _render_markdown_html(md_or_html)
soup = BeautifulSoup(html, "html.parser")
for s in soup.find_all("strong", class_="subtitle"):
s.name = "h2"
s.attrs = {"data-origin": "subtitle"}
for a in soup.find_all("a"):
href_val = a.get("href")
extra_val = a.get("data-doc")
href = href_val if isinstance(href_val, str) else None
extra = extra_val if isinstance(extra_val, str) else None
resolved = resolve_link(href, extra)
if resolved:
a["href"] = resolved
elif not href and extra:
a.replace_with(a.get_text() + " [" + extra + "]")
opts = options or {}
header_text = opts.get("header_text") or None
footer_text = opts.get("footer_text") or None
logo_url = opts.get("logo_url") or None
copyright_text = opts.get("copyright_text") or None
cover_src = opts.get("cover_src") or None
product_name_opt = opts.get("product_name") or None
document_name_opt = opts.get("document_name") or None
product_version_opt = opts.get("product_version") or None
document_version_opt = opts.get("document_version") or None
toc_flag = bool(opts.get("toc"))
meta = soup.new_tag("div", attrs={"class": "doc-meta"})
if header_text:
ht = soup.new_tag("div", attrs={"class": "doc-header-text"})
text = header_text
left = text
right = ""
if "||" in text:
parts = text.split("||", 1)
left, right = parts[0], parts[1]
elif "|" in text:
parts = text.split("|", 1)
left, right = parts[0], parts[1]
if logo_url:
img = soup.new_tag("img", attrs={"class": "logo-inline", "src": logo_url})
ht.append(img)
hl = soup.new_tag("span", attrs={"class": "doc-header-left"})
hl.string = left
ht.append(hl)
if right.strip():
hr = soup.new_tag("span", attrs={"class": "doc-header-right"})
hr.string = right
ht.append(hr)
meta.append(ht)
else:
first_h1 = None
if soup.body:
first_h1 = soup.body.find("h1")
else:
first_h1 = soup.find("h1")
left = (first_h1.get_text(strip=True) if first_h1 else "文档")
right = opts.get("filename_text") or ""
ht = soup.new_tag("div", attrs={"class": "doc-header-text"})
if logo_url:
img = soup.new_tag("img", attrs={"class": "logo-inline", "src": logo_url})
ht.append(img)
hl = soup.new_tag("span", attrs={"class": "doc-header-left"})
hl.string = left
ht.append(hl)
if right:
hr = soup.new_tag("span", attrs={"class": "doc-header-right"})
hr.string = right
ht.append(hr)
meta.append(ht)
if footer_text:
ft = soup.new_tag("div", attrs={"class": "doc-footer-text"})
ft.string = footer_text
meta.append(ft)
page_header_val = (header_text or (document_name_opt or None))
if not page_header_val:
first_h1_for_header = None
if soup.body:
first_h1_for_header = soup.body.find("h1")
else:
first_h1_for_header = soup.find("h1")
page_header_val = (first_h1_for_header.get_text(strip=True) if first_h1_for_header else "文档")
page_footer_val = (footer_text or "FunMD")
ph = soup.new_tag("div", attrs={"class": "doc-page-header"})
if logo_url:
logo_inline = soup.new_tag("img", attrs={"src": logo_url, "class": "doc-page-header-logo"})
ph.append(logo_inline)
ht_inline = soup.new_tag("span", attrs={"class": "doc-page-header-text"})
ht_inline.string = page_header_val
ph.append(ht_inline)
meta.append(ph)
pf = soup.new_tag("div", attrs={"class": "doc-page-footer"})
pf.string = page_footer_val
meta.append(pf)
if copyright_text:
cp = soup.new_tag("div", attrs={"class": "doc-copyright"})
cp.string = copyright_text
meta.append(cp)
# brand logo is rendered inline within header; no separate top-left element
if soup.body:
soup.body.insert(0, meta)
else:
soup.insert(0, meta)
if not soup.head:
head = soup.new_tag("head")
soup.insert(0, head)
else:
head = soup.head
style_run = soup.new_tag("style")
style_run.string = "@page{margin:20mm}@page{\n @top-center{content: element(page-header)}\n @bottom-center{content: element(page-footer)}\n}\n.doc-page-header{position: running(page-header); font-size:10pt; color:#666; display:block; text-align:center; width:100%}\n.doc-page-header::after{content:''; display:block; width:80%; border-bottom:1px solid #d9d9d9; margin:4px auto 0}\n.doc-page-header-logo{height:20px; vertical-align:middle; margin-right:4px}\n.doc-page-header-text{vertical-align:middle}\n.doc-page-footer{position: running(page-footer); font-size:10pt; color:#666}\n.doc-page-footer::before{content:''; display:block; width:80%; border-top:1px solid #d9d9d9; margin:0 auto 4px}"
head.append(style_run)
# Fallback inline styles for cover to ensure visibility even if external CSS isn't loaded
if (cover_src or product_name_opt or document_name_opt or product_version_opt or document_version_opt):
if not soup.head:
head = soup.new_tag("head")
soup.insert(0, head)
else:
head = soup.head
style = soup.new_tag("style")
style.string = "@page:first{margin:0} html,body{margin:0;padding:0}.cover{position:relative;width:210mm;height:297mm;overflow:hidden;page-break-after:always}.cover .cover-bg{position:absolute;left:0;top:0;right:0;bottom:0;width:100%;height:100%;object-fit:cover;display:block}.cover .cover-brand{position:absolute;top:20mm;left:20mm;font-size:18pt;font-weight:700;color:#1d4ed8}.cover .cover-footer{position:absolute;left:0;right:0;bottom:0;background:#1d4ed8;color:#fff;padding:12mm 20mm}.cover .cover-title{font-size:24pt;font-weight:700;margin:0}.cover .cover-subtitle{font-size:13pt;margin-top:4pt}.cover .cover-meta{margin-top:8pt;font-size:11pt;display:flex;gap:20mm}"
head.append(style)
if cover_src or product_name_opt or document_name_opt or product_version_opt or document_version_opt:
cov = soup.new_tag("section", attrs={"class": "cover"})
if cover_src:
bg = soup.new_tag("img", attrs={"class": "cover-bg", "src": cover_src})
cov.append(bg)
if product_name_opt:
brand_el = soup.new_tag("div", attrs={"class": "cover-brand"})
brand_el.string = product_name_opt
cov.append(brand_el)
footer = soup.new_tag("div", attrs={"class": "cover-footer"})
title_text = document_name_opt or None
if not title_text:
first_h1 = soup.body.find("h1") if soup.body else soup.find("h1")
if first_h1:
title_text = first_h1.get_text(strip=True)
title_el = soup.new_tag("div", attrs={"class": "cover-title"})
title_el.string = title_text or "文档"
footer.append(title_el)
subtitle_val = opts.get("filename_text") or ""
if subtitle_val:
subtitle_el = soup.new_tag("div", attrs={"class": "cover-subtitle"})
subtitle_el.string = subtitle_val
footer.append(subtitle_el)
meta_el = soup.new_tag("div", attrs={"class": "cover-meta"})
if product_version_opt:
pv = soup.new_tag("span")
pv.string = f"产品版本:{product_version_opt}"
meta_el.append(pv)
if document_version_opt:
dv = soup.new_tag("span")
dv.string = f"文档版本:{document_version_opt}"
meta_el.append(dv)
footer.append(meta_el)
cov.append(footer)
if soup.body:
soup.body.insert(1, cov)
else:
soup.insert(1, cov)
if toc_flag:
headings = [
el for el in (soup.find_all(["h1", "h2", "h3"]) or [])
if el.get("data-origin") != "subtitle"
]
if headings:
ul = soup.new_tag("ul")
idx = 1
for el in headings:
text = el.get_text(strip=True)
if not text:
continue
hid = el.get("id")
if not hid:
hid = f"sec-{idx}"
el["id"] = hid
idx += 1
li = soup.new_tag("li", attrs={"class": f"toc-{el.name}"})
a = soup.new_tag("a", attrs={"href": f"#{hid}", "class": "toc-text"})
a.string = text
dots = soup.new_tag("span", attrs={"class": "toc-dots"})
page = soup.new_tag("span", attrs={"class": "toc-page", "data-target": f"#{hid}"})
li.append(a)
li.append(dots)
li.append(page)
ul.append(li)
nav = soup.new_tag("nav", attrs={"class": "toc"})
h = soup.new_tag("h1")
h.string = "目录"
nav.append(h)
nav.append(ul)
if soup.body:
soup.body.insert(2, nav)
else:
soup.insert(2, nav)
if soup.body:
for h in soup.body.find_all(["h1", "h2", "h3"]):
sib: Optional[PageElement] = h.find_next_sibling()
blocks: List[Any] = []
first_table: Optional[Any] = None
while sib is not None:
# Skip pure whitespace nodes
if getattr(sib, "name", None) is None:
try:
if str(sib).strip() == "":
sib = sib.next_sibling
continue
except Exception:
break
# Stop if next heading encountered
name = getattr(sib, "name", None)
if name in ["h1", "h2", "h3"]:
break
# Collect explanatory blocks until first table
if name == "table":
first_table = sib
break
if name in ["p", "blockquote", "ul", "ol"]:
blocks.append(sib)
sib = sib.next_sibling
continue
# Unknown block: stop grouping to avoid wrapping unrelated content
break
if first_table is not None:
wrap = soup.new_tag("div", attrs={"class": "table-block"})
h.insert_before(wrap)
wrap.append(h.extract())
for el in blocks:
wrap.append(el.extract())
wrap.append(first_table.extract())
return str(soup)
def _stylesheets_for(css_name: Optional[str], css_text: Optional[str]):
sheets: List[Any] = []
if CSS is None:
return sheets
if css_text:
sheets.append(CSS(string=css_text))
if css_name:
css_path = Path(__file__).resolve().parent.parent / "configs" / "styles" / f"{css_name}.css"
if css_path.exists():
sheets.append(CSS(filename=str(css_path)))
return sheets
def _render_pdf_with_reportlab(md: str) -> bytes:
"""
使用 reportlab 生成支持中文的 PDF纯 Python无外部依赖
完整支持 markdown 格式:标题、列表、代码块、表格、引用等
"""
print(f"[DEBUG] _render_pdf_with_reportlab 被调用, md 长度: {len(md)}")
bio = io.BytesIO()
# 创建 PDF 文档
doc = SimpleDocTemplate(
bio,
pagesize=A4,
rightMargin=20*mm,
leftMargin=20*mm,
topMargin=20*mm,
bottomMargin=20*mm,
)
# 存放 PDF 元素的列表
story = []
styles = getSampleStyleSheet()
# 尝试注册中文字体
try:
# Windows 系统字体
font_path = r"C:\Windows\Fonts\msyh.ttc" # 微软雅黑
if Path(font_path).exists():
pdfmetrics.registerFont(TTFont('ChineseFont', font_path, subfontIndex=0))
chinese_font = 'ChineseFont'
else:
# 尝试其他常见字体路径
alternative_fonts = [
r"C:\Windows\Fonts\simhei.ttf", # 黑体
r"C:\Windows\Fonts\simsun.ttc", # 宋体
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc", # Linux
"/System/Library/Fonts/PingFang.ttc", # macOS
]
chinese_font = 'Helvetica' # 默认
for font in alternative_fonts:
if Path(font).exists():
try:
pdfmetrics.registerFont(TTFont('ChineseFont', font))
chinese_font = 'ChineseFont'
break
except:
continue
except Exception:
chinese_font = 'Helvetica'
# 创建支持中文的样式
title_style = ParagraphStyle(
'ChineseTitle',
parent=styles['Heading1'],
fontName=chinese_font,
fontSize=18,
textColor=colors.black,
spaceAfter=12,
spaceBefore=12,
leading=22,
)
heading2_style = ParagraphStyle(
'ChineseHeading2',
parent=styles['Heading2'],
fontName=chinese_font,
fontSize=14,
textColor=colors.black,
spaceAfter=10,
spaceBefore=10,
leading=18,
)
heading3_style = ParagraphStyle(
'ChineseHeading3',
parent=styles['Heading3'],
fontName=chinese_font,
fontSize=12,
textColor=colors.black,
spaceAfter=8,
spaceBefore=8,
leading=16,
)
normal_style = ParagraphStyle(
'ChineseNormal',
parent=styles['Normal'],
fontName=chinese_font,
fontSize=10,
textColor=colors.black,
spaceAfter=8,
wordWrap='CJK', # 中文换行支持
leading=14,
)
blockquote_style = ParagraphStyle(
'ChineseBlockquote',
parent=normal_style,
fontName=chinese_font,
leftIndent=10*mm,
textColor=colors.Color(0.4, 0.4, 0.4),
spaceAfter=8,
backColor=colors.Color(0.95, 0.95, 0.95),
)
code_block_style = ParagraphStyle(
'ChineseCodeBlock',
parent=normal_style,
fontName='Courier',
fontSize=8,
textColor=colors.black,
backColor=colors.Color(0.98, 0.98, 0.98),
leftIndent=5*mm,
rightIndent=5*mm,
spaceAfter=10,
spaceBefore=10,
leading=12,
)
# 解析 markdown
lines = md.split('\n')
i = 0
in_code_block = False
code_lang = ''
code_lines = []
def process_inline_markdown(text: str) -> str:
"""处理行内 markdown 格式:粗体、斜体、行内代码、链接"""
# 使用占位符来保护我们生成的 HTML 标签
placeholders = {}
placeholder_idx = 0
def save_placeholder(content):
nonlocal placeholder_idx
key = f"__PLACEHOLDER_{placeholder_idx}__"
placeholder_idx += 1
placeholders[key] = content
return key
# 先进行 HTML 转义(处理用户输入中的特殊字符)
text = text.replace('<', '&lt;').replace('>', '&gt;')
# 处理行内代码(避免和其他标记冲突)
def replace_code(match):
code_text = match.group(1)
# 代码内容不需要转义,直接使用
html = f'<font face="Courier" color="#d63384">{code_text}</font>'
return save_placeholder(html)
text = re.sub(r'`([^`]+)`', replace_code, text)
# 处理粗体
def replace_bold(match):
content = match.group(1)
html = f'<b>{content}</b>'
return save_placeholder(html)
text = re.sub(r'\*\*([^*]+)\*\*', replace_bold, text)
# 处理斜体
def replace_italic(match):
content = match.group(1)
html = f'<i>{content}</i>'
return save_placeholder(html)
text = re.sub(r'\*([^*]+)\*', replace_italic, text)
# 处理链接 [text](url) - 使用 reportlab 的 link 标签创建可点击的超链接
def replace_link(match):
link_text = match.group(1)
url = match.group(2)
# 使用蓝色下划线样式link href 属性使链接可点击
html = f'<a href="{url}" color="blue"><u>{link_text}</u></a>'
return save_placeholder(html)
text = re.sub(r'\[([^\]]+)\]\(([^\)]+)\)', replace_link, text)
# 还原占位符为实际的 HTML 标签
for key, value in placeholders.items():
text = text.replace(key, value)
return text
def parse_table(table_lines: list) -> None:
"""解析 markdown 表格并添加到 story"""
if not table_lines:
return
# 解析分隔行以确定列对齐方式
separator_line = table_lines[1] if len(table_lines) > 1 else ""
alignments = []
if separator_line:
parts = separator_line.split('|')[1:-1] # 去掉首尾空元素
for part in parts:
part = part.strip()
if part.startswith(':') and part.endswith(':'):
alignments.append('CENTER')
elif part.endswith(':'):
alignments.append('RIGHT')
else:
alignments.append('LEFT')
# 解析表头
header_cells = [cell.strip() for cell in table_lines[0].split('|')[1:-1]]
# 处理表头中的行内样式
processed_headers = []
for cell in header_cells:
processed = process_inline_markdown(cell)
processed_headers.append(Paragraph(processed, normal_style))
# 解析数据行(跳过分隔行)
row_data = [processed_headers]
for line in table_lines[2:]:
if '|' in line:
cells = [cell.strip() for cell in line.split('|')[1:-1]]
# 处理每个单元格中的行内样式
processed_cells = []
for cell in cells:
processed = process_inline_markdown(cell)
processed_cells.append(Paragraph(processed, normal_style))
row_data.append(processed_cells)
# 计算列宽(自动调整)
col_widths = []
num_cols = len(header_cells)
if num_cols > 0:
# 计算每列的最大宽度
max_content_width = (A4[0] - 40*mm) / num_cols # 减去左右边距
col_widths = [max_content_width] * num_cols
# 创建表格样式
table_style = TableStyle([
# 表头样式
('BACKGROUND', (0, 0), (-1, 0), colors.Color(0.4, 0.6, 0.9)), # 蓝色背景
('TEXTCOLOR', (0, 0), (-1, 0), colors.white),
('FONTNAME', (0, 0), (-1, 0), chinese_font),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 8),
('TOPPADDING', (0, 0), (-1, 0), 8),
('LEFTPADDING', (0, 0), (-1, -1), 6),
('RIGHTPADDING', (0, 0), (-1, -1), 6),
# 表头边框
('LINEABOVE', (0, 0), (-1, 0), 1, colors.black),
('LINEBELOW', (0, 0), (-1, 0), 1, colors.black),
('LINEBEFORE', (0, 0), (0, -1), 0.5, colors.grey),
('LINEAFTER', (-1, 0), (-1, -1), 0.5, colors.grey),
# 数据行样式
('BACKGROUND', (0, 1), (-1, -1), colors.white),
('FONTNAME', (0, 1), (-1, -1), chinese_font),
('FONTSIZE', (0, 1), (-1, -1), 9),
('TOPPADDING', (0, 1), (-1, -1), 6),
('BOTTOMPADDING', (0, 1), (-1, -1), 6),
# 斑马纹效果(交替行背景色)
('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, colors.Color(0.95, 0.95, 0.98)]),
# 网格线
('GRID', (0, 0), (-1, -1), 0.5, colors.grey),
# 设置对齐方式
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
])
# 应用列对齐
for col_idx, align in enumerate(alignments):
if align:
table_style.add('ALIGN', (col_idx, 0), (col_idx, -1), align)
# 创建表格
t = Table(row_data, colWidths=col_widths)
t.setStyle(table_style)
story.append(t)
story.append(Spacer(1, 8*mm))
while i < len(lines):
line = lines[i]
# 代码块处理
if line.strip().startswith('```'):
if in_code_block:
# 代码块结束
code_text = '\n'.join(code_lines)
# 使用 pre 标签保留格式
escaped_code = code_text.replace('<', '&lt;').replace('>', '&gt;')
story.append(Paragraph(f'<font face="Courier" size="8">{escaped_code}</font>', code_block_style))
story.append(Spacer(1, 3*mm))
code_lines = []
in_code_block = False
else:
in_code_block = True
code_lang = line.strip()[3:] # 获取语言标识
i += 1
continue
if in_code_block:
code_lines.append(line)
i += 1
continue
# 表格处理
if '|' in line and i + 1 < len(lines) and '|' in lines[i + 1]:
# 检查是否是分隔行
next_line = lines[i + 1].strip()
if re.match(r'^\|?\s*:?-+:?\s*(\|:?-+:?\s*)*\|?$', next_line):
table_lines = [line, next_line] # 包含表头和分隔行
i += 2
# 收集所有表格数据行
while i < len(lines) and '|' in lines[i] and not lines[i].strip().startswith('```'):
table_lines.append(lines[i])
i += 1
parse_table(table_lines)
continue
# 标题处理
if line.startswith('#### '):
text = process_inline_markdown(line[5:].strip())
h4_style = ParagraphStyle(
'ChineseHeading4',
parent=heading3_style,
fontSize=11,
)
story.append(Paragraph(text, h4_style))
elif line.startswith('### '):
text = process_inline_markdown(line[4:].strip())
story.append(Paragraph(text, heading3_style))
elif line.startswith('## '):
text = process_inline_markdown(line[3:].strip())
story.append(Paragraph(text, heading2_style))
elif line.startswith('# '):
text = process_inline_markdown(line[2:].strip())
story.append(Paragraph(text, title_style))
# 引用块处理
elif line.strip().startswith('>'):
quote_text = line.strip()[1:].strip()
processed = process_inline_markdown(quote_text)
story.append(Paragraph(processed, blockquote_style))
# 无序列表处理(包括任务列表)
elif line.strip().startswith('- ') or line.strip().startswith('* '):
content = line.strip()[2:].strip()
# 检查是否是任务列表 [ ] 或 [x]
task_checked = None
if content.startswith('[ ]'):
# 未完成的任务
task_text = content[2:].strip()
task_checked = False
elif content.startswith('[x]') or content.startswith('[X]'):
# 已完成的任务
task_text = content[2:].strip()
task_checked = True
else:
# 普通列表项
task_text = content
task_checked = None
text = process_inline_markdown(task_text)
if task_checked is True:
# 使用复选框符号表示已完成
story.append(Paragraph(f'{text}', normal_style))
elif task_checked is False:
# 使用复选框符号表示未完成
story.append(Paragraph(f'{text}', normal_style))
else:
# 普通列表项
story.append(Paragraph(f'{text}', normal_style))
# 有序列表处理
elif re.match(r'^\s*\d+\.\s', line.strip()):
match = re.match(r'^\s*(\d+)\.\s(.*)$', line.strip())
if match:
num = match.group(1)
text = process_inline_markdown(match.group(2))
story.append(Paragraph(f'{num}. {text}', normal_style))
# 分隔线
elif line.strip() in ['---', '***', '___']:
# 使用 HRFlowable 绘制水平分割线
story.append(Spacer(1, 3*mm))
story.append(HRFlowable(
width="100%",
thickness=0.5,
lineCap='round',
color=colors.grey,
spaceBefore=1*mm,
spaceAfter=3*mm,
))
# 空行
elif not line.strip():
story.append(Spacer(1, 2*mm))
# 普通段落(可能跨多行)
elif line.strip():
# 收集连续的非空行作为段落
paragraph_lines = [line.strip()]
i += 1
while i < len(lines):
next_line = lines[i].strip()
# 遇到空行、标题、列表等特殊行时停止
if (not next_line or
next_line.startswith('#') or
next_line.startswith('>') or
next_line.startswith('-') or
next_line.startswith('*') or
next_line.startswith('```') or
re.match(r'^\d+\.\s', next_line) or
(next_line.startswith('---') or next_line.startswith('***')) or
('|' in next_line and i + 1 < len(lines) and '|' in lines[i + 1])):
break
paragraph_lines.append(next_line)
i += 1
paragraph_text = ' '.join(paragraph_lines)
processed = process_inline_markdown(paragraph_text)
story.append(Paragraph(processed, normal_style))
i -= 1 # 回退一行,因为外层会 i += 1
i += 1
# 生成 PDF
doc.build(story)
return bio.getvalue()
def _render_pdf_with_xhtml2pdf(md: str, html: str, css_name: Optional[str], css_text: Optional[str]) -> bytes:
"""
使用 xhtml2pdf 渲染 PDF纯 Python无外部依赖
"""
# 使用简单的 markdown 转 HTML避免复杂的 normalize_html
simple_html = _render_markdown_html(md)
# 构建完整的 HTML 文档,确保格式正确
full_html = f'''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
@page {{
margin: 20mm;
}}
body {{
font-family: "Microsoft YaHei", "SimSun", Arial, sans-serif;
font-size: 12pt;
line-height: 1.6;
}}
h1, h2, h3, h4, h5, h6 {{
color: #333;
margin-top: 1em;
margin-bottom: 0.5em;
}}
h1 {{ font-size: 24pt; font-weight: bold; }}
h2 {{ font-size: 20pt; font-weight: bold; }}
h3 {{ font-size: 16pt; font-weight: bold; }}
p {{ margin-bottom: 1em; }}
ul, ol {{ margin-left: 2em; }}
table {{
border-collapse: collapse;
width: 100%;
margin: 1em 0;
}}
th, td {{
border: 1px solid #ddd;
padding: 8px;
}}
th {{
background-color: #f2f2f2;
}}
a {{ color: #1d4ed8; text-decoration: underline; }}
</style>
</head>
<body>
{simple_html}
</body>
</html>'''
# 使用 BytesIO 接收 PDF 输出
bio = io.BytesIO()
# 调用 pisa.CreatePDF
_pisa.CreatePDF(
full_html,
dest=bio,
encoding='utf-8'
)
return bio.getvalue()
def md_to_pdf_bytes_with_renderer(md: str, renderer: str = "weasyprint", css_name: Optional[str] = None, css_text: Optional[str] = None, toc: bool = False, header_text: Optional[str] = None, footer_text: Optional[str] = None, logo_url: Optional[str] = None, copyright_text: Optional[str] = None, filename_text: Optional[str] = None, cover_src: Optional[str] = None, product_name: Optional[str] = None, document_name: Optional[str] = None, product_version: Optional[str] = None, document_version: Optional[str] = None) -> bytes:
html = normalize_html(md, options={
"toc": "1" if toc else "",
"header_text": header_text,
"footer_text": footer_text,
"logo_url": logo_url,
"copyright_text": copyright_text,
"filename_text": filename_text,
"cover_src": cover_src,
"product_name": product_name,
"document_name": document_name,
"product_version": product_version,
"document_version": document_version,
})
# ========== PDF 渲染优先级 ==========
# 1. reportlab (首选) - 纯 Python支持中文跨平台兼容
# 2. WeasyPrint - 需要 GTK 系统库Windows 上安装复杂
# =====================================
print(f"[DEBUG] 开始 PDF 转换, _HAS_REPORTLAB={_HAS_REPORTLAB}, HTML is None={HTML is None}")
# 首选reportlab纯 Python支持中文无需外部依赖
if _HAS_REPORTLAB:
try:
print(f"[DEBUG] 尝试使用 reportlab...")
return _render_pdf_with_reportlab(md)
except Exception as e:
# reportlab 失败,记录错误并继续尝试下一个方案
import traceback
error_detail = traceback.format_exc()
print(f"[DEBUG] reportlab 失败: {str(e)}")
print(f"[DEBUG] 错误详情:\n{error_detail}")
# 备选WeasyPrint需要系统库支持
if HTML is not None:
try:
print(f"[DEBUG] 尝试使用 WeasyPrint...")
stylesheets = _stylesheets_for(css_name, css_text)
pdf_bytes = HTML(string=html).write_pdf(stylesheets=stylesheets or None)
return pdf_bytes
except Exception as e:
# WeasyPrint 失败,记录错误
import traceback
error_detail = traceback.format_exc()
print(f"[DEBUG] WeasyPrint 失败: {str(e)}")
print(f"[DEBUG] 错误详情:\n{error_detail}")
raise RuntimeError("PDF 转换失败。reportlab 已安装但转换失败,请检查 markdown 格式")