import io import zipfile from pathlib import Path from fastapi.testclient import TestClient import sys from pathlib import Path as _Path base = _Path(__file__).resolve().parents[2] sys.path.insert(0, str(base)) sys.path.insert(0, str(base / "docling")) import app.server as server class FakeMinio: def __init__(self): self.objs = {} def put_object(self, bucket_name: str, object_name: str, data: io.BytesIO, length: int, content_type: str): self.objs[(bucket_name, object_name)] = data.read(length) def get_presigned_url(self, method: str, bucket: str, obj: str, expires: int): return f"http://minio.test/presigned/{bucket}/{obj}" def presigned_get_object(self, bucket: str, obj: str, expires: int): return f"http://minio.test/presigned/{bucket}/{obj}" def setup(): server.RUNTIME_CONFIG["minio"].update({ "endpoint": "127.0.0.1:9000", "public": "http://127.0.0.1:9000", "access": "ak", "secret": "sk", "bucket": "test", "secure": "false", "prefix": "assets", "store_final": "true", "public_read": "true", }) fake = FakeMinio() def _cur(): return fake, "test", "http://127.0.0.1:9000", "assets" server._minio_current = _cur # type: ignore def main(): setup() app = server.app c = TestClient(app) tmp = Path("/tmp/run_slash_path_debug") tmp.mkdir(parents=True, exist_ok=True) zpath = tmp / "pkg.zip" md_dir = tmp / "docs" img_dir = md_dir / "images" img_dir.mkdir(parents=True, exist_ok=True) (img_dir / "p.png").write_bytes(b"PNG") (md_dir / "a.md").write_text("![](/images/p.png)", "utf-8") with zipfile.ZipFile(str(zpath), "w") as zf: zf.write(str(md_dir / "a.md"), arcname="a.md") zf.write(str(img_dir / "p.png"), arcname="images/p.png") with open(zpath, "rb") as fp: files = {"file": ("pkg.zip", fp.read())} r1 = c.post("/api/archive/stage", files=files) sid = r1.json()["data"]["id"] r2 = c.post("/api/archive/process", data={"id": sid, "prefix": "assets", "versionId": "1007"}) print("process:", r2.status_code) print(r2.json()) if __name__ == "__main__": main()