114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
import io
|
|
import zipfile
|
|
from pathlib import Path
|
|
from fastapi.testclient import TestClient
|
|
|
|
import app.server as server
|
|
|
|
|
|
class FakeMinio:
|
|
def __init__(self):
|
|
self.objs = {}
|
|
|
|
def put_object(self, bucket_name: str, object_name: str, data: io.BytesIO, length: int, content_type: str):
|
|
self.objs[(bucket_name, object_name)] = data.read(length)
|
|
|
|
def get_presigned_url(self, method: str, bucket: str, obj: str, expires: int):
|
|
return f"http://minio.test/presigned/{bucket}/{obj}"
|
|
|
|
def presigned_get_object(self, bucket: str, obj: str, expires: int):
|
|
return f"http://minio.test/presigned/{bucket}/{obj}"
|
|
|
|
|
|
def setup_module(module=None):
|
|
server.RUNTIME_CONFIG["minio"].update({
|
|
"endpoint": "127.0.0.1:9000",
|
|
"public": "http://127.0.0.1:9000",
|
|
"access": "ak",
|
|
"secret": "sk",
|
|
"bucket": "test",
|
|
"secure": "false",
|
|
"prefix": "assets",
|
|
"store_final": "true",
|
|
"public_read": "true",
|
|
})
|
|
fake = FakeMinio()
|
|
def _cur():
|
|
return fake, "test", "http://127.0.0.1:9000", "assets"
|
|
server._minio_current = _cur # type: ignore
|
|
|
|
|
|
def test_process_invalid_id():
|
|
app = server.app
|
|
c = TestClient(app)
|
|
r = c.post("/api/archive/process", data={"id": "missing"})
|
|
assert r.status_code == 200
|
|
j = r.json()
|
|
assert j["code"] != 0
|
|
|
|
|
|
def test_stage_unsupported_format_and_cleanup(tmp_path: Path):
|
|
app = server.app
|
|
c = TestClient(app)
|
|
rar_path = tmp_path / "pkg.rar"
|
|
rar_path.write_bytes(b"RAR")
|
|
with open(rar_path, "rb") as fp:
|
|
files = {"file": ("pkg.rar", fp.read())}
|
|
r1 = c.post("/api/archive/stage", files=files)
|
|
assert r1.status_code == 200
|
|
sid = r1.json()["data"]["id"]
|
|
r2 = c.post("/api/archive/process", data={"id": sid})
|
|
assert r2.status_code == 200
|
|
j2 = r2.json()
|
|
assert j2["code"] != 0
|
|
r3 = c.post("/api/archive/process", data={"id": sid})
|
|
assert r3.status_code == 200
|
|
j3 = r3.json()
|
|
assert j3["code"] != 0
|
|
|
|
|
|
def test_upload_list_empty_lines_comments_and_urls(tmp_path: Path):
|
|
app = server.app
|
|
c = TestClient(app)
|
|
root = tmp_path / "listcase2"
|
|
root.mkdir(parents=True, exist_ok=True)
|
|
(root / "img.png").write_bytes(b"PNG")
|
|
(root / "a.md").write_text("", "utf-8")
|
|
(root / "b.txt").write_text("", "utf-8")
|
|
lines = ["", "# comment", "http://example.com/x.md", str(root / "a.md"), str(root / "b.txt")]
|
|
data_bytes = "\n".join(lines).encode("utf-8")
|
|
files = {"list_file": ("list.txt", data_bytes)}
|
|
r = c.post("/api/upload-list", files=files, data={"prefix": "assets", "versionId": "1005"})
|
|
assert r.status_code == 200
|
|
j = r.json()
|
|
assert j["code"] == 0
|
|
assert j["data"]["count"] >= 2
|
|
|
|
|
|
def test_archive_duplicate_filenames_tree(tmp_path: Path):
|
|
app = server.app
|
|
c = TestClient(app)
|
|
zpath = tmp_path / "dup.zip"
|
|
base = tmp_path / "src"
|
|
sub = base / "sub"
|
|
sub.mkdir(parents=True, exist_ok=True)
|
|
(base / "a.md").write_text("", "utf-8")
|
|
(base / "img.png").write_bytes(b"PNG")
|
|
(sub / "a.md").write_text("", "utf-8")
|
|
with zipfile.ZipFile(str(zpath), "w") as zf:
|
|
zf.write(str(base / "a.md"), arcname="a.md")
|
|
zf.write(str(base / "img.png"), arcname="img.png")
|
|
zf.write(str(sub / "a.md"), arcname="sub/a.md")
|
|
with open(zpath, "rb") as fp:
|
|
files = {"file": ("dup.zip", fp.read())}
|
|
r1 = c.post("/api/archive/stage", files=files)
|
|
assert r1.status_code == 200
|
|
sid = r1.json()["data"]["id"]
|
|
r2 = c.post("/api/archive/process", data={"id": sid, "prefix": "assets", "versionId": "1006"})
|
|
assert r2.status_code == 200
|
|
j = r2.json()
|
|
assert j["code"] == 0
|
|
tree = j["data"]["import"]["tree"]
|
|
names = [n["name"] for n in tree]
|
|
assert "sub" in names or any((isinstance(n, dict) and n.get("type") == "FOLDER" and n.get("name") == "sub") for n in tree)
|