import io import zipfile from pathlib import Path from fastapi.testclient import TestClient import app.server as server class FakeMinio: def __init__(self): self.objs = {} def put_object(self, bucket_name: str, object_name: str, data: io.BytesIO, length: int, content_type: str): self.objs[(bucket_name, object_name)] = data.read(length) def get_presigned_url(self, method: str, bucket: str, obj: str, expires: int): return f"http://minio.test/presigned/{bucket}/{obj}" def presigned_get_object(self, bucket: str, obj: str, expires: int): return f"http://minio.test/presigned/{bucket}/{obj}" def setup_module(module=None): server.RUNTIME_CONFIG["minio"].update({ "endpoint": "127.0.0.1:9000", "public": "http://127.0.0.1:9000", "access": "ak", "secret": "sk", "bucket": "test", "secure": "false", "prefix": "assets", "store_final": "true", "public_read": "true", }) fake = FakeMinio() def _cur(): return fake, "test", "http://127.0.0.1:9000", "assets" server._minio_current = _cur # type: ignore def test_process_invalid_id(): app = server.app c = TestClient(app) r = c.post("/api/archive/process", data={"id": "missing"}) assert r.status_code == 200 j = r.json() assert j["code"] != 0 def test_stage_unsupported_format_and_cleanup(tmp_path: Path): app = server.app c = TestClient(app) rar_path = tmp_path / "pkg.rar" rar_path.write_bytes(b"RAR") with open(rar_path, "rb") as fp: files = {"file": ("pkg.rar", fp.read())} r1 = c.post("/api/archive/stage", files=files) assert r1.status_code == 200 sid = r1.json()["data"]["id"] r2 = c.post("/api/archive/process", data={"id": sid}) assert r2.status_code == 200 j2 = r2.json() assert j2["code"] != 0 r3 = c.post("/api/archive/process", data={"id": sid}) assert r3.status_code == 200 j3 = r3.json() assert j3["code"] != 0 def test_upload_list_empty_lines_comments_and_urls(tmp_path: Path): app = server.app c = TestClient(app) root = tmp_path / "listcase2" root.mkdir(parents=True, exist_ok=True) (root / "img.png").write_bytes(b"PNG") (root / "a.md").write_text("![](img.png)", "utf-8") (root / "b.txt").write_text("![](img.png)", "utf-8") lines = ["", "# comment", "http://example.com/x.md", str(root / "a.md"), str(root / "b.txt")] data_bytes = "\n".join(lines).encode("utf-8") files = {"list_file": ("list.txt", data_bytes)} r = c.post("/api/upload-list", files=files, data={"prefix": "assets", "versionId": "1005"}) assert r.status_code == 200 j = r.json() assert j["code"] == 0 assert j["data"]["count"] >= 2 def test_archive_duplicate_filenames_tree(tmp_path: Path): app = server.app c = TestClient(app) zpath = tmp_path / "dup.zip" base = tmp_path / "src" sub = base / "sub" sub.mkdir(parents=True, exist_ok=True) (base / "a.md").write_text("![](img.png)", "utf-8") (base / "img.png").write_bytes(b"PNG") (sub / "a.md").write_text("![](../img.png)", "utf-8") with zipfile.ZipFile(str(zpath), "w") as zf: zf.write(str(base / "a.md"), arcname="a.md") zf.write(str(base / "img.png"), arcname="img.png") zf.write(str(sub / "a.md"), arcname="sub/a.md") with open(zpath, "rb") as fp: files = {"file": ("dup.zip", fp.read())} r1 = c.post("/api/archive/stage", files=files) assert r1.status_code == 200 sid = r1.json()["data"]["id"] r2 = c.post("/api/archive/process", data={"id": sid, "prefix": "assets", "versionId": "1006"}) assert r2.status_code == 200 j = r2.json() assert j["code"] == 0 tree = j["data"]["import"]["tree"] names = [n["name"] for n in tree] assert "sub" in names or any((isinstance(n, dict) and n.get("type") == "FOLDER" and n.get("name") == "sub") for n in tree)