diff --git a/.env b/.env index 20292a0..6b5bdf9 100644 --- a/.env +++ b/.env @@ -1,10 +1,13 @@ -# Required — fill in before running -ZHIPUAI_API_KEY=your-zhipuai-api-key-here -STORAGE_ACCESS_KEY=your-storage-access-key -STORAGE_SECRET_KEY=your-storage-secret-key -STORAGE_ENDPOINT=http://rustfs:9000 +# Required - fill in before running +ZHIPUAI_API_KEY=b11404531c574043b1b0750186cd9d79.LfoyJjtj1fnGbTLl +DASHSCOPE_API_KEY=sk-d0ebc07bad2d4666bcd284f80d3fe138 +STORAGE_ACCESS_KEY=admin +STORAGE_SECRET_KEY=your_strong_password +STORAGE_ENDPOINT=http://39.107.112.174:9000 # Optional overrides -BACKEND_CALLBACK_URL=http://label-backend:8080/api/ai/callback +DASHSCOPE_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1 +DASHSCOPE_FINE_TUNE_BASE_URL=https://dashscope.aliyuncs.com/api/v1 +BACKEND_CALLBACK_URL=http://localhost:18082/api/ai/callback LOG_LEVEL=INFO -# MAX_VIDEO_SIZE_MB=200 +MAX_VIDEO_SIZE_MB=500MB diff --git a/README.md b/README.md index 13fca10..9cb828c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # label_ai_service +> 2026-04-16 update: 默认 LLM 适配器已切换为阿里云百炼/千问(DashScope OpenAI-compatible API)。文本默认模型为 `qwen3.6-plus`,视觉默认模型为 `qwen-vl-plus`;旧的 `ZhipuAIClient` 代码保留在仓库中,但默认依赖注入不再使用。 + `label_ai_service` 是知识图谱智能标注平台的 AI 计算服务,基于 FastAPI 提供独立部署的推理与预处理能力。它不直接访问数据库,而是通过 ZhipuAI GLM 系列模型完成结构化抽取,通过 RustFS 读写原始文件和处理结果,并通过 HTTP 回调把异步视频任务结果通知上游后端。 当前服务覆盖 6 类核心能力: @@ -128,9 +130,14 @@ label_ai_service/ ```yaml server: - port: 8000 + port: 18000 log_level: INFO +dashscope: + api_key: "" + base_url: "https://dashscope.aliyuncs.com/compatible-mode/v1" + fine_tune_base_url: "https://dashscope.aliyuncs.com/api/v1" + storage: buckets: source_data: "source-data" @@ -140,12 +147,12 @@ backend: {} video: frame_sample_count: 8 - max_file_size_mb: 200 + max_file_size_mb: 500 keyframe_diff_threshold: 30.0 models: - default_text: "glm-4-flash" - default_vision: "glm-4v-flash" + default_text: "qwen3.6-plus" + default_vision: "qwen-vl-plus" ``` ### .env @@ -154,7 +161,9 @@ models: | 变量名 | 必填 | 说明 | |---|---|---| -| `ZHIPUAI_API_KEY` | 是 | ZhipuAI API Key | +| `DASHSCOPE_API_KEY` | 是 | DashScope API Key | +| `DASHSCOPE_BASE_URL` | 否 | DashScope OpenAI-compatible base URL | +| `DASHSCOPE_FINE_TUNE_BASE_URL` | 否 | DashScope fine-tune API base URL | | `STORAGE_ACCESS_KEY` | 是 | RustFS/S3 Access Key | | `STORAGE_SECRET_KEY` | 是 | RustFS/S3 Secret Key | | `STORAGE_ENDPOINT` | 是 | RustFS/S3 Endpoint,例如 `http://rustfs:9000` | @@ -165,7 +174,9 @@ models: `.env` 示例: ```ini -ZHIPUAI_API_KEY=your-zhipuai-api-key-here +DASHSCOPE_API_KEY=your-dashscope-api-key-here +DASHSCOPE_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1 +DASHSCOPE_FINE_TUNE_BASE_URL=https://dashscope.aliyuncs.com/api/v1 STORAGE_ACCESS_KEY=your-storage-access-key STORAGE_SECRET_KEY=your-storage-secret-key STORAGE_ENDPOINT=http://rustfs:9000 @@ -318,7 +329,7 @@ curl -X POST http://localhost:8000/api/v1/finetune/start \ -H "Content-Type: application/json" \ -d '{ "jsonl_url": "https://example.com/train.jsonl", - "base_model": "glm-4-flash", + "base_model": "qwen3-14b", "hyperparams": { "epochs": 3, "learning_rate": 0.0001 diff --git a/app/clients/llm/qwen_client.py b/app/clients/llm/qwen_client.py new file mode 100644 index 0000000..6fb237a --- /dev/null +++ b/app/clients/llm/qwen_client.py @@ -0,0 +1,132 @@ +from typing import Any + +import httpx + +from app.clients.llm.base import LLMClient +from app.core.exceptions import LLMCallError +from app.core.logging import get_logger + +logger = get_logger(__name__) + + +class QwenClient(LLMClient): + def __init__( + self, + api_key: str, + base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1", + fine_tune_base_url: str | None = None, + transport: httpx.BaseTransport | None = None, + ) -> None: + self._api_key = api_key + self._base_url = base_url.rstrip("/") + self._fine_tune_base_url = ( + fine_tune_base_url.rstrip("/") + if fine_tune_base_url + else self._base_url.replace("/compatible-mode/v1", "/api/v1") + ) + self._transport = transport + + async def chat(self, model: str, messages: list[dict]) -> str: + return await self._chat(model, messages) + + async def chat_vision(self, model: str, messages: list[dict]) -> str: + return await self._chat(model, messages) + + async def submit_finetune(self, jsonl_url: str, base_model: str, hyperparams: dict) -> str: + try: + file_bytes = await self._download_training_file(jsonl_url) + file_id = await self._upload_training_file(file_bytes) + payload = { + "model": base_model, + "training_file_ids": [file_id], + } + if hyperparams: + payload["hyper_parameters"] = hyperparams + data = await self._post_json(self._fine_tune_base_url, "/fine-tunes", payload) + output = data.get("output", {}) + job_id = output.get("job_id") or data.get("job_id") + if not job_id: + raise LLMCallError("千问微调任务提交失败: 缺少 job_id") + return job_id + except LLMCallError: + raise + except Exception as exc: + raise LLMCallError(f"千问微调任务提交失败: {exc}") from exc + + async def get_finetune_status(self, job_id: str) -> dict: + try: + data = await self._get_json(self._fine_tune_base_url, f"/fine-tunes/{job_id}") + output = data.get("output", {}) + return { + "job_id": output.get("job_id") or job_id, + "status": output.get("status", "").lower(), + "progress": output.get("progress"), + "error_message": output.get("message"), + } + except LLMCallError: + raise + except Exception as exc: + raise LLMCallError(f"查询千问微调任务失败: {exc}") from exc + + async def _chat(self, model: str, messages: list[dict]) -> str: + try: + data = await self._post_json( + self._base_url, + "/chat/completions", + {"model": model, "messages": messages}, + ) + content = data["choices"][0]["message"]["content"] + if isinstance(content, list): + return "".join( + part.get("text", "") if isinstance(part, dict) else str(part) + for part in content + ) + logger.info("llm_call", extra={"model": model, "response_len": len(content)}) + return content + except LLMCallError: + raise + except Exception as exc: + logger.error("llm_call_error", extra={"model": model, "error": str(exc)}) + raise LLMCallError(f"千问大模型调用失败: {exc}") from exc + + async def _download_training_file(self, jsonl_url: str) -> bytes: + async with self._build_client() as client: + response = await client.get(jsonl_url) + response.raise_for_status() + return response.content + + async def _upload_training_file(self, file_bytes: bytes) -> str: + async with self._build_client(base_url=self._base_url) as client: + response = await client.post( + "/files", + data={"purpose": "fine-tune"}, + files={"file": ("training.jsonl", file_bytes, "application/jsonl")}, + ) + response.raise_for_status() + data = response.json() + file_id = data.get("id") + if not file_id: + raise LLMCallError("千问训练文件上传失败: 缺少 file id") + return file_id + + async def _post_json(self, base_url: str, path: str, payload: dict[str, Any]) -> dict[str, Any]: + async with self._build_client(base_url=base_url) as client: + response = await client.post(path, json=payload) + response.raise_for_status() + return response.json() + + async def _get_json(self, base_url: str, path: str) -> dict[str, Any]: + async with self._build_client(base_url=base_url) as client: + response = await client.get(path) + response.raise_for_status() + return response.json() + + def _build_client(self, base_url: str | None = None) -> httpx.AsyncClient: + return httpx.AsyncClient( + base_url=base_url or self._base_url, + headers={ + "Authorization": f"Bearer {self._api_key}", + }, + transport=self._transport, + timeout=60, + ) diff --git a/app/clients/storage/rustfs_client.py b/app/clients/storage/rustfs_client.py index 19708d6..57e49f1 100644 --- a/app/clients/storage/rustfs_client.py +++ b/app/clients/storage/rustfs_client.py @@ -28,6 +28,7 @@ class RustFSClient(StorageClient): ) return resp["Body"].read() except ClientError as exc: + print(exc) raise StorageError(f"存储下载失败 [{bucket}/{path}]: {exc}") from exc async def upload_bytes( diff --git a/app/core/config.py b/app/core/config.py index ac40a1d..9a0a6f5 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -10,6 +10,9 @@ load_dotenv() # Maps environment variable names to nested YAML key paths _ENV_OVERRIDES: dict[str, list[str]] = { + "DASHSCOPE_API_KEY": ["dashscope", "api_key"], + "DASHSCOPE_BASE_URL": ["dashscope", "base_url"], + "DASHSCOPE_FINE_TUNE_BASE_URL": ["dashscope", "fine_tune_base_url"], "ZHIPUAI_API_KEY": ["zhipuai", "api_key"], "STORAGE_ACCESS_KEY": ["storage", "access_key"], "STORAGE_SECRET_KEY": ["storage", "secret_key"], diff --git a/app/core/dependencies.py b/app/core/dependencies.py index 66a9c72..7f9de08 100644 --- a/app/core/dependencies.py +++ b/app/core/dependencies.py @@ -1,7 +1,7 @@ from functools import lru_cache from app.clients.llm.base import LLMClient -from app.clients.llm.zhipuai_client import ZhipuAIClient +from app.clients.llm.qwen_client import QwenClient from app.clients.storage.base import StorageClient from app.clients.storage.rustfs_client import RustFSClient from app.core.config import get_config @@ -10,7 +10,12 @@ from app.core.config import get_config @lru_cache(maxsize=1) def get_llm_client() -> LLMClient: cfg = get_config() - return ZhipuAIClient(api_key=cfg["zhipuai"]["api_key"]) + dashscope_cfg = cfg["dashscope"] + return QwenClient( + api_key=dashscope_cfg["api_key"], + base_url=dashscope_cfg.get("base_url", "https://dashscope.aliyuncs.com/compatible-mode/v1"), + fine_tune_base_url=dashscope_cfg.get("fine_tune_base_url"), + ) @lru_cache(maxsize=1) diff --git a/app/services/text_service.py b/app/services/text_service.py index 18c2003..caaa257 100644 --- a/app/services/text_service.py +++ b/app/services/text_service.py @@ -6,7 +6,7 @@ import docx from app.clients.llm.base import LLMClient from app.clients.storage.base import StorageClient from app.core.config import get_config -from app.core.exceptions import UnsupportedFileTypeError +from app.core.exceptions import StorageError, UnsupportedFileTypeError from app.core.json_utils import extract_json from app.core.logging import get_logger from app.models.text_models import ( @@ -61,7 +61,21 @@ async def extract_triples( bucket = cfg["storage"]["buckets"]["source_data"] model = req.model or cfg["models"]["default_text"] - data = await storage.download_bytes(bucket, req.file_path) + try: + data = await storage.download_bytes(bucket, req.file_path) + logger.info("文件下载成功", extra={ + "file_name": req.file_name, + "size_bytes": len(data) + }) + except Exception as e: + logger.error("文件下载失败", extra={ + "file_name": req.file_name, + "file_path": req.file_path, + "bucket": bucket, + "error_type": type(e).__name__, + "error_message": str(e) + }, exc_info=True) + raise StorageError(f"下载文件失败: {str(e)}") from e if ext == ".txt": text = _parse_txt(data) diff --git a/config.yaml b/config.yaml index 925435a..7e03640 100644 --- a/config.yaml +++ b/config.yaml @@ -2,18 +2,23 @@ server: port: 18000 log_level: INFO +dashscope: + api_key: "" # override with DASHSCOPE_API_KEY in .env or environment + base_url: "https://dashscope.aliyuncs.com/compatible-mode/v1" + fine_tune_base_url: "https://dashscope.aliyuncs.com/api/v1" + storage: buckets: - source_data: "source-data" + source_data: "label-source-data" finetune_export: "finetune-export" backend: {} # callback_url injected via BACKEND_CALLBACK_URL env var video: frame_sample_count: 8 # uniform frames sampled for video-to-text - max_file_size_mb: 200 # video size limit (override with MAX_VIDEO_SIZE_MB) + max_file_size_mb: 500 # video size limit (override with MAX_VIDEO_SIZE_MB) keyframe_diff_threshold: 30.0 # grayscale mean-diff threshold for keyframe detection models: - default_text: "glm-4-flash" - default_vision: "glm-4v-flash" + default_text: "qwen-plus" + default_vision: "qwen-vl-plus"