Files
FunMD_Convert/docling/app/tests/test_batch_upload_edge_cases.py

114 lines
3.9 KiB
Python
Raw Normal View History

2026-01-07 17:18:26 +08:00
import io
import zipfile
from pathlib import Path
from fastapi.testclient import TestClient
import app.server as server
class FakeMinio:
def __init__(self):
self.objs = {}
def put_object(self, bucket_name: str, object_name: str, data: io.BytesIO, length: int, content_type: str):
self.objs[(bucket_name, object_name)] = data.read(length)
def get_presigned_url(self, method: str, bucket: str, obj: str, expires: int):
return f"http://minio.test/presigned/{bucket}/{obj}"
def presigned_get_object(self, bucket: str, obj: str, expires: int):
return f"http://minio.test/presigned/{bucket}/{obj}"
def setup_module(module=None):
server.RUNTIME_CONFIG["minio"].update({
"endpoint": "127.0.0.1:9000",
"public": "http://127.0.0.1:9000",
"access": "ak",
"secret": "sk",
"bucket": "test",
"secure": "false",
"prefix": "assets",
"store_final": "true",
"public_read": "true",
})
fake = FakeMinio()
def _cur():
return fake, "test", "http://127.0.0.1:9000", "assets"
server._minio_current = _cur # type: ignore
def test_process_invalid_id():
app = server.app
c = TestClient(app)
r = c.post("/api/archive/process", data={"id": "missing"})
assert r.status_code == 200
j = r.json()
assert j["code"] != 0
def test_stage_unsupported_format_and_cleanup(tmp_path: Path):
app = server.app
c = TestClient(app)
rar_path = tmp_path / "pkg.rar"
rar_path.write_bytes(b"RAR")
with open(rar_path, "rb") as fp:
files = {"file": ("pkg.rar", fp.read())}
r1 = c.post("/api/archive/stage", files=files)
assert r1.status_code == 200
sid = r1.json()["data"]["id"]
r2 = c.post("/api/archive/process", data={"id": sid})
assert r2.status_code == 200
j2 = r2.json()
assert j2["code"] != 0
r3 = c.post("/api/archive/process", data={"id": sid})
assert r3.status_code == 200
j3 = r3.json()
assert j3["code"] != 0
def test_upload_list_empty_lines_comments_and_urls(tmp_path: Path):
app = server.app
c = TestClient(app)
root = tmp_path / "listcase2"
root.mkdir(parents=True, exist_ok=True)
(root / "img.png").write_bytes(b"PNG")
(root / "a.md").write_text("![](img.png)", "utf-8")
(root / "b.txt").write_text("![](img.png)", "utf-8")
lines = ["", "# comment", "http://example.com/x.md", str(root / "a.md"), str(root / "b.txt")]
data_bytes = "\n".join(lines).encode("utf-8")
files = {"list_file": ("list.txt", data_bytes)}
r = c.post("/api/upload-list", files=files, data={"prefix": "assets", "versionId": "1005"})
assert r.status_code == 200
j = r.json()
assert j["code"] == 0
assert j["data"]["count"] >= 2
def test_archive_duplicate_filenames_tree(tmp_path: Path):
app = server.app
c = TestClient(app)
zpath = tmp_path / "dup.zip"
base = tmp_path / "src"
sub = base / "sub"
sub.mkdir(parents=True, exist_ok=True)
(base / "a.md").write_text("![](img.png)", "utf-8")
(base / "img.png").write_bytes(b"PNG")
(sub / "a.md").write_text("![](../img.png)", "utf-8")
with zipfile.ZipFile(str(zpath), "w") as zf:
zf.write(str(base / "a.md"), arcname="a.md")
zf.write(str(base / "img.png"), arcname="img.png")
zf.write(str(sub / "a.md"), arcname="sub/a.md")
with open(zpath, "rb") as fp:
files = {"file": ("dup.zip", fp.read())}
r1 = c.post("/api/archive/stage", files=files)
assert r1.status_code == 200
sid = r1.json()["data"]["id"]
r2 = c.post("/api/archive/process", data={"id": sid, "prefix": "assets", "versionId": "1006"})
assert r2.status_code == 200
j = r2.json()
assert j["code"] == 0
tree = j["data"]["import"]["tree"]
names = [n["name"] for n in tree]
assert "sub" in names or any((isinstance(n, dict) and n.get("type") == "FOLDER" and n.get("name") == "sub") for n in tree)