from pathlib import Path from typing import Optional, Tuple, Dict, List, Any from urllib.parse import urlparse, unquote import os import re import io from bs4 import BeautifulSoup from bs4.element import PageElement import marko import sys try: _DOC_BASE = Path(__file__).resolve().parents[2] / "docling" p = str(_DOC_BASE) if p not in sys.path: sys.path.insert(0, p) except Exception: pass try: from docling.document_converter import DocumentConverter except Exception: class DocumentConverter: # type: ignore def __init__(self, *args, **kwargs): pass def convert(self, source): raise RuntimeError("docling not available") from docx import Document from docx.shared import Mm, Pt from docx.enum.section import WD_SECTION from docx.enum.text import WD_PARAGRAPH_ALIGNMENT from docx.oxml import OxmlElement from docx.oxml.ns import qn from urllib.request import urlopen import json try: from weasyprint import HTML, CSS # type: ignore except Exception: HTML = None CSS = None _mdit: Any = None _tasklists_plugin: Any = None _deflist_plugin: Any = None _footnote_plugin: Any = None _attrs_plugin: Any = None _HAS_MD_IT: bool = False try: import markdown_it as _mdit # type: ignore from mdit_py_plugins.tasklists import tasklists_plugin as _tasklists_plugin # type: ignore from mdit_py_plugins.deflist import deflist_plugin as _deflist_plugin # type: ignore from mdit_py_plugins.footnote import footnote_plugin as _footnote_plugin # type: ignore from mdit_py_plugins.attrs import attrs_plugin as _attrs_plugin # type: ignore _HAS_MD_IT = True except Exception: pass converter = DocumentConverter() LINKMAP_PATH = Path(__file__).resolve().parent.parent / "configs" / "linkmap" / "linkmap.json" _LINKMAP: Dict[str, str] = {} def load_linkmap() -> Dict[str, str]: global _LINKMAP try: if LINKMAP_PATH.exists(): _LINKMAP = json.loads(LINKMAP_PATH.read_text("utf-8")) or {} except Exception: _LINKMAP = {} return _LINKMAP def save_linkmap(mapping: Dict[str, str]) -> None: LINKMAP_PATH.parent.mkdir(parents=True, exist_ok=True) LINKMAP_PATH.write_text(json.dumps(mapping, ensure_ascii=False, indent=2), "utf-8") load_linkmap() def resolve_link(href: Optional[str], data_doc: Optional[str]) -> Optional[str]: if href: return href if not _LINKMAP: load_linkmap() if data_doc and data_doc in _LINKMAP: return _LINKMAP[data_doc] return None def export_payload(doc, fmt: str) -> Tuple[str, str]: f = fmt.lower() if f == "markdown": return doc.export_to_markdown(), "text/markdown" if f == "html": return doc.export_to_html(), "text/html" if f == "json": return doc.export_to_json(), "application/json" if f == "doctags": return doc.export_to_doctags(), "application/json" raise ValueError("unsupported export") def infer_basename(source_url: Optional[str], upload_name: Optional[str]) -> str: if source_url: path = urlparse(source_url).path name = os.path.basename(path) or "document" name = unquote(name) return os.path.splitext(name)[0] or "document" if upload_name: name = os.path.splitext(os.path.basename(upload_name))[0] or "document" return name return "document" def sanitize_filename(name: Optional[str]) -> str: if not name: return "document" name = name.strip()[:128] name = re.sub(r'[<>:"/\\|?*\x00-\x1F]', "_", name) or "document" return name def convert_source(source: str, export: str) -> Tuple[str, str]: result = converter.convert(source) return export_payload(result.document, export) def md_to_docx_bytes(md: str, toc: bool = False, header_text: Optional[str] = None, footer_text: Optional[str] = None, logo_url: Optional[str] = None, copyright_text: Optional[str] = None, filename_text: Optional[str] = None, cover_src: Optional[str] = None, product_name: Optional[str] = None, document_name: Optional[str] = None, product_version: Optional[str] = None, document_version: Optional[str] = None) -> bytes: try: import logging as _log _log.info(f"md_to_docx_bytes start toc={toc} header={bool(header_text)} footer={bool(footer_text)} logo={bool(logo_url)} cover={bool(cover_src)}") except Exception: pass def _add_field(paragraph, instr: str): r1 = paragraph.add_run() b = OxmlElement('w:fldChar') b.set(qn('w:fldCharType'), 'begin') r1._r.append(b) r2 = paragraph.add_run() t = OxmlElement('w:instrText') t.set(qn('xml:space'), 'preserve') t.text = instr r2._r.append(t) r3 = paragraph.add_run() e = OxmlElement('w:fldChar') e.set(qn('w:fldCharType'), 'end') r3._r.append(e) def _available_width(section) -> int: return section.page_width - section.left_margin - section.right_margin def _fetch_bytes(u: str) -> Optional[bytes]: try: if u.lower().startswith('http://') or u.lower().startswith('https://'): with urlopen(u, timeout=10) as r: return r.read() p = Path(u) if p.exists() and p.is_file(): return p.read_bytes() except Exception: return None return None html = normalize_html(md, options={ "toc": "1" if toc else "", "header_text": header_text, "footer_text": footer_text, "logo_url": logo_url, "copyright_text": copyright_text, "filename_text": filename_text, "cover_src": cover_src, "product_name": product_name, "document_name": document_name, "product_version": product_version, "document_version": document_version, }) try: import logging as _log _log.info(f"md_to_docx_bytes normalize_html length={len(html)}") except Exception: pass soup = BeautifulSoup(html, "html.parser") doc = Document() sec0 = doc.sections[0] sec0.page_width = Mm(210) sec0.page_height = Mm(297) sec0.left_margin = Mm(15) sec0.right_margin = Mm(15) sec0.top_margin = Mm(20) sec0.bottom_margin = Mm(20) has_cover = bool(cover_src or (soup.find('section', class_='cover') is not None)) if has_cover: sec0.left_margin = Mm(0) sec0.right_margin = Mm(0) sec0.top_margin = Mm(0) sec0.bottom_margin = Mm(0) if cover_src: b = _fetch_bytes(cover_src) if b: bio = io.BytesIO(b) doc.add_picture(bio, width=_available_width(sec0)) if product_name: p = doc.add_paragraph() r = p.add_run(product_name) r.font.size = Pt(18) r.bold = True t = document_name or None if not t: h1 = soup.body.find('h1') if soup.body else soup.find('h1') t = h1.get_text(strip=True) if h1 else '文档' p2 = doc.add_paragraph() r2 = p2.add_run(t or '文档') r2.font.size = Pt(24) r2.bold = True if filename_text: p3 = doc.add_paragraph() r3 = p3.add_run(filename_text) r3.font.size = Pt(13) meta_parts = [] if product_version: meta_parts.append("产品版本:" + product_version) if document_version: meta_parts.append("文档版本:" + document_version) if meta_parts: pm = doc.add_paragraph(" ".join(meta_parts)) pm.font = None doc.add_section(WD_SECTION.NEW_PAGE) sec = doc.sections[-1] sec.page_width = Mm(210) sec.page_height = Mm(297) sec.left_margin = Mm(15) sec.right_margin = Mm(15) sec.top_margin = Mm(20) sec.bottom_margin = Mm(20) else: sec = sec0 if header_text or logo_url or filename_text: hp = sec.header.add_paragraph() left = header_text or '' right = '' if '||' in left: parts = left.split('||', 1) left, right = parts[0], parts[1] elif '|' in left: parts = left.split('|', 1) left, right = parts[0], parts[1] if left.strip(): hp.add_run(left.strip()) if right.strip(): rp = sec.header.add_paragraph() rp.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT rp.add_run(right.strip()) elif filename_text: rp = sec.header.add_paragraph() rp.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT rp.add_run(filename_text) if footer_text or copyright_text: fp = sec.footer.add_paragraph() if footer_text: fp.add_run(footer_text) if copyright_text: cp = sec.footer.add_paragraph() cp.add_run(copyright_text) pn = sec.footer.add_paragraph() pn.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT _add_field(pn, 'PAGE') if toc: doc.add_paragraph('目录') _add_field(doc.add_paragraph(), 'TOC \\o "1-3" \\h \\z \\u') doc.add_page_break() def add_inline(p, node): if isinstance(node, str): p.add_run(node) return if node.name in ['strong', 'b']: r = p.add_run(node.get_text()) r.bold = True return if node.name in ['em', 'i']: r = p.add_run(node.get_text()) r.italic = True return if node.name == 'code': r = p.add_run(node.get_text()) r.font.name = 'Courier New' return if node.name == 'a': text = node.get_text() href = node.get('href') extra = node.get('data-doc') resolved = resolve_link(href, extra) if resolved: p.add_run(text + ' [' + resolved + ']') else: p.add_run(text) return if node.name == 'img': src = node.get('src') or '' b = _fetch_bytes(src) if b: bio = io.BytesIO(b) try: doc.add_picture(bio, width=_available_width(sec)) except Exception: pass return for c in getattr(node, 'children', []): add_inline(p, c) def process_block(el): name = getattr(el, 'name', None) if name is None: return cls = el.get('class') or [] if name == 'div' and 'doc-meta' in cls: return if name == 'section' and 'cover' in cls: return if name == 'nav' and 'toc' in cls: return if name == 'div': for child in el.children: process_block(child) return if name == 'h1': doc.add_heading(el.get_text(), level=1) return if name == 'h2' or (name == 'strong' and 'subtitle' in cls): doc.add_heading(el.get_text(), level=2) return if name == 'h3': doc.add_heading(el.get_text(), level=3) return if name == 'p': p = doc.add_paragraph() for c in el.children: add_inline(p, c) return if name in ['ul', 'ol']: for li in el.find_all('li', recursive=False): p = doc.add_paragraph(style='List Bullet') for c in li.children: add_inline(p, c) return if name == 'pre': code = el.get_text() or '' p = doc.add_paragraph() run = p.add_run(code) run.font.name = 'Courier New' return if name == 'blockquote': p = doc.add_paragraph(el.get_text()) p.paragraph_format.left_indent = Mm(10) return if name == 'table': rows = [] thead = el.find('thead') tbody = el.find('tbody') if thead: hdrs = [th.get_text(strip=True) for th in thead.find_all('th')] else: hdrs = [cell.get_text(strip=True) for cell in el.find_all('tr')[0].find_all(['th','td'])] if el.find_all('tr') else [] trs = tbody.find_all('tr') if tbody else el.find_all('tr')[1:] for tr in trs: tds = [td.get_text(strip=True) for td in tr.find_all('td')] rows.append(tds) tbl = doc.add_table(rows=1 + len(rows), cols=len(hdrs) or 1) hdr = tbl.rows[0].cells for k, h in enumerate(hdrs or ['']): hdr[k].text = h for r_idx, row in enumerate(rows): cells = tbl.rows[1 + r_idx].cells for c_idx in range(len(hdrs) or 1): cells[c_idx].text = (row[c_idx] if c_idx < len(row) else '') return if name == 'img': src = el.get('src') or '' b = _fetch_bytes(src) if b: bio = io.BytesIO(b) try: doc.add_picture(bio, width=_available_width(sec)) except Exception: pass return body = soup.body or soup for el in body.children: process_block(el) bio = io.BytesIO() try: import logging as _log _log.info("md_to_docx_bytes saving doc") except Exception: pass doc.save(bio) try: import logging as _log _log.info(f"md_to_docx_bytes done size={bio.tell()}") except Exception: pass return bio.getvalue() def md_to_pdf_bytes(md: str) -> bytes: return md_to_pdf_bytes_with_renderer(md, renderer="weasyprint") def _md_with_tables_to_html(md_text: str) -> str: lines = md_text.splitlines() out = [] i = 0 while i < len(lines): line = lines[i] def is_sep(s: str) -> bool: s = s.strip() if "|" not in s: return False s = s.strip("|") return all(set(seg.strip()) <= set("-: ") and len(seg.strip()) >= 1 for seg in s.split("|")) if "|" in line and i + 1 < len(lines) and is_sep(lines[i + 1]): headers = [c.strip() for c in line.strip().strip("|").split("|")] j = i + 2 rows = [] while j < len(lines) and "|" in lines[j]: rows.append([c.strip() for c in lines[j].strip().strip("|").split("|")]) j += 1 tbl = ["
| {h} | ") tbl.append("
|---|
| {cell} | ") tbl.append("