Files
label_ai_service/app/services/video_service.py

190 lines
6.0 KiB
Python
Raw Permalink Normal View History

import base64
import io
import os
import tempfile
import time
from typing import Callable
import cv2
import httpx
import numpy as np
from app.clients.llm.base import LLMClient
from app.clients.storage.base import StorageClient
from app.core.config import get_config
from app.core.logging import get_logger
from app.models.video_models import ExtractFramesRequest, FrameInfo, VideoToTextRequest
logger = get_logger(__name__)
async def _post_callback(url: str, payload: dict) -> None:
async with httpx.AsyncClient(timeout=10) as http:
try:
await http.post(url, json=payload)
except Exception as exc:
logger.error("callback_failed", extra={"url": url, "error": str(exc)})
async def extract_frames_task(
req: ExtractFramesRequest,
storage: StorageClient,
callback_url: str,
) -> None:
cfg = get_config()
bucket = cfg["storage"]["buckets"]["source_data"]
threshold = cfg["video"].get("keyframe_diff_threshold", 30.0)
tmp = None
try:
video_bytes = await storage.download_bytes(bucket, req.file_path)
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
f.write(video_bytes)
tmp = f.name
cap = cv2.VideoCapture(tmp)
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
frames_info: list[FrameInfo] = []
upload_index = 0
prev_gray = None
frame_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
extract = False
if req.mode == "interval":
extract = (frame_idx % req.frame_interval == 0)
else: # keyframe
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32)
if prev_gray is None:
extract = True
else:
diff = np.mean(np.abs(gray - prev_gray))
extract = diff > threshold
prev_gray = gray
if extract:
time_sec = round(frame_idx / fps, 3)
_, buf = cv2.imencode(".jpg", frame)
frame_path = f"frames/{req.source_id}/{upload_index}.jpg"
await storage.upload_bytes(bucket, frame_path, buf.tobytes(), "image/jpeg")
frames_info.append(FrameInfo(
frame_index=frame_idx,
time_sec=time_sec,
frame_path=frame_path,
))
upload_index += 1
frame_idx += 1
cap.release()
logger.info("extract_frames_done", extra={
"job_id": req.job_id,
"frames": len(frames_info),
})
await _post_callback(callback_url, {
"job_id": req.job_id,
"status": "SUCCESS",
"frames": [f.model_dump() for f in frames_info],
"output_path": None,
"error_message": None,
})
except Exception as exc:
logger.error("extract_frames_failed", extra={"job_id": req.job_id, "error": str(exc)})
await _post_callback(callback_url, {
"job_id": req.job_id,
"status": "FAILED",
"frames": None,
"output_path": None,
"error_message": str(exc),
})
finally:
if tmp and os.path.exists(tmp):
os.unlink(tmp)
async def video_to_text_task(
req: VideoToTextRequest,
llm: LLMClient,
storage: StorageClient,
callback_url: str,
) -> None:
cfg = get_config()
bucket = cfg["storage"]["buckets"]["source_data"]
sample_count = cfg["video"].get("frame_sample_count", 8)
model = req.model or cfg["models"]["default_vision"]
tmp = None
try:
video_bytes = await storage.download_bytes(bucket, req.file_path)
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
f.write(video_bytes)
tmp = f.name
cap = cv2.VideoCapture(tmp)
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
start_frame = int(req.start_sec * fps)
end_frame = int(req.end_sec * fps)
total = max(end_frame - start_frame, 1)
# Uniform sampling
indices = [
start_frame + int(i * total / sample_count)
for i in range(sample_count)
]
indices = list(dict.fromkeys(indices)) # deduplicate
content: list[dict] = []
for idx in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if not ret:
continue
_, buf = cv2.imencode(".jpg", frame)
b64 = base64.b64encode(buf.tobytes()).decode()
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}})
cap.release()
prompt = req.prompt_template or "请用中文详细描述这段视频的内容,生成结构化文字描述。"
content.append({"type": "text", "text": prompt})
messages = [{"role": "user", "content": content}]
description = await llm.chat_vision(model, messages)
# Upload description text
timestamp = int(time.time())
output_path = f"video-text/{req.source_id}/{timestamp}.txt"
await storage.upload_bytes(
bucket, output_path, description.encode("utf-8"), "text/plain"
)
logger.info("video_to_text_done", extra={"job_id": req.job_id, "output_path": output_path})
await _post_callback(callback_url, {
"job_id": req.job_id,
"status": "SUCCESS",
"frames": None,
"output_path": output_path,
"error_message": None,
})
except Exception as exc:
logger.error("video_to_text_failed", extra={"job_id": req.job_id, "error": str(exc)})
await _post_callback(callback_url, {
"job_id": req.job_id,
"status": "FAILED",
"frames": None,
"output_path": None,
"error_message": str(exc),
})
finally:
if tmp and os.path.exists(tmp):
os.unlink(tmp)