493 lines
17 KiB
Python
493 lines
17 KiB
Python
from pathlib import Path
|
|
from typing import Optional, Tuple
|
|
import re
|
|
|
|
import tempfile
|
|
import sys
|
|
from urllib.parse import urlsplit
|
|
from urllib.request import urlopen
|
|
from urllib.error import HTTPError, URLError
|
|
import io
|
|
_DOC_AVAILABLE = True
|
|
try:
|
|
_DOC_BASE = Path(__file__).resolve().parents[2] / "docling"
|
|
p = str(_DOC_BASE)
|
|
if p not in sys.path:
|
|
sys.path.insert(0, p)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
from docling.document_converter import DocumentConverter
|
|
from docling.datamodel.base_models import InputFormat
|
|
from docling.document_converter import PdfFormatOption
|
|
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
|
|
from docling.datamodel.pipeline_options import PdfPipelineOptions
|
|
from docling_core.types.doc import ImageRefMode
|
|
except Exception:
|
|
_DOC_AVAILABLE = False
|
|
class DocumentConverter: # type: ignore
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
def convert(self, source):
|
|
raise RuntimeError("docling unavailable")
|
|
class InputFormat: # type: ignore
|
|
PDF = "pdf"
|
|
class PdfFormatOption: # type: ignore
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
class StandardPdfPipeline: # type: ignore
|
|
pass
|
|
class PdfPipelineOptions: # type: ignore
|
|
def __init__(self):
|
|
pass
|
|
class ImageRefMode: # type: ignore
|
|
EMBEDDED = None
|
|
|
|
"""
|
|
@api Unified Converter Service
|
|
@description Provides core document conversion logic unifying Docling and word2markdown engines
|
|
"""
|
|
|
|
_W2M_AVAILABLE = False
|
|
try:
|
|
from app.services.word2markdown import convert_any as _w2m_convert_any # type: ignore
|
|
_W2M_AVAILABLE = True
|
|
except Exception:
|
|
_W2M_AVAILABLE = False
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup # type: ignore
|
|
except Exception:
|
|
BeautifulSoup = None # type: ignore
|
|
try:
|
|
from app.services.docling_adapter import normalize_html as _normalize_html # type: ignore
|
|
from app.services.docling_adapter import resolve_link as _resolve_link # type: ignore
|
|
from app.services.docling_adapter import _render_markdown_html as _render_md_html # type: ignore
|
|
except Exception:
|
|
_normalize_html = None # type: ignore
|
|
_resolve_link = None # type: ignore
|
|
_render_md_html = None # type: ignore
|
|
|
|
def _is_http(s: str) -> bool:
|
|
t = (s or "").lower()
|
|
return t.startswith("http://") or t.startswith("https://")
|
|
|
|
def _read_bytes(source: str) -> Tuple[bytes, str]:
|
|
ct = ""
|
|
try:
|
|
if _is_http(source):
|
|
from urllib.request import urlopen
|
|
with urlopen(source, timeout=10) as r:
|
|
ct = r.headers.get("Content-Type") or ""
|
|
return r.read() or b"", ct
|
|
p = Path(source)
|
|
if p.exists() and p.is_file():
|
|
return p.read_bytes(), ct
|
|
except Exception:
|
|
return b"", ct
|
|
return b"", ct
|
|
|
|
def _decode_to_utf8(raw: bytes, ct: str = "") -> str:
|
|
if not raw:
|
|
return ""
|
|
if raw.startswith(b"\xef\xbb\xbf"):
|
|
try:
|
|
return raw[3:].decode("utf-8")
|
|
except Exception:
|
|
pass
|
|
if raw.startswith(b"\xff\xfe"):
|
|
try:
|
|
return raw[2:].decode("utf-16le")
|
|
except Exception:
|
|
pass
|
|
if raw.startswith(b"\xfe\xff"):
|
|
try:
|
|
return raw[2:].decode("utf-16be")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
m = re.search(r"charset=([\w-]+)", ct or "", re.IGNORECASE)
|
|
if m:
|
|
enc = m.group(1).strip().lower()
|
|
try:
|
|
return raw.decode(enc)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
candidates = [
|
|
"utf-8", "gb18030", "gbk", "big5", "shift_jis", "iso-8859-1", "windows-1252",
|
|
]
|
|
for enc in candidates:
|
|
try:
|
|
return raw.decode(enc)
|
|
except Exception:
|
|
continue
|
|
return raw.decode("utf-8", errors="replace")
|
|
|
|
def _normalize_newlines(s: str) -> str:
|
|
return (s or "").replace("\r\n", "\n").replace("\r", "\n")
|
|
|
|
def _html_to_markdown(html: str) -> str:
|
|
if not html:
|
|
return ""
|
|
if BeautifulSoup is None:
|
|
return html
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
out: list[str] = []
|
|
def txt(node) -> str:
|
|
return (getattr(node, "get_text", lambda **kwargs: str(node))(strip=True) if node else "")
|
|
def inline(node) -> str:
|
|
if isinstance(node, str):
|
|
return node
|
|
name = getattr(node, "name", None)
|
|
if name in {None}: # type: ignore
|
|
return str(node)
|
|
if name in {"strong", "b"}:
|
|
return "**" + txt(node) + "**"
|
|
if name in {"em", "i"}:
|
|
return "*" + txt(node) + "*"
|
|
if name == "code":
|
|
return "`" + txt(node) + "`"
|
|
if name == "a":
|
|
href_val = node.get("href")
|
|
extra_val = node.get("data-doc")
|
|
href = href_val if isinstance(href_val, str) else None
|
|
extra = extra_val if isinstance(extra_val, str) else None
|
|
resolved = _resolve_link(href, extra) if _resolve_link else (href or extra)
|
|
url = resolved or ""
|
|
text = txt(node)
|
|
if url:
|
|
return f"[{text}]({url})"
|
|
return text
|
|
if name == "img":
|
|
alt = node.get("alt") or "image"
|
|
src = node.get("src") or ""
|
|
return f""
|
|
res = []
|
|
for c in getattr(node, "children", []):
|
|
res.append(inline(c))
|
|
return "".join(res)
|
|
def block(node):
|
|
name = getattr(node, "name", None)
|
|
if name is None:
|
|
s = str(node).strip()
|
|
if s:
|
|
out.append(s)
|
|
return
|
|
if name in {"h1", "h2", "h3", "h4", "h5", "h6"}:
|
|
lvl = int(name[1])
|
|
out.append("#" * lvl + " " + txt(node))
|
|
out.append("")
|
|
return
|
|
if name == "p":
|
|
segs = [inline(c) for c in node.children]
|
|
out.append("".join(segs))
|
|
out.append("")
|
|
return
|
|
if name == "br":
|
|
out.append("")
|
|
return
|
|
if name in {"ul", "ol"}:
|
|
is_ol = name == "ol"
|
|
idx = 1
|
|
for li in node.find_all("li", recursive=False):
|
|
text = "".join(inline(c) for c in li.children)
|
|
if is_ol:
|
|
out.append(f"{idx}. {text}")
|
|
idx += 1
|
|
else:
|
|
out.append(f"- {text}")
|
|
out.append("")
|
|
return
|
|
if name == "pre":
|
|
code_node = node.find("code")
|
|
code_text = code_node.get_text() if code_node else node.get_text()
|
|
lang = ""
|
|
cls = (code_node.get("class") if code_node else node.get("class")) or []
|
|
for c in cls:
|
|
s = str(c)
|
|
if s.startswith("language-"):
|
|
lang = s.split("-", 1)[-1]
|
|
break
|
|
out.append(f"```{lang}\n{code_text}\n```\n")
|
|
return
|
|
if name == "blockquote":
|
|
lines = [l for l in txt(node).splitlines() if l.strip()]
|
|
for l in lines:
|
|
out.append("> " + l)
|
|
out.append("")
|
|
return
|
|
if name == "table":
|
|
rows = node.find_all("tr")
|
|
if not rows:
|
|
return
|
|
headers = [h.get_text(strip=True) for h in (rows[0].find_all(["th","td"]) or [])]
|
|
if headers:
|
|
out.append("|" + "|".join(headers) + "|")
|
|
sep = "|" + "|".join(["---" for _ in headers]) + "|"
|
|
out.append(sep)
|
|
for tr in rows[1:]:
|
|
cells = [td.get_text(strip=True) for td in tr.find_all("td")]
|
|
if cells:
|
|
out.append("|" + "|".join(cells) + "|")
|
|
out.append("")
|
|
return
|
|
if name == "div":
|
|
for c in node.children:
|
|
block(c)
|
|
return
|
|
segs = [inline(c) for c in node.children]
|
|
if segs:
|
|
out.append("".join(segs))
|
|
out.append("")
|
|
root = soup.body or soup
|
|
for ch in getattr(root, "children", []):
|
|
block(ch)
|
|
return _normalize_newlines("\n".join(out)).strip()
|
|
|
|
|
|
def _lower_html_table_tags(html: str) -> str:
|
|
"""
|
|
@function _lower_html_table_tags
|
|
@description Normalizes HTML table tags to lowercase
|
|
@param html Input HTML string
|
|
@return Normalized HTML string
|
|
"""
|
|
if not html:
|
|
return html
|
|
tags = ["TABLE", "THEAD", "TBODY", "TFOOT", "TR", "TH", "TD"]
|
|
out = html
|
|
for t in tags:
|
|
out = re.sub(r"</?" + t + r"\b", lambda m: m.group(0).lower(), out)
|
|
out = re.sub(r">\s*\n+\s*", ">\n", out)
|
|
return out
|
|
|
|
|
|
def _replace_admonitions(md: str) -> str:
|
|
"""
|
|
@function _replace_admonitions
|
|
@description Replaces ::: style admonitions with !!! style
|
|
@param md Input markdown string
|
|
@return Processed markdown string
|
|
"""
|
|
if not md:
|
|
return md
|
|
lines = md.split("\n")
|
|
out = []
|
|
in_block = False
|
|
for raw in lines:
|
|
t = raw.strip()
|
|
if t.startswith(":::"):
|
|
if not in_block:
|
|
name = t[3:].strip()
|
|
if not name:
|
|
out.append("!!!")
|
|
else:
|
|
out.append("!!! " + name)
|
|
in_block = True
|
|
else:
|
|
out.append("!!!")
|
|
in_block = False
|
|
continue
|
|
out.append(raw)
|
|
return "\n".join(out)
|
|
|
|
|
|
def _enhance_codeblocks(md: str) -> str:
|
|
if not md:
|
|
return md
|
|
lines = md.split("\n")
|
|
res = []
|
|
in_fence = False
|
|
fence_lang = ""
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
t = line.strip()
|
|
if t.startswith("```"):
|
|
in_fence = not in_fence
|
|
try:
|
|
fence_lang = (t[3:] or "").strip() if in_fence else ""
|
|
except Exception:
|
|
fence_lang = ""
|
|
res.append(line)
|
|
i += 1
|
|
continue
|
|
if in_fence:
|
|
res.append(line)
|
|
i += 1
|
|
continue
|
|
if t.startswith("{") or t.startswith("["):
|
|
buf = [line]
|
|
j = i + 1
|
|
closed = False
|
|
depth = t.count("{") - t.count("}")
|
|
while j < len(lines):
|
|
buf.append(lines[j])
|
|
s = lines[j].strip()
|
|
depth += s.count("{") - s.count("}")
|
|
if depth <= 0 and s.endswith("}"):
|
|
closed = True
|
|
break
|
|
j += 1
|
|
if closed and len(buf) >= 3:
|
|
lang = "json"
|
|
res.append("```" + lang)
|
|
res.extend(buf)
|
|
res.append("```")
|
|
i = j + 1
|
|
continue
|
|
code_sig = (
|
|
("public static" in t) or ("private static" in t) or ("class " in t) or ("return " in t) or ("package " in t) or ("import " in t)
|
|
)
|
|
if code_sig:
|
|
buf = [line]
|
|
j = i + 1
|
|
while j < len(lines):
|
|
s = lines[j].strip()
|
|
if not s:
|
|
break
|
|
if s.startswith("# ") or s.startswith("## ") or s.startswith("### "):
|
|
break
|
|
buf.append(lines[j])
|
|
j += 1
|
|
if len(buf) >= 3:
|
|
res.append("```")
|
|
res.extend(buf)
|
|
res.append("```")
|
|
i = j + 1
|
|
continue
|
|
res.append(line)
|
|
i += 1
|
|
return "\n".join(res)
|
|
|
|
|
|
class FormatConverter:
|
|
"""
|
|
@class FormatConverter
|
|
@description Unified converter class wrapping Docling and word2markdown
|
|
"""
|
|
def __init__(self) -> None:
|
|
self._docling = DocumentConverter()
|
|
|
|
def convert(self, source: str, export: str = "markdown", engine: Optional[str] = None, mdx_safe_mode_enabled: bool = True) -> Tuple[str, str, Optional[str]]:
|
|
"""
|
|
@function convert
|
|
@description Convert a document source to specified format
|
|
@param source Path or URL to source document
|
|
@param export Output format (markdown, html, json, doctags)
|
|
@param engine Optional engine override (word2markdown/docling)
|
|
@param mdx_safe_mode_enabled Toggle safe mode for MDX
|
|
@return Tuple of (encoding, content)
|
|
"""
|
|
|
|
|
|
# Prefer custom word2markdown engine for DOC/DOCX when available
|
|
auto_engine = None
|
|
try:
|
|
from pathlib import Path as _P
|
|
suf = _P(source).suffix.lower()
|
|
if not engine and suf in {".doc", ".docx"} and _W2M_AVAILABLE:
|
|
auto_engine = "word2markdown"
|
|
except Exception:
|
|
auto_engine = None
|
|
use_engine = (engine or auto_engine or "").lower()
|
|
try:
|
|
from urllib.parse import urlsplit
|
|
path = source
|
|
if _is_http(source):
|
|
path = urlsplit(source).path or ""
|
|
ext = Path(path).suffix.lower()
|
|
except Exception:
|
|
ext = Path(source).suffix.lower()
|
|
if ext in {".txt"}:
|
|
raw, ct = _read_bytes(source)
|
|
text = _normalize_newlines(_decode_to_utf8(raw, ct))
|
|
if export.lower() == "html":
|
|
if _render_md_html is not None:
|
|
html = _render_md_html(text)
|
|
else:
|
|
try:
|
|
import marko
|
|
html = marko.convert(text)
|
|
except Exception:
|
|
html = f"<pre>{text}</pre>"
|
|
return "utf-8", _lower_html_table_tags(html), None
|
|
md = _enhance_codeblocks(text)
|
|
return "utf-8", md, None
|
|
if ext in {".md"}:
|
|
raw, ct = _read_bytes(source)
|
|
text = _normalize_newlines(_decode_to_utf8(raw, ct))
|
|
if export.lower() == "html":
|
|
if _render_md_html is not None:
|
|
html = _render_md_html(text)
|
|
else:
|
|
try:
|
|
import marko
|
|
html = marko.convert(text)
|
|
except Exception:
|
|
html = text
|
|
return "utf-8", _lower_html_table_tags(html), None
|
|
return "utf-8", text, None
|
|
if ext in {".html", ".htm"}:
|
|
try:
|
|
conv = DocumentConverter(allowed_formats=[InputFormat.HTML])
|
|
result = conv.convert(source)
|
|
if export.lower() == "html":
|
|
html = result.document.export_to_html()
|
|
html = _lower_html_table_tags(html)
|
|
return "utf-8", html, None
|
|
md = result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED)
|
|
md = _replace_admonitions(md)
|
|
md = _enhance_codeblocks(md)
|
|
return "utf-8", md, None
|
|
except Exception:
|
|
raw, ct = _read_bytes(source)
|
|
html_in = _normalize_newlines(_decode_to_utf8(raw, ct))
|
|
if export.lower() == "html":
|
|
html = _normalize_html(html_in) if _normalize_html is not None else html_in
|
|
return "utf-8", _lower_html_table_tags(html), None
|
|
md = _html_to_markdown(html_in)
|
|
md = _replace_admonitions(md)
|
|
md = _enhance_codeblocks(md)
|
|
return "utf-8", md, None
|
|
if use_engine in {"pandoc", "custom", "word2markdown"} and _W2M_AVAILABLE:
|
|
enc, md = _w2m_convert_any(Path(source), mdx_safe_mode_enabled=mdx_safe_mode_enabled)
|
|
md = _replace_admonitions(md)
|
|
md = _enhance_codeblocks(md)
|
|
return enc or "utf-8", md, None
|
|
# Configure PDF pipeline to generate picture images into a per-call artifacts directory
|
|
artifacts_dir = tempfile.mkdtemp(prefix="docling_artifacts_")
|
|
pdf_opts = PdfPipelineOptions()
|
|
pdf_opts.generate_picture_images = True
|
|
pdf_opts.generate_page_images = True
|
|
pdf_opts.images_scale = 2.0
|
|
pdf_opts.do_code_enrichment = True
|
|
pdf_opts.do_formula_enrichment = True
|
|
self._docling = DocumentConverter(
|
|
format_options={
|
|
InputFormat.PDF: PdfFormatOption(
|
|
pipeline_cls=StandardPdfPipeline,
|
|
pipeline_options=pdf_opts,
|
|
)
|
|
}
|
|
)
|
|
result = self._docling.convert(source)
|
|
if export.lower() == "markdown":
|
|
md = result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED)
|
|
md = _replace_admonitions(md)
|
|
md = _enhance_codeblocks(md)
|
|
return "utf-8", md, artifacts_dir
|
|
if export.lower() == "html":
|
|
html = result.document.export_to_html()
|
|
html = _lower_html_table_tags(html)
|
|
return "utf-8", html, artifacts_dir
|
|
if export.lower() == "json":
|
|
js = result.document.export_to_json()
|
|
return "utf-8", js, artifacts_dir
|
|
if export.lower() == "doctags":
|
|
dt = result.document.export_to_doctags()
|
|
return "utf-8", dt, artifacts_dir
|
|
raise RuntimeError("unsupported export")
|