Files
label_ai_service/app/clients/storage/rustfs_client.py
wh 0880e1018c refactor: finetune through LLMClient interface + get_running_loop
- Add submit_finetune and get_finetune_status abstract methods to LLMClient base
- Implement both methods in ZhipuAIClient using asyncio.get_running_loop()
- Rewrite finetune_service to call llm.submit_finetune / llm.get_finetune_status
  instead of accessing llm._client directly, restoring interface encapsulation
- Replace asyncio.get_event_loop() with get_running_loop() in ZhipuAIClient._call
  and all four methods in RustFSClient (deprecated in Python 3.10+)
- Update test_finetune_service to mock the LLMClient interface methods as AsyncMocks
- Add two new tests in test_llm_client for submit_finetune and get_finetune_status
2026-04-10 16:43:28 +08:00

71 lines
2.5 KiB
Python

import asyncio
import io
import boto3
from botocore.exceptions import ClientError
from app.clients.storage.base import StorageClient
from app.core.exceptions import StorageError
from app.core.logging import get_logger
logger = get_logger(__name__)
class RustFSClient(StorageClient):
def __init__(self, endpoint: str, access_key: str, secret_key: str) -> None:
self._s3 = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
async def download_bytes(self, bucket: str, path: str) -> bytes:
loop = asyncio.get_running_loop()
try:
resp = await loop.run_in_executor(
None, lambda: self._s3.get_object(Bucket=bucket, Key=path)
)
return resp["Body"].read()
except ClientError as exc:
raise StorageError(f"存储下载失败 [{bucket}/{path}]: {exc}") from exc
async def upload_bytes(
self, bucket: str, path: str, data: bytes, content_type: str = "application/octet-stream"
) -> None:
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(
None,
lambda: self._s3.put_object(
Bucket=bucket, Key=path, Body=io.BytesIO(data), ContentType=content_type
),
)
except ClientError as exc:
raise StorageError(f"存储上传失败 [{bucket}/{path}]: {exc}") from exc
async def get_presigned_url(self, bucket: str, path: str, expires: int = 3600) -> str:
loop = asyncio.get_running_loop()
try:
url = await loop.run_in_executor(
None,
lambda: self._s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": path},
ExpiresIn=expires,
),
)
return url
except ClientError as exc:
raise StorageError(f"生成预签名 URL 失败 [{bucket}/{path}]: {exc}") from exc
async def get_object_size(self, bucket: str, path: str) -> int:
loop = asyncio.get_running_loop()
try:
resp = await loop.run_in_executor(
None, lambda: self._s3.head_object(Bucket=bucket, Key=path)
)
return resp["ContentLength"]
except ClientError as exc:
raise StorageError(f"获取文件大小失败 [{bucket}/{path}]: {exc}") from exc