diff --git a/docs/superpowers/plans/2026-04-10-ai-service-impl.md b/docs/superpowers/plans/2026-04-10-ai-service-impl.md index 7db598b..5957548 100644 --- a/docs/superpowers/plans/2026-04-10-ai-service-impl.md +++ b/docs/superpowers/plans/2026-04-10-ai-service-impl.md @@ -53,6 +53,7 @@ backend: {} video: frame_sample_count: 8 + max_file_size_mb: 200 models: default_text: "glm-4-flash" @@ -67,6 +68,7 @@ STORAGE_ACCESS_KEY=minioadmin STORAGE_SECRET_KEY=minioadmin STORAGE_ENDPOINT=http://rustfs:9000 BACKEND_CALLBACK_URL=http://backend:8080/internal/video-job/callback +# MAX_VIDEO_SIZE_MB=200 # 可选,覆盖 config.yaml 中的视频大小上限 ``` - [ ] **Step 4: 创建 `requirements.txt`** @@ -111,6 +113,7 @@ def mock_storage(): client.download_bytes = AsyncMock() client.upload_bytes = AsyncMock() client.get_presigned_url = MagicMock(return_value="https://example.com/presigned/crop.jpg") + client.get_object_size = AsyncMock(return_value=10 * 1024 * 1024) # 默认 10MB,小于限制 return client ``` @@ -246,12 +249,13 @@ from dotenv import load_dotenv _ROOT = Path(__file__).parent.parent.parent _ENV_OVERRIDES = { - "ZHIPUAI_API_KEY": ["zhipuai", "api_key"], - "STORAGE_ACCESS_KEY": ["storage", "access_key"], - "STORAGE_SECRET_KEY": ["storage", "secret_key"], - "STORAGE_ENDPOINT": ["storage", "endpoint"], - "BACKEND_CALLBACK_URL": ["backend", "callback_url"], - "LOG_LEVEL": ["server", "log_level"], + "ZHIPUAI_API_KEY": ["zhipuai", "api_key"], + "STORAGE_ACCESS_KEY": ["storage", "access_key"], + "STORAGE_SECRET_KEY": ["storage", "secret_key"], + "STORAGE_ENDPOINT": ["storage", "endpoint"], + "BACKEND_CALLBACK_URL": ["backend", "callback_url"], + "LOG_LEVEL": ["server", "log_level"], + "MAX_VIDEO_SIZE_MB": ["video", "max_file_size_mb"], } @@ -638,6 +642,15 @@ def test_get_presigned_url(rustfs_client): Params={"Bucket": "source-data", "Key": "crops/1/0.jpg"}, ExpiresIn=3600, ) + + +def test_get_object_size(rustfs_client): + rustfs_client._mock_s3.head_object.return_value = {"ContentLength": 1024 * 1024 * 50} + size = asyncio.run(rustfs_client.get_object_size("source-data", "video/1.mp4")) + assert size == 1024 * 1024 * 50 + rustfs_client._mock_s3.head_object.assert_called_once_with( + Bucket="source-data", Key="video/1.mp4" + ) ``` - [ ] **Step 2: 运行,确认失败** @@ -672,6 +685,10 @@ class StorageClient(ABC): @abstractmethod def get_presigned_url(self, bucket: str, path: str, expires: int = 3600) -> str: """生成预签名访问 URL。""" + + @abstractmethod + async def get_object_size(self, bucket: str, path: str) -> int: + """返回对象字节大小,用于在下载前进行大小校验。""" ``` - [ ] **Step 4: 实现 `app/clients/storage/rustfs_client.py`** @@ -719,6 +736,13 @@ class RustFSClient(StorageClient): Params={"Bucket": bucket, "Key": path}, ExpiresIn=expires, ) + + async def get_object_size(self, bucket: str, path: str) -> int: + loop = asyncio.get_event_loop() + resp = await loop.run_in_executor( + None, lambda: self._s3.head_object(Bucket=bucket, Key=path) + ) + return resp["ContentLength"] ``` - [ ] **Step 5: 运行,确认通过** @@ -727,7 +751,7 @@ class RustFSClient(StorageClient): conda run -n label pytest tests/test_storage_client.py -v ``` -Expected: `3 passed` +Expected: `4 passed` - [ ] **Step 6: Commit** @@ -816,6 +840,11 @@ app = FastAPI(title="Label AI Service", version="1.0.0", lifespan=lifespan) app.middleware("http")(request_logging_middleware) + +@app.get("/health", tags=["Health"]) +async def health(): + return {"status": "ok"} + app.add_exception_handler(UnsupportedFileTypeError, unsupported_file_type_handler) app.add_exception_handler(StorageDownloadError, storage_download_handler) app.add_exception_handler(LLMResponseParseError, llm_parse_handler) @@ -831,11 +860,26 @@ app.add_exception_handler(Exception, generic_error_handler) # app.include_router(finetune.router, prefix="/api/v1") ``` -- [ ] **Step 3: Commit** +- [ ] **Step 3: 验证 /health 端点** + +```bash +conda run -n label python -c " +from fastapi.testclient import TestClient +from app.main import app +client = TestClient(app) +r = client.get('/health') +assert r.status_code == 200 and r.json() == {'status': 'ok'}, r.json() +print('health check OK') +" +``` + +Expected: `health check OK` + +- [ ] **Step 4: Commit** ```bash git add app/core/dependencies.py app/main.py -git commit -m "feat: DI dependencies and FastAPI app entry with lifespan" +git commit -m "feat: DI dependencies, FastAPI app entry with lifespan and /health endpoint" ``` --- @@ -1465,7 +1509,7 @@ async def extract_quadruples( bbox=bbox, cropped_image_path=crop_path, )) - except (KeyError, TypeError, Exception) as e: + except Exception as e: logger.warning(f"跳过不完整四元组 index={i}: {e}") return result @@ -1997,7 +2041,8 @@ def client(mock_llm, mock_storage): return TestClient(app) -def test_extract_frames_returns_202(client): +def test_extract_frames_returns_202(client, mock_storage): + mock_storage.get_object_size = AsyncMock(return_value=10 * 1024 * 1024) # 10MB resp = client.post("/api/v1/video/extract-frames", json={ "file_path": "video/202404/1.mp4", "source_id": 10, @@ -2010,7 +2055,8 @@ def test_extract_frames_returns_202(client): assert "后台处理中" in resp.json()["message"] -def test_video_to_text_returns_202(client): +def test_video_to_text_returns_202(client, mock_storage): + mock_storage.get_object_size = AsyncMock(return_value=10 * 1024 * 1024) # 10MB resp = client.post("/api/v1/video/to-text", json={ "file_path": "video/202404/1.mp4", "source_id": 10, @@ -2020,12 +2066,25 @@ def test_video_to_text_returns_202(client): }) assert resp.status_code == 202 assert resp.json()["job_id"] == 43 + + +def test_extract_frames_rejects_oversized_video(client, mock_storage): + mock_storage.get_object_size = AsyncMock(return_value=300 * 1024 * 1024) # 300MB > 200MB limit + resp = client.post("/api/v1/video/extract-frames", json={ + "file_path": "video/202404/big.mp4", + "source_id": 10, + "job_id": 99, + "mode": "interval", + "frame_interval": 30, + }) + assert resp.status_code == 400 + assert "大小" in resp.json()["detail"] ``` - [ ] **Step 2: 实现 `app/routers/video.py`** ```python -from fastapi import APIRouter, BackgroundTasks, Depends +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from app.clients.llm.base import LLMClient from app.clients.storage.base import StorageClient @@ -2042,6 +2101,16 @@ from app.services import video_service router = APIRouter(tags=["Video"]) +async def _check_video_size(storage: StorageClient, bucket: str, file_path: str, max_mb: int) -> None: + """在触发后台任务前校验视频文件大小,超限时抛出 HTTP 400。""" + size_bytes = await storage.get_object_size(bucket, file_path) + if size_bytes > max_mb * 1024 * 1024: + raise HTTPException( + status_code=400, + detail=f"视频文件大小超出限制(最大 {max_mb}MB,当前 {size_bytes // 1024 // 1024}MB)", + ) + + @router.post("/video/extract-frames", response_model=ExtractFramesResponse, status_code=202) async def extract_frames( req: ExtractFramesRequest, @@ -2049,6 +2118,8 @@ async def extract_frames( storage: StorageClient = Depends(get_storage_client), ): cfg = get_config() + bucket = cfg["storage"]["buckets"]["source_data"] + await _check_video_size(storage, bucket, req.file_path, cfg["video"]["max_file_size_mb"]) background_tasks.add_task( video_service.extract_frames_background, file_path=req.file_path, @@ -2058,7 +2129,7 @@ async def extract_frames( frame_interval=req.frame_interval, storage=storage, callback_url=cfg["backend"]["callback_url"], - bucket=cfg["storage"]["buckets"]["source_data"], + bucket=bucket, ) return ExtractFramesResponse(message="任务已接受,后台处理中", job_id=req.job_id) @@ -2071,6 +2142,8 @@ async def video_to_text( storage: StorageClient = Depends(get_storage_client), ): cfg = get_config() + bucket = cfg["storage"]["buckets"]["source_data"] + await _check_video_size(storage, bucket, req.file_path, cfg["video"]["max_file_size_mb"]) model = req.model or cfg["models"]["default_vision"] prompt = req.prompt_template or video_service.DEFAULT_VIDEO_TO_TEXT_PROMPT background_tasks.add_task( @@ -2086,7 +2159,7 @@ async def video_to_text( llm=llm, storage=storage, callback_url=cfg["backend"]["callback_url"], - bucket=cfg["storage"]["buckets"]["source_data"], + bucket=bucket, ) return VideoToTextResponse(message="任务已接受,后台处理中", job_id=req.job_id) ``` @@ -2104,7 +2177,7 @@ app.include_router(video.router, prefix="/api/v1") conda run -n label pytest tests/test_video_router.py -v ``` -Expected: `2 passed` +Expected: `3 passed` - [ ] **Step 5: Commit** @@ -2225,6 +2298,7 @@ async def test_gen_text_qa_llm_error(mock_llm): @pytest.mark.asyncio async def test_gen_image_qa(mock_llm, mock_storage): mock_llm.chat_vision.return_value = '[{"question":"图中是什么?","answer":"电缆接头"}]' + mock_storage.download_bytes.return_value = b"fake-image-bytes" items = [ImageQuadrupleForQA( subject="电缆接头", predicate="位于", object="配电箱", qualifier="", cropped_image_path="crops/1/0.jpg" )] @@ -2232,7 +2306,12 @@ async def test_gen_image_qa(mock_llm, mock_storage): result = await gen_image_qa(items=items, model="glm-4v-flash", prompt_template="", llm=mock_llm, storage=mock_storage) assert len(result) == 1 assert result[0].image_path == "crops/1/0.jpg" - mock_storage.get_presigned_url.assert_called_once_with("source-data", "crops/1/0.jpg") + # 验证使用 download_bytes(base64),而非 presigned URL + mock_storage.download_bytes.assert_called_once_with("source-data", "crops/1/0.jpg") + # 验证发送给 GLM-4V 的消息包含 base64 data URL + call_messages = mock_llm.chat_vision.call_args[0][0] + image_content = call_messages[1]["content"][0] + assert image_content["image_url"]["url"].startswith("data:image/jpeg;base64,") ``` - [ ] **Step 3: 运行,确认失败** @@ -2246,12 +2325,13 @@ Expected: `ImportError` - [ ] **Step 4: 实现 `app/services/qa_service.py`** ```python +import base64 import json import logging from app.clients.llm.base import LLMClient from app.clients.storage.base import StorageClient -from app.core.exceptions import LLMCallError, LLMResponseParseError +from app.core.exceptions import LLMCallError, LLMResponseParseError, StorageDownloadError from app.core.json_utils import parse_json_response from app.models.qa_models import ( ImageQAPair, @@ -2326,7 +2406,12 @@ async def gen_image_qa( result = [] prompt = prompt_template or DEFAULT_IMAGE_QA_PROMPT for item in items: - presigned_url = storage.get_presigned_url(bucket, item.cropped_image_path) + # 下载裁剪图并 base64 编码:RustFS 为内网部署,presigned URL 无法被云端 GLM-4V 访问 + try: + image_bytes = await storage.download_bytes(bucket, item.cropped_image_path) + except Exception as e: + raise StorageDownloadError(f"下载裁剪图失败 {item.cropped_image_path}: {e}") from e + b64 = base64.b64encode(image_bytes).decode() quad_text = json.dumps( {k: v for k, v in item.model_dump().items() if k != "cropped_image_path"}, ensure_ascii=False, @@ -2334,7 +2419,7 @@ async def gen_image_qa( messages = [ {"role": "system", "content": "你是专业的视觉问答对生成助手。"}, {"role": "user", "content": [ - {"type": "image_url", "image_url": {"url": presigned_url}}, + {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}, {"type": "text", "text": prompt + quad_text}, ]}, ] @@ -2813,6 +2898,12 @@ services: networks: - label-net restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 10s rustfs: image: minio/minio:latest diff --git a/docs/superpowers/specs/2026-04-10-ai-service-design.md b/docs/superpowers/specs/2026-04-10-ai-service-design.md index 027dfce..55b01bb 100644 --- a/docs/superpowers/specs/2026-04-10-ai-service-design.md +++ b/docs/superpowers/specs/2026-04-10-ai-service-design.md @@ -125,6 +125,7 @@ backend: {} # callback_url 由 .env 注入 video: frame_sample_count: 8 # 视频转文本时均匀抽取的代表帧数 + max_file_size_mb: 200 # 视频文件大小上限(超过则拒绝,防止 OOM) models: default_text: "glm-4-flash" @@ -139,6 +140,7 @@ STORAGE_ACCESS_KEY=minioadmin STORAGE_SECRET_KEY=minioadmin STORAGE_ENDPOINT=http://rustfs:9000 BACKEND_CALLBACK_URL=http://backend:8080/internal/video-job/callback +# MAX_VIDEO_SIZE_MB=200 # 可选,覆盖 config.yaml 中的视频大小上限 ``` ### 3.4 config 模块实现 @@ -154,12 +156,13 @@ _ROOT = Path(__file__).parent.parent.parent # 环境变量 → YAML 路径映射 _ENV_OVERRIDES = { - "ZHIPUAI_API_KEY": ["zhipuai", "api_key"], - "STORAGE_ACCESS_KEY": ["storage", "access_key"], - "STORAGE_SECRET_KEY": ["storage", "secret_key"], - "STORAGE_ENDPOINT": ["storage", "endpoint"], - "BACKEND_CALLBACK_URL": ["backend", "callback_url"], - "LOG_LEVEL": ["server", "log_level"], + "ZHIPUAI_API_KEY": ["zhipuai", "api_key"], + "STORAGE_ACCESS_KEY": ["storage", "access_key"], + "STORAGE_SECRET_KEY": ["storage", "secret_key"], + "STORAGE_ENDPOINT": ["storage", "endpoint"], + "BACKEND_CALLBACK_URL": ["backend", "callback_url"], + "LOG_LEVEL": ["server", "log_level"], + "MAX_VIDEO_SIZE_MB": ["video", "max_file_size_mb"], } def _set_nested(d: dict, keys: list[str], value: str): @@ -351,6 +354,17 @@ app = FastAPI(title="Label AI Service", lifespan=lifespan) 统一前缀:`/api/v1`。FastAPI 自动生成 Swagger 文档(`/docs`)。 +### 5.0 健康检查 + +**`GET /health`** + +```json +// 响应(200 OK) +{"status": "ok"} +``` + +用于 Docker healthcheck、Nginx 上游探测、运维监控。无需认证,不访问外部依赖。 + ### 5.1 文本三元组提取 **`POST /api/v1/text/extract`** @@ -541,7 +555,7 @@ POST {BACKEND_CALLBACK_URL} } ``` -图像 QA 生成时,AI 服务通过 `get_presigned_url` 获取裁剪图临时访问 URL,构造多模态消息后调用 GLM-4V。 +图像 QA 生成时,AI 服务通过 `storage.download_bytes` 重新下载裁剪图,base64 编码后直接嵌入多模态消息,避免 RustFS 内网 presigned URL 无法被云端 GLM-4V 访问的问题。 ### 5.7 提交微调任务 @@ -628,6 +642,8 @@ def extract_text(data: bytes, filename: str) -> str: **抽帧(BackgroundTask)**: ``` +0. storage.get_object_size(bucket, file_path) → 字节数 + 超过 video.max_file_size_mb 限制 → 回调 FAILED(路由层提前校验,返回 400) 1. storage.download_bytes → bytes → 写入 tempfile 2. cv2.VideoCapture 打开临时文件 3. interval 模式:按 frame_interval 步进读帧 @@ -659,9 +675,10 @@ def extract_text(data: bytes, filename: str) -> str: 图像 QA: 遍历四元组列表 - storage.get_presigned_url(cropped_image_path) → 临时 URL - 构造多模态消息(image_url + 问题指令) + storage.download_bytes(bucket, cropped_image_path) → bytes → base64 编码 + 构造多模态消息(data:image/jpeg;base64,... + 问题指令) llm.chat_vision → 解析 → 含 image_path 的 QAPairList + (注:不使用 presigned URL,因 RustFS 为内网部署,云端 GLM-4V 无法访问内网地址) ``` ### 6.5 finetune_service — GLM 微调对接 @@ -764,6 +781,12 @@ ai-service: - backend networks: - label-net + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 10s ``` ### 10.3 requirements.txt @@ -799,9 +822,9 @@ ZhipuAI 官方 SDK 是同步阻塞调用,直接 `await` 不生效。通过 `lo 项目规模适中,视频处理任务由 ADMIN 手动触发,并发量可控。FastAPI `BackgroundTasks` 无需额外中间件(Redis 队列、Celery Worker),部署简单,任务状态通过回调接口传递给 Java 后端管理,符合整体架构风格。 -### 11.4 为何图像 QA 生成用 presigned URL 而非 base64 +### 11.4 为何图像 QA 生成用 base64 而非 presigned URL -裁剪图已存储在 RustFS,GLM-4V 支持通过 URL 直接访问图片。presigned URL 避免将图片内容重新加载到 AI 服务内存后再 base64 编码,减少内存压力,适合多张图片批量生成的场景。 +RustFS 部署在 Docker 内网(`http://rustfs:9000`),presigned URL 指向内网地址,云端 GLM-4V API 无法访问,会导致所有图像 QA 请求失败。因此将裁剪图重新下载为 bytes,base64 编码后直接嵌入多模态消息体,与 `image_service` 处理原图的方式保持一致,无需 RustFS 有公网地址。 ### 11.5 config.yaml + .env 分层配置的原因