Files
FunMD_Convert/docling/app/services/unified_converter.py

493 lines
17 KiB
Python
Raw Permalink Normal View History

2026-01-07 17:18:26 +08:00
from pathlib import Path
from typing import Optional, Tuple
import re
import tempfile
import sys
from urllib.parse import urlsplit
from urllib.request import urlopen
from urllib.error import HTTPError, URLError
import io
_DOC_AVAILABLE = True
try:
_DOC_BASE = Path(__file__).resolve().parents[2] / "docling"
p = str(_DOC_BASE)
if p not in sys.path:
sys.path.insert(0, p)
except Exception:
pass
try:
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import InputFormat
from docling.document_converter import PdfFormatOption
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling_core.types.doc import ImageRefMode
except Exception:
_DOC_AVAILABLE = False
class DocumentConverter: # type: ignore
def __init__(self, *args, **kwargs):
pass
def convert(self, source):
raise RuntimeError("docling unavailable")
class InputFormat: # type: ignore
PDF = "pdf"
class PdfFormatOption: # type: ignore
def __init__(self, *args, **kwargs):
pass
class StandardPdfPipeline: # type: ignore
pass
class PdfPipelineOptions: # type: ignore
def __init__(self):
pass
class ImageRefMode: # type: ignore
EMBEDDED = None
"""
@api Unified Converter Service
@description Provides core document conversion logic unifying Docling and word2markdown engines
"""
_W2M_AVAILABLE = False
try:
from app.services.word2markdown import convert_any as _w2m_convert_any # type: ignore
_W2M_AVAILABLE = True
except Exception:
_W2M_AVAILABLE = False
try:
from bs4 import BeautifulSoup # type: ignore
except Exception:
BeautifulSoup = None # type: ignore
try:
from app.services.docling_adapter import normalize_html as _normalize_html # type: ignore
from app.services.docling_adapter import resolve_link as _resolve_link # type: ignore
from app.services.docling_adapter import _render_markdown_html as _render_md_html # type: ignore
except Exception:
_normalize_html = None # type: ignore
_resolve_link = None # type: ignore
_render_md_html = None # type: ignore
def _is_http(s: str) -> bool:
t = (s or "").lower()
return t.startswith("http://") or t.startswith("https://")
def _read_bytes(source: str) -> Tuple[bytes, str]:
ct = ""
try:
if _is_http(source):
from urllib.request import urlopen
with urlopen(source, timeout=10) as r:
ct = r.headers.get("Content-Type") or ""
return r.read() or b"", ct
p = Path(source)
if p.exists() and p.is_file():
return p.read_bytes(), ct
except Exception:
return b"", ct
return b"", ct
def _decode_to_utf8(raw: bytes, ct: str = "") -> str:
if not raw:
return ""
if raw.startswith(b"\xef\xbb\xbf"):
try:
return raw[3:].decode("utf-8")
except Exception:
pass
if raw.startswith(b"\xff\xfe"):
try:
return raw[2:].decode("utf-16le")
except Exception:
pass
if raw.startswith(b"\xfe\xff"):
try:
return raw[2:].decode("utf-16be")
except Exception:
pass
try:
m = re.search(r"charset=([\w-]+)", ct or "", re.IGNORECASE)
if m:
enc = m.group(1).strip().lower()
try:
return raw.decode(enc)
except Exception:
pass
except Exception:
pass
candidates = [
"utf-8", "gb18030", "gbk", "big5", "shift_jis", "iso-8859-1", "windows-1252",
]
for enc in candidates:
try:
return raw.decode(enc)
except Exception:
continue
return raw.decode("utf-8", errors="replace")
def _normalize_newlines(s: str) -> str:
return (s or "").replace("\r\n", "\n").replace("\r", "\n")
def _html_to_markdown(html: str) -> str:
if not html:
return ""
if BeautifulSoup is None:
return html
soup = BeautifulSoup(html, "html.parser")
out: list[str] = []
def txt(node) -> str:
return (getattr(node, "get_text", lambda **kwargs: str(node))(strip=True) if node else "")
def inline(node) -> str:
if isinstance(node, str):
return node
name = getattr(node, "name", None)
if name in {None}: # type: ignore
return str(node)
if name in {"strong", "b"}:
return "**" + txt(node) + "**"
if name in {"em", "i"}:
return "*" + txt(node) + "*"
if name == "code":
return "`" + txt(node) + "`"
if name == "a":
href_val = node.get("href")
extra_val = node.get("data-doc")
href = href_val if isinstance(href_val, str) else None
extra = extra_val if isinstance(extra_val, str) else None
resolved = _resolve_link(href, extra) if _resolve_link else (href or extra)
url = resolved or ""
text = txt(node)
if url:
return f"[{text}]({url})"
return text
if name == "img":
alt = node.get("alt") or "image"
src = node.get("src") or ""
return f"![{alt}]({src})"
res = []
for c in getattr(node, "children", []):
res.append(inline(c))
return "".join(res)
def block(node):
name = getattr(node, "name", None)
if name is None:
s = str(node).strip()
if s:
out.append(s)
return
if name in {"h1", "h2", "h3", "h4", "h5", "h6"}:
lvl = int(name[1])
out.append("#" * lvl + " " + txt(node))
out.append("")
return
if name == "p":
segs = [inline(c) for c in node.children]
out.append("".join(segs))
out.append("")
return
if name == "br":
out.append("")
return
if name in {"ul", "ol"}:
is_ol = name == "ol"
idx = 1
for li in node.find_all("li", recursive=False):
text = "".join(inline(c) for c in li.children)
if is_ol:
out.append(f"{idx}. {text}")
idx += 1
else:
out.append(f"- {text}")
out.append("")
return
if name == "pre":
code_node = node.find("code")
code_text = code_node.get_text() if code_node else node.get_text()
lang = ""
cls = (code_node.get("class") if code_node else node.get("class")) or []
for c in cls:
s = str(c)
if s.startswith("language-"):
lang = s.split("-", 1)[-1]
break
out.append(f"```{lang}\n{code_text}\n```\n")
return
if name == "blockquote":
lines = [l for l in txt(node).splitlines() if l.strip()]
for l in lines:
out.append("> " + l)
out.append("")
return
if name == "table":
rows = node.find_all("tr")
if not rows:
return
headers = [h.get_text(strip=True) for h in (rows[0].find_all(["th","td"]) or [])]
if headers:
out.append("|" + "|".join(headers) + "|")
sep = "|" + "|".join(["---" for _ in headers]) + "|"
out.append(sep)
for tr in rows[1:]:
cells = [td.get_text(strip=True) for td in tr.find_all("td")]
if cells:
out.append("|" + "|".join(cells) + "|")
out.append("")
return
if name == "div":
for c in node.children:
block(c)
return
segs = [inline(c) for c in node.children]
if segs:
out.append("".join(segs))
out.append("")
root = soup.body or soup
for ch in getattr(root, "children", []):
block(ch)
return _normalize_newlines("\n".join(out)).strip()
def _lower_html_table_tags(html: str) -> str:
"""
@function _lower_html_table_tags
@description Normalizes HTML table tags to lowercase
@param html Input HTML string
@return Normalized HTML string
"""
if not html:
return html
tags = ["TABLE", "THEAD", "TBODY", "TFOOT", "TR", "TH", "TD"]
out = html
for t in tags:
out = re.sub(r"</?" + t + r"\b", lambda m: m.group(0).lower(), out)
out = re.sub(r">\s*\n+\s*", ">\n", out)
return out
def _replace_admonitions(md: str) -> str:
"""
@function _replace_admonitions
@description Replaces ::: style admonitions with !!! style
@param md Input markdown string
@return Processed markdown string
"""
if not md:
return md
lines = md.split("\n")
out = []
in_block = False
for raw in lines:
t = raw.strip()
if t.startswith(":::"):
if not in_block:
name = t[3:].strip()
if not name:
out.append("!!!")
else:
out.append("!!! " + name)
in_block = True
else:
out.append("!!!")
in_block = False
continue
out.append(raw)
return "\n".join(out)
def _enhance_codeblocks(md: str) -> str:
if not md:
return md
lines = md.split("\n")
res = []
in_fence = False
fence_lang = ""
i = 0
while i < len(lines):
line = lines[i]
t = line.strip()
if t.startswith("```"):
in_fence = not in_fence
try:
fence_lang = (t[3:] or "").strip() if in_fence else ""
except Exception:
fence_lang = ""
res.append(line)
i += 1
continue
if in_fence:
res.append(line)
i += 1
continue
if t.startswith("{") or t.startswith("["):
buf = [line]
j = i + 1
closed = False
depth = t.count("{") - t.count("}")
while j < len(lines):
buf.append(lines[j])
s = lines[j].strip()
depth += s.count("{") - s.count("}")
if depth <= 0 and s.endswith("}"):
closed = True
break
j += 1
if closed and len(buf) >= 3:
lang = "json"
res.append("```" + lang)
res.extend(buf)
res.append("```")
i = j + 1
continue
code_sig = (
("public static" in t) or ("private static" in t) or ("class " in t) or ("return " in t) or ("package " in t) or ("import " in t)
)
if code_sig:
buf = [line]
j = i + 1
while j < len(lines):
s = lines[j].strip()
if not s:
break
if s.startswith("# ") or s.startswith("## ") or s.startswith("### "):
break
buf.append(lines[j])
j += 1
if len(buf) >= 3:
res.append("```")
res.extend(buf)
res.append("```")
i = j + 1
continue
res.append(line)
i += 1
return "\n".join(res)
class FormatConverter:
"""
@class FormatConverter
@description Unified converter class wrapping Docling and word2markdown
"""
def __init__(self) -> None:
self._docling = DocumentConverter()
def convert(self, source: str, export: str = "markdown", engine: Optional[str] = None, mdx_safe_mode_enabled: bool = True) -> Tuple[str, str, Optional[str]]:
"""
@function convert
@description Convert a document source to specified format
@param source Path or URL to source document
@param export Output format (markdown, html, json, doctags)
@param engine Optional engine override (word2markdown/docling)
@param mdx_safe_mode_enabled Toggle safe mode for MDX
@return Tuple of (encoding, content)
"""
# Prefer custom word2markdown engine for DOC/DOCX when available
auto_engine = None
try:
from pathlib import Path as _P
suf = _P(source).suffix.lower()
if not engine and suf in {".doc", ".docx"} and _W2M_AVAILABLE:
auto_engine = "word2markdown"
except Exception:
auto_engine = None
use_engine = (engine or auto_engine or "").lower()
try:
from urllib.parse import urlsplit
path = source
if _is_http(source):
path = urlsplit(source).path or ""
ext = Path(path).suffix.lower()
except Exception:
ext = Path(source).suffix.lower()
if ext in {".txt"}:
raw, ct = _read_bytes(source)
text = _normalize_newlines(_decode_to_utf8(raw, ct))
if export.lower() == "html":
if _render_md_html is not None:
html = _render_md_html(text)
else:
try:
import marko
html = marko.convert(text)
except Exception:
html = f"<pre>{text}</pre>"
return "utf-8", _lower_html_table_tags(html), None
md = _enhance_codeblocks(text)
return "utf-8", md, None
if ext in {".md"}:
raw, ct = _read_bytes(source)
text = _normalize_newlines(_decode_to_utf8(raw, ct))
if export.lower() == "html":
if _render_md_html is not None:
html = _render_md_html(text)
else:
try:
import marko
html = marko.convert(text)
except Exception:
html = text
return "utf-8", _lower_html_table_tags(html), None
return "utf-8", text, None
if ext in {".html", ".htm"}:
try:
conv = DocumentConverter(allowed_formats=[InputFormat.HTML])
result = conv.convert(source)
if export.lower() == "html":
html = result.document.export_to_html()
html = _lower_html_table_tags(html)
return "utf-8", html, None
md = result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED)
md = _replace_admonitions(md)
md = _enhance_codeblocks(md)
return "utf-8", md, None
except Exception:
raw, ct = _read_bytes(source)
html_in = _normalize_newlines(_decode_to_utf8(raw, ct))
if export.lower() == "html":
html = _normalize_html(html_in) if _normalize_html is not None else html_in
return "utf-8", _lower_html_table_tags(html), None
md = _html_to_markdown(html_in)
md = _replace_admonitions(md)
md = _enhance_codeblocks(md)
return "utf-8", md, None
if use_engine in {"pandoc", "custom", "word2markdown"} and _W2M_AVAILABLE:
enc, md = _w2m_convert_any(Path(source), mdx_safe_mode_enabled=mdx_safe_mode_enabled)
md = _replace_admonitions(md)
md = _enhance_codeblocks(md)
return enc or "utf-8", md, None
# Configure PDF pipeline to generate picture images into a per-call artifacts directory
artifacts_dir = tempfile.mkdtemp(prefix="docling_artifacts_")
pdf_opts = PdfPipelineOptions()
pdf_opts.generate_picture_images = True
pdf_opts.generate_page_images = True
pdf_opts.images_scale = 2.0
pdf_opts.do_code_enrichment = True
pdf_opts.do_formula_enrichment = True
self._docling = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_cls=StandardPdfPipeline,
pipeline_options=pdf_opts,
)
}
)
result = self._docling.convert(source)
if export.lower() == "markdown":
md = result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED)
md = _replace_admonitions(md)
md = _enhance_codeblocks(md)
return "utf-8", md, artifacts_dir
if export.lower() == "html":
html = result.document.export_to_html()
html = _lower_html_table_tags(html)
return "utf-8", html, artifacts_dir
if export.lower() == "json":
js = result.document.export_to_json()
return "utf-8", js, artifacts_dir
if export.lower() == "doctags":
dt = result.document.export_to_doctags()
return "utf-8", dt, artifacts_dir
raise RuntimeError("unsupported export")