import asyncio import io import boto3 from botocore.exceptions import ClientError from app.clients.storage.base import StorageClient from app.core.exceptions import StorageError from app.core.logging import get_logger logger = get_logger(__name__) class RustFSClient(StorageClient): def __init__(self, endpoint: str, access_key: str, secret_key: str) -> None: self._s3 = boto3.client( "s3", endpoint_url=endpoint, aws_access_key_id=access_key, aws_secret_access_key=secret_key, ) async def download_bytes(self, bucket: str, path: str) -> bytes: loop = asyncio.get_event_loop() try: resp = await loop.run_in_executor( None, lambda: self._s3.get_object(Bucket=bucket, Key=path) ) return resp["Body"].read() except ClientError as exc: raise StorageError(f"存储下载失败 [{bucket}/{path}]: {exc}") from exc async def upload_bytes( self, bucket: str, path: str, data: bytes, content_type: str = "application/octet-stream" ) -> None: loop = asyncio.get_event_loop() try: await loop.run_in_executor( None, lambda: self._s3.put_object( Bucket=bucket, Key=path, Body=io.BytesIO(data), ContentType=content_type ), ) except ClientError as exc: raise StorageError(f"存储上传失败 [{bucket}/{path}]: {exc}") from exc async def get_presigned_url(self, bucket: str, path: str, expires: int = 3600) -> str: loop = asyncio.get_event_loop() try: url = await loop.run_in_executor( None, lambda: self._s3.generate_presigned_url( "get_object", Params={"Bucket": bucket, "Key": path}, ExpiresIn=expires, ), ) return url except ClientError as exc: raise StorageError(f"生成预签名 URL 失败 [{bucket}/{path}]: {exc}") from exc async def get_object_size(self, bucket: str, path: str) -> int: loop = asyncio.get_event_loop() try: resp = await loop.run_in_executor( None, lambda: self._s3.head_object(Bucket=bucket, Key=path) ) return resp["ContentLength"] except ClientError as exc: raise StorageError(f"获取文件大小失败 [{bucket}/{path}]: {exc}") from exc