- app/models/video_models.py: ExtractFramesRequest, VideoToTextRequest, FrameInfo, VideoJobCallback, VideoAcceptedResponse - app/services/video_service.py: interval+keyframe frame extraction, uniform-sample video-to-text, HTTP callback, temp file cleanup - app/routers/video.py: size check helper (_check_video_size via head_object), BackgroundTasks enqueue for both endpoints - tests: 6 service + 4 router tests, 10/10 passing
196 lines
7.3 KiB
Python
196 lines
7.3 KiB
Python
import io
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import pytest
|
|
import numpy as np
|
|
import cv2
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from app.models.video_models import ExtractFramesRequest, VideoToTextRequest
|
|
|
|
|
|
def _make_test_video(path: str, num_frames: int = 10, fps: float = 10.0, width=64, height=64):
|
|
"""Write a small test video to `path` using cv2.VideoWriter."""
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
|
out = cv2.VideoWriter(path, fourcc, fps, (width, height))
|
|
for i in range(num_frames):
|
|
frame = np.full((height, width, 3), (i * 20) % 256, dtype=np.uint8)
|
|
out.write(frame)
|
|
out.release()
|
|
|
|
|
|
# ── US3: Frame Extraction ──────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def frames_req():
|
|
return ExtractFramesRequest(
|
|
file_path="video/test.mp4",
|
|
source_id=10,
|
|
job_id=42,
|
|
mode="interval",
|
|
frame_interval=3,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_interval_mode_extracts_correct_frames(mock_storage, frames_req, tmp_path):
|
|
video_path = str(tmp_path / "test.mp4")
|
|
_make_test_video(video_path, num_frames=10, fps=10.0)
|
|
|
|
with open(video_path, "rb") as f:
|
|
video_bytes = f.read()
|
|
|
|
mock_storage.download_bytes = AsyncMock(return_value=video_bytes)
|
|
mock_storage.upload_bytes = AsyncMock(return_value=None)
|
|
|
|
callback_payloads = []
|
|
|
|
async def fake_callback(url, payload):
|
|
callback_payloads.append(payload)
|
|
|
|
with patch("app.services.video_service._post_callback", new=fake_callback):
|
|
from app.services.video_service import extract_frames_task
|
|
await extract_frames_task(frames_req, mock_storage, "http://backend/callback")
|
|
|
|
assert len(callback_payloads) == 1
|
|
cb = callback_payloads[0]
|
|
assert cb["status"] == "SUCCESS"
|
|
assert cb["job_id"] == 42
|
|
# With 10 frames and interval=3, we expect frames at indices 0, 3, 6, 9 → 4 frames
|
|
assert len(cb["frames"]) == 4
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_keyframe_mode_extracts_scene_changes(mock_storage, tmp_path):
|
|
video_path = str(tmp_path / "kf.mp4")
|
|
# Create video with 2 distinct scenes separated by sudden color change
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
|
|
out = cv2.VideoWriter(video_path, fourcc, 10.0, (64, 64))
|
|
for _ in range(5):
|
|
out.write(np.zeros((64, 64, 3), dtype=np.uint8)) # black frames
|
|
for _ in range(5):
|
|
out.write(np.full((64, 64, 3), 200, dtype=np.uint8)) # bright frames
|
|
out.release()
|
|
|
|
with open(video_path, "rb") as f:
|
|
video_bytes = f.read()
|
|
|
|
mock_storage.download_bytes = AsyncMock(return_value=video_bytes)
|
|
mock_storage.upload_bytes = AsyncMock(return_value=None)
|
|
|
|
callback_payloads = []
|
|
|
|
async def fake_callback(url, payload):
|
|
callback_payloads.append(payload)
|
|
|
|
req = ExtractFramesRequest(
|
|
file_path="video/kf.mp4",
|
|
source_id=10,
|
|
job_id=43,
|
|
mode="keyframe",
|
|
)
|
|
with patch("app.services.video_service._post_callback", new=fake_callback):
|
|
from app.services.video_service import extract_frames_task
|
|
await extract_frames_task(req, mock_storage, "http://backend/callback")
|
|
|
|
cb = callback_payloads[0]
|
|
assert cb["status"] == "SUCCESS"
|
|
# Should capture at least the scene-change frame
|
|
assert len(cb["frames"]) >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_frame_upload_path_convention(mock_storage, frames_req, tmp_path):
|
|
video_path = str(tmp_path / "test.mp4")
|
|
_make_test_video(video_path, num_frames=3, fps=10.0)
|
|
with open(video_path, "rb") as f:
|
|
mock_storage.download_bytes = AsyncMock(return_value=f.read())
|
|
mock_storage.upload_bytes = AsyncMock(return_value=None)
|
|
|
|
callback_payloads = []
|
|
async def fake_callback(url, payload):
|
|
callback_payloads.append(payload)
|
|
|
|
req = ExtractFramesRequest(
|
|
file_path="video/test.mp4", source_id=10, job_id=99, mode="interval", frame_interval=1
|
|
)
|
|
with patch("app.services.video_service._post_callback", new=fake_callback):
|
|
from app.services.video_service import extract_frames_task
|
|
await extract_frames_task(req, mock_storage, "http://backend/callback")
|
|
|
|
uploaded_paths = [call.args[1] for call in mock_storage.upload_bytes.call_args_list]
|
|
for i, path in enumerate(uploaded_paths):
|
|
assert path == f"frames/10/{i}.jpg"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_failed_extraction_sends_failed_callback(mock_storage, frames_req):
|
|
mock_storage.download_bytes = AsyncMock(side_effect=Exception("storage failure"))
|
|
|
|
callback_payloads = []
|
|
async def fake_callback(url, payload):
|
|
callback_payloads.append(payload)
|
|
|
|
with patch("app.services.video_service._post_callback", new=fake_callback):
|
|
from app.services.video_service import extract_frames_task
|
|
await extract_frames_task(frames_req, mock_storage, "http://backend/callback")
|
|
|
|
assert callback_payloads[0]["status"] == "FAILED"
|
|
assert callback_payloads[0]["error_message"] is not None
|
|
|
|
|
|
# ── US4: Video To Text ─────────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def totext_req():
|
|
return VideoToTextRequest(
|
|
file_path="video/test.mp4",
|
|
source_id=10,
|
|
job_id=44,
|
|
start_sec=0.0,
|
|
end_sec=1.0,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_video_to_text_samples_frames_and_calls_llm(mock_llm, mock_storage, totext_req, tmp_path):
|
|
video_path = str(tmp_path / "totext.mp4")
|
|
_make_test_video(video_path, num_frames=20, fps=10.0)
|
|
with open(video_path, "rb") as f:
|
|
mock_storage.download_bytes = AsyncMock(return_value=f.read())
|
|
mock_llm.chat_vision = AsyncMock(return_value="视频描述内容")
|
|
mock_storage.upload_bytes = AsyncMock(return_value=None)
|
|
|
|
callback_payloads = []
|
|
async def fake_callback(url, payload):
|
|
callback_payloads.append(payload)
|
|
|
|
with patch("app.services.video_service._post_callback", new=fake_callback):
|
|
from app.services.video_service import video_to_text_task
|
|
await video_to_text_task(totext_req, mock_llm, mock_storage, "http://backend/callback")
|
|
|
|
assert callback_payloads[0]["status"] == "SUCCESS"
|
|
assert "output_path" in callback_payloads[0]
|
|
assert callback_payloads[0]["output_path"].startswith("video-text/10/")
|
|
mock_llm.chat_vision.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_video_to_text_llm_failure_sends_failed_callback(mock_llm, mock_storage, totext_req, tmp_path):
|
|
video_path = str(tmp_path / "fail.mp4")
|
|
_make_test_video(video_path, num_frames=5, fps=10.0)
|
|
with open(video_path, "rb") as f:
|
|
mock_storage.download_bytes = AsyncMock(return_value=f.read())
|
|
mock_llm.chat_vision = AsyncMock(side_effect=Exception("LLM unavailable"))
|
|
|
|
callback_payloads = []
|
|
async def fake_callback(url, payload):
|
|
callback_payloads.append(payload)
|
|
|
|
with patch("app.services.video_service._post_callback", new=fake_callback):
|
|
from app.services.video_service import video_to_text_task
|
|
await video_to_text_task(totext_req, mock_llm, mock_storage, "http://backend/callback")
|
|
|
|
assert callback_payloads[0]["status"] == "FAILED"
|