190 lines
6.0 KiB
Python
190 lines
6.0 KiB
Python
|
|
import base64
|
||
|
|
import io
|
||
|
|
import os
|
||
|
|
import tempfile
|
||
|
|
import time
|
||
|
|
from typing import Callable
|
||
|
|
|
||
|
|
import cv2
|
||
|
|
import httpx
|
||
|
|
import numpy as np
|
||
|
|
|
||
|
|
from app.clients.llm.base import LLMClient
|
||
|
|
from app.clients.storage.base import StorageClient
|
||
|
|
from app.core.config import get_config
|
||
|
|
from app.core.logging import get_logger
|
||
|
|
from app.models.video_models import ExtractFramesRequest, FrameInfo, VideoToTextRequest
|
||
|
|
|
||
|
|
logger = get_logger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
async def _post_callback(url: str, payload: dict) -> None:
|
||
|
|
async with httpx.AsyncClient(timeout=10) as http:
|
||
|
|
try:
|
||
|
|
await http.post(url, json=payload)
|
||
|
|
except Exception as exc:
|
||
|
|
logger.error("callback_failed", extra={"url": url, "error": str(exc)})
|
||
|
|
|
||
|
|
|
||
|
|
async def extract_frames_task(
|
||
|
|
req: ExtractFramesRequest,
|
||
|
|
storage: StorageClient,
|
||
|
|
callback_url: str,
|
||
|
|
) -> None:
|
||
|
|
cfg = get_config()
|
||
|
|
bucket = cfg["storage"]["buckets"]["source_data"]
|
||
|
|
threshold = cfg["video"].get("keyframe_diff_threshold", 30.0)
|
||
|
|
|
||
|
|
tmp = None
|
||
|
|
try:
|
||
|
|
video_bytes = await storage.download_bytes(bucket, req.file_path)
|
||
|
|
|
||
|
|
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
|
||
|
|
f.write(video_bytes)
|
||
|
|
tmp = f.name
|
||
|
|
|
||
|
|
cap = cv2.VideoCapture(tmp)
|
||
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
|
||
|
|
frames_info: list[FrameInfo] = []
|
||
|
|
upload_index = 0
|
||
|
|
prev_gray = None
|
||
|
|
frame_idx = 0
|
||
|
|
|
||
|
|
while True:
|
||
|
|
ret, frame = cap.read()
|
||
|
|
if not ret:
|
||
|
|
break
|
||
|
|
|
||
|
|
extract = False
|
||
|
|
if req.mode == "interval":
|
||
|
|
extract = (frame_idx % req.frame_interval == 0)
|
||
|
|
else: # keyframe
|
||
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.float32)
|
||
|
|
if prev_gray is None:
|
||
|
|
extract = True
|
||
|
|
else:
|
||
|
|
diff = np.mean(np.abs(gray - prev_gray))
|
||
|
|
extract = diff > threshold
|
||
|
|
prev_gray = gray
|
||
|
|
|
||
|
|
if extract:
|
||
|
|
time_sec = round(frame_idx / fps, 3)
|
||
|
|
_, buf = cv2.imencode(".jpg", frame)
|
||
|
|
frame_path = f"frames/{req.source_id}/{upload_index}.jpg"
|
||
|
|
await storage.upload_bytes(bucket, frame_path, buf.tobytes(), "image/jpeg")
|
||
|
|
frames_info.append(FrameInfo(
|
||
|
|
frame_index=frame_idx,
|
||
|
|
time_sec=time_sec,
|
||
|
|
frame_path=frame_path,
|
||
|
|
))
|
||
|
|
upload_index += 1
|
||
|
|
|
||
|
|
frame_idx += 1
|
||
|
|
|
||
|
|
cap.release()
|
||
|
|
|
||
|
|
logger.info("extract_frames_done", extra={
|
||
|
|
"job_id": req.job_id,
|
||
|
|
"frames": len(frames_info),
|
||
|
|
})
|
||
|
|
await _post_callback(callback_url, {
|
||
|
|
"job_id": req.job_id,
|
||
|
|
"status": "SUCCESS",
|
||
|
|
"frames": [f.model_dump() for f in frames_info],
|
||
|
|
"output_path": None,
|
||
|
|
"error_message": None,
|
||
|
|
})
|
||
|
|
|
||
|
|
except Exception as exc:
|
||
|
|
logger.error("extract_frames_failed", extra={"job_id": req.job_id, "error": str(exc)})
|
||
|
|
await _post_callback(callback_url, {
|
||
|
|
"job_id": req.job_id,
|
||
|
|
"status": "FAILED",
|
||
|
|
"frames": None,
|
||
|
|
"output_path": None,
|
||
|
|
"error_message": str(exc),
|
||
|
|
})
|
||
|
|
finally:
|
||
|
|
if tmp and os.path.exists(tmp):
|
||
|
|
os.unlink(tmp)
|
||
|
|
|
||
|
|
|
||
|
|
async def video_to_text_task(
|
||
|
|
req: VideoToTextRequest,
|
||
|
|
llm: LLMClient,
|
||
|
|
storage: StorageClient,
|
||
|
|
callback_url: str,
|
||
|
|
) -> None:
|
||
|
|
cfg = get_config()
|
||
|
|
bucket = cfg["storage"]["buckets"]["source_data"]
|
||
|
|
sample_count = cfg["video"].get("frame_sample_count", 8)
|
||
|
|
model = req.model or cfg["models"]["default_vision"]
|
||
|
|
|
||
|
|
tmp = None
|
||
|
|
try:
|
||
|
|
video_bytes = await storage.download_bytes(bucket, req.file_path)
|
||
|
|
|
||
|
|
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
|
||
|
|
f.write(video_bytes)
|
||
|
|
tmp = f.name
|
||
|
|
|
||
|
|
cap = cv2.VideoCapture(tmp)
|
||
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
|
||
|
|
start_frame = int(req.start_sec * fps)
|
||
|
|
end_frame = int(req.end_sec * fps)
|
||
|
|
total = max(end_frame - start_frame, 1)
|
||
|
|
|
||
|
|
# Uniform sampling
|
||
|
|
indices = [
|
||
|
|
start_frame + int(i * total / sample_count)
|
||
|
|
for i in range(sample_count)
|
||
|
|
]
|
||
|
|
indices = list(dict.fromkeys(indices)) # deduplicate
|
||
|
|
|
||
|
|
content: list[dict] = []
|
||
|
|
for idx in indices:
|
||
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
|
||
|
|
ret, frame = cap.read()
|
||
|
|
if not ret:
|
||
|
|
continue
|
||
|
|
_, buf = cv2.imencode(".jpg", frame)
|
||
|
|
b64 = base64.b64encode(buf.tobytes()).decode()
|
||
|
|
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}})
|
||
|
|
|
||
|
|
cap.release()
|
||
|
|
|
||
|
|
prompt = req.prompt_template or "请用中文详细描述这段视频的内容,生成结构化文字描述。"
|
||
|
|
content.append({"type": "text", "text": prompt})
|
||
|
|
|
||
|
|
messages = [{"role": "user", "content": content}]
|
||
|
|
description = await llm.chat_vision(model, messages)
|
||
|
|
|
||
|
|
# Upload description text
|
||
|
|
timestamp = int(time.time())
|
||
|
|
output_path = f"video-text/{req.source_id}/{timestamp}.txt"
|
||
|
|
await storage.upload_bytes(
|
||
|
|
bucket, output_path, description.encode("utf-8"), "text/plain"
|
||
|
|
)
|
||
|
|
|
||
|
|
logger.info("video_to_text_done", extra={"job_id": req.job_id, "output_path": output_path})
|
||
|
|
await _post_callback(callback_url, {
|
||
|
|
"job_id": req.job_id,
|
||
|
|
"status": "SUCCESS",
|
||
|
|
"frames": None,
|
||
|
|
"output_path": output_path,
|
||
|
|
"error_message": None,
|
||
|
|
})
|
||
|
|
|
||
|
|
except Exception as exc:
|
||
|
|
logger.error("video_to_text_failed", extra={"job_id": req.job_id, "error": str(exc)})
|
||
|
|
await _post_callback(callback_url, {
|
||
|
|
"job_id": req.job_id,
|
||
|
|
"status": "FAILED",
|
||
|
|
"frames": None,
|
||
|
|
"output_path": None,
|
||
|
|
"error_message": str(exc),
|
||
|
|
})
|
||
|
|
finally:
|
||
|
|
if tmp and os.path.exists(tmp):
|
||
|
|
os.unlink(tmp)
|