diff --git a/app/models/__pycache__/video_models.cpython-312.pyc b/app/models/__pycache__/video_models.cpython-312.pyc new file mode 100644 index 0000000..5792e1c Binary files /dev/null and b/app/models/__pycache__/video_models.cpython-312.pyc differ diff --git a/app/models/video_models.py b/app/models/video_models.py new file mode 100644 index 0000000..b6f206d --- /dev/null +++ b/app/models/video_models.py @@ -0,0 +1,38 @@ +from pydantic import BaseModel + + +class ExtractFramesRequest(BaseModel): + file_path: str + source_id: int + job_id: int + mode: str = "interval" + frame_interval: int = 30 + + +class VideoToTextRequest(BaseModel): + file_path: str + source_id: int + job_id: int + start_sec: float + end_sec: float + model: str | None = None + prompt_template: str | None = None + + +class FrameInfo(BaseModel): + frame_index: int + time_sec: float + frame_path: str + + +class VideoJobCallback(BaseModel): + job_id: int + status: str + frames: list[FrameInfo] | None = None + output_path: str | None = None + error_message: str | None = None + + +class VideoAcceptedResponse(BaseModel): + message: str + job_id: int diff --git a/app/routers/__pycache__/video.cpython-312.pyc b/app/routers/__pycache__/video.cpython-312.pyc index 4566483..c3a3f06 100644 Binary files a/app/routers/__pycache__/video.cpython-312.pyc and b/app/routers/__pycache__/video.cpython-312.pyc differ diff --git a/app/routers/video.py b/app/routers/video.py index 136e997..efdba25 100644 --- a/app/routers/video.py +++ b/app/routers/video.py @@ -1,3 +1,69 @@ -from fastapi import APIRouter +from fastapi import APIRouter, BackgroundTasks, Depends + +from app.clients.llm.base import LLMClient +from app.clients.storage.base import StorageClient +from app.core.config import get_config +from app.core.dependencies import get_llm_client, get_storage_client +from app.core.exceptions import VideoTooLargeError +from app.models.video_models import ( + ExtractFramesRequest, + VideoAcceptedResponse, + VideoToTextRequest, +) +from app.services import video_service router = APIRouter(tags=["Video"]) + + +async def _check_video_size(storage: StorageClient, bucket: str, file_path: str, max_mb: int) -> None: + size_bytes = await storage.get_object_size(bucket, file_path) + if size_bytes > max_mb * 1024 * 1024: + raise VideoTooLargeError( + f"视频文件大小超出限制(最大 {max_mb}MB,当前 {size_bytes // 1024 // 1024}MB)" + ) + + +@router.post("/video/extract-frames", response_model=VideoAcceptedResponse, status_code=202) +async def extract_frames( + req: ExtractFramesRequest, + background_tasks: BackgroundTasks, + storage: StorageClient = Depends(get_storage_client), +) -> VideoAcceptedResponse: + cfg = get_config() + bucket = cfg["storage"]["buckets"]["source_data"] + max_mb = cfg["video"]["max_file_size_mb"] + callback_url = cfg.get("backend", {}).get("callback_url", "") + + await _check_video_size(storage, bucket, req.file_path, max_mb) + + background_tasks.add_task( + video_service.extract_frames_task, + req, + storage, + callback_url, + ) + return VideoAcceptedResponse(message="任务已接受,后台处理中", job_id=req.job_id) + + +@router.post("/video/to-text", response_model=VideoAcceptedResponse, status_code=202) +async def video_to_text( + req: VideoToTextRequest, + background_tasks: BackgroundTasks, + storage: StorageClient = Depends(get_storage_client), + llm: LLMClient = Depends(get_llm_client), +) -> VideoAcceptedResponse: + cfg = get_config() + bucket = cfg["storage"]["buckets"]["source_data"] + max_mb = cfg["video"]["max_file_size_mb"] + callback_url = cfg.get("backend", {}).get("callback_url", "") + + await _check_video_size(storage, bucket, req.file_path, max_mb) + + background_tasks.add_task( + video_service.video_to_text_task, + req, + llm, + storage, + callback_url, + ) + return VideoAcceptedResponse(message="任务已接受,后台处理中", job_id=req.job_id) diff --git a/app/services/__pycache__/video_service.cpython-312.pyc b/app/services/__pycache__/video_service.cpython-312.pyc new file mode 100644 index 0000000..d6ef12d Binary files /dev/null and b/app/services/__pycache__/video_service.cpython-312.pyc differ diff --git a/app/services/video_service.py b/app/services/video_service.py new file mode 100644 index 0000000..1851c0f --- /dev/null +++ b/app/services/video_service.py @@ -0,0 +1,189 @@ +import base64 +import io +import os +import tempfile +import time +from typing import Callable + +import cv2 +import httpx +import numpy as np + +from app.clients.llm.base import LLMClient +from app.clients.storage.base import StorageClient +from app.core.config import get_config +from app.core.logging import get_logger +from app.models.video_models import ExtractFramesRequest, FrameInfo, VideoToTextRequest + +logger = get_logger(__name__) + + +async def _post_callback(url: str, payload: dict) -> None: + async with httpx.AsyncClient(timeout=10) as http: + try: + await http.post(url, json=payload) + except Exception as exc: + logger.error("callback_failed", extra={"url": url, "error": str(exc)}) + + +async def extract_frames_task( + req: ExtractFramesRequest, + storage: StorageClient, + callback_url: str, +) -> None: + cfg = get_config() + bucket = cfg["storage"]["buckets"]["source_data"] + threshold = cfg["video"].get("keyframe_diff_threshold", 30.0) + + tmp = None + try: + video_bytes = await storage.download_bytes(bucket, req.file_path) + + with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f: + f.write(video_bytes) + tmp = f.name + + cap = cv2.VideoCapture(tmp) + fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 + frames_info: list[FrameInfo] = [] + upload_index = 0 + prev_gray = None + frame_idx = 0 + + while True: + ret, frame = cap.read() + if not ret: + break + + extract = False + if req.mode == "interval": + extract = (frame_idx % req.frame_interval == 0) + else: # keyframe + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32) + if prev_gray is None: + extract = True + else: + diff = np.mean(np.abs(gray - prev_gray)) + extract = diff > threshold + prev_gray = gray + + if extract: + time_sec = round(frame_idx / fps, 3) + _, buf = cv2.imencode(".jpg", frame) + frame_path = f"frames/{req.source_id}/{upload_index}.jpg" + await storage.upload_bytes(bucket, frame_path, buf.tobytes(), "image/jpeg") + frames_info.append(FrameInfo( + frame_index=frame_idx, + time_sec=time_sec, + frame_path=frame_path, + )) + upload_index += 1 + + frame_idx += 1 + + cap.release() + + logger.info("extract_frames_done", extra={ + "job_id": req.job_id, + "frames": len(frames_info), + }) + await _post_callback(callback_url, { + "job_id": req.job_id, + "status": "SUCCESS", + "frames": [f.model_dump() for f in frames_info], + "output_path": None, + "error_message": None, + }) + + except Exception as exc: + logger.error("extract_frames_failed", extra={"job_id": req.job_id, "error": str(exc)}) + await _post_callback(callback_url, { + "job_id": req.job_id, + "status": "FAILED", + "frames": None, + "output_path": None, + "error_message": str(exc), + }) + finally: + if tmp and os.path.exists(tmp): + os.unlink(tmp) + + +async def video_to_text_task( + req: VideoToTextRequest, + llm: LLMClient, + storage: StorageClient, + callback_url: str, +) -> None: + cfg = get_config() + bucket = cfg["storage"]["buckets"]["source_data"] + sample_count = cfg["video"].get("frame_sample_count", 8) + model = req.model or cfg["models"]["default_vision"] + + tmp = None + try: + video_bytes = await storage.download_bytes(bucket, req.file_path) + + with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f: + f.write(video_bytes) + tmp = f.name + + cap = cv2.VideoCapture(tmp) + fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 + start_frame = int(req.start_sec * fps) + end_frame = int(req.end_sec * fps) + total = max(end_frame - start_frame, 1) + + # Uniform sampling + indices = [ + start_frame + int(i * total / sample_count) + for i in range(sample_count) + ] + indices = list(dict.fromkeys(indices)) # deduplicate + + content: list[dict] = [] + for idx in indices: + cap.set(cv2.CAP_PROP_POS_FRAMES, idx) + ret, frame = cap.read() + if not ret: + continue + _, buf = cv2.imencode(".jpg", frame) + b64 = base64.b64encode(buf.tobytes()).decode() + content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}) + + cap.release() + + prompt = req.prompt_template or "请用中文详细描述这段视频的内容,生成结构化文字描述。" + content.append({"type": "text", "text": prompt}) + + messages = [{"role": "user", "content": content}] + description = await llm.chat_vision(model, messages) + + # Upload description text + timestamp = int(time.time()) + output_path = f"video-text/{req.source_id}/{timestamp}.txt" + await storage.upload_bytes( + bucket, output_path, description.encode("utf-8"), "text/plain" + ) + + logger.info("video_to_text_done", extra={"job_id": req.job_id, "output_path": output_path}) + await _post_callback(callback_url, { + "job_id": req.job_id, + "status": "SUCCESS", + "frames": None, + "output_path": output_path, + "error_message": None, + }) + + except Exception as exc: + logger.error("video_to_text_failed", extra={"job_id": req.job_id, "error": str(exc)}) + await _post_callback(callback_url, { + "job_id": req.job_id, + "status": "FAILED", + "frames": None, + "output_path": None, + "error_message": str(exc), + }) + finally: + if tmp and os.path.exists(tmp): + os.unlink(tmp) diff --git a/tests/__pycache__/test_video_router.cpython-312-pytest-9.0.3.pyc b/tests/__pycache__/test_video_router.cpython-312-pytest-9.0.3.pyc new file mode 100644 index 0000000..8d9e034 Binary files /dev/null and b/tests/__pycache__/test_video_router.cpython-312-pytest-9.0.3.pyc differ diff --git a/tests/__pycache__/test_video_service.cpython-312-pytest-9.0.3.pyc b/tests/__pycache__/test_video_service.cpython-312-pytest-9.0.3.pyc new file mode 100644 index 0000000..6c753b4 Binary files /dev/null and b/tests/__pycache__/test_video_service.cpython-312-pytest-9.0.3.pyc differ diff --git a/tests/test_video_router.py b/tests/test_video_router.py new file mode 100644 index 0000000..703dc40 --- /dev/null +++ b/tests/test_video_router.py @@ -0,0 +1,71 @@ +import pytest +from unittest.mock import AsyncMock, patch + +from app.core.exceptions import VideoTooLargeError + + +def test_extract_frames_returns_202(client, mock_storage): + mock_storage.get_object_size = AsyncMock(return_value=10 * 1024 * 1024) # 10 MB + + with patch("app.routers.video.BackgroundTasks.add_task"): + resp = client.post( + "/api/v1/video/extract-frames", + json={ + "file_path": "video/test.mp4", + "source_id": 10, + "job_id": 42, + }, + ) + assert resp.status_code == 202 + data = resp.json() + assert data["job_id"] == 42 + + +def test_extract_frames_video_too_large_returns_400(client, mock_storage): + mock_storage.get_object_size = AsyncMock(return_value=300 * 1024 * 1024) # 300 MB > 200 MB + + resp = client.post( + "/api/v1/video/extract-frames", + json={ + "file_path": "video/big.mp4", + "source_id": 10, + "job_id": 99, + }, + ) + assert resp.status_code == 400 + assert resp.json()["code"] == "VIDEO_TOO_LARGE" + + +def test_video_to_text_returns_202(client, mock_storage): + mock_storage.get_object_size = AsyncMock(return_value=10 * 1024 * 1024) + + with patch("app.routers.video.BackgroundTasks.add_task"): + resp = client.post( + "/api/v1/video/to-text", + json={ + "file_path": "video/test.mp4", + "source_id": 10, + "job_id": 43, + "start_sec": 0, + "end_sec": 60, + }, + ) + assert resp.status_code == 202 + assert resp.json()["job_id"] == 43 + + +def test_video_to_text_too_large_returns_400(client, mock_storage): + mock_storage.get_object_size = AsyncMock(return_value=300 * 1024 * 1024) + + resp = client.post( + "/api/v1/video/to-text", + json={ + "file_path": "video/big.mp4", + "source_id": 10, + "job_id": 99, + "start_sec": 0, + "end_sec": 60, + }, + ) + assert resp.status_code == 400 + assert resp.json()["code"] == "VIDEO_TOO_LARGE" diff --git a/tests/test_video_service.py b/tests/test_video_service.py new file mode 100644 index 0000000..3e33483 --- /dev/null +++ b/tests/test_video_service.py @@ -0,0 +1,195 @@ +import io +import json +import os +import tempfile +import pytest +import numpy as np +import cv2 +from unittest.mock import AsyncMock, MagicMock, patch + +from app.models.video_models import ExtractFramesRequest, VideoToTextRequest + + +def _make_test_video(path: str, num_frames: int = 10, fps: float = 10.0, width=64, height=64): + """Write a small test video to `path` using cv2.VideoWriter.""" + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + out = cv2.VideoWriter(path, fourcc, fps, (width, height)) + for i in range(num_frames): + frame = np.full((height, width, 3), (i * 20) % 256, dtype=np.uint8) + out.write(frame) + out.release() + + +# ── US3: Frame Extraction ────────────────────────────────────────────────────── + +@pytest.fixture +def frames_req(): + return ExtractFramesRequest( + file_path="video/test.mp4", + source_id=10, + job_id=42, + mode="interval", + frame_interval=3, + ) + + +@pytest.mark.asyncio +async def test_interval_mode_extracts_correct_frames(mock_storage, frames_req, tmp_path): + video_path = str(tmp_path / "test.mp4") + _make_test_video(video_path, num_frames=10, fps=10.0) + + with open(video_path, "rb") as f: + video_bytes = f.read() + + mock_storage.download_bytes = AsyncMock(return_value=video_bytes) + mock_storage.upload_bytes = AsyncMock(return_value=None) + + callback_payloads = [] + + async def fake_callback(url, payload): + callback_payloads.append(payload) + + with patch("app.services.video_service._post_callback", new=fake_callback): + from app.services.video_service import extract_frames_task + await extract_frames_task(frames_req, mock_storage, "http://backend/callback") + + assert len(callback_payloads) == 1 + cb = callback_payloads[0] + assert cb["status"] == "SUCCESS" + assert cb["job_id"] == 42 + # With 10 frames and interval=3, we expect frames at indices 0, 3, 6, 9 → 4 frames + assert len(cb["frames"]) == 4 + + +@pytest.mark.asyncio +async def test_keyframe_mode_extracts_scene_changes(mock_storage, tmp_path): + video_path = str(tmp_path / "kf.mp4") + # Create video with 2 distinct scenes separated by sudden color change + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + out = cv2.VideoWriter(video_path, fourcc, 10.0, (64, 64)) + for _ in range(5): + out.write(np.zeros((64, 64, 3), dtype=np.uint8)) # black frames + for _ in range(5): + out.write(np.full((64, 64, 3), 200, dtype=np.uint8)) # bright frames + out.release() + + with open(video_path, "rb") as f: + video_bytes = f.read() + + mock_storage.download_bytes = AsyncMock(return_value=video_bytes) + mock_storage.upload_bytes = AsyncMock(return_value=None) + + callback_payloads = [] + + async def fake_callback(url, payload): + callback_payloads.append(payload) + + req = ExtractFramesRequest( + file_path="video/kf.mp4", + source_id=10, + job_id=43, + mode="keyframe", + ) + with patch("app.services.video_service._post_callback", new=fake_callback): + from app.services.video_service import extract_frames_task + await extract_frames_task(req, mock_storage, "http://backend/callback") + + cb = callback_payloads[0] + assert cb["status"] == "SUCCESS" + # Should capture at least the scene-change frame + assert len(cb["frames"]) >= 1 + + +@pytest.mark.asyncio +async def test_frame_upload_path_convention(mock_storage, frames_req, tmp_path): + video_path = str(tmp_path / "test.mp4") + _make_test_video(video_path, num_frames=3, fps=10.0) + with open(video_path, "rb") as f: + mock_storage.download_bytes = AsyncMock(return_value=f.read()) + mock_storage.upload_bytes = AsyncMock(return_value=None) + + callback_payloads = [] + async def fake_callback(url, payload): + callback_payloads.append(payload) + + req = ExtractFramesRequest( + file_path="video/test.mp4", source_id=10, job_id=99, mode="interval", frame_interval=1 + ) + with patch("app.services.video_service._post_callback", new=fake_callback): + from app.services.video_service import extract_frames_task + await extract_frames_task(req, mock_storage, "http://backend/callback") + + uploaded_paths = [call.args[1] for call in mock_storage.upload_bytes.call_args_list] + for i, path in enumerate(uploaded_paths): + assert path == f"frames/10/{i}.jpg" + + +@pytest.mark.asyncio +async def test_failed_extraction_sends_failed_callback(mock_storage, frames_req): + mock_storage.download_bytes = AsyncMock(side_effect=Exception("storage failure")) + + callback_payloads = [] + async def fake_callback(url, payload): + callback_payloads.append(payload) + + with patch("app.services.video_service._post_callback", new=fake_callback): + from app.services.video_service import extract_frames_task + await extract_frames_task(frames_req, mock_storage, "http://backend/callback") + + assert callback_payloads[0]["status"] == "FAILED" + assert callback_payloads[0]["error_message"] is not None + + +# ── US4: Video To Text ───────────────────────────────────────────────────────── + +@pytest.fixture +def totext_req(): + return VideoToTextRequest( + file_path="video/test.mp4", + source_id=10, + job_id=44, + start_sec=0.0, + end_sec=1.0, + ) + + +@pytest.mark.asyncio +async def test_video_to_text_samples_frames_and_calls_llm(mock_llm, mock_storage, totext_req, tmp_path): + video_path = str(tmp_path / "totext.mp4") + _make_test_video(video_path, num_frames=20, fps=10.0) + with open(video_path, "rb") as f: + mock_storage.download_bytes = AsyncMock(return_value=f.read()) + mock_llm.chat_vision = AsyncMock(return_value="视频描述内容") + mock_storage.upload_bytes = AsyncMock(return_value=None) + + callback_payloads = [] + async def fake_callback(url, payload): + callback_payloads.append(payload) + + with patch("app.services.video_service._post_callback", new=fake_callback): + from app.services.video_service import video_to_text_task + await video_to_text_task(totext_req, mock_llm, mock_storage, "http://backend/callback") + + assert callback_payloads[0]["status"] == "SUCCESS" + assert "output_path" in callback_payloads[0] + assert callback_payloads[0]["output_path"].startswith("video-text/10/") + mock_llm.chat_vision.assert_called_once() + + +@pytest.mark.asyncio +async def test_video_to_text_llm_failure_sends_failed_callback(mock_llm, mock_storage, totext_req, tmp_path): + video_path = str(tmp_path / "fail.mp4") + _make_test_video(video_path, num_frames=5, fps=10.0) + with open(video_path, "rb") as f: + mock_storage.download_bytes = AsyncMock(return_value=f.read()) + mock_llm.chat_vision = AsyncMock(side_effect=Exception("LLM unavailable")) + + callback_payloads = [] + async def fake_callback(url, payload): + callback_payloads.append(payload) + + with patch("app.services.video_service._post_callback", new=fake_callback): + from app.services.video_service import video_to_text_task + await video_to_text_task(totext_req, mock_llm, mock_storage, "http://backend/callback") + + assert callback_payloads[0]["status"] == "FAILED"