feat(US3+4): video frame extraction + video-to-text — POST /api/v1/video/*

- app/models/video_models.py: ExtractFramesRequest, VideoToTextRequest,
  FrameInfo, VideoJobCallback, VideoAcceptedResponse
- app/services/video_service.py: interval+keyframe frame extraction,
  uniform-sample video-to-text, HTTP callback, temp file cleanup
- app/routers/video.py: size check helper (_check_video_size via head_object),
  BackgroundTasks enqueue for both endpoints
- tests: 6 service + 4 router tests, 10/10 passing
This commit is contained in:
wh
2026-04-10 16:00:08 +08:00
parent 2876c179ac
commit 0274bb470a
10 changed files with 560 additions and 1 deletions

Binary file not shown.

View File

@@ -0,0 +1,189 @@
import base64
import io
import os
import tempfile
import time
from typing import Callable
import cv2
import httpx
import numpy as np
from app.clients.llm.base import LLMClient
from app.clients.storage.base import StorageClient
from app.core.config import get_config
from app.core.logging import get_logger
from app.models.video_models import ExtractFramesRequest, FrameInfo, VideoToTextRequest
logger = get_logger(__name__)
async def _post_callback(url: str, payload: dict) -> None:
async with httpx.AsyncClient(timeout=10) as http:
try:
await http.post(url, json=payload)
except Exception as exc:
logger.error("callback_failed", extra={"url": url, "error": str(exc)})
async def extract_frames_task(
req: ExtractFramesRequest,
storage: StorageClient,
callback_url: str,
) -> None:
cfg = get_config()
bucket = cfg["storage"]["buckets"]["source_data"]
threshold = cfg["video"].get("keyframe_diff_threshold", 30.0)
tmp = None
try:
video_bytes = await storage.download_bytes(bucket, req.file_path)
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
f.write(video_bytes)
tmp = f.name
cap = cv2.VideoCapture(tmp)
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
frames_info: list[FrameInfo] = []
upload_index = 0
prev_gray = None
frame_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
extract = False
if req.mode == "interval":
extract = (frame_idx % req.frame_interval == 0)
else: # keyframe
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32)
if prev_gray is None:
extract = True
else:
diff = np.mean(np.abs(gray - prev_gray))
extract = diff > threshold
prev_gray = gray
if extract:
time_sec = round(frame_idx / fps, 3)
_, buf = cv2.imencode(".jpg", frame)
frame_path = f"frames/{req.source_id}/{upload_index}.jpg"
await storage.upload_bytes(bucket, frame_path, buf.tobytes(), "image/jpeg")
frames_info.append(FrameInfo(
frame_index=frame_idx,
time_sec=time_sec,
frame_path=frame_path,
))
upload_index += 1
frame_idx += 1
cap.release()
logger.info("extract_frames_done", extra={
"job_id": req.job_id,
"frames": len(frames_info),
})
await _post_callback(callback_url, {
"job_id": req.job_id,
"status": "SUCCESS",
"frames": [f.model_dump() for f in frames_info],
"output_path": None,
"error_message": None,
})
except Exception as exc:
logger.error("extract_frames_failed", extra={"job_id": req.job_id, "error": str(exc)})
await _post_callback(callback_url, {
"job_id": req.job_id,
"status": "FAILED",
"frames": None,
"output_path": None,
"error_message": str(exc),
})
finally:
if tmp and os.path.exists(tmp):
os.unlink(tmp)
async def video_to_text_task(
req: VideoToTextRequest,
llm: LLMClient,
storage: StorageClient,
callback_url: str,
) -> None:
cfg = get_config()
bucket = cfg["storage"]["buckets"]["source_data"]
sample_count = cfg["video"].get("frame_sample_count", 8)
model = req.model or cfg["models"]["default_vision"]
tmp = None
try:
video_bytes = await storage.download_bytes(bucket, req.file_path)
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
f.write(video_bytes)
tmp = f.name
cap = cv2.VideoCapture(tmp)
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
start_frame = int(req.start_sec * fps)
end_frame = int(req.end_sec * fps)
total = max(end_frame - start_frame, 1)
# Uniform sampling
indices = [
start_frame + int(i * total / sample_count)
for i in range(sample_count)
]
indices = list(dict.fromkeys(indices)) # deduplicate
content: list[dict] = []
for idx in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if not ret:
continue
_, buf = cv2.imencode(".jpg", frame)
b64 = base64.b64encode(buf.tobytes()).decode()
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}})
cap.release()
prompt = req.prompt_template or "请用中文详细描述这段视频的内容,生成结构化文字描述。"
content.append({"type": "text", "text": prompt})
messages = [{"role": "user", "content": content}]
description = await llm.chat_vision(model, messages)
# Upload description text
timestamp = int(time.time())
output_path = f"video-text/{req.source_id}/{timestamp}.txt"
await storage.upload_bytes(
bucket, output_path, description.encode("utf-8"), "text/plain"
)
logger.info("video_to_text_done", extra={"job_id": req.job_id, "output_path": output_path})
await _post_callback(callback_url, {
"job_id": req.job_id,
"status": "SUCCESS",
"frames": None,
"output_path": output_path,
"error_message": None,
})
except Exception as exc:
logger.error("video_to_text_failed", extra={"job_id": req.job_id, "error": str(exc)})
await _post_callback(callback_url, {
"job_id": req.job_id,
"status": "FAILED",
"frames": None,
"output_path": None,
"error_message": str(exc),
})
finally:
if tmp and os.path.exists(tmp):
os.unlink(tmp)