提取操作使用千问plus大模型

This commit is contained in:
wh
2026-04-17 01:22:05 +08:00
parent d34f703523
commit 3a60d8cb33
8 changed files with 196 additions and 22 deletions

View File

@@ -0,0 +1,132 @@
from typing import Any
import httpx
from app.clients.llm.base import LLMClient
from app.core.exceptions import LLMCallError
from app.core.logging import get_logger
logger = get_logger(__name__)
class QwenClient(LLMClient):
def __init__(
self,
api_key: str,
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
fine_tune_base_url: str | None = None,
transport: httpx.BaseTransport | None = None,
) -> None:
self._api_key = api_key
self._base_url = base_url.rstrip("/")
self._fine_tune_base_url = (
fine_tune_base_url.rstrip("/")
if fine_tune_base_url
else self._base_url.replace("/compatible-mode/v1", "/api/v1")
)
self._transport = transport
async def chat(self, model: str, messages: list[dict]) -> str:
return await self._chat(model, messages)
async def chat_vision(self, model: str, messages: list[dict]) -> str:
return await self._chat(model, messages)
async def submit_finetune(self, jsonl_url: str, base_model: str, hyperparams: dict) -> str:
try:
file_bytes = await self._download_training_file(jsonl_url)
file_id = await self._upload_training_file(file_bytes)
payload = {
"model": base_model,
"training_file_ids": [file_id],
}
if hyperparams:
payload["hyper_parameters"] = hyperparams
data = await self._post_json(self._fine_tune_base_url, "/fine-tunes", payload)
output = data.get("output", {})
job_id = output.get("job_id") or data.get("job_id")
if not job_id:
raise LLMCallError("千问微调任务提交失败: 缺少 job_id")
return job_id
except LLMCallError:
raise
except Exception as exc:
raise LLMCallError(f"千问微调任务提交失败: {exc}") from exc
async def get_finetune_status(self, job_id: str) -> dict:
try:
data = await self._get_json(self._fine_tune_base_url, f"/fine-tunes/{job_id}")
output = data.get("output", {})
return {
"job_id": output.get("job_id") or job_id,
"status": output.get("status", "").lower(),
"progress": output.get("progress"),
"error_message": output.get("message"),
}
except LLMCallError:
raise
except Exception as exc:
raise LLMCallError(f"查询千问微调任务失败: {exc}") from exc
async def _chat(self, model: str, messages: list[dict]) -> str:
try:
data = await self._post_json(
self._base_url,
"/chat/completions",
{"model": model, "messages": messages},
)
content = data["choices"][0]["message"]["content"]
if isinstance(content, list):
return "".join(
part.get("text", "") if isinstance(part, dict) else str(part)
for part in content
)
logger.info("llm_call", extra={"model": model, "response_len": len(content)})
return content
except LLMCallError:
raise
except Exception as exc:
logger.error("llm_call_error", extra={"model": model, "error": str(exc)})
raise LLMCallError(f"千问大模型调用失败: {exc}") from exc
async def _download_training_file(self, jsonl_url: str) -> bytes:
async with self._build_client() as client:
response = await client.get(jsonl_url)
response.raise_for_status()
return response.content
async def _upload_training_file(self, file_bytes: bytes) -> str:
async with self._build_client(base_url=self._base_url) as client:
response = await client.post(
"/files",
data={"purpose": "fine-tune"},
files={"file": ("training.jsonl", file_bytes, "application/jsonl")},
)
response.raise_for_status()
data = response.json()
file_id = data.get("id")
if not file_id:
raise LLMCallError("千问训练文件上传失败: 缺少 file id")
return file_id
async def _post_json(self, base_url: str, path: str, payload: dict[str, Any]) -> dict[str, Any]:
async with self._build_client(base_url=base_url) as client:
response = await client.post(path, json=payload)
response.raise_for_status()
return response.json()
async def _get_json(self, base_url: str, path: str) -> dict[str, Any]:
async with self._build_client(base_url=base_url) as client:
response = await client.get(path)
response.raise_for_status()
return response.json()
def _build_client(self, base_url: str | None = None) -> httpx.AsyncClient:
return httpx.AsyncClient(
base_url=base_url or self._base_url,
headers={
"Authorization": f"Bearer {self._api_key}",
},
transport=self._transport,
timeout=60,
)

View File

@@ -28,6 +28,7 @@ class RustFSClient(StorageClient):
)
return resp["Body"].read()
except ClientError as exc:
print(exc)
raise StorageError(f"存储下载失败 [{bucket}/{path}]: {exc}") from exc
async def upload_bytes(

View File

@@ -10,6 +10,9 @@ load_dotenv()
# Maps environment variable names to nested YAML key paths
_ENV_OVERRIDES: dict[str, list[str]] = {
"DASHSCOPE_API_KEY": ["dashscope", "api_key"],
"DASHSCOPE_BASE_URL": ["dashscope", "base_url"],
"DASHSCOPE_FINE_TUNE_BASE_URL": ["dashscope", "fine_tune_base_url"],
"ZHIPUAI_API_KEY": ["zhipuai", "api_key"],
"STORAGE_ACCESS_KEY": ["storage", "access_key"],
"STORAGE_SECRET_KEY": ["storage", "secret_key"],

View File

@@ -1,7 +1,7 @@
from functools import lru_cache
from app.clients.llm.base import LLMClient
from app.clients.llm.zhipuai_client import ZhipuAIClient
from app.clients.llm.qwen_client import QwenClient
from app.clients.storage.base import StorageClient
from app.clients.storage.rustfs_client import RustFSClient
from app.core.config import get_config
@@ -10,7 +10,12 @@ from app.core.config import get_config
@lru_cache(maxsize=1)
def get_llm_client() -> LLMClient:
cfg = get_config()
return ZhipuAIClient(api_key=cfg["zhipuai"]["api_key"])
dashscope_cfg = cfg["dashscope"]
return QwenClient(
api_key=dashscope_cfg["api_key"],
base_url=dashscope_cfg.get("base_url", "https://dashscope.aliyuncs.com/compatible-mode/v1"),
fine_tune_base_url=dashscope_cfg.get("fine_tune_base_url"),
)
@lru_cache(maxsize=1)

View File

@@ -6,7 +6,7 @@ import docx
from app.clients.llm.base import LLMClient
from app.clients.storage.base import StorageClient
from app.core.config import get_config
from app.core.exceptions import UnsupportedFileTypeError
from app.core.exceptions import StorageError, UnsupportedFileTypeError
from app.core.json_utils import extract_json
from app.core.logging import get_logger
from app.models.text_models import (
@@ -61,7 +61,21 @@ async def extract_triples(
bucket = cfg["storage"]["buckets"]["source_data"]
model = req.model or cfg["models"]["default_text"]
data = await storage.download_bytes(bucket, req.file_path)
try:
data = await storage.download_bytes(bucket, req.file_path)
logger.info("文件下载成功", extra={
"file_name": req.file_name,
"size_bytes": len(data)
})
except Exception as e:
logger.error("文件下载失败", extra={
"file_name": req.file_name,
"file_path": req.file_path,
"bucket": bucket,
"error_type": type(e).__name__,
"error_message": str(e)
}, exc_info=True)
raise StorageError(f"下载文件失败: {str(e)}") from e
if ext == ".txt":
text = _parse_txt(data)