- Add submit_finetune and get_finetune_status abstract methods to LLMClient base - Implement both methods in ZhipuAIClient using asyncio.get_running_loop() - Rewrite finetune_service to call llm.submit_finetune / llm.get_finetune_status instead of accessing llm._client directly, restoring interface encapsulation - Replace asyncio.get_event_loop() with get_running_loop() in ZhipuAIClient._call and all four methods in RustFSClient (deprecated in Python 3.10+) - Update test_finetune_service to mock the LLMClient interface methods as AsyncMocks - Add two new tests in test_llm_client for submit_finetune and get_finetune_status
71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
import asyncio
|
|
import io
|
|
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
from app.clients.storage.base import StorageClient
|
|
from app.core.exceptions import StorageError
|
|
from app.core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class RustFSClient(StorageClient):
|
|
def __init__(self, endpoint: str, access_key: str, secret_key: str) -> None:
|
|
self._s3 = boto3.client(
|
|
"s3",
|
|
endpoint_url=endpoint,
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_key,
|
|
)
|
|
|
|
async def download_bytes(self, bucket: str, path: str) -> bytes:
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
resp = await loop.run_in_executor(
|
|
None, lambda: self._s3.get_object(Bucket=bucket, Key=path)
|
|
)
|
|
return resp["Body"].read()
|
|
except ClientError as exc:
|
|
raise StorageError(f"存储下载失败 [{bucket}/{path}]: {exc}") from exc
|
|
|
|
async def upload_bytes(
|
|
self, bucket: str, path: str, data: bytes, content_type: str = "application/octet-stream"
|
|
) -> None:
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
await loop.run_in_executor(
|
|
None,
|
|
lambda: self._s3.put_object(
|
|
Bucket=bucket, Key=path, Body=io.BytesIO(data), ContentType=content_type
|
|
),
|
|
)
|
|
except ClientError as exc:
|
|
raise StorageError(f"存储上传失败 [{bucket}/{path}]: {exc}") from exc
|
|
|
|
async def get_presigned_url(self, bucket: str, path: str, expires: int = 3600) -> str:
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
url = await loop.run_in_executor(
|
|
None,
|
|
lambda: self._s3.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": bucket, "Key": path},
|
|
ExpiresIn=expires,
|
|
),
|
|
)
|
|
return url
|
|
except ClientError as exc:
|
|
raise StorageError(f"生成预签名 URL 失败 [{bucket}/{path}]: {exc}") from exc
|
|
|
|
async def get_object_size(self, bucket: str, path: str) -> int:
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
resp = await loop.run_in_executor(
|
|
None, lambda: self._s3.head_object(Bucket=bucket, Key=path)
|
|
)
|
|
return resp["ContentLength"]
|
|
except ClientError as exc:
|
|
raise StorageError(f"获取文件大小失败 [{bucket}/{path}]: {exc}") from exc
|